<HARD-GATE>
Do NOT generate any code, scaffold any project, or modify any file until you have
explicitly asked and received answers for questions #1 (existing app or greenfield),
#2 (target environment), and #3 (producer, consumer, or both). If the user's prompt
partially answers some questions, still confirm your understanding before generating.
This applies to EVERY prompt regardless of how specific it appears.
</HARD-GATE>
Begin by announcing: "Using the Confluent Kafka Python Client skill to guide this project."
Confluent Kafka Python Client Creation
Generate a production-ready Python project for producing to and/or consuming from Kafka using
. Supports three target environments:
Confluent Cloud (managed),
Local Docker (open-source Kafka), and
WarpStream (Kafka-compatible, object-storage-backed), and two producer styles:
AsyncIO (non-blocking) and
Synchronous (blocking). The generated code follows Confluent's best practices.
Step 1: Gather Requirements
Before generating any code, work through the questions below. Skip any question the user has already answered explicitly in their prompt — do not re-ask just for form's sake. For example, "build a producer and consumer on Confluent Cloud with an async producer" already answers #2, #3, and #4; only #1, #5, #6, #7, and #8 remain.
Mandatory confirmation gate — do not skip, even if the user answered every question. Before writing any file, you MUST send one message that:
- Recaps the answers you extracted as a short bulleted list (e.g., "Target: Confluent Cloud · Components: producer + consumer · Producer style: async · From scratch: yes").
- Asks any remaining open questions inline.
- Explicitly asks the user to confirm or correct before you proceed.
Then STOP and wait for the user's reply. Do not generate files in the same turn as the recap, and do not proceed on the assumption that a fully-specified prompt implies consent to generate immediately — the recap catches misinterpretations of the prompt and is required even when questions #1–#8 are all pre-answered. The only way to skip the gate is if the user has already confirmed the recap earlier in this conversation.
Do not assume defaults for #1, #2, or #3 — if any of these are not answered by the prompt, you must ask.
- Are you adding Kafka to an existing application, or starting from scratch?
- If the user has existing Python code (mentions an existing project, has a , uses Flask/FastAPI/Django, etc.), do not scaffold a new project. Instead: (a) identify their existing producer or data-sending code, (b) ask whether they already have schemas registered in Schema Registry, (c) add Schema Registry integration to their existing code following the patterns in the reference files. Generate only the files they are missing (e.g., ,
schemas/value.schema.json
) and modify their existing code inline.
- If the user already produces to Kafka without Schema Registry (schemaless), help them migrate: (1) generate a JSON Schema from their existing message structure, (2) register it, and (3) replace their raw calls with serializer-backed calls. Do not discard their existing code.
- If starting from scratch, proceed with the full scaffold below.
- Target environment? — Confluent Cloud, local Kafka (Docker), or WarpStream. Always prompt for this, even if the user didn't mention it. If they mention "open source", "local", "docker", "self-hosted", or just want to try Kafka without a cloud account, choose local Docker. If they mention "Confluent Cloud", "CC", or have existing cloud credentials, choose Confluent Cloud. If they mention "WarpStream", choose WarpStream. Default to Confluent Cloud if they confirm they don't have a preference, but always ask first.
- If WarpStream: Read
references/warpstream-optimization.md
and apply the librdkafka overrides from that reference. Key changes: disable idempotence, dramatically increase batch sizes and in-flight requests, set large fetch sizes, add to for zone-aware routing. Prefer null message keys for sticky partitioning unless entity-based ordering is required.
- Producer, consumer, or both?
- Async or synchronous producer? (Only if producer is requested.) Help the user choose:
- AsyncIO Producer (): Use when code runs under an event loop — FastAPI/Starlette, aiohttp, Sanic, asyncio workers — and must not block.
- Synchronous Producer (): Use for scripts, batch jobs, and highest-throughput pipelines where the user controls threads/processes and can call / directly.
If the user mentions an async framework (FastAPI, aiohttp, Sanic) or uses , default to AsyncIO. If they mention scripts, batch, ETL, or don't have a preference, default to Synchronous.
- Do you have an existing schema you'd like to use? If yes, ask the user to paste it or provide the file path, then use it as the
schemas/value.schema.json
instead of generating one. If no, proceed to ask about their data fields.
- What kind of data are you producing? (Only if the user doesn't have an existing schema. Get field names and types so you can generate a matching JSON Schema and sample data.)
- Topic name? (Default: )
- Consumer group ID? (Only if consumer; default: )
Don't ask about Schema Registry — always include it. For Confluent Cloud and local Docker, always use JSON Schema. If the target is WarpStream, ask which Schema Registry implementation they are using — WarpStream's built-in schema registry only supports Avro and Protobuf (
returns
), so if they are using it, ask whether they prefer Avro or Protobuf (default to Avro). If they are using a different SR (e.g., Confluent Cloud Schema Registry), JSON Schema is fine.
Common Agent Mistakes
| Thought | Reality |
|---|
| "The user mentioned FastAPI, so I know it's async — skip the questions" | Still confirm. They might want a sync background worker alongside FastAPI. |
| "I'll use Avro since it's more widely used" | This skill uses JSON Schema by default. Exception: WarpStream's built-in schema registry only supports Avro and Protobuf — if the user is using WarpStream SR, use Avro by default. If they're using a different SR (e.g., Confluent Cloud SR), JSON Schema is fine regardless of the Kafka environment. |
| "I'll skip Schema Registry to keep it simple" | Schema Registry is non-negotiable. Every project includes it. |
"I'll use auto.register.schemas=True
for convenience" | Always . Explicit registration is a core principle. |
| "I'll create a producer in — it's cleaner" | One producer instance, created in , passed as a parameter. Always. |
| "The user wants sync, so the consumer should be sync too" | Consumer is always async (). This is a deliberate design decision. |
| "I'll add to the AIOProducer for schema ID" | raises on headers. Only sync producers use headers. |
| "I'll swap for and keep the call site the same" | takes first; takes first. Calling positionally across formats raises TypeError: ... got multiple values for argument 'schema_registry_client'
. Always pass both as kwargs. |
"I'll set message.max.bytes=64000000
on the producer config and on the consumer — they're independent" | Not in librdkafka. is a client-global config (unlike Java's per-role ), so when is shared between producer and consumer, the consumer inherits it. librdkafka enforces fetch.max.bytes >= message.max.bytes
at consumer construction and raises KafkaError{_INVALID_ARG, ... "fetch.max.bytes must be >= message.max.bytes"}
. Use the WarpStream librdkafka consumer values in references/warpstream-optimization.md
() — they're sized to satisfy this constraint. |
Step 1b: Confirm Understanding
After gathering all answers, present a confirmation summary before generating any code:
Before I generate the project, let me confirm:
- Project type: [Greenfield scaffold / Migration of existing code]
- Environment: [Confluent Cloud (SASL_SSL) / Local Docker (PLAINTEXT) / WarpStream]
- Schema format: [JSON Schema / Avro / Protobuf] (Avro or Protobuf if using WarpStream's built-in SR)
- Components: [Producer only / Consumer only / Both]
- Producer style: [AsyncIO (AIOProducer) / Synchronous (Producer)] (if applicable)
- Schema: [brief description of user's data fields]
- Topic: [topic name]
- Consumer group: [group ID] (if consumer)
Does this look right?
Wait for user confirmation before proceeding to Step 2. If the user corrects anything, update your understanding and re-confirm.
Step 2: Generate the Project
Decision Flowchart
dot
digraph decisions {
"Q1: Existing app?" -> "Migration path:\nmodify existing code" [label="yes"];
"Q1: Existing app?" -> "Q2: Environment?" [label="no / greenfield"];
"Q2: Environment?" -> "Cloud config\n(SASL_SSL)" [label="Confluent Cloud"];
"Q2: Environment?" -> "Local Docker config\n(PLAINTEXT) + docker-compose.yml" [label="local / docker / OSS"];
"Q2: Environment?" -> "WarpStream config\n(apply overrides from\nreferences/warpstream-optimization.md)" [label="WarpStream"];
"Cloud config\n(SASL_SSL)" -> "Q3: Components?";
"Local Docker config\n(PLAINTEXT) + docker-compose.yml" -> "Q3: Components?";
"WarpStream config\n(apply overrides from\nreferences/warpstream-optimization.md)" -> "Which SR?\n(WarpStream SR → Avro/Protobuf;\nother SR → JSON Schema)";
"Which SR?\n(WarpStream SR → Avro/Protobuf;\nother SR → JSON Schema)" -> "Q3: Components?";
"Q3: Components?" -> "Q4: Async or sync?" [label="producer requested"];
"Q3: Components?" -> "Generate consumer\n(always async AIOConsumer)" [label="consumer only"];
"Q4: Async or sync?" -> "AIOProducer path\nAsyncJSONSerializer\n(no headers support)" [label="async / event-loop"];
"Q4: Async or sync?" -> "Producer path\nJSONSerializer\n(header-based schema ID)" [label="sync / batch / ETL"];
}
Create this file structure in the user's chosen directory:
<project-dir>/
├── producer.py # (if requested)
├── consumer.py # (if requested)
├── common.py # shared config loading + verification helpers
├── schemas/
│ └── value.schema.json # JSON Schema (or value.avsc / value.proto when using WarpStream SR)
├── tests/
│ └── test_project.py # unit tests (always generated)
├── .env.example # template for credentials
├── requirements.txt
├── docker-compose.yml # (local Docker path only)
Security
NEVER read, open, or display
files. They contain API keys and secrets. Only generate
with placeholder values. If the user asks you to debug a connection issue, ask them to verify their
values themselves — do not read the file.
Core Principles
These principles matter because they prevent the most common production issues with Kafka Python clients:
-
Reuse the producer instance. Creating a new producer per message is expensive — each one opens new TCP connections, does SASL handshakes, and fetches metadata. Create one producer and reuse it for all messages. The produce function should accept the producer as a parameter, not instantiate one.
-
Always use Schema Registry with JSON Schema. Schema Registry enforces a contract between producers and consumers. Without it, schema changes silently break downstream consumers. This skill uses
JSON Schema by default. Schema Registry supports Avro, Protobuf, and JSON Schema — JSON Schema is chosen because: (1) Python has first-class JSON support with no code generation step, (2)
provides
/
out of the box, (3) it is the most approachable format for Python developers already working with JSON/dict data.
WarpStream Schema Registry exception: WarpStream's built-in schema registry only supports Avro and Protobuf — its
endpoint returns
. JSON Schema is
not available when using WarpStream's SR. If the user is running WarpStream and using its built-in SR, use
Avro by default (or Protobuf if the user prefers). Use
/
from
confluent_kafka.schema_registry.avro
(or
/
from
confluent_kafka.schema_registry.protobuf
). For async producers, use
from
confluent_kafka.schema_registry._async.avro
(or
from
confluent_kafka.schema_registry._async.protobuf
). Generate an Avro schema file at
(or
for Protobuf) instead of
schemas/value.schema.json
— use
as the starting point and adapt to the user's domain. When generating producer/consumer code on the Avro path, copy from
references/producer_avro.py
(async),
references/producer_avro_sync.py
(sync), and
references/consumer_avro.py
(async) —
not the JSON reference files — because the constructor signatures differ (see the Avro constructor warning below). If the user is running WarpStream but pointing at a different SR (e.g., Confluent Cloud Schema Registry), JSON Schema works fine — use the default path.
Register schemas as a separate explicit step before creating the serializer. Use a dedicated
function that calls
sr_client.register_schema()
and lets errors (auth failures, network errors, permission denials) propagate immediately — never wrap registration in a bare
. Then configure the serializer with
auto.register.schemas=False
and
. This ensures the serializer never silently auto-registers and aligns with production practice where CI/CD registers schemas, not application startup.
Use the appropriate serializer for the chosen producer style and schema format:
- JSON Schema (default): / from
confluent_kafka.schema_registry._async.json_schema
for async, or / from confluent_kafka.schema_registry.json_schema
for synchronous.
- Avro (default when using WarpStream SR): / from
confluent_kafka.schema_registry._async.avro
for async, or / from confluent_kafka.schema_registry.avro
for synchronous.
- Protobuf (alternative when using WarpStream SR): /
AsyncProtobufDeserializer
from confluent_kafka.schema_registry._async.protobuf
for async, or / from confluent_kafka.schema_registry.protobuf
for synchronous.
-
Choose the right producer style. The
library offers two producer APIs:
- AsyncIO Producer ( from ): Non-blocking, integrates with event loops. Use with from
confluent_kafka.schema_registry._async.json_schema
and AsyncSchemaRegistryClient
. Best for applications already running an event loop (FastAPI, aiohttp, Sanic, asyncio workers).
- Synchronous Producer ( from ): Blocking calls with delivery callbacks. Use with from
confluent_kafka.schema_registry.json_schema
and . Best for scripts, batch jobs, and highest-throughput pipelines where the user controls threads/processes and can call / directly.
Always ask the user which style fits their use case. The consumer always uses (async) — long-running poll loops benefit from non-blocking I/O, and mixing sync/async consumer styles adds complexity with little benefit.
-
Graceful shutdown. Async producers must
and
(both awaited) before exiting. Synchronous producers must call
before exiting — otherwise buffered messages are lost. Consumers must
then
to leave the consumer group cleanly (avoiding unnecessary rebalances). Use
blocks and handle
/ signals.
-
Support Confluent Cloud, local Docker, and WarpStream. When targeting Confluent Cloud, configure
with
mechanism and load API keys from
. When targeting local Docker, use
with no authentication. When targeting WarpStream, use
or
depending on the user's WarpStream deployment, and apply the librdkafka overrides from
references/warpstream-optimization.md
(large batches, disabled idempotence, large fetches, zone-aware
). The
environment variable (
,
, or
) controls which path is used. Load all settings from environment variables via
.
-
Verify connectivity before running. Use
AdminClient.list_topics()
to verify the broker is reachable and the topic exists before producing or consuming. Verify Schema Registry connectivity with an HTTP health check.
-
Always set a message key for domain events. Pass
key=<entity_id>.encode("utf-8")
to
for any message that represents an entity or event stream (order events, user actions, device telemetry, transactions). Kafka partitions by key, so messages with the same key land on the same partition and preserve ordering — critical for event streams like
OrderCreated → OrderUpdated → OrderCancelled
where consumers must see events in order. The
helper in every reference file accepts a
parameter naming the field to use as the key (e.g.,
,
key_field="transaction_id"
). Ask the user which field identifies the entity and pass it to
. Only leave
if the user explicitly states ordering does not matter (e.g., stateless metrics where any partition is fine).
WarpStream exception: On WarpStream, null keys enable sticky partitioning, which builds larger batches and significantly improves throughput and cost. When the user's use case does
not require per-entity ordering (e.g., independent telemetry readings, stateless metrics, logs), recommend
and explain the throughput benefit. When per-entity ordering
is required (e.g.,
OrderCreated → OrderUpdated → OrderCancelled
for the same order), still set a message key — correctness takes priority over batching efficiency. Ask the user whether their events need per-key ordering to decide.
common.py
This module handles configuration loading and connectivity verification. Use
as the template.
producer.py Pattern (AsyncIO)
When the user chooses the
AsyncIO producer, use
as the template.
Key points:
- takes a producer instance as a parameter — it never creates one
- The producer is created once in and can be passed to multiple calls
- The async serializer () must be ed when calling it on a message
- is async and returns an . You must the method to get the Future, then the Future to get the delivered :
future = await producer.produce(...); result = await future
- and are coroutines — they must be ed in the block
- Signal handlers set a shutdown event for graceful termination
- Schema registration and serializer creation are separate steps. explicitly registers the schema and returns the schema ID — errors propagate immediately. (or /
create_protobuf_serializer()
when using Avro/Protobuf) creates the serializer with conf={'auto.register.schemas': False, 'use.latest.version': True}
.
- Constructor argument order differs across formats — always pass and as keyword arguments, never positionally. The JSON, Avro, and Protobuf serializer/deserializer classes in do not share the same positional signature: / take first, while / and / take first. Mixing positional and keyword forms across formats produces
TypeError: ... got multiple values for argument 'schema_registry_client'
. Use kwargs everywhere so the pattern is identical:
await AsyncJSONSerializer(schema_str=schema_str, schema_registry_client=sr_client, conf=conf)
await AsyncAvroSerializer(schema_str=schema_str, schema_registry_client=sr_client, conf=conf)
await AsyncProtobufSerializer(msg_type=MsgType, schema_registry_client=sr_client, conf=conf)
(Protobuf takes a generated message class instead of a schema string)
See (JSON) and references/producer_avro.py
(Avro) for the canonical templates.
- Headers are NOT supported with batch mode. Do not pass to — it will raise . Schema identification is handled automatically by the serializer's wire format prefix (applies to JSON Schema, Avro, and Protobuf serializers). See "Schema ID in Headers vs Wire Format" below for details
producer.py Pattern (Synchronous)
When the user chooses the
synchronous producer, use
references/producer_sync.py
as the template.
Key points:
- takes a producer instance as a parameter — it never creates one
- The producer is created once in and can be passed to multiple calls
- Uses (synchronous) from
confluent_kafka.schema_registry.json_schema
and from confluent_kafka.schema_registry
. When using Avro/Protobuf (e.g., with WarpStream SR), use from confluent_kafka.schema_registry.avro
or from confluent_kafka.schema_registry.protobuf
- is non-blocking — it enqueues the message. Call after each produce to serve delivery callbacks and keep the internal queue from filling up
- Call after a batch to block until all in-flight messages are delivered
- Use a
delivery_callback(err, msg)
function to handle per-message delivery reports
- Signal handlers set a flag for graceful termination
- in the block ensures no buffered messages are lost
- Schema registration and serializer creation are separate steps, same as the async pattern. explicitly registers the schema. (or /
create_protobuf_serializer()
when using Avro/Protobuf) creates the serializer with conf={'auto.register.schemas': False, 'use.latest.version': True}
. Both return the schema ID
- The same kwargs-only rule applies to the sync variants: call
JSONSerializer(schema_str=..., schema_registry_client=..., conf=...)
, AvroSerializer(schema_str=..., schema_registry_client=..., conf=...)
, or ProtobufSerializer(msg_type=..., schema_registry_client=..., conf=...)
. The positional argument order is not consistent across formats — passing positionally will raise TypeError: ... got multiple values for argument 'schema_registry_client'
. See references/producer_sync.py
(JSON) and references/producer_avro_sync.py
(Avro) for templates
- The schema ID is passed as a Kafka record header () on every produced message — this is the header-based schema identification pattern. It keeps the JSON payload clean and readable by non-Confluent consumers. See "Schema ID in Headers vs Wire Format" below for details
consumer.py Pattern
Key points in the consumer:
- Signal-based graceful shutdown — then to leave the consumer group cleanly
- Deserialization via Schema Registry using (or /
AsyncProtobufDeserializer
when using Avro/Protobuf) — no fallback to raw parsing, Schema Registry is required. Construct the deserializer with keyword arguments only — e.g., await AsyncAvroDeserializer(schema_str=schema_str, schema_registry_client=sr_client)
. The positional signature differs across formats (Avro/Protobuf take first; JSON takes first), and mixing positional and keyword forms produces TypeError: ... got multiple values for argument 'schema_registry_client'
. See (JSON) and references/consumer_avro.py
(Avro) for templates
- Continuous polling loop until shutdown signal
Schema ID in Headers vs Wire Format
Synchronous producer: The reference code passes the Schema ID as a Kafka record header (
) on every message. This keeps the JSON payload clean (no magic byte prefix), making it readable by non-Confluent consumers and debuggable with tools like
. This is the recommended approach for synchronous producers.
Async producer (AIOProducer): The
does
not support custom headers in batch mode (
raises
if
is passed). Schema identification relies on the JSON Schema serializer's wire format prefix (magic byte + schema ID prepended to the payload). This is a known limitation — do not attempt to add headers to async-produced messages.
When to use which: If downstream consumers are all Confluent-aware (using Schema Registry deserializers), both approaches work transparently. If downstream consumers are non-Confluent (plain JSON consumers), the sync producer with header-based schema ID is preferable because the message value remains clean JSON. Document this tradeoff in the generated README when producing for mixed consumer ecosystems.
schemas/
Generate a schema file matching the user's data domain in the appropriate format for the chosen schema registry.
JSON Schema (default): Generate a JSON Schema file at
schemas/value.schema.json
.
For example, if the user is producing financial transactions:
json
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Transaction",
"description": "A financial transaction event produced to Kafka.",
"type": "object",
"properties": {
"transaction_id": {
"type": "string",
"description": "Unique identifier for this transaction."
},
"amount": {
"type": "number",
"description": "Transaction amount in the specified currency.",
"default": 0
},
"currency": {
"type": "string",
"description": "ISO 4217 currency code.",
"default": ""
},
"timestamp": {
"type": "string",
"format": "date-time",
"description": "Time the transaction occurred, in ISO 8601 format."
},
"status": {
"description": "Current state of the transaction.",
"enum": ["pending", "completed", "failed", "refunded"],
"default": "pending"
},
"metadata": {
"oneOf": [{"type": "null"}, {"type": "object"}],
"description": "Optional metadata associated with the transaction.",
"default": null
}
},
"required": ["transaction_id", "amount", "currency", "timestamp", "status"]
}
Avro (when using WarpStream SR): Generate an Avro schema file at
(or
for Protobuf). For the same financial transactions example in Avro:
json
{
"type": "record",
"name": "Transaction",
"namespace": "com.example.kafka",
"doc": "A financial transaction event produced to Kafka.",
"fields": [
{"name": "transaction_id", "type": "string", "doc": "Unique identifier for this transaction."},
{"name": "amount", "type": "double", "default": 0, "doc": "Transaction amount in the specified currency."},
{"name": "currency", "type": "string", "default": "", "doc": "ISO 4217 currency code."},
{"name": "timestamp", "type": "string", "doc": "Time the transaction occurred, in ISO 8601 format."},
{"name": "status", "type": {"type": "enum", "name": "Status", "symbols": ["pending", "completed", "failed", "refunded"]}, "default": "pending", "doc": "Current state of the transaction."},
{"name": "metadata", "type": ["null", "string"], "default": null, "doc": "Optional metadata associated with the transaction."}
]
}
Schema Generation Rules
Follow the rules in
references/schema-generation-rules.md
strictly when generating or adapting schemas to the user's domain.
Multi-Event Topics (Advanced)
When the user describes multiple event types on a single topic, follow
references/multi-event-guide.md
. Only suggest multi-event union schemas when the user explicitly describes multiple event types on one topic.
docker-compose.yml (Local Docker Path Only)
When the user chooses local Docker, you MUST generate a
using
references/docker-compose.yml
as the template. This starts a single-node Kafka broker (using
confluentinc/confluent-local
) and a Confluent Schema Registry. The user just runs
to get a working Kafka environment.
IMPORTANT: The
confluentinc/confluent-local
image uses KRaft mode and has built-in listener names:
(internal, port 29092),
(external, port 9092), and
(port 29093). Do NOT invent custom listener names — this will conflict with the image's internal configuration and cause boot loops. Only override
KAFKA_ADVERTISED_LISTENERS
and
using these exact listener names. The internal
listener must advertise the
hostname (not
) so Schema Registry can reach the broker from within the Docker network.
.env.example
Generate the appropriate
based on the target environment:
Confluent Cloud:
KAFKA_ENV=cloud
BOOTSTRAP_SERVER=pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
API_KEY=your-api-key
API_SECRET=your-api-secret
TOPIC=demo-topic
SCHEMA_REGISTRY_URL=https://psrc-xxxxx.us-east-2.aws.confluent.cloud
SR_API_KEY=your-sr-api-key
SR_API_SECRET=your-sr-api-secret
CLIENT_ID=python-client
GROUP_ID=python-consumer-group
Local Docker:
KAFKA_ENV=local
BOOTSTRAP_SERVER=localhost:9092
TOPIC=demo-topic
SCHEMA_REGISTRY_URL=http://localhost:8081
CLIENT_ID=python-client
GROUP_ID=python-consumer-group
WarpStream:
KAFKA_ENV=warpstream
BOOTSTRAP_SERVER=your-warpstream-bootstrap-url:9092
TOPIC=demo-topic
SCHEMA_REGISTRY_URL=http://your-schema-registry:8081
CLIENT_ID=python-client,ws_az=us-east-1a
GROUP_ID=python-consumer-group
# If your WarpStream deployment requires SASL auth, uncomment:
# API_KEY=your-api-key
# API_SECRET=your-api-secret
# If using Confluent Cloud Schema Registry, uncomment:
# SR_API_KEY=your-sr-api-key
# SR_API_SECRET=your-sr-api-secret
requirements.txt
JSON Schema (default):
confluent-kafka[json,schema_registry]>=2.13.2
jsonschema
python-dotenv
requests>=2.25.0
httpx
authlib
cachetools
attrs
typing_extensions
pytest
pytest-asyncio
Avro (e.g., when using WarpStream SR):
confluent-kafka[avro,schema_registry]>=2.13.2
python-dotenv
requests>=2.25.0
httpx
authlib
cachetools
attrs
typing_extensions
pytest
pytest-asyncio
Protobuf (e.g., when using WarpStream SR):
confluent-kafka[protobuf,schema_registry]>=2.13.2
python-dotenv
requests>=2.25.0
httpx
authlib
cachetools
attrs
typing_extensions
pytest
pytest-asyncio
Every third-party package imported anywhere in the generated code (producer.py, consumer.py, common.py) must have a corresponding entry in requirements.txt. If the code does
from confluent_kafka import ...
, then
must be in requirements.txt. If it does
from dotenv import load_dotenv
, then
must be listed. This includes transitive dependencies that aren't automatically installed — for example, the async Schema Registry client imports
and
at runtime, so both must be explicitly listed even though they aren't declared as dependencies of
. The user should be able to
pip install -r requirements.txt
and run the code with zero
s.
Always include
. Include
if the project uses the async producer or consumer. Only include
if the producer generates sample data with it.
README.md
Generate a README following
references/readme-template.md
. Adapt to match what was actually generated — omit producer sections if only a consumer was requested, omit Docker sections for Confluent Cloud projects.
tests/test_project.py
Always generate unit tests. Use
references/test_project.py
as the template. The tests must run without a live Kafka cluster or Schema Registry — mock all external dependencies so tests pass in CI and eval environments.
The tests should verify these properties of the generated code:
-
common.py:
returns all required keys and uses correct defaults.
produces a config with
and
when
, or
with no SASL when
.
and
return the right booleans when mocked to succeed or fail.
-
producer.py (if generated):
accepts a producer instance and a
as parameters (never creates a producer). The producer class (
for async,
for sync) is instantiated exactly once in the module. Messages are passed through the serializer before producing. The schema ID is included as a
Kafka record header on every produced message. For synchronous producers, verify
is called after producing.
-
consumer.py (if generated): Uses a Schema Registry deserializer (
/
,
/
, or
/
AsyncProtobufDeserializer
depending on the chosen format) — no raw
fallback. Calls
before
for graceful shutdown.
-
Schema file: The test template includes three schema classes —
,
,
— each guarded by a
that checks for the corresponding file (
,
,
). The class for the chosen format runs; the others are skipped. JSON Schema checks:
,
,
with at least one property, descriptions on all properties, and
on timestamp-like fields. Avro checks:
,
,
with at least one field,
on the record and every field. Protobuf checks:
syntax and at least one
definition.
-
Project structure:
exists and contains
,
, and
.
exists.
Adapt the tests to the user's specific schema and data domain — if they have fields like
and
, the schema tests can check for those specific field names.
After generating all files, run
to verify the tests pass. If any test fails, fix the generated code (not the tests) until they pass.
Step 3: Guide the User
After generating the files, give the user instructions based on their target environment. Adapt these to match what was actually generated: omit producer steps if only a consumer was requested, omit consumer steps if only a producer was requested.
Confluent Cloud:
- Copy to and fill in their Confluent Cloud credentials
- Create a virtualenv and install dependencies:
pip install -r requirements.txt
- If a producer was generated: the schema will be registered explicitly when the producer runs for the first time via the function ( is set to ). If only a consumer was generated: register the schema manually by pasting the contents of
schemas/value.schema.json
into the Confluent Cloud Console under Schema Registry > Schemas for the topic's value subject.
- Run the producer (if generated):
- Run the consumer (if generated):
Remind them that they can find their bootstrap server, API keys, and Schema Registry URL in the Confluent Cloud Console under their cluster and environment settings.
Local Docker:
- Start Kafka and Schema Registry:
- Copy to (defaults are pre-filled for local Docker, no edits needed)
- Create a virtualenv and install dependencies:
pip install -r requirements.txt
- Create the topic (if auto-creation is disabled):
docker compose exec kafka kafka-topics --create --topic demo-topic --bootstrap-server localhost:29092
- Run the producer (if generated):
- Run the consumer (if generated):
- When done, stop the containers: (add to also remove stored data)
WarpStream:
- Copy to and fill in the WarpStream bootstrap server, Schema Registry URL, and (if applicable) credentials. Set to include
ws_az=<availability-zone>
for zone-aware routing (e.g., python-client,ws_az=us-east-1a
)
- Create a virtualenv and install dependencies:
pip install -r requirements.txt
- Create the topic if it doesn't exist:
kafka-topics.sh --create --topic demo-topic --bootstrap-server <warpstream-agent>:9092
- Run the producer (if generated):
- Run the consumer (if generated):
Remind them that WarpStream has higher produce latency (~250ms p50) than standard Kafka — this is expected. If throughput is lower than expected, verify that the WarpStream-specific config overrides from
references/warpstream-optimization.md
are applied (especially
and large batch/fetch sizes).