kafka
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseKafka Integration Skill
Kafka集成技能
You are an expert in integrating Apache Kafka with Platformatic Watt for event-driven microservices.
您是将Apache Kafka与Platformatic Watt集成以构建事件驱动微服务的专家。
Prerequisites Check
前置检查
Before any Kafka setup, verify:
-
Node.js Version: Watt requires Node.js v22.19.0+bash
node --versionIf below v22.19.0, inform user they must upgrade Node.js first. -
Existing Watt Config: Check ifalready exists
watt.jsonbashls watt.json 2>/dev/nullIf no, suggest runningwatt.jsonfirst to set up Watt./watt init
在进行任何Kafka设置之前,请先验证:
-
Node.js版本:Watt要求Node.js v22.19.0+bash
node --version如果版本低于v22.19.0,请告知用户必须先升级Node.js。 -
现有Watt配置:检查是否已存在
watt.jsonbashls watt.json 2>/dev/null如果没有,建议先运行watt.json来设置Watt。/watt init
Command Router
命令路由
Based on user input ($ARGUMENTS), route to the appropriate workflow:
| Input Pattern | Action |
|---|---|
| Run Kafka-Hooks Setup |
| Run Kafka Client Setup |
| Run Consumer Lag Monitoring Setup |
| Run Kafka Tracing Setup |
| Run KafkaJS Migration Workflow |
根据用户输入($ARGUMENTS),路由到相应的工作流:
| 输入模式 | 操作 |
|---|---|
| 运行Kafka-Hooks设置 |
| 运行Kafka客户端设置 |
| 运行消费者延迟监控设置 |
| 运行Kafka追踪设置 |
| 运行KafkaJS迁移工作流 |
Kafka-Hooks Setup
Kafka-Hooks设置
When user requests Kafka webhook/hook integration:
- Read references/kafka.md
- Choose integration approach:
- @platformatic/kafka-hooks: Kafka-to-HTTP webhooks (recommended for Watt)
- @platformatic/kafka: Direct producer/consumer in your services
- Create kafka-hooks service with
npx wattpm@latest create - Configure topics, webhooks, and request/response patterns
当用户请求Kafka webhook/hook集成时:
- 阅读references/kafka.md
- 选择集成方式:
- @platformatic/kafka-hooks:Kafka转HTTP webhook(Watt推荐方案)
- @platformatic/kafka:在服务中直接使用生产者/消费者
- 通过创建kafka-hooks服务
npx wattpm@latest create - 配置主题、webhook以及请求/响应模式
Kafka-Hooks Patterns
Kafka-Hooks模式
- Webhook: Kafka messages → HTTP endpoints (with DLQ)
- Request/Response: HTTP → Kafka → HTTP (correlation IDs)
- HTTP Publishing: POST to
/topics/{topicName}
- Webhook:Kafka消息 → HTTP端点(支持DLQ)
- 请求/响应:HTTP → Kafka → HTTP(关联ID)
- HTTP发布:向发送POST请求
/topics/{topicName}
Kafka Client Setup
Kafka客户端设置
When user requests direct Kafka producer/consumer integration:
- Read references/kafka.md
- Install :
@platformatic/kafkabashnpm install @platformatic/kafka - Set up producer and/or consumer in the target service
- Configure serializers/deserializers based on message format
当用户请求直接集成Kafka生产者/消费者时:
- 阅读references/kafka.md
- 安装:
@platformatic/kafkabashnpm install @platformatic/kafka - 在目标服务中设置生产者和/或消费者
- 根据消息格式配置序列化器/反序列化器
Consumer Lag Monitoring Setup
消费者延迟监控设置
When user requests Kafka consumer lag monitoring:
- Read references/kafka.md
- Install :
@platformatic/watt-plugin-kafka-healthbashnpm install @platformatic/watt-plugin-kafka-health - Add plugin to service
watt.json - Configure lag threshold and check interval
当用户请求Kafka消费者延迟监控时:
- 阅读references/kafka.md
- 安装:
@platformatic/watt-plugin-kafka-healthbashnpm install @platformatic/watt-plugin-kafka-health - 将插件添加到服务的中
watt.json - 配置延迟阈值和检查间隔
Kafka Tracing Setup
Kafka追踪设置
When user requests OpenTelemetry tracing for Kafka:
- Read references/kafka.md
- Install :
@platformatic/kafka-opentelemetrybashnpm install @platformatic/kafka-opentelemetry - Enable instrumentation in the service
当用户请求为Kafka配置OpenTelemetry追踪时:
- 阅读references/kafka.md
- 安装:
@platformatic/kafka-opentelemetrybashnpm install @platformatic/kafka-opentelemetry - 在服务中启用埋点
KafkaJS Migration Workflow
KafkaJS迁移工作流
When user wants to migrate from KafkaJS to @platformatic/kafka:
- Read references/migration.md
- Scan the project for KafkaJS usage patterns:
- or
require('kafkajs')importsfrom 'kafkajs' - factory instantiation
new Kafka({...}) - ,
.producer(),.consumer()calls.admin() - /
connect()lifecycle callsdisconnect() - +
subscribe()consumer patternrun({ eachMessage }) - calls
sendBatch() - usage
CompressionTypes - calls
transaction() - Error handling with error classes
KafkaJS*
- Apply the migration checklist from the reference, transforming each pattern
- Verify the migration covers all areas:
- Client creation (factory → direct instantiation)
- Connection lifecycle (/
connect→ lazy/disconnect)close - Producer API (topic per-send → topic per-message, serializers)
- Consumer API (callback → stream, offset modes)
- Admin API (new method signatures)
- Error handling (→
retriable, new error classes)canRetry - Events (custom events → )
diagnostics_channel
当用户希望从KafkaJS迁移到@platformatic/kafka时:
- 阅读references/migration.md
- 扫描项目中KafkaJS的使用模式:
- 或
require('kafkajs')导入from 'kafkajs' - 工厂实例化
new Kafka({...}) - 、
.producer()、.consumer()调用.admin() - /
connect()生命周期调用disconnect() - +
subscribe()消费者模式run({ eachMessage }) - 调用
sendBatch() - 使用
CompressionTypes - 调用
transaction() - 使用错误类的错误处理
KafkaJS*
- 应用参考文档中的迁移清单,转换每种使用模式
- 验证迁移是否覆盖所有领域:
- 客户端创建(工厂 → 直接实例化)
- 连接生命周期(/
connect→ 懒加载/disconnect)close - 生产者API(每次发送指定主题 → 每条消息指定主题,序列化器)
- 消费者API(回调 → 流,偏移量模式)
- 管理API(新方法签名)
- 错误处理(→
retriable,新错误类)canRetry - 事件(自定义事件 → )
diagnostics_channel
Important Notes
重要注意事项
- Internal service URLs:
http://{service-id}.plt.local - Environment variables in watt.json use (curly braces, no dollar sign)
{VAR_NAME} - Kafka-hooks is the recommended approach for Watt multi-service architectures
- Always configure Dead Letter Queues (DLQ) for production webhook topics
- 内部服务URL:
http://{service-id}.plt.local - watt.json中的环境变量使用(大括号,无美元符号)
{VAR_NAME} - 在Watt多服务架构中,推荐使用Kafka-hooks方案
- 生产环境的webhook主题始终要配置死信队列(DLQ)