kafka

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Kafka Integration Skill

Kafka集成技能

You are an expert in integrating Apache Kafka with Platformatic Watt for event-driven microservices.
您是将Apache Kafka与Platformatic Watt集成以构建事件驱动微服务的专家。

Prerequisites Check

前置检查

Before any Kafka setup, verify:
  1. Node.js Version: Watt requires Node.js v22.19.0+
    bash
    node --version
    If below v22.19.0, inform user they must upgrade Node.js first.
  2. Existing Watt Config: Check if
    watt.json
    already exists
    bash
    ls watt.json 2>/dev/null
    If no
    watt.json
    , suggest running
    /watt init
    first to set up Watt.
在进行任何Kafka设置之前,请先验证:
  1. Node.js版本:Watt要求Node.js v22.19.0+
    bash
    node --version
    如果版本低于v22.19.0,请告知用户必须先升级Node.js。
  2. 现有Watt配置:检查是否已存在
    watt.json
    bash
    ls watt.json 2>/dev/null
    如果没有
    watt.json
    ,建议先运行
    /watt init
    来设置Watt。

Command Router

命令路由

Based on user input ($ARGUMENTS), route to the appropriate workflow:
Input PatternAction
hooks
,
webhooks
, (empty)
Run Kafka-Hooks Setup
producer
,
consumer
,
client
Run Kafka Client Setup
monitoring
,
lag
,
health
Run Consumer Lag Monitoring Setup
tracing
,
opentelemetry
,
otel
Run Kafka Tracing Setup
migrate
,
kafkajs
,
migration
Run KafkaJS Migration Workflow

根据用户输入($ARGUMENTS),路由到相应的工作流:
输入模式操作
hooks
webhooks
、(空)
运行Kafka-Hooks设置
producer
consumer
client
运行Kafka客户端设置
monitoring
lag
health
运行消费者延迟监控设置
tracing
opentelemetry
otel
运行Kafka追踪设置
migrate
kafkajs
migration
运行KafkaJS迁移工作流

Kafka-Hooks Setup

Kafka-Hooks设置

When user requests Kafka webhook/hook integration:
  1. Read references/kafka.md
  2. Choose integration approach:
    • @platformatic/kafka-hooks: Kafka-to-HTTP webhooks (recommended for Watt)
    • @platformatic/kafka: Direct producer/consumer in your services
  3. Create kafka-hooks service with
    npx wattpm@latest create
  4. Configure topics, webhooks, and request/response patterns
当用户请求Kafka webhook/hook集成时:
  1. 阅读references/kafka.md
  2. 选择集成方式:
    • @platformatic/kafka-hooks:Kafka转HTTP webhook(Watt推荐方案)
    • @platformatic/kafka:在服务中直接使用生产者/消费者
  3. 通过
    npx wattpm@latest create
    创建kafka-hooks服务
  4. 配置主题、webhook以及请求/响应模式

Kafka-Hooks Patterns

Kafka-Hooks模式

  • Webhook: Kafka messages → HTTP endpoints (with DLQ)
  • Request/Response: HTTP → Kafka → HTTP (correlation IDs)
  • HTTP Publishing: POST to
    /topics/{topicName}

  • Webhook:Kafka消息 → HTTP端点(支持DLQ)
  • 请求/响应:HTTP → Kafka → HTTP(关联ID)
  • HTTP发布:向
    /topics/{topicName}
    发送POST请求

Kafka Client Setup

Kafka客户端设置

When user requests direct Kafka producer/consumer integration:
  1. Read references/kafka.md
  2. Install
    @platformatic/kafka
    :
    bash
    npm install @platformatic/kafka
  3. Set up producer and/or consumer in the target service
  4. Configure serializers/deserializers based on message format

当用户请求直接集成Kafka生产者/消费者时:
  1. 阅读references/kafka.md
  2. 安装
    @platformatic/kafka
    bash
    npm install @platformatic/kafka
  3. 在目标服务中设置生产者和/或消费者
  4. 根据消息格式配置序列化器/反序列化器

Consumer Lag Monitoring Setup

消费者延迟监控设置

When user requests Kafka consumer lag monitoring:
  1. Read references/kafka.md
  2. Install
    @platformatic/watt-plugin-kafka-health
    :
    bash
    npm install @platformatic/watt-plugin-kafka-health
  3. Add plugin to service
    watt.json
  4. Configure lag threshold and check interval

当用户请求Kafka消费者延迟监控时:
  1. 阅读references/kafka.md
  2. 安装
    @platformatic/watt-plugin-kafka-health
    bash
    npm install @platformatic/watt-plugin-kafka-health
  3. 将插件添加到服务的
    watt.json
  4. 配置延迟阈值和检查间隔

Kafka Tracing Setup

Kafka追踪设置

When user requests OpenTelemetry tracing for Kafka:
  1. Read references/kafka.md
  2. Install
    @platformatic/kafka-opentelemetry
    :
    bash
    npm install @platformatic/kafka-opentelemetry
  3. Enable instrumentation in the service

当用户请求为Kafka配置OpenTelemetry追踪时:
  1. 阅读references/kafka.md
  2. 安装
    @platformatic/kafka-opentelemetry
    bash
    npm install @platformatic/kafka-opentelemetry
  3. 在服务中启用埋点

KafkaJS Migration Workflow

KafkaJS迁移工作流

When user wants to migrate from KafkaJS to @platformatic/kafka:
  1. Read references/migration.md
  2. Scan the project for KafkaJS usage patterns:
    • require('kafkajs')
      or
      from 'kafkajs'
      imports
    • new Kafka({...})
      factory instantiation
    • .producer()
      ,
      .consumer()
      ,
      .admin()
      calls
    • connect()
      /
      disconnect()
      lifecycle calls
    • subscribe()
      +
      run({ eachMessage })
      consumer pattern
    • sendBatch()
      calls
    • CompressionTypes
      usage
    • transaction()
      calls
    • Error handling with
      KafkaJS*
      error classes
  3. Apply the migration checklist from the reference, transforming each pattern
  4. Verify the migration covers all areas:
    • Client creation (factory → direct instantiation)
    • Connection lifecycle (
      connect
      /
      disconnect
      → lazy/
      close
      )
    • Producer API (topic per-send → topic per-message, serializers)
    • Consumer API (callback → stream, offset modes)
    • Admin API (new method signatures)
    • Error handling (
      retriable
      canRetry
      , new error classes)
    • Events (custom events →
      diagnostics_channel
      )

当用户希望从KafkaJS迁移到@platformatic/kafka时:
  1. 阅读references/migration.md
  2. 扫描项目中KafkaJS的使用模式:
    • require('kafkajs')
      from 'kafkajs'
      导入
    • new Kafka({...})
      工厂实例化
    • .producer()
      .consumer()
      .admin()
      调用
    • connect()
      /
      disconnect()
      生命周期调用
    • subscribe()
      +
      run({ eachMessage })
      消费者模式
    • sendBatch()
      调用
    • CompressionTypes
      使用
    • transaction()
      调用
    • 使用
      KafkaJS*
      错误类的错误处理
  3. 应用参考文档中的迁移清单,转换每种使用模式
  4. 验证迁移是否覆盖所有领域:
    • 客户端创建(工厂 → 直接实例化)
    • 连接生命周期(
      connect
      /
      disconnect
      → 懒加载/
      close
    • 生产者API(每次发送指定主题 → 每条消息指定主题,序列化器)
    • 消费者API(回调 → 流,偏移量模式)
    • 管理API(新方法签名)
    • 错误处理(
      retriable
      canRetry
      ,新错误类)
    • 事件(自定义事件 →
      diagnostics_channel

Important Notes

重要注意事项

  • Internal service URLs:
    http://{service-id}.plt.local
  • Environment variables in watt.json use
    {VAR_NAME}
    (curly braces, no dollar sign)
  • Kafka-hooks is the recommended approach for Watt multi-service architectures
  • Always configure Dead Letter Queues (DLQ) for production webhook topics
  • 内部服务URL:
    http://{service-id}.plt.local
  • watt.json中的环境变量使用
    {VAR_NAME}
    (大括号,无美元符号)
  • 在Watt多服务架构中,推荐使用Kafka-hooks方案
  • 生产环境的webhook主题始终要配置死信队列(DLQ)