developing-kafka-python-client

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese
<HARD-GATE> Do NOT generate any code, scaffold any project, or modify any file until you have explicitly asked and received answers for questions #1 (existing app or greenfield), #2 (target environment), and #3 (producer, consumer, or both). If the user's prompt partially answers some questions, still confirm your understanding before generating. This applies to EVERY prompt regardless of how specific it appears. </HARD-GATE>
Begin by announcing: "Using the Confluent Kafka Python Client skill to guide this project."
<HARD-GATE> 在明确询问并获取以下问题的答案之前,请勿生成任何代码、搭建任何项目或修改任何文件:问题1(现有应用还是全新项目)、问题2(目标环境)、问题3(生产者、消费者还是两者兼具)。如果用户的提示部分回答了某些问题,仍需在生成内容前确认你的理解。无论提示看起来多么具体,此规则均适用于所有场景。 </HARD-GATE>
首先告知用户:"使用Confluent Kafka Python Client技能指导此项目。"

Confluent Kafka Python Client Creation

Confluent Kafka Python客户端创建

Generate a production-ready Python project for producing to and/or consuming from Kafka using
confluent-kafka-python
. Supports three target environments: Confluent Cloud (managed), Local Docker (open-source Kafka), and WarpStream (Kafka-compatible, object-storage-backed), and two producer styles: AsyncIO (non-blocking) and Synchronous (blocking). The generated code follows Confluent's best practices.
使用
confluent-kafka-python
生成可用于生产环境的Python项目,实现向Kafka生产消息和/或从Kafka消费消息。支持三种目标环境:Confluent Cloud(托管式)、Local Docker(开源Kafka)和WarpStream(兼容Kafka、基于对象存储),以及两种生产者类型:AsyncIO(非阻塞)和Synchronous(阻塞)。生成的代码遵循Confluent的最佳实践。

Step 1: Gather Requirements

步骤1:收集需求

Before generating any code, work through the questions below. Skip any question the user has already answered explicitly in their prompt — do not re-ask just for form's sake. For example, "build a producer and consumer on Confluent Cloud with an async producer" already answers #2, #3, and #4; only #1, #5, #6, #7, and #8 remain.
Mandatory confirmation gate — do not skip, even if the user answered every question. Before writing any file, you MUST send one message that:
  1. Recaps the answers you extracted as a short bulleted list (e.g., "Target: Confluent Cloud · Components: producer + consumer · Producer style: async · From scratch: yes").
  2. Asks any remaining open questions inline.
  3. Explicitly asks the user to confirm or correct before you proceed.
Then STOP and wait for the user's reply. Do not generate files in the same turn as the recap, and do not proceed on the assumption that a fully-specified prompt implies consent to generate immediately — the recap catches misinterpretations of the prompt and is required even when questions #1–#8 are all pre-answered. The only way to skip the gate is if the user has already confirmed the recap earlier in this conversation.
Do not assume defaults for #1, #2, or #3 — if any of these are not answered by the prompt, you must ask.
  1. Are you adding Kafka to an existing application, or starting from scratch?
    • If the user has existing Python code (mentions an existing project, has a
      main.py
      , uses Flask/FastAPI/Django, etc.), do not scaffold a new project. Instead: (a) identify their existing producer or data-sending code, (b) ask whether they already have schemas registered in Schema Registry, (c) add Schema Registry integration to their existing code following the patterns in the reference files. Generate only the files they are missing (e.g.,
      common.py
      ,
      schemas/value.schema.json
      ) and modify their existing code inline.
    • If the user already produces to Kafka without Schema Registry (schemaless), help them migrate: (1) generate a JSON Schema from their existing message structure, (2) register it, and (3) replace their raw
      producer.produce()
      calls with serializer-backed calls. Do not discard their existing code.
    • If starting from scratch, proceed with the full scaffold below.
  2. Target environment? — Confluent Cloud, local Kafka (Docker), or WarpStream. Always prompt for this, even if the user didn't mention it. If they mention "open source", "local", "docker", "self-hosted", or just want to try Kafka without a cloud account, choose local Docker. If they mention "Confluent Cloud", "CC", or have existing cloud credentials, choose Confluent Cloud. If they mention "WarpStream", choose WarpStream. Default to Confluent Cloud if they confirm they don't have a preference, but always ask first.
    • If WarpStream: Read
      references/warpstream-optimization.md
      and apply the librdkafka overrides from that reference. Key changes: disable idempotence, dramatically increase batch sizes and in-flight requests, set large fetch sizes, add
      ws_az=<az>
      to
      client.id
      for zone-aware routing. Prefer null message keys for sticky partitioning unless entity-based ordering is required.
  3. Producer, consumer, or both?
  4. Async or synchronous producer? (Only if producer is requested.) Help the user choose:
    • AsyncIO Producer (
      AIOProducer
      ): Use when code runs under an event loop — FastAPI/Starlette, aiohttp, Sanic, asyncio workers — and must not block.
    • Synchronous Producer (
      Producer
      ): Use for scripts, batch jobs, and highest-throughput pipelines where the user controls threads/processes and can call
      poll()
      /
      flush()
      directly. If the user mentions an async framework (FastAPI, aiohttp, Sanic) or uses
      asyncio
      , default to AsyncIO. If they mention scripts, batch, ETL, or don't have a preference, default to Synchronous.
  5. Do you have an existing schema you'd like to use? If yes, ask the user to paste it or provide the file path, then use it as the
    schemas/value.schema.json
    instead of generating one. If no, proceed to ask about their data fields.
  6. What kind of data are you producing? (Only if the user doesn't have an existing schema. Get field names and types so you can generate a matching JSON Schema and sample data.)
  7. Topic name? (Default:
    demo-topic
    )
  8. Consumer group ID? (Only if consumer; default:
    python-consumer-group
    )
Don't ask about Schema Registry — always include it. For Confluent Cloud and local Docker, always use JSON Schema. If the target is WarpStream, ask which Schema Registry implementation they are using — WarpStream's built-in schema registry only supports Avro and Protobuf (
GET /schemas/types
returns
["AVRO","PROTOBUF"]
), so if they are using it, ask whether they prefer Avro or Protobuf (default to Avro). If they are using a different SR (e.g., Confluent Cloud Schema Registry), JSON Schema is fine.
在生成任何代码之前,先完成以下问题的沟通。如果用户已在提示中明确回答了某个问题,可跳过该问题——不要为了走流程而重复询问。例如,用户提示"在Confluent Cloud上构建生产者和消费者,使用异步生产者",已经回答了问题2、3、4;只需询问问题1、5、6、7、8即可。
强制确认环节——请勿跳过,即使用户已回答所有问题。在编写任何文件之前,你必须发送一条消息,包含:
  1. 以简短项目符号列表形式总结你提取到的答案(例如:"目标环境:Confluent Cloud · 组件:生产者+消费者 · 生产者类型:异步 · 项目类型:全新项目")。
  2. 列出所有未回答的问题。
  3. 明确请求用户确认或纠正上述内容。
然后停止操作,等待用户回复。请勿在发送总结的同一轮次生成文件,也不要假设提示已完全明确就直接生成内容——总结环节可以避免对提示的误解,即使问题1-8都已预先回答,此环节也是必需的。唯一可跳过此环节的情况是用户在此前的对话中已确认过总结内容。
不要为问题1、2或3假设默认值——如果这些问题未在提示中得到回答,必须询问用户。
  1. 你是要为现有应用添加Kafka功能,还是从零开始创建项目?
    • 如果用户有现有Python代码(提及现有项目、存在
      main.py
      、使用Flask/FastAPI/Django等),请勿搭建新项目。而是:(a) 识别其现有的生产者或数据发送代码;(b) 询问他们是否已在Schema Registry中注册了Schema;(c) 按照参考文件中的模式,为其现有代码添加Schema Registry集成。仅生成他们缺失的文件(例如
      common.py
      schemas/value.schema.json
      )并内联修改其现有代码。
    • 如果用户已在不使用Schema Registry的情况下向Kafka生产消息(无Schema),帮助他们迁移:(1) 根据其现有消息结构生成JSON Schema;(2) 注册该Schema;(3) 将其原始的
      producer.produce()
      调用替换为基于序列化器的调用。请勿丢弃其现有代码。
    • 如果是从零开始创建项目,请按照下文的完整搭建流程进行。
  2. 目标环境是什么?——Confluent Cloud、本地Kafka(Docker)还是WarpStream。无论用户是否提及,都必须询问此问题。如果用户提到"开源"、"本地"、"docker"、"自托管",或只是想在没有云账户的情况下试用Kafka,选择Local Docker。如果用户提到"Confluent Cloud"、"CC",或已有云凭证,选择Confluent Cloud。如果用户提到"WarpStream",选择WarpStream。如果用户确认没有偏好,默认选择Confluent Cloud,但必须先询问用户。
    • 如果是WarpStream: 阅读
      references/warpstream-optimization.md
      并应用其中的librdkafka配置覆盖项。关键更改:禁用幂等性、大幅增加批处理大小和在途请求数、设置较大的获取大小、在
      client.id
      中添加
      ws_az=<az>
      以实现区域感知路由。除非需要基于实体的顺序,否则优先使用空消息键以实现粘性分区。
  3. 需要生产者、消费者还是两者兼具?
  4. 使用异步还是同步生产者?(仅当需要生产者时询问)帮助用户选择:
    • AsyncIO生产者
      AIOProducer
      ):适用于代码运行在事件循环中的场景——FastAPI/Starlette、aiohttp、Sanic、asyncio工作进程——且不能阻塞。
    • 同步生产者
      Producer
      ):适用于脚本、批处理作业和高吞吐量管道,用户可直接控制线程/进程并调用
      poll()
      /
      flush()
      。 如果用户提到异步框架(FastAPI、aiohttp、Sanic)或使用
      asyncio
      ,默认选择AsyncIO。如果用户提到脚本、批处理、ETL,或没有偏好,默认选择Synchronous
  5. 你是否有要使用的现有Schema? 如果有,请用户粘贴Schema内容或提供文件路径,然后将其用作
    schemas/value.schema.json
    ,无需生成新的Schema。如果没有,继续询问用户的数据字段信息。
  6. 你要生产的数据是什么类型?(仅当用户没有现有Schema时询问)获取字段名称和类型,以便生成匹配的JSON Schema和示例数据。
  7. Topic名称是什么?(默认值:
    demo-topic
  8. 消费者组ID是什么?(仅当需要消费者时询问;默认值:
    python-consumer-group
无需询问是否使用Schema Registry——所有项目都必须包含Schema Registry。对于Confluent Cloud和Local Docker,始终使用JSON Schema。如果目标环境是WarpStream,询问用户使用的是哪种Schema Registry实现——WarpStream内置的Schema Registry仅支持Avro和Protobuf(
GET /schemas/types
返回
["AVRO","PROTOBUF"]
),因此如果用户使用该内置Registry,询问他们偏好Avro还是Protobuf(默认选择Avro)。如果用户使用其他Registry(例如Confluent Cloud Schema Registry),则可使用JSON Schema。

Common Agent Mistakes

常见Agent错误

ThoughtReality
"The user mentioned FastAPI, so I know it's async — skip the questions"Still confirm. They might want a sync background worker alongside FastAPI.
"I'll use Avro since it's more widely used"This skill uses JSON Schema by default. Exception: WarpStream's built-in schema registry only supports Avro and Protobuf — if the user is using WarpStream SR, use Avro by default. If they're using a different SR (e.g., Confluent Cloud SR), JSON Schema is fine regardless of the Kafka environment.
"I'll skip Schema Registry to keep it simple"Schema Registry is non-negotiable. Every project includes it.
"I'll use
auto.register.schemas=True
for convenience"
Always
False
. Explicit registration is a core principle.
"I'll create a producer in
produce()
— it's cleaner"
One producer instance, created in
main()
, passed as a parameter. Always.
"The user wants sync, so the consumer should be sync too"Consumer is always async (
AIOConsumer
). This is a deliberate design decision.
"I'll add
headers=
to the AIOProducer for schema ID"
AIOProducer.produce()
raises
NotImplementedError
on headers. Only sync producers use headers.
"I'll swap
AsyncJSONSerializer
for
AsyncAvroSerializer
and keep the call site the same"
JSONSerializer
takes
schema_str
first;
AvroSerializer
takes
schema_registry_client
first. Calling positionally across formats raises
TypeError: ... got multiple values for argument 'schema_registry_client'
. Always pass both as kwargs.
"I'll set
message.max.bytes=64000000
on the producer config and
fetch.max.bytes=50242880
on the consumer — they're independent"
Not in librdkafka.
message.max.bytes
is a client-global config (unlike Java's per-role
max.request.size
), so when
get_kafka_config()
is shared between producer and consumer, the consumer inherits it. librdkafka enforces
fetch.max.bytes >= message.max.bytes
at consumer construction and raises
KafkaError{_INVALID_ARG, ... "fetch.max.bytes must be >= message.max.bytes"}
. Use the WarpStream librdkafka consumer values in
references/warpstream-optimization.md
(
fetch.max.bytes=67108864
) — they're sized to satisfy this constraint.
错误想法实际情况
"用户提到了FastAPI,所以我知道是异步的——跳过问题"仍需确认。用户可能希望在FastAPI之外使用同步后台工作进程。
"我将使用Avro,因为它更广泛使用"本技能默认使用JSON Schema。例外情况:WarpStream内置的Schema Registry仅支持Avro和Protobuf——如果用户使用WarpStream SR,默认使用Avro。如果用户使用其他SR(例如Confluent Cloud SR),无论Kafka环境如何,JSON Schema都是可行的。
"我将跳过Schema Registry以简化操作"Schema Registry是必须的。每个项目都包含它。
"我将设置
auto.register.schemas=True
以方便使用"
始终设置为
False
。显式注册是核心原则。
"我将在
produce()
中创建生产者——这样更简洁"
仅创建一个生产者实例,在
main()
中创建,作为参数传递。必须遵循此规则。
"用户想要同步生产者,所以消费者也应该是同步的"消费者始终是异步的(
AIOConsumer
)。这是经过深思熟虑的设计决策。
"我将为AIOProducer添加
headers=
以传递Schema ID"
AIOProducer.produce()
会因
headers=
参数抛出
NotImplementedError
。只有同步生产者使用headers。
"我将把
AsyncJSONSerializer
替换为
AsyncAvroSerializer
,并保持调用方式不变"
JSONSerializer
首先接受
schema_str
AvroSerializer
首先接受
schema_registry_client
。跨格式使用 positional 参数会抛出
TypeError: ... got multiple values for argument 'schema_registry_client'
。始终使用关键字参数传递。
"我将在生产者配置中设置
message.max.bytes=64000000
,在消费者配置中设置
fetch.max.bytes=50242880
——它们是独立的"
在librdkafka中并非如此。
message.max.bytes
客户端全局配置(与Java的按角色设置
max.request.size
不同),因此当
get_kafka_config()
在生产者和消费者之间共享时,消费者会继承该配置。librdkafka在消费者构建时强制要求
fetch.max.bytes >= message.max.bytes
,否则会抛出
KafkaError{_INVALID_ARG, ... "fetch.max.bytes must be >= message.max.bytes"}
。使用
references/warpstream-optimization.md
中的WarpStream librdkafka消费者值(
fetch.max.bytes=67108864
)——这些值满足该约束条件。

Step 1b: Confirm Understanding

步骤1b:确认理解

After gathering all answers, present a confirmation summary before generating any code:
Before I generate the project, let me confirm:
- Project type: [Greenfield scaffold / Migration of existing code]
- Environment: [Confluent Cloud (SASL_SSL) / Local Docker (PLAINTEXT) / WarpStream]
- Schema format: [JSON Schema / Avro / Protobuf] (Avro or Protobuf if using WarpStream's built-in SR)
- Components: [Producer only / Consumer only / Both]
- Producer style: [AsyncIO (AIOProducer) / Synchronous (Producer)] (if applicable)
- Schema: [brief description of user's data fields]
- Topic: [topic name]
- Consumer group: [group ID] (if consumer)

Does this look right?
Wait for user confirmation before proceeding to Step 2. If the user corrects anything, update your understanding and re-confirm.
收集所有答案后,在生成任何代码之前,呈现确认总结:
在我生成项目之前,请确认以下信息:
- 项目类型:[全新项目搭建 / 现有代码迁移]
- 环境:[Confluent Cloud (SASL_SSL) / Local Docker (PLAINTEXT) / WarpStream]
- Schema格式:[JSON Schema / Avro / Protobuf](使用WarpStream内置SR时为Avro或Protobuf)
- 组件:[仅生产者 / 仅消费者 / 两者兼具]
- 生产者类型:[AsyncIO (AIOProducer) / Synchronous (Producer)](如适用)
- Schema:[用户数据字段的简要描述]
- Topic:[Topic名称]
- 消费者组:[组ID](如涉及消费者)

以上信息是否正确?
等待用户确认后再进行步骤2。如果用户纠正了任何内容,更新你的理解并重新确认。

Step 2: Generate the Project

步骤2:生成项目

Decision Flowchart

决策流程图

dot
digraph decisions {
  "Q1: Existing app?" -> "Migration path:\nmodify existing code" [label="yes"];
  "Q1: Existing app?" -> "Q2: Environment?" [label="no / greenfield"];
  "Q2: Environment?" -> "Cloud config\n(SASL_SSL)" [label="Confluent Cloud"];
  "Q2: Environment?" -> "Local Docker config\n(PLAINTEXT) + docker-compose.yml" [label="local / docker / OSS"];
  "Q2: Environment?" -> "WarpStream config\n(apply overrides from\nreferences/warpstream-optimization.md)" [label="WarpStream"];
  "Cloud config\n(SASL_SSL)" -> "Q3: Components?";
  "Local Docker config\n(PLAINTEXT) + docker-compose.yml" -> "Q3: Components?";
  "WarpStream config\n(apply overrides from\nreferences/warpstream-optimization.md)" -> "Which SR?\n(WarpStream SR → Avro/Protobuf;\nother SR → JSON Schema)";
  "Which SR?\n(WarpStream SR → Avro/Protobuf;\nother SR → JSON Schema)" -> "Q3: Components?";
  "Q3: Components?" -> "Q4: Async or sync?" [label="producer requested"];
  "Q3: Components?" -> "Generate consumer\n(always async AIOConsumer)" [label="consumer only"];
  "Q4: Async or sync?" -> "AIOProducer path\nAsyncJSONSerializer\n(no headers support)" [label="async / event-loop"];
  "Q4: Async or sync?" -> "Producer path\nJSONSerializer\n(header-based schema ID)" [label="sync / batch / ETL"];
}
Create this file structure in the user's chosen directory:
<project-dir>/
├── producer.py          # (if requested)
├── consumer.py          # (if requested)
├── common.py            # shared config loading + verification helpers
├── schemas/
│   └── value.schema.json # JSON Schema (or value.avsc / value.proto when using WarpStream SR)
├── tests/
│   └── test_project.py  # unit tests (always generated)
├── .env.example         # template for credentials
├── requirements.txt
├── docker-compose.yml   # (local Docker path only)
dot
digraph decisions {
  "Q1: 现有应用?" -> "迁移路径:\n修改现有代码" [label="是"];
  "Q1: 现有应用?" -> "Q2: 环境?" [label="否 / 全新项目"];
  "Q2: 环境?" -> "云配置\n(SASL_SSL)" [label="Confluent Cloud"];
  "Q2: 环境?" -> "本地Docker配置\n(PLAINTEXT) + docker-compose.yml" [label="本地 / docker / 开源"];
  "Q2: 环境?" -> "WarpStream配置\n(应用references/warpstream-optimization.md中的覆盖项)" [label="WarpStream"];
  "云配置\n(SASL_SSL)" -> "Q3: 组件?";
  "本地Docker配置\n(PLAINTEXT) + docker-compose.yml" -> "Q3: 组件?";
  "WarpStream配置\n(应用references/warpstream-optimization.md中的覆盖项)" -> "使用哪种SR?\n(WarpStream SR → Avro/Protobuf;\n其他SR → JSON Schema)";
  "使用哪种SR?\n(WarpStream SR → Avro/Protobuf;\n其他SR → JSON Schema)" -> "Q3: 组件?";
  "Q3: 组件?" -> "Q4: 异步还是同步?" [label="需要生产者"];
  "Q3: 组件?" -> "生成消费者\n(始终为异步AIOConsumer)" [label="仅消费者"];
  "Q4: 异步还是同步?" -> "AIOProducer路径\nAsyncJSONSerializer\n(不支持headers)" [label="异步 / 事件循环"];
  "Q4: 异步还是同步?" -> "Producer路径\nJSONSerializer\n(基于header的Schema ID)" [label="同步 / 批处理 / ETL"];
}
在用户选择的目录中创建以下文件结构:
<项目目录>/
├── producer.py          # (如果需要)
├── consumer.py          # (如果需要)
├── common.py            # 共享配置加载 + 验证助手
├── schemas/
│   └── value.schema.json # JSON Schema(使用WarpStream SR时为value.avsc / value.proto)
├── tests/
│   └── test_project.py  # 单元测试(始终生成)
├── .env.example         # 凭证模板
├── requirements.txt
├── docker-compose.yml   # (仅本地Docker路径)

Security

安全注意事项

NEVER read, open, or display
.env
files. They contain API keys and secrets. Only generate
.env.example
with placeholder values. If the user asks you to debug a connection issue, ask them to verify their
.env
values themselves — do not read the file.
切勿读取、打开或显示
.env
文件。这些文件包含API密钥和机密信息。仅生成带有占位符值的
.env.example
。如果用户要求调试连接问题,请让他们自行验证
.env
中的值——不要读取该文件。

Core Principles

核心原则

These principles matter because they prevent the most common production issues with Kafka Python clients:
  1. Reuse the producer instance. Creating a new producer per message is expensive — each one opens new TCP connections, does SASL handshakes, and fetches metadata. Create one producer and reuse it for all messages. The produce function should accept the producer as a parameter, not instantiate one.
  2. Always use Schema Registry with JSON Schema. Schema Registry enforces a contract between producers and consumers. Without it, schema changes silently break downstream consumers. This skill uses JSON Schema by default. Schema Registry supports Avro, Protobuf, and JSON Schema — JSON Schema is chosen because: (1) Python has first-class JSON support with no code generation step, (2)
    confluent-kafka-python
    provides
    JSONSerializer
    /
    JSONDeserializer
    out of the box, (3) it is the most approachable format for Python developers already working with JSON/dict data.
    WarpStream Schema Registry exception: WarpStream's built-in schema registry only supports Avro and Protobuf — its
    GET /schemas/types
    endpoint returns
    ["AVRO","PROTOBUF"]
    . JSON Schema is not available when using WarpStream's SR. If the user is running WarpStream and using its built-in SR, use Avro by default (or Protobuf if the user prefers). Use
    AvroSerializer
    /
    AvroDeserializer
    from
    confluent_kafka.schema_registry.avro
    (or
    ProtobufSerializer
    /
    ProtobufDeserializer
    from
    confluent_kafka.schema_registry.protobuf
    ). For async producers, use
    AsyncAvroSerializer
    from
    confluent_kafka.schema_registry._async.avro
    (or
    AsyncProtobufSerializer
    from
    confluent_kafka.schema_registry._async.protobuf
    ). Generate an Avro schema file at
    schemas/value.avsc
    (or
    schemas/value.proto
    for Protobuf) instead of
    schemas/value.schema.json
    — use
    references/value.avsc
    as the starting point and adapt to the user's domain. When generating producer/consumer code on the Avro path, copy from
    references/producer_avro.py
    (async),
    references/producer_avro_sync.py
    (sync), and
    references/consumer_avro.py
    (async) — not the JSON reference files — because the constructor signatures differ (see the Avro constructor warning below). If the user is running WarpStream but pointing at a different SR (e.g., Confluent Cloud Schema Registry), JSON Schema works fine — use the default path.
    Register schemas as a separate explicit step before creating the serializer. Use a dedicated
    register_schema()
    function that calls
    sr_client.register_schema()
    and lets errors (auth failures, network errors, permission denials) propagate immediately — never wrap registration in a bare
    try/except
    . Then configure the serializer with
    auto.register.schemas=False
    and
    use.latest.version=True
    . This ensures the serializer never silently auto-registers and aligns with production practice where CI/CD registers schemas, not application startup.
    Use the appropriate serializer for the chosen producer style and schema format:
    • JSON Schema (default):
      AsyncJSONSerializer
      /
      AsyncJSONDeserializer
      from
      confluent_kafka.schema_registry._async.json_schema
      for async, or
      JSONSerializer
      /
      JSONDeserializer
      from
      confluent_kafka.schema_registry.json_schema
      for synchronous.
    • Avro (default when using WarpStream SR):
      AsyncAvroSerializer
      /
      AsyncAvroDeserializer
      from
      confluent_kafka.schema_registry._async.avro
      for async, or
      AvroSerializer
      /
      AvroDeserializer
      from
      confluent_kafka.schema_registry.avro
      for synchronous.
    • Protobuf (alternative when using WarpStream SR):
      AsyncProtobufSerializer
      /
      AsyncProtobufDeserializer
      from
      confluent_kafka.schema_registry._async.protobuf
      for async, or
      ProtobufSerializer
      /
      ProtobufDeserializer
      from
      confluent_kafka.schema_registry.protobuf
      for synchronous.
  3. Choose the right producer style. The
    confluent-kafka-python
    library offers two producer APIs:
    • AsyncIO Producer (
      AIOProducer
      from
      confluent_kafka.aio
      ): Non-blocking, integrates with
      asyncio
      event loops. Use with
      AsyncJSONSerializer
      from
      confluent_kafka.schema_registry._async.json_schema
      and
      AsyncSchemaRegistryClient
      . Best for applications already running an event loop (FastAPI, aiohttp, Sanic, asyncio workers).
    • Synchronous Producer (
      Producer
      from
      confluent_kafka
      ): Blocking calls with delivery callbacks. Use with
      JSONSerializer
      from
      confluent_kafka.schema_registry.json_schema
      and
      SchemaRegistryClient
      . Best for scripts, batch jobs, and highest-throughput pipelines where the user controls threads/processes and can call
      poll()
      /
      flush()
      directly. Always ask the user which style fits their use case. The consumer always uses
      AIOConsumer
      (async) — long-running poll loops benefit from non-blocking I/O, and mixing sync/async consumer styles adds complexity with little benefit.
  4. Graceful shutdown. Async producers must
    flush()
    and
    close()
    (both awaited) before exiting. Synchronous producers must call
    flush()
    before exiting — otherwise buffered messages are lost. Consumers must
    unsubscribe()
    then
    close()
    to leave the consumer group cleanly (avoiding unnecessary rebalances). Use
    try/finally
    blocks and handle
    KeyboardInterrupt
    / signals.
  5. Support Confluent Cloud, local Docker, and WarpStream. When targeting Confluent Cloud, configure
    SASL_SSL
    with
    PLAIN
    mechanism and load API keys from
    .env
    . When targeting local Docker, use
    PLAINTEXT
    with no authentication. When targeting WarpStream, use
    SASL_SSL
    or
    PLAINTEXT
    depending on the user's WarpStream deployment, and apply the librdkafka overrides from
    references/warpstream-optimization.md
    (large batches, disabled idempotence, large fetches, zone-aware
    client.id
    ). The
    KAFKA_ENV
    environment variable (
    cloud
    ,
    local
    , or
    warpstream
    ) controls which path is used. Load all settings from environment variables via
    .env
    .
  6. Verify connectivity before running. Use
    AdminClient.list_topics()
    to verify the broker is reachable and the topic exists before producing or consuming. Verify Schema Registry connectivity with an HTTP health check.
  7. Always set a message key for domain events. Pass
    key=<entity_id>.encode("utf-8")
    to
    producer.produce()
    for any message that represents an entity or event stream (order events, user actions, device telemetry, transactions). Kafka partitions by key, so messages with the same key land on the same partition and preserve ordering — critical for event streams like
    OrderCreated → OrderUpdated → OrderCancelled
    where consumers must see events in order. The
    produce()
    helper in every reference file accepts a
    key_field
    parameter naming the field to use as the key (e.g.,
    key_field="order_id"
    ,
    key_field="transaction_id"
    ). Ask the user which field identifies the entity and pass it to
    produce()
    . Only leave
    key_field=None
    if the user explicitly states ordering does not matter (e.g., stateless metrics where any partition is fine).
    WarpStream exception: On WarpStream, null keys enable sticky partitioning, which builds larger batches and significantly improves throughput and cost. When the user's use case does not require per-entity ordering (e.g., independent telemetry readings, stateless metrics, logs), recommend
    key_field=None
    and explain the throughput benefit. When per-entity ordering is required (e.g.,
    OrderCreated → OrderUpdated → OrderCancelled
    for the same order), still set a message key — correctness takes priority over batching efficiency. Ask the user whether their events need per-key ordering to decide.
这些原则至关重要,因为它们可以避免Kafka Python客户端在生产环境中最常见的问题:
  1. 重用生产者实例。为每条消息创建新的生产者实例成本很高——每个实例都会打开新的TCP连接、执行SASL握手并获取元数据。创建一个生产者实例并在所有消息中重用它。produce函数应接受生产者作为参数,而不是实例化新的生产者。
  2. 始终结合Schema Registry使用JSON Schema。Schema Registry在生产者和消费者之间强制执行契约。没有它,Schema更改会无声地破坏下游消费者。本技能默认使用JSON Schema。Schema Registry支持Avro、Protobuf和JSON Schema——选择JSON Schema的原因是:(1) Python对JSON有原生支持,无需代码生成步骤;(2)
    confluent-kafka-python
    提供开箱即用的
    JSONSerializer
    /
    JSONDeserializer
    ;(3) 对于已经使用JSON/dict数据的Python开发者来说,这是最易上手的格式。
    WarpStream Schema Registry例外情况:WarpStream内置的Schema Registry仅支持Avro和Protobuf——其
    GET /schemas/types
    端点返回
    ["AVRO","PROTOBUF"]
    。当使用WarpStream的SR时,JSON Schema不可用。如果用户运行WarpStream并使用其内置SR,默认使用Avro(或用户偏好的Protobuf)。使用
    confluent_kafka.schema_registry.avro
    中的
    AvroSerializer
    /
    AvroDeserializer
    (或
    confluent_kafka.schema_registry.protobuf
    中的
    ProtobufSerializer
    /
    ProtobufDeserializer
    )。对于异步生产者,使用
    confluent_kafka.schema_registry._async.avro
    中的
    AsyncAvroSerializer
    (或
    confluent_kafka.schema_registry._async.protobuf
    中的
    AsyncProtobufSerializer
    )。生成Avro Schema文件到
    schemas/value.avsc
    (或Protobuf的
    schemas/value.proto
    ),而非
    schemas/value.schema.json
    ——以
    references/value.avsc
    为起点,根据用户的业务场景调整。在Avro路径上生成生产者/消费者代码时,复制
    references/producer_avro.py
    (异步)、
    references/producer_avro_sync.py
    (同步)和
    references/consumer_avro.py
    (异步)中的代码——不要使用JSON参考文件——因为构造函数签名不同(请参阅下面的Avro构造函数警告)。如果用户运行WarpStream但指向其他SR(例如Confluent Cloud Schema Registry),JSON Schema仍然适用——使用默认路径。
    将Schema注册作为单独的显式步骤,然后再创建序列化器。使用专用的
    register_schema()
    函数调用
    sr_client.register_schema()
    ,并让错误(认证失败、网络错误、权限拒绝)直接传播——切勿将注册过程包装在空的
    try/except
    块中。然后配置序列化器时设置
    auto.register.schemas=False
    use.latest.version=True
    。这确保序列化器永远不会自动注册Schema,符合生产实践中由CI/CD注册Schema而非应用启动时注册的模式。
    根据选择的生产者类型和Schema格式使用相应的序列化器:
    • JSON Schema(默认):异步场景使用
      confluent_kafka.schema_registry._async.json_schema
      中的
      AsyncJSONSerializer
      /
      AsyncJSONDeserializer
      ,同步场景使用
      confluent_kafka.schema_registry.json_schema
      中的
      JSONSerializer
      /
      JSONDeserializer
    • Avro(使用WarpStream SR时默认):异步场景使用
      confluent_kafka.schema_registry._async.avro
      中的
      AsyncAvroSerializer
      /
      AsyncAvroDeserializer
      ,同步场景使用
      confluent_kafka.schema_registry.avro
      中的
      AvroSerializer
      /
      AvroDeserializer
    • Protobuf(使用WarpStream SR时可选):异步场景使用
      confluent_kafka.schema_registry._async.protobuf
      中的
      AsyncProtobufSerializer
      /
      AsyncProtobufDeserializer
      ,同步场景使用
      confluent_kafka.schema_registry.protobuf
      中的
      ProtobufSerializer
      /
      ProtobufDeserializer
  3. 选择正确的生产者类型
    confluent-kafka-python
    库提供两种生产者API:
    • AsyncIO生产者
      confluent_kafka.aio
      中的
      AIOProducer
      ):非阻塞,与
      asyncio
      事件循环集成。结合
      confluent_kafka.schema_registry._async.json_schema
      中的
      AsyncJSONSerializer
      AsyncSchemaRegistryClient
      使用。最适合已经运行事件循环的应用(FastAPI、aiohttp、Sanic、asyncio工作进程)。
    • 同步生产者
      confluent_kafka
      中的
      Producer
      ):阻塞调用,带有交付回调。结合
      confluent_kafka.schema_registry.json_schema
      中的
      JSONSerializer
      SchemaRegistryClient
      使用。最适合脚本、批处理作业和高吞吐量管道,用户可直接控制线程/进程并调用
      poll()
      /
      flush()
      。 始终询问用户哪种类型适合他们的用例。消费者始终使用
      AIOConsumer
      (异步)——长时间运行的轮询循环受益于非阻塞I/O,混合同步/异步消费者类型会增加复杂性且几乎没有好处。
  4. 优雅关闭。异步生产者必须在退出前
    flush()
    close()
    (均需await)。同步生产者必须在退出前调用
    flush()
    ——否则缓冲的消息会丢失。消费者必须
    unsubscribe()
    然后
    close()
    ,以干净地离开消费者组(避免不必要的重新平衡)。使用
    try/finally
    块并处理
    KeyboardInterrupt
    / 信号。
  5. 支持Confluent Cloud、本地Docker和WarpStream。针对Confluent Cloud时,配置
    SASL_SSL
    PLAIN
    机制,并从
    .env
    加载API密钥。针对本地Docker时,使用
    PLAINTEXT
    且无需认证。针对WarpStream时,根据用户的WarpStream部署使用
    SASL_SSL
    PLAINTEXT
    ,并应用
    references/warpstream-optimization.md
    中的librdkafka配置覆盖项(大批次、禁用幂等性、大获取量、区域感知的
    client.id
    )。环境变量
    KAFKA_ENV
    cloud
    local
    warpstream
    )控制使用哪种路径。所有设置均通过
    .env
    从环境变量加载。
  6. 运行前验证连接。使用
    AdminClient.list_topics()
    验证代理是否可达且Topic存在,然后再进行生产或消费。通过HTTP健康检查验证Schema Registry的连接性。
  7. 为领域事件始终设置消息键。对于代表实体或事件流的消息(订单事件、用户操作、设备遥测、交易),在
    producer.produce()
    中传递
    key=<entity_id>.encode("utf-8")
    。Kafka按键分区,因此具有相同键的消息会落在同一个分区上并保持顺序——这对于
    OrderCreated → OrderUpdated → OrderCancelled
    之类的事件流至关重要,消费者必须按顺序查看这些事件。每个参考文件中的
    produce()
    助手函数都接受
    key_field
    参数,指定用作键的字段(例如
    key_field="order_id"
    key_field="transaction_id"
    )。询问用户哪个字段标识实体,并将其传递给
    produce()
    。仅当用户明确表示顺序无关紧要时(例如无状态指标,任何分区都可以),才将
    key_field
    设为
    None
    WarpStream例外情况:在WarpStream上,空键启用粘性分区,这会构建更大的批次并显著提高吞吐量和降低成本。当用户的用例不需要基于实体的顺序时(例如独立的遥测读数、无状态指标、日志),建议设置
    key_field=None
    并解释吞吐量优势。当需要基于实体的顺序时(例如同一订单的
    OrderCreated → OrderUpdated → OrderCancelled
    ),仍需设置消息键——正确性优先于批处理效率。询问用户他们的事件是否需要按键排序来决定。

common.py

common.py

This module handles configuration loading and connectivity verification. Use
references/common.py
as the template.
此模块处理配置加载和连接性验证。以
references/common.py
为模板。

producer.py Pattern (AsyncIO)

producer.py模式(AsyncIO)

When the user chooses the AsyncIO producer, use
references/producer.py
as the template.
Key points:
  • produce()
    takes a producer instance as a parameter — it never creates one
  • The producer is created once in
    main()
    and can be passed to multiple
    produce()
    calls
  • The async serializer (
    AsyncJSONSerializer
    ) must be
    await
    ed when calling it on a message
  • AIOProducer.produce()
    is async and returns an
    asyncio.Future
    . You must
    await
    the method to get the Future, then
    await
    the Future to get the delivered
    Message
    :
    future = await producer.produce(...); result = await future
  • AIOProducer.flush()
    and
    close()
    are coroutines — they must be
    await
    ed in the
    finally
    block
  • Signal handlers set a shutdown event for graceful termination
  • Schema registration and serializer creation are separate steps.
    register_schema()
    explicitly registers the schema and returns the schema ID — errors propagate immediately.
    create_json_serializer()
    (or
    create_avro_serializer()
    /
    create_protobuf_serializer()
    when using Avro/Protobuf) creates the serializer with
    conf={'auto.register.schemas': False, 'use.latest.version': True}
    .
  • Constructor argument order differs across formats — always pass
    schema_registry_client
    and
    schema_str
    as keyword arguments, never positionally.
    The JSON, Avro, and Protobuf serializer/deserializer classes in
    confluent-kafka-python
    do not share the same positional signature:
    JSONSerializer
    /
    JSONDeserializer
    take
    schema_str
    first, while
    AvroSerializer
    /
    AvroDeserializer
    and
    ProtobufSerializer
    /
    ProtobufDeserializer
    take
    schema_registry_client
    first. Mixing positional and keyword forms across formats produces
    TypeError: ... got multiple values for argument 'schema_registry_client'
    . Use kwargs everywhere so the pattern is identical:
    • await AsyncJSONSerializer(schema_str=schema_str, schema_registry_client=sr_client, conf=conf)
    • await AsyncAvroSerializer(schema_str=schema_str, schema_registry_client=sr_client, conf=conf)
    • await AsyncProtobufSerializer(msg_type=MsgType, schema_registry_client=sr_client, conf=conf)
      (Protobuf takes a generated message class instead of a schema string) See
      references/producer.py
      (JSON) and
      references/producer_avro.py
      (Avro) for the canonical templates.
  • Headers are NOT supported with
    AIOProducer
    batch mode.
    Do not pass
    headers=
    to
    AIOProducer.produce()
    — it will raise
    NotImplementedError
    . Schema identification is handled automatically by the serializer's wire format prefix (applies to JSON Schema, Avro, and Protobuf serializers). See "Schema ID in Headers vs Wire Format" below for details
当用户选择AsyncIO生产者时,以
references/producer.py
为模板。
关键点:
  • produce()
    接受生产者实例作为参数——从不创建新实例
  • 生产者在
    main()
    中创建一次,可传递给多个
    produce()
    调用
  • 异步序列化器(
    AsyncJSONSerializer
    )在处理消息时必须被
    await
  • AIOProducer.produce()
    是异步的,返回
    asyncio.Future
    。必须
    await
    该方法以获取Future,然后
    await
    Future以获取已交付的
    Message
    future = await producer.produce(...); result = await future
  • AIOProducer.flush()
    close()
    是协程——必须在
    finally
    块中
    await
  • 信号处理程序设置关闭事件以实现优雅终止
  • Schema注册和序列化器创建是分开的步骤。
    register_schema()
    显式注册Schema并返回Schema ID——错误直接传播。
    create_json_serializer()
    (使用Avro/Protobuf时为
    create_avro_serializer()
    /
    create_protobuf_serializer()
    )创建序列化器时设置
    conf={'auto.register.schemas': False, 'use.latest.version': True}
  • 不同格式的构造函数参数顺序不同——始终以关键字参数传递
    schema_registry_client
    schema_str
    ,切勿使用位置参数
    confluent-kafka-python
    中的JSON、Avro和Protobuf序列化器/反序列化器类共享相同的位置签名:
    JSONSerializer
    /
    JSONDeserializer
    首先接受
    schema_str
    ,而
    AvroSerializer
    /
    AvroDeserializer
    ProtobufSerializer
    /
    ProtobufDeserializer
    首先接受
    schema_registry_client
    。跨格式混合使用位置参数和关键字参数会导致
    TypeError: ... got multiple values for argument 'schema_registry_client'
    。始终使用关键字参数,确保模式一致:
    • await AsyncJSONSerializer(schema_str=schema_str, schema_registry_client=sr_client, conf=conf)
    • await AsyncAvroSerializer(schema_str=schema_str, schema_registry_client=sr_client, conf=conf)
    • await AsyncProtobufSerializer(msg_type=MsgType, schema_registry_client=sr_client, conf=conf)
      (Protobuf接受生成的消息类而非Schema字符串) 请参阅
      references/producer.py
      (JSON)和
      references/producer_avro.py
      (Avro)获取规范模板。
  • AIOProducer批处理模式不支持Headers。请勿向
    AIOProducer.produce()
    传递
    headers=
    ——会抛出
    NotImplementedError
    。Schema识别由序列化器的线格式前缀自动处理(适用于JSON Schema、Avro和Protobuf序列化器)。有关详细信息,请参阅下文的"Headers中的Schema ID vs线格式"。

producer.py Pattern (Synchronous)

producer.py模式(同步)

When the user chooses the synchronous producer, use
references/producer_sync.py
as the template.
Key points:
  • produce()
    takes a producer instance as a parameter — it never creates one
  • The producer is created once in
    main()
    and can be passed to multiple
    produce()
    calls
  • Uses
    JSONSerializer
    (synchronous) from
    confluent_kafka.schema_registry.json_schema
    and
    SchemaRegistryClient
    from
    confluent_kafka.schema_registry
    . When using Avro/Protobuf (e.g., with WarpStream SR), use
    AvroSerializer
    from
    confluent_kafka.schema_registry.avro
    or
    ProtobufSerializer
    from
    confluent_kafka.schema_registry.protobuf
  • Producer.produce()
    is non-blocking — it enqueues the message. Call
    producer.poll(0)
    after each produce to serve delivery callbacks and keep the internal queue from filling up
  • Call
    producer.flush()
    after a batch to block until all in-flight messages are delivered
  • Use a
    delivery_callback(err, msg)
    function to handle per-message delivery reports
  • Signal handlers set a flag for graceful termination
  • flush()
    in the
    finally
    block ensures no buffered messages are lost
  • Schema registration and serializer creation are separate steps, same as the async pattern.
    register_schema()
    explicitly registers the schema.
    create_json_serializer()
    (or
    create_avro_serializer()
    /
    create_protobuf_serializer()
    when using Avro/Protobuf) creates the serializer with
    conf={'auto.register.schemas': False, 'use.latest.version': True}
    . Both return the schema ID
  • The same kwargs-only rule applies to the sync variants: call
    JSONSerializer(schema_str=..., schema_registry_client=..., conf=...)
    ,
    AvroSerializer(schema_str=..., schema_registry_client=..., conf=...)
    , or
    ProtobufSerializer(msg_type=..., schema_registry_client=..., conf=...)
    . The positional argument order is not consistent across formats — passing positionally will raise
    TypeError: ... got multiple values for argument 'schema_registry_client'
    . See
    references/producer_sync.py
    (JSON) and
    references/producer_avro_sync.py
    (Avro) for templates
  • The schema ID is passed as a Kafka record header (
    confluent.value.schemaId
    ) on every produced message — this is the header-based schema identification pattern. It keeps the JSON payload clean and readable by non-Confluent consumers. See "Schema ID in Headers vs Wire Format" below for details
当用户选择同步生产者时,以
references/producer_sync.py
为模板。
关键点:
  • produce()
    接受生产者实例作为参数——从不创建新实例
  • 生产者在
    main()
    中创建一次,可传递给多个
    produce()
    调用
  • 使用
    confluent_kafka.schema_registry.json_schema
    中的
    JSONSerializer
    (同步)和
    confluent_kafka.schema_registry
    中的
    SchemaRegistryClient
    。使用Avro/Protobuf时(例如WarpStream SR),使用
    confluent_kafka.schema_registry.avro
    中的
    AvroSerializer
    confluent_kafka.schema_registry.protobuf
    中的
    ProtobufSerializer
  • Producer.produce()
    是非阻塞的——它将消息加入队列。每次produce后调用
    producer.poll(0)
    以处理交付回调并防止内部队列填满
  • 在批处理后调用
    producer.flush()
    以阻塞,直到所有在途消息都已交付
  • 使用
    delivery_callback(err, msg)
    函数处理每条消息的交付报告
  • 信号处理程序设置标志以实现优雅终止
  • finally
    块中调用
    flush()
    确保没有缓冲消息丢失
  • Schema注册和序列化器创建是分开的步骤,与异步模式相同。
    register_schema()
    显式注册Schema。
    create_json_serializer()
    (使用Avro/Protobuf时为
    create_avro_serializer()
    /
    create_protobuf_serializer()
    )创建序列化器时设置
    conf={'auto.register.schemas': False, 'use.latest.version': True}
    。两者均返回Schema ID
  • 同步变体也适用相同的仅关键字参数规则:调用
    JSONSerializer(schema_str=..., schema_registry_client=..., conf=...)
    AvroSerializer(schema_str=..., schema_registry_client=..., conf=...)
    ProtobufSerializer(msg_type=..., schema_registry_client=..., conf=...)
    。不同格式的位置参数顺序不一致——使用位置参数会抛出
    TypeError: ... got multiple values for argument 'schema_registry_client'
    。请参阅
    references/producer_sync.py
    (JSON)和
    references/producer_avro_sync.py
    (Avro)获取模板。
  • Schema ID作为Kafka记录头(
    confluent.value.schemaId
    )传递给每条生产的消息——这是基于header的Schema识别模式。它保持JSON负载干净可读,非Confluent消费者也可读取。这是同步生产者的推荐方法。

consumer.py Pattern

consumer.py模式

Use
references/consumer.py
as the template.
Key points in the consumer:
  • Signal-based graceful shutdown —
    unsubscribe()
    then
    close()
    to leave the consumer group cleanly
  • Deserialization via Schema Registry using
    AsyncJSONDeserializer
    (or
    AsyncAvroDeserializer
    /
    AsyncProtobufDeserializer
    when using Avro/Protobuf) — no fallback to raw parsing, Schema Registry is required. Construct the deserializer with keyword arguments only — e.g.,
    await AsyncAvroDeserializer(schema_str=schema_str, schema_registry_client=sr_client)
    . The positional signature differs across formats (Avro/Protobuf take
    schema_registry_client
    first; JSON takes
    schema_str
    first), and mixing positional and keyword forms produces
    TypeError: ... got multiple values for argument 'schema_registry_client'
    . See
    references/consumer.py
    (JSON) and
    references/consumer_avro.py
    (Avro) for templates
  • Continuous polling loop until shutdown signal
references/consumer.py
为模板。
消费者的关键点:
  • 基于信号的优雅关闭——
    unsubscribe()
    然后
    close()
    以干净地离开消费者组
  • 通过Schema Registry进行反序列化,使用
    AsyncJSONDeserializer
    (使用Avro/Protobuf时为
    AsyncAvroDeserializer
    /
    AsyncProtobufDeserializer
    )——不回退到原始解析,必须使用Schema Registry。仅以关键字参数构造反序列化器——例如
    await AsyncAvroDeserializer(schema_str=schema_str, schema_registry_client=sr_client)
    。不同格式的位置签名不同(Avro/Protobuf首先接受
    schema_registry_client
    ;JSON首先接受
    schema_str
    ),混合使用位置参数和关键字参数会导致
    TypeError: ... got multiple values for argument 'schema_registry_client'
    。请参阅
    references/consumer.py
    (JSON)和
    references/consumer_avro.py
    (Avro)获取模板。
  • 持续轮询循环直到收到关闭信号

Schema ID in Headers vs Wire Format

Headers中的Schema ID vs线格式

Synchronous producer: The reference code passes the Schema ID as a Kafka record header (
confluent.value.schemaId
) on every message. This keeps the JSON payload clean (no magic byte prefix), making it readable by non-Confluent consumers and debuggable with tools like
kcat
. This is the recommended approach for synchronous producers.
Async producer (AIOProducer): The
AIOProducer
does not support custom headers in batch mode (
produce()
raises
NotImplementedError
if
headers=
is passed). Schema identification relies on the JSON Schema serializer's wire format prefix (magic byte + schema ID prepended to the payload). This is a known limitation — do not attempt to add headers to async-produced messages.
When to use which: If downstream consumers are all Confluent-aware (using Schema Registry deserializers), both approaches work transparently. If downstream consumers are non-Confluent (plain JSON consumers), the sync producer with header-based schema ID is preferable because the message value remains clean JSON. Document this tradeoff in the generated README when producing for mixed consumer ecosystems.
同步生产者:参考代码将Schema ID作为Kafka记录头(
confluent.value.schemaId
)传递给每条消息。这保持JSON负载干净(无魔术字节前缀),使非Confluent消费者也可读取,并可使用
kcat
等工具调试。这是同步生产者的推荐方法。
异步生产者(AIOProducer)
AIOProducer
在批处理模式下支持自定义headers(如果传递
headers=
produce()
会抛出
NotImplementedError
)。Schema识别依赖于JSON Schema序列化器的线格式前缀(魔术字节+Schema ID前置到负载)。这是已知限制——请勿尝试为异步生产的消息添加headers。
何时使用哪种方式:如果下游消费者均支持Confluent(使用Schema Registry反序列化器),两种方式都可透明工作。如果下游消费者不支持Confluent(普通JSON消费者),带有基于header的Schema ID的同步生产者更可取,因为消息值保持为干净的JSON。当为混合消费者生态系统生产消息时,在生成的README中记录此权衡。

schemas/

schemas/

Generate a schema file matching the user's data domain in the appropriate format for the chosen schema registry.
JSON Schema (default): Generate a JSON Schema file at
schemas/value.schema.json
.
For example, if the user is producing financial transactions:
json
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Transaction",
  "description": "A financial transaction event produced to Kafka.",
  "type": "object",
  "properties": {
    "transaction_id": {
      "type": "string",
      "description": "Unique identifier for this transaction."
    },
    "amount": {
      "type": "number",
      "description": "Transaction amount in the specified currency.",
      "default": 0
    },
    "currency": {
      "type": "string",
      "description": "ISO 4217 currency code.",
      "default": ""
    },
    "timestamp": {
      "type": "string",
      "format": "date-time",
      "description": "Time the transaction occurred, in ISO 8601 format."
    },
    "status": {
      "description": "Current state of the transaction.",
      "enum": ["pending", "completed", "failed", "refunded"],
      "default": "pending"
    },
    "metadata": {
      "oneOf": [{"type": "null"}, {"type": "object"}],
      "description": "Optional metadata associated with the transaction.",
      "default": null
    }
  },
  "required": ["transaction_id", "amount", "currency", "timestamp", "status"]
}
Avro (when using WarpStream SR): Generate an Avro schema file at
schemas/value.avsc
(or
schemas/value.proto
for Protobuf). For the same financial transactions example in Avro:
json
{
  "type": "record",
  "name": "Transaction",
  "namespace": "com.example.kafka",
  "doc": "A financial transaction event produced to Kafka.",
  "fields": [
    {"name": "transaction_id", "type": "string", "doc": "Unique identifier for this transaction."},
    {"name": "amount", "type": "double", "default": 0, "doc": "Transaction amount in the specified currency."},
    {"name": "currency", "type": "string", "default": "", "doc": "ISO 4217 currency code."},
    {"name": "timestamp", "type": "string", "doc": "Time the transaction occurred, in ISO 8601 format."},
    {"name": "status", "type": {"type": "enum", "name": "Status", "symbols": ["pending", "completed", "failed", "refunded"]}, "default": "pending", "doc": "Current state of the transaction."},
    {"name": "metadata", "type": ["null", "string"], "default": null, "doc": "Optional metadata associated with the transaction."}
  ]
}
根据用户的数据领域生成匹配的Schema文件,格式与选择的Schema Registry一致。
JSON Schema(默认):在
schemas/value.schema.json
生成JSON Schema文件。
例如,如果用户生产金融交易数据:
json
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Transaction",
  "description": "生产到Kafka的金融交易事件。",
  "type": "object",
  "properties": {
    "transaction_id": {
      "type": "string",
      "description": "此交易的唯一标识符。"
    },
    "amount": {
      "type": "number",
      "description": "指定货币的交易金额。",
      "default": 0
    },
    "currency": {
      "type": "string",
      "description": "ISO 4217货币代码。",
      "default": ""
    },
    "timestamp": {
      "type": "string",
      "format": "date-time",
      "description": "交易发生的时间,采用ISO 8601格式。"
    },
    "status": {
      "description": "交易的当前状态。",
      "enum": ["pending", "completed", "failed", "refunded"],
      "default": "pending"
    },
    "metadata": {
      "oneOf": [{"type": "null"}, {"type": "object"}],
      "description": "与交易关联的可选元数据。",
      "default": null
    }
  },
  "required": ["transaction_id", "amount", "currency", "timestamp", "status"]
}
Avro(使用WarpStream SR时):在
schemas/value.avsc
生成Avro Schema文件(或Protobuf的
schemas/value.proto
)。对于上述金融交易示例,Avro格式如下:
json
{
  "type": "record",
  "name": "Transaction",
  "namespace": "com.example.kafka",
  "doc": "生产到Kafka的金融交易事件。",
  "fields": [
    {"name": "transaction_id", "type": "string", "doc": "此交易的唯一标识符。"},
    {"name": "amount", "type": "double", "default": 0, "doc": "指定货币的交易金额。"},
    {"name": "currency", "type": "string", "default": "", "doc": "ISO 4217货币代码。"},
    {"name": "timestamp", "type": "string", "doc": "交易发生的时间,采用ISO 8601格式。"},
    {"name": "status", "type": {"type": "enum", "name": "Status", "symbols": ["pending", "completed", "failed", "refunded"]}, "default": "pending", "doc": "交易的当前状态。"},
    {"name": "metadata", "type": ["null", "string"], "default": null, "doc": "与交易关联的可选元数据。"}
  ]
}

Schema Generation Rules

Schema生成规则

Follow the rules in
references/schema-generation-rules.md
strictly when generating or adapting schemas to the user's domain.
在生成或调整Schema以适应用户领域时,严格遵循
references/schema-generation-rules.md
中的规则。

Multi-Event Topics (Advanced)

多事件Topic(高级)

When the user describes multiple event types on a single topic, follow
references/multi-event-guide.md
. Only suggest multi-event union schemas when the user explicitly describes multiple event types on one topic.
当用户描述单个Topic上有多个事件类型时,遵循
references/multi-event-guide.md
。仅当用户明确描述单个Topic上有多个事件类型时,才建议使用多事件联合Schema。

docker-compose.yml (Local Docker Path Only)

docker-compose.yml(仅本地Docker路径)

When the user chooses local Docker, you MUST generate a
docker-compose.yml
using
references/docker-compose.yml
as the template. This starts a single-node Kafka broker (using
confluentinc/confluent-local
) and a Confluent Schema Registry. The user just runs
docker compose up -d
to get a working Kafka environment.
IMPORTANT: The
confluentinc/confluent-local
image uses KRaft mode and has built-in listener names:
PLAINTEXT
(internal, port 29092),
PLAINTEXT_HOST
(external, port 9092), and
CONTROLLER
(port 29093). Do NOT invent custom listener names — this will conflict with the image's internal configuration and cause boot loops. Only override
KAFKA_ADVERTISED_LISTENERS
and
KAFKA_LISTENERS
using these exact listener names. The internal
PLAINTEXT
listener must advertise the
kafka
hostname (not
localhost
) so Schema Registry can reach the broker from within the Docker network.
当用户选择本地Docker时,必须以
references/docker-compose.yml
为模板生成
docker-compose.yml
。这会启动单节点Kafka代理(使用
confluentinc/confluent-local
)和Confluent Schema Registry。用户只需运行
docker compose up -d
即可获得可用的Kafka环境。
重要提示
confluentinc/confluent-local
镜像使用KRaft模式,具有内置的监听器名称:
PLAINTEXT
(内部,端口29092)、
PLAINTEXT_HOST
(外部,端口9092)和
CONTROLLER
(端口29093)。请勿自定义监听器名称——这会与镜像的内部配置冲突并导致启动循环。仅使用这些确切的监听器名称覆盖
KAFKA_ADVERTISED_LISTENERS
KAFKA_LISTENERS
。内部
PLAINTEXT
监听器必须公布
kafka
主机名(而非
localhost
),以便Schema Registry可从Docker网络内部访问代理。

.env.example

.env.example

Generate the appropriate
.env.example
based on the target environment:
Confluent Cloud:
KAFKA_ENV=cloud
BOOTSTRAP_SERVER=pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
API_KEY=your-api-key
API_SECRET=your-api-secret
TOPIC=demo-topic
SCHEMA_REGISTRY_URL=https://psrc-xxxxx.us-east-2.aws.confluent.cloud
SR_API_KEY=your-sr-api-key
SR_API_SECRET=your-sr-api-secret
CLIENT_ID=python-client
GROUP_ID=python-consumer-group
Local Docker:
KAFKA_ENV=local
BOOTSTRAP_SERVER=localhost:9092
TOPIC=demo-topic
SCHEMA_REGISTRY_URL=http://localhost:8081
CLIENT_ID=python-client
GROUP_ID=python-consumer-group
WarpStream:
KAFKA_ENV=warpstream
BOOTSTRAP_SERVER=your-warpstream-bootstrap-url:9092
TOPIC=demo-topic
SCHEMA_REGISTRY_URL=http://your-schema-registry:8081
CLIENT_ID=python-client,ws_az=us-east-1a
GROUP_ID=python-consumer-group
根据目标环境生成相应的
.env.example
Confluent Cloud:
KAFKA_ENV=cloud
BOOTSTRAP_SERVER=pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
API_KEY=your-api-key
API_SECRET=your-api-secret
TOPIC=demo-topic
SCHEMA_REGISTRY_URL=https://psrc-xxxxx.us-east-2.aws.confluent.cloud
SR_API_KEY=your-sr-api-key
SR_API_SECRET=your-sr-api-secret
CLIENT_ID=python-client
GROUP_ID=python-consumer-group
Local Docker:
KAFKA_ENV=local
BOOTSTRAP_SERVER=localhost:9092
TOPIC=demo-topic
SCHEMA_REGISTRY_URL=http://localhost:8081
CLIENT_ID=python-client
GROUP_ID=python-consumer-group
WarpStream:
KAFKA_ENV=warpstream
BOOTSTRAP_SERVER=your-warpstream-bootstrap-url:9092
TOPIC=demo-topic
SCHEMA_REGISTRY_URL=http://your-schema-registry:8081
CLIENT_ID=python-client,ws_az=us-east-1a
GROUP_ID=python-consumer-group

If your WarpStream deployment requires SASL auth, uncomment:

如果你的WarpStream部署需要SASL认证,请取消注释:

API_KEY=your-api-key

API_KEY=your-api-key

API_SECRET=your-api-secret

API_SECRET=your-api-secret

If using Confluent Cloud Schema Registry, uncomment:

如果使用Confluent Cloud Schema Registry,请取消注释:

SR_API_KEY=your-sr-api-key

SR_API_KEY=your-sr-api-key

SR_API_SECRET=your-sr-api-secret

SR_API_SECRET=your-sr-api-secret

undefined
undefined

requirements.txt

requirements.txt

JSON Schema (default):
confluent-kafka[json,schema_registry]>=2.13.2
jsonschema
python-dotenv
requests>=2.25.0
httpx
authlib
cachetools
attrs
typing_extensions
pytest
pytest-asyncio
Avro (e.g., when using WarpStream SR):
confluent-kafka[avro,schema_registry]>=2.13.2
python-dotenv
requests>=2.25.0
httpx
authlib
cachetools
attrs
typing_extensions
pytest
pytest-asyncio
Protobuf (e.g., when using WarpStream SR):
confluent-kafka[protobuf,schema_registry]>=2.13.2
python-dotenv
requests>=2.25.0
httpx
authlib
cachetools
attrs
typing_extensions
pytest
pytest-asyncio
Every third-party package imported anywhere in the generated code (producer.py, consumer.py, common.py) must have a corresponding entry in requirements.txt. If the code does
from confluent_kafka import ...
, then
confluent-kafka
must be in requirements.txt. If it does
from dotenv import load_dotenv
, then
python-dotenv
must be listed. This includes transitive dependencies that aren't automatically installed — for example, the async Schema Registry client imports
httpx
and
authlib
at runtime, so both must be explicitly listed even though they aren't declared as dependencies of
confluent-kafka
. The user should be able to
pip install -r requirements.txt
and run the code with zero
ModuleNotFoundError
s.
Always include
pytest
. Include
pytest-asyncio
if the project uses the async producer or consumer. Only include
Faker
if the producer generates sample data with it.
JSON Schema(默认)
confluent-kafka[json,schema_registry]>=2.13.2
jsonschema
python-dotenv
requests>=2.25.0
httpx
authlib
cachetools
attrs
typing_extensions
pytest
pytest-asyncio
Avro(例如使用WarpStream SR时)
confluent-kafka[avro,schema_registry]>=2.13.2
python-dotenv
requests>=2.25.0
httpx
authlib
cachetools
attrs
typing_extensions
pytest
pytest-asyncio
Protobuf(例如使用WarpStream SR时)
confluent-kafka[protobuf,schema_registry]>=2.13.2
python-dotenv
requests>=2.25.0
httpx
authlib
cachetools
attrs
typing_extensions
pytest
pytest-asyncio
生成代码中导入的每个第三方包(producer.py、consumer.py、common.py)都必须在requirements.txt中有对应的条目。如果代码中使用
from confluent_kafka import ...
,则
confluent-kafka
必须在requirements.txt中。如果使用
from dotenv import load_dotenv
,则必须列出
python-dotenv
。这包括不会自动安装的传递依赖——例如异步Schema Registry客户端在运行时导入
httpx
authlib
,因此即使它们不是
confluent-kafka
的依赖项,也必须显式列出。用户应能够运行
pip install -r requirements.txt
并零
ModuleNotFoundError
地运行代码。
始终包含
pytest
。如果项目使用异步生产者或消费者,包含
pytest-asyncio
。仅当生产者使用
Faker
生成示例数据时,才包含
Faker

README.md

README.md

Generate a README following
references/readme-template.md
. Adapt to match what was actually generated — omit producer sections if only a consumer was requested, omit Docker sections for Confluent Cloud projects.
根据
references/readme-template.md
生成README。根据实际生成的内容调整——如果仅生成了消费者,省略生产者部分;如果是Confluent Cloud项目,省略Docker部分。

tests/test_project.py

tests/test_project.py

Always generate unit tests. Use
references/test_project.py
as the template. The tests must run without a live Kafka cluster or Schema Registry — mock all external dependencies so tests pass in CI and eval environments.
The tests should verify these properties of the generated code:
  1. common.py:
    load_config()
    returns all required keys and uses correct defaults.
    get_kafka_config()
    produces a config with
    SASL_SSL
    and
    PLAIN
    when
    KAFKA_ENV=cloud
    , or
    PLAINTEXT
    with no SASL when
    KAFKA_ENV=local
    .
    verify_kafka_setup()
    and
    verify_schema_registry()
    return the right booleans when mocked to succeed or fail.
  2. producer.py (if generated):
    produce()
    accepts a producer instance and a
    schema_id
    as parameters (never creates a producer). The producer class (
    AIOProducer
    for async,
    Producer
    for sync) is instantiated exactly once in the module. Messages are passed through the serializer before producing. The schema ID is included as a
    confluent.value.schemaId
    Kafka record header on every produced message. For synchronous producers, verify
    flush()
    is called after producing.
  3. consumer.py (if generated): Uses a Schema Registry deserializer (
    JSONDeserializer
    /
    AsyncJSONDeserializer
    ,
    AvroDeserializer
    /
    AsyncAvroDeserializer
    , or
    ProtobufDeserializer
    /
    AsyncProtobufDeserializer
    depending on the chosen format) — no raw
    json.loads
    fallback. Calls
    unsubscribe()
    before
    close()
    for graceful shutdown.
  4. Schema file: The test template includes three schema classes —
    TestJsonSchema
    ,
    TestAvroSchema
    ,
    TestProtobufSchema
    — each guarded by a
    pytest.mark.skipif
    that checks for the corresponding file (
    value.schema.json
    ,
    value.avsc
    ,
    value.proto
    ). The class for the chosen format runs; the others are skipped. JSON Schema checks:
    type: object
    ,
    title
    ,
    properties
    with at least one property, descriptions on all properties, and
    format: date-time
    on timestamp-like fields. Avro checks:
    type: record
    ,
    name
    ,
    fields
    with at least one field,
    doc
    on the record and every field. Protobuf checks:
    proto3
    syntax and at least one
    message
    definition.
  5. Project structure:
    requirements.txt
    exists and contains
    confluent-kafka
    ,
    python-dotenv
    , and
    requests
    .
    .env.example
    exists.
Adapt the tests to the user's specific schema and data domain — if they have fields like
device_id
and
temperature
, the schema tests can check for those specific field names.
After generating all files, run
pytest tests/
to verify the tests pass. If any test fails, fix the generated code (not the tests) until they pass.
始终生成单元测试。以
references/test_project.py
为模板。测试必须在没有实时Kafka集群或Schema Registry的情况下运行——模拟所有外部依赖,以便测试在CI和评估环境中通过。
测试应验证生成代码的以下特性:
  1. common.py
    load_config()
    返回所有必需的键并使用正确的默认值。当
    KAFKA_ENV=cloud
    时,
    get_kafka_config()
    生成带有
    SASL_SSL
    PLAIN
    的配置;当
    KAFKA_ENV=local
    时,生成带有
    PLAINTEXT
    且无SASL的配置。当模拟成功或失败时,
    verify_kafka_setup()
    verify_schema_registry()
    返回正确的布尔值。
  2. producer.py(如果生成):
    produce()
    接受生产者实例和
    schema_id
    作为参数(从不创建生产者)。生产者类(异步为
    AIOProducer
    ,同步为
    Producer
    )在模块中仅实例化一次。消息在生产前通过序列化器处理。Schema ID作为
    confluent.value.schemaId
    Kafka记录头包含在每条生产的消息中。对于同步生产者,验证生产后调用了
    flush()
  3. consumer.py(如果生成):使用Schema Registry反序列化器(根据选择的格式为
    JSONDeserializer
    /
    AsyncJSONDeserializer
    AvroDeserializer
    /
    AsyncAvroDeserializer
    ProtobufDeserializer
    /
    AsyncProtobufDeserializer
    )——不回退到原始
    json.loads
    解析。在
    close()
    之前调用
    unsubscribe()
    以实现优雅关闭。
  4. Schema文件:测试模板包含三个Schema类——
    TestJsonSchema
    TestAvroSchema
    TestProtobufSchema
    ——每个类都由
    pytest.mark.skipif
    保护,检查对应的文件(
    value.schema.json
    value.avsc
    value.proto
    )。选择的格式对应的类会运行;其他类会被跳过。JSON Schema检查:
    type: object
    title
    、至少一个
    properties
    、所有属性都有描述、时间戳类字段有
    format: date-time
    。Avro检查:
    type: record
    name
    、至少一个
    fields
    、记录和每个字段都有
    doc
    。Protobuf检查:
    proto3
    语法和至少一个
    message
    定义。
  5. 项目结构
    requirements.txt
    存在且包含
    confluent-kafka
    python-dotenv
    requests
    .env.example
    存在。
根据用户的特定Schema和数据领域调整测试——如果他们有
device_id
temperature
等字段,Schema测试可以检查这些特定字段名称。
生成所有文件后,运行
pytest tests/
以验证测试通过。如果任何测试失败,修复生成的代码(而非测试)直到测试通过。

Step 3: Guide the User

步骤3:指导用户

After generating the files, give the user instructions based on their target environment. Adapt these to match what was actually generated: omit producer steps if only a consumer was requested, omit consumer steps if only a producer was requested.
Confluent Cloud:
  1. Copy
    .env.example
    to
    .env
    and fill in their Confluent Cloud credentials
  2. Create a virtualenv and install dependencies:
    pip install -r requirements.txt
  3. If a producer was generated: the schema will be registered explicitly when the producer runs for the first time via the
    register_schema()
    function (
    auto.register.schemas
    is set to
    False
    ). If only a consumer was generated: register the schema manually by pasting the contents of
    schemas/value.schema.json
    into the Confluent Cloud Console under Schema Registry > Schemas for the topic's value subject.
  4. Run the producer (if generated):
    python producer.py
  5. Run the consumer (if generated):
    python consumer.py
Remind them that they can find their bootstrap server, API keys, and Schema Registry URL in the Confluent Cloud Console under their cluster and environment settings.
Local Docker:
  1. Start Kafka and Schema Registry:
    docker compose up -d
  2. Copy
    .env.example
    to
    .env
    (defaults are pre-filled for local Docker, no edits needed)
  3. Create a virtualenv and install dependencies:
    pip install -r requirements.txt
  4. Create the topic (if auto-creation is disabled):
    docker compose exec kafka kafka-topics --create --topic demo-topic --bootstrap-server localhost:29092
  5. Run the producer (if generated):
    python producer.py
  6. Run the consumer (if generated):
    python consumer.py
  7. When done, stop the containers:
    docker compose down
    (add
    -v
    to also remove stored data)
WarpStream:
  1. Copy
    .env.example
    to
    .env
    and fill in the WarpStream bootstrap server, Schema Registry URL, and (if applicable) credentials. Set
    CLIENT_ID
    to include
    ws_az=<availability-zone>
    for zone-aware routing (e.g.,
    python-client,ws_az=us-east-1a
    )
  2. Create a virtualenv and install dependencies:
    pip install -r requirements.txt
  3. Create the topic if it doesn't exist:
    kafka-topics.sh --create --topic demo-topic --bootstrap-server <warpstream-agent>:9092
  4. Run the producer (if generated):
    python producer.py
  5. Run the consumer (if generated):
    python consumer.py
Remind them that WarpStream has higher produce latency (~250ms p50) than standard Kafka — this is expected. If throughput is lower than expected, verify that the WarpStream-specific config overrides from
references/warpstream-optimization.md
are applied (especially
enable.idempotence=false
and large batch/fetch sizes).
生成文件后,根据用户的目标环境提供指导说明。根据实际生成的内容调整——如果仅生成了消费者,省略生产者步骤;如果仅生成了生产者,省略消费者步骤。
Confluent Cloud:
  1. .env.example
    复制为
    .env
    并填写Confluent Cloud凭证
  2. 创建虚拟环境并安装依赖:
    pip install -r requirements.txt
  3. 如果生成了生产者:首次运行生产者时,
    register_schema()
    函数会显式注册Schema(
    auto.register.schemas
    设置为
    False
    )。如果仅生成了消费者:手动注册Schema,将
    schemas/value.schema.json
    的内容粘贴到Confluent Cloud控制台的Schema Registry > Schemas中,对应Topic的value主题。
  4. 运行生产者(如果生成):
    python producer.py
  5. 运行消费者(如果生成):
    python consumer.py
提醒用户可在Confluent Cloud控制台的集群和环境设置中找到引导服务器、API密钥和Schema Registry URL。
Local Docker:
  1. 启动Kafka和Schema Registry:
    docker compose up -d
  2. .env.example
    复制为
    .env
    (本地Docker的默认值已预先填充,无需编辑)
  3. 创建虚拟环境并安装依赖:
    pip install -r requirements.txt
  4. 创建Topic(如果自动创建已禁用):
    docker compose exec kafka kafka-topics --create --topic demo-topic --bootstrap-server localhost:29092
  5. 运行生产者(如果生成):
    python producer.py
  6. 运行消费者(如果生成):
    python consumer.py
  7. 使用完成后,停止容器:
    docker compose down
    (添加
    -v
    可同时删除存储的数据)
WarpStream:
  1. .env.example
    复制为
    .env
    并填写WarpStream引导服务器、Schema Registry URL和(如适用)凭证。设置
    CLIENT_ID
    以包含
    ws_az=<availability-zone>
    以实现区域感知路由(例如
    python-client,ws_az=us-east-1a
  2. 创建虚拟环境并安装依赖:
    pip install -r requirements.txt
  3. 如果Topic不存在,创建Topic:
    kafka-topics.sh --create --topic demo-topic --bootstrap-server <warpstream-agent>:9092
  4. 运行生产者(如果生成):
    python producer.py
  5. 运行消费者(如果生成):
    python consumer.py
提醒用户WarpStream的生产延迟(~250ms p50)高于标准Kafka——这是正常现象。如果吞吐量低于预期,验证是否应用了
references/warpstream-optimization.md
中的WarpStream特定配置覆盖项(尤其是
enable.idempotence=false
和大批次/获取大小)。