ingestion-pipeline-doctor-nodejs

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Pipeline Doctor

Pipeline Doctor

Quick reference for PostHog's ingestion pipeline framework and its convention-checking agents.
PostHog摄入管道框架及其规范检查agents的快速参考指南。

Architecture overview

架构概述

The ingestion pipeline processes events through a typed, composable step chain:
text
Kafka message
  → messageAware()
    → parse headers/body
    → sequentially() for preprocessing
    → filterMap() to enrich context (e.g., team lookup)
    → teamAware()
      → groupBy(token:distinctId)
        → concurrently() for per-entity processing
      → gather()
      → pipeBatch() for batch operations
      → handleIngestionWarnings()
    → handleResults()
  → handleSideEffects()
  → build()
See
nodejs/src/ingestion/analytics/joined-ingestion-pipeline.ts
for the real implementation.
摄入管道通过一个类型化、可组合的步骤链处理事件:
text
Kafka message
  → messageAware()
    → parse headers/body
    → sequentially() for preprocessing
    → filterMap() to enrich context (e.g., team lookup)
    → teamAware()
      → groupBy(token:distinctId)
        → concurrently() for per-entity processing
      → gather()
      → pipeBatch() for batch operations
      → handleIngestionWarnings()
    → handleResults()
  → handleSideEffects()
  → build()
查看
nodejs/src/ingestion/analytics/joined-ingestion-pipeline.ts
获取实际实现代码。

Key file locations

关键文件位置

WhatWhere
Step type
nodejs/src/ingestion/pipelines/steps.ts
Result types
nodejs/src/ingestion/pipelines/results.ts
Doc-test chapters
nodejs/src/ingestion/pipelines/docs/*.test.ts
Joined pipeline
nodejs/src/ingestion/analytics/joined-ingestion-pipeline.ts
Doctor agents
.claude/agents/ingestion/
Test helpers
nodejs/src/ingestion/pipelines/docs/helpers.ts
内容位置
步骤类型
nodejs/src/ingestion/pipelines/steps.ts
结果类型
nodejs/src/ingestion/pipelines/results.ts
文档测试章节
nodejs/src/ingestion/pipelines/docs/*.test.ts
联合管道
nodejs/src/ingestion/analytics/joined-ingestion-pipeline.ts
Doctor agents
.claude/agents/ingestion/
测试辅助工具
nodejs/src/ingestion/pipelines/docs/helpers.ts

Which agent to use

选择合适的Agent

ConcernAgentWhen to use
Step structure
pipeline-step-doctor
Factory pattern, type extension, config injection, naming
Result handling
pipeline-result-doctor
ok/dlq/drop/redirect, side effects, ingestion warnings
Composition
pipeline-composition-doctor
Builder chain, concurrency, grouping, branching, retries
Testing
pipeline-testing-doctor
Test helpers, assertions, fake timers, doc-test style
关注场景Agent适用场景
步骤结构
pipeline-step-doctor
工厂模式、类型扩展、配置注入、命名规范
结果处理
pipeline-result-doctor
ok/dlq/drop/redirect、副作用处理、摄入警告
组合方式
pipeline-composition-doctor
构建器链、并发处理、分组、分支、重试
测试
pipeline-testing-doctor
测试辅助工具、断言、模拟定时器、文档测试风格

Quick convention reference

快速规范参考

Steps: Factory function returning a named inner function. Generic
<T extends Input>
for type extension. No
any
. Config via closure.
Results: Use
ok()
,
dlq()
,
drop()
,
redirect()
constructors. Side effects as promises in
ok(value, [effects])
. Warnings as third parameter.
Composition:
messageAware
wraps the pipeline.
handleResults
inside
messageAware
.
handleSideEffects
after.
groupBy
+
concurrently
for per-entity work.
gather
before batch steps.
Testing: Step tests call factory directly. Use
consumeAll()
/
collectBatches()
helpers. Fake timers for async. Type guards for result assertions. No
any
.
步骤:工厂函数返回一个命名内部函数。使用泛型
<T extends Input>
实现类型扩展。禁止使用
any
类型。通过闭包注入配置。
结果:使用
ok()
dlq()
drop()
redirect()
构造函数。副作用以Promise形式传入
ok(value, [effects])
。警告作为第三个参数传入。
组合
messageAware
包裹整个管道。
handleResults
置于
messageAware
内部。
handleSideEffects
置于其后。使用
groupBy
+
concurrently
处理每个实体的任务。批量步骤前使用
gather
测试:步骤测试直接调用工厂函数。使用
consumeAll()
/
collectBatches()
辅助工具。为异步操作使用模拟定时器。使用类型守卫进行结果断言。禁止使用
any
类型。

Running all doctors

运行所有诊断工具

Ask Claude to "run all pipeline doctors on my recent changes" to get a comprehensive review across all 4 concern areas.
让Claude执行“对我最近的变更运行所有pipeline doctors”,即可获取涵盖全部4个关注领域的全面评审。