Loading...
Loading...
Complete guide for Apache Kafka stream processing including producers, consumers, Kafka Streams, connectors, schema registry, and production deployment
npx skill4agent add manutej/luxor-claude-marketplace kafka-stream-processingKafka Design Philosophy:
- High Throughput: Millions of messages per second
- Low Latency: Single-digit millisecond latency
- Durability: Replicated, persistent storage
- Scalability: Horizontal scaling via partitions
- Fault Tolerance: Automatic failover and recovery
- Message Delivery Semantics: At-least-once, exactly-once support# Create a topic with 20 partitions and replication factor 3
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_topic_name \
--partitions 20 --replication-factor 3 --config x=y# Increase partition count (cannot decrease)
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my_topic_name \
--partitions 40<h3>Stream Partitions and Tasks</h3>
<p> The messaging layer of Kafka partitions data for storing and transporting it. Kafka Streams partitions data for processing it. In both cases, this partitioning is what enables data locality, elasticity, scalability, high performance, and fault tolerance. Kafka Streams uses the concepts of <b>partitions</b> and <b>tasks</b> as logical units of its parallelism model based on Kafka topic partitions. There are close links between Kafka Streams and Kafka in the context of parallelism: </p>
<ul>
<li>Each <b>stream partition</b> is a totally ordered sequence of data records and maps to a Kafka <b>topic partition</b>.</li>
<li>A <b>data record</b> in the stream maps to a Kafka <b>message</b> from that topic.</li>
<li>The <b>keys</b> of data records determine the partitioning of data in both Kafka and Kafka Streams, i.e., how data is routed to specific partitions within topics.</li>
</ul>
<p> An application's processor topology is scaled by breaking it into multiple tasks. More specifically, Kafka Streams creates a fixed number of tasks based on the input stream partitions for the application, with each task assigned a list of partitions from the input streams (i.e., Kafka topics). The assignment of partitions to tasks never changes so that each task is a fixed unit of parallelism of the application. Tasks can then instantiate their own processor topology based on the assigned partitions; they also maintain a buffer for each of its assigned partitions and process messages one-at-a-time from these record buffers. As a result stream tasks can be processed independently and in parallel without manual intervention. </p>
<p> Slightly simplified, the maximum parallelism at which your application may run is bounded by the maximum number of stream tasks, which itself is determined by maximum number of partitions of the input topic(s) the application is reading from. For example, if your input topic has 5 partitions, then you can run up to 5 applications instances. These instances will collaboratively process the topic's data. If you run a larger number of app instances than partitions of the input topic, the "excess" app instances will launch but remain idle; however, if one of the busy instances goes down, one of the idle instances will resume the former's work. </p>
<p> It is important to understand that Kafka Streams is not a resource manager, but a library that "runs" anywhere its stream processing application runs. Multiple instances of the application are executed either on the same machine, or spread across multiple machines and tasks can be distributed automatically by the library to those running application instances. The assignment of partitions to tasks never changes; if an application instance fails, all its assigned tasks will be automatically restarted on other instances and continue to consume from the same stream partitions. </p>Design:
- The Producer: Design considerations.
- The Consumer: Design considerations.
- Message Delivery Semantics: At-least-once, at-most-once, exactly-once.
- Using Transactions for atomic operations.ProducerClient:
publish(topic: str, message: bytes, partition_key: Optional[str] = None)
topic: The topic to publish the message to.
message: The message payload to send.
partition_key: Optional key to determine the partition. If None, random partitioning is used.
get_metadata(topic: str) -> dict
topic: The topic to get metadata for.
Returns: A dictionary containing broker information and partition leader details.Producer API:
- send(record): Sends a record to a Kafka topic.
- Parameters:
- record: The record to send, including topic, key, and value.
- Returns: A Future representing the result of the send operation.
- flush(): Forces any buffered records to be sent.
- close(): Closes the producer, releasing any resources.
- metrics(): Returns metrics about the producer.
Configuration:
- bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
- key.serializer: The serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.
- value.serializer: The serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
- acks: The number of acknowledgments the producer requires the leader to have received before considering a request complete.
- linger.ms: The producer groups together any records that arrive in between request transmissions into a single batched request.
- batch.size: The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.host1:9092,host2:9092,host3:9092org.apache.kafka.common.serialization.StringSerializerorg.apache.kafka.common.serialization.ByteArraySerializer01all-1truefalsenonegzipsnappylz4zstdimport org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key1", "Hello Kafka!");
// Async send with callback
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error producing: " + exception);
} else {
System.out.printf("Sent to partition %d, offset %d%n",
metadata.partition(), metadata.offset());
}
});
}
}
}# List consumer groups
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# Describe consumer group members with partition assignments
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0)
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2)
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1)
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -truefalseearliestlatestnoneParameter Name: max.poll.records
Corresponding Client: Consumer
Streams Default: 100
Parameter Name: client.id
Corresponding Client: -
Streams Default: <application.id>-<random-UUID>
Parameter Name: enable.auto.commit
Description: Controls whether the consumer automatically commits offsets. When true, the consumer will automatically commit offsets periodically based on the poll interval.
Default Value: trueimport org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.partition(), record.offset(), record.key(), record.value());
// Process record
processRecord(record);
}
// Manual commit after processing batch
consumer.commitSync();
}
}
}
private static void processRecord(ConsumerRecord<String, String> record) {
// Business logic here
}
}// Async commit with callback
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed: " + exception);
}
});
// Sync commit for reliability
try {
consumer.commitSync();
} catch (CommitFailedException e) {
System.err.println("Commit failed: " + e);
}There are two special processors in the topology:
<ul>
<li><b>Source Processor</b>: A special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.</li>
<li><b>Sink Processor</b>: A special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.</li>
</ul>
Note that in normal processor nodes other remote systems can also be accessed while processing the current record. Therefore the processed results can either be streamed back into Kafka or written to an external system.import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> wordCounts = builder.stream(
"word-counts-input-topic", /* input topic */
Consumed.with(
Serdes.String(), /* key serde */
Serdes.Long() /* value serde */
)
);import org.apache.kafka.streams.StreamsBuilder;
StreamsBuilder builder = new StreamsBuilder();
builder.table("input-topic");KTable: Each application instance gets data from only 1 partition.
GlobalKTable: Each application instance gets data from all partitions.KStream<String, Long> stream = ...;
// Write the stream to the output topic, using the configured default key
// and value serdes.
stream.to("my-stream-output-topic");
// Write the stream to the output topic, using explicit key and value serdes,
// (thus overriding the defaults in the config properties).
stream.to("my-stream-output-topic", Produced.with(Serdes.String(), Serdes.Long()));KStream<byte[], String> stream = ... ;
KStream<byte[], String> repartitionedStream = stream.repartition(Repartitioned.numberOfPartitions(10));Join Co-partitioning Requirements:
For equi-joins in Kafka Streams, input data must be co-partitioned. This ensures that records with the same key from both sides of the join are delivered to the same stream task.
Requirements for data co-partitioning:
1. Input topics (left and right sides) must have the same number of partitions.
2. All applications writing to the input topics must use the same partitioning strategy to ensure records with the same key are delivered to the same partition number.
- This applies to producer settings like `partitioner.class` (e.g., `ProducerConfig.PARTITIONER_CLASS_CONFIG`) and Kafka Streams `StreamPartitioner` for operations like `KStream#to()`.
- Using default partitioner settings across all applications generally satisfies this requirement.
Why co-partitioning is required:
- KStream-KStream, KTable-KTable, and KStream-KTable joins are performed based on record keys (e.g., `leftRecord.key == rightRecord.key`). Co-partitioning by key ensures these records meet.
Exceptions where co-partitioning is NOT required:
1. KStream-GlobalKTable joins:
- All partitions of the GlobalKTable's underlying changelog stream are available to each KafkaStreams instance.
- A `KeyValueMapper` allows non-key based joins from KStream to GlobalKTable.
2. KTable-KTable Foreign-Key joins:
- Kafka Streams internally ensures co-partitioning for these joins.Internal Topic Configuration:
- message.timestamp.type: 'CreateTime' for all internal topics.
- Internal Repartition Topics:
- compaction.policy: 'delete'
- retention.time: -1 (infinite)
- Internal Changelog Topics for Key-Value Stores:
- compaction.policy: 'compact'
- Internal Changelog Topics for Windowed Key-Value Stores:
- compaction.policy: 'delete,compact'
- retention.time: 24 hours + windowed store setting
- Internal Changelog Topics for Versioned State Stores:
- cleanup.policy: 'compact'
- min.compaction.lag.ms: 24 hours + store's historyRetentionMsThe parallelism of a Kafka Streams application is primarily determined by how many partitions the input topics have. For example, if your application reads from a single topic that has ten partitions, then you can run up to ten instances of your applications. You can run further instances, but these will be idle.
The number of topic partitions is the upper limit for the parallelism of your Kafka Streams application and for the number of running instances of your application.
To achieve balanced workload processing across application instances and to prevent processing hotpots, you should distribute data and processing workloads:
Data should be equally distributed across topic partitions. For example, if two topic partitions each have 1 million messages, this is better than a single partition with 2 million messages and none in the other.
Processing workload should be equally distributed across topic partitions. For example, if the time to process messages varies widely, then it is better to spread the processing-intensive messages across partitions rather than storing these messages within the same partition.Properties streamsSettings = new Properties();
// same value for consumer, producer, and admin client
streamsSettings.put("PARAMETER_NAME", "value");
// different values for consumer and producer
streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value");
streamsSettings.put("producer.PARAMETER_NAME", "producer-value");
streamsSettings.put("admin.PARAMETER_NAME", "admin-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value");
streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");
streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value");Properties streamsSettings = new Properties();
// same config value for all consumer types
streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
// set a different restore consumer config. This would make restore consumer take restore-consumer-value,
// while main consumer and global consumer stay with general-consumer-value
streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");Properties streamsSettings = new Properties();
// Override default for both changelog and repartition topics
streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");Producer Client ID Naming Schema:
- at-least-once (default):
`[client.Id]-StreamThread-[sequence-number]`
- exactly-once (EOS version 1):
`[client.Id]-StreamThread-[sequence-number]-[taskId]`
- exactly-once-beta (EOS version 2):
`[client.Id]-StreamThread-[sequence-number]`
Where `[client.Id]` is either set via Streams configuration parameter `client.id` or defaults to `[application.id]-[processId]` (`[processId]` is a random UUID).Parameter Name: isolation.level
Corresponding Client: Consumer
Streams Default: READ_COMMITTED
Parameter Name: enable.idempotence
Corresponding Client: Producer
Streams Default: trueParameter Name: transaction.timeout.ms
Corresponding Client: Producer
Streams Default: 10000
Parameter Name: delivery.timeout.ms
Corresponding Client: Producer
Streams Default: Integer.MAX_VALUETopologies: Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input]) --> KSTREAM-FILTER-0000000001
Processor: KSTREAM-FILTER-0000000001 (stores: []) --> KSTREAM-MAPVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000002 (stores: []) --> KSTREAM-SINK-0000000003
<-- KSTREAM-FILTER-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: output)
<-- KSTREAM-MAPVALUES-0000000002Kafka Streams Topology Naming:
- Aggregation repartition topics: Grouped
- KStream-KTable Join repartition topic: Joined
- KStream-KStream Join repartition topics: StreamJoined
- KStream-KTable Join state stores: Joined
- KStream-KStream Join state stores: StreamJoined
- State Stores (for aggregations and KTable-KTable joins): Materialized
- Stream/Table non-stateful operations: NamedOperation Naming Class
------------------------------------------------------------------
Aggregation repartition topics Grouped
KStream-KStream Join repartition topics StreamJoined
KStream-KTable Join repartition topic Joined
KStream-KStream Join state stores StreamJoined
State Stores (for aggregations and KTable-KTable joins) Materialized
Stream/Table non-stateful operations NamedProperties props = new Properties();
props.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);"topology.optimization":"all""topology.optimization":"none"$ mvn clean package
$ mvn exec:java -Dexec.mainClass=myapps.WordCount
Sub-topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: \[\]) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-KEY-SELECT-0000000002(stores: \[\]) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
Processor: KSTREAM-FILTER-0000000005(stores: \[\]) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
Sink: KSTREAM-SINK-0000000004(topic: counts-store-repartition) <-- KSTREAM-FILTER-0000000005
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000006(topics: counts-store-repartition) --> KSTREAM-AGGREGATE-0000000003
Processor: KSTREAM-AGGREGATE-0000000003(stores: \[counts-store\]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
Processor: KTABLE-TOSTREAM-0000000007(stores: \[\]) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
Global Stores: noneData Contracts with Schema Registry:
- Purpose: Ensure events written to Kafka can be read properly and prevent malformed events.
- Implementation: Deploy a schema registry alongside the Kafka cluster.
- Functionality: Manages event schemas and maps them to topics, guiding producers on correct event formats.
- Note: Kafka does not include a schema registry; third-party implementations are available.import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.*;
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
String userSchema = "{"
+ "\"type\":\"record\","
+ "\"name\":\"User\","
+ "\"fields\":["
+ " {\"name\":\"name\",\"type\":\"string\"},"
+ " {\"name\":\"age\",\"type\":\"int\"}"
+ "]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord user = new GenericData.Record(schema);
user.put("name", "John Doe");
user.put("age", 30);
ProducerRecord<String, GenericRecord> record =
new ProducerRecord<>("users", "user1", user);
producer.send(record);Kafka Connect Sink Connector Input Topics
Configuration options for sink connectors to specify input topics using a comma-separated list or a regular expression.
topics
topics.regex{
"name": "jdbc-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "postgres",
"connection.password": "password",
"table.whitelist": "users,orders",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "postgres-"
}
}{
"name": "elasticsearch-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "user-events,order-events",
"connection.url": "http://localhost:9200",
"type.name": "_doc",
"key.ignore": "false"
}
}Kafka Streams Topic Management:
User Topics:
- Input Topics: Specified via source processors (e.g., StreamsBuilder#stream(), StreamsBuilder#table(), Topology#addSource()).
- Output Topics: Specified via sink processors (e.g., KStream#to(), KTable.to(), Topology#addSink()).
- Management: Must be created and managed manually ahead of time (e.g., via topic tools).
- Sharing: If shared, users must coordinate topic management.
- Auto-creation: Discouraged due to potential cluster configuration and default topic settings (e.g., replication factor).
Internal Topics:
- Purpose: Used internally by the application for state stores (e.g., changelog topics).
- Creation: Created by the application itself.
- Usage: Only used by the specific stream application.
- Permissions: Requires underlying clients to have admin permissions on Kafka brokers if security is enabled.
- Naming Convention: Typically follows '<application.id>-<operatorName>-<suffix>', but not guaranteed for future releases.DESCRIBE_PRODUCERS:
- Action: Read
- Resource: Topic
DESCRIBE_TOPIC_PARTITIONS:
- Action: Describe
- Resource: Topic$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
>all streams lead to kafka
>hello kafka streams
>join kafka summitMetric Name: outgoing-byte-rate
Description: The average number of outgoing bytes sent per second for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
Metric Name: outgoing-byte-total
Description: The total number of outgoing bytes sent for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
Metric Name: request-rate
Description: The average number of requests sent per second for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
Metric Name: request-total
Description: The total number of requests sent for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
Metric Name: request-size-avg
Description: The average size of all requests in the window for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
Metric Name: request-size-max
Description: The maximum size of any request sent in the window for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
Metric Name: incoming-byte-rate
Description: The average number of incoming bytes received per second for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)# Broker ID
broker.id=1
# Listeners
listeners=PLAINTEXT://broker1:9092,SSL://broker1:9093
# Log directories (use multiple disks)
log.dirs=/data/kafka-logs-1,/data/kafka-logs-2
# Replication
default.replication.factor=3
min.insync.replicas=2
# Leader election
unclean.leader.election.enable=false
auto.leader.rebalance.enable=true
# Log retention
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000API: DescribeTopicPartitions
Purpose: Fetches detailed information about topic partitions, including Eligible Leader Replicas (ELR).
Usage:
- Via Admin Client: The admin client can fetch ELR info by describing topics.
- Direct API Call: Use the DescribeTopicPartitions API endpoint.
ELR Selection Logic:
- If ELR is not empty, select a replica that is not fenced.
- Select the last known leader if it is unfenced, mimicking pre-4.0 behavior when all replicas are offline.
Dependencies/Side Effects:
- Updating `min.insync.replicas` for a topic will clean the ELR field for that topic.
- Updating the cluster default `min.insync.replicas` will clean ELR fields for all topics.
Return Values:
- ELR status and related replica information for partitions.# SSL configuration
listeners=SSL://broker:9093
security.inter.broker.protocol=SSL
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=password
ssl.client.auth=required# SASL/PLAIN configuration
listeners=SASL_SSL://broker:9093
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# JAAS configuration
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret" \
user_admin="admin-secret" \
user_alice="alice-secret";# Network threads
num.network.threads=8
# I/O threads
num.io.threads=16
# Socket buffer sizes
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
# Replication
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
# Log flush (rely on OS page cache)
log.flush.interval.messages=9223372036854775807
log.flush.interval.ms=nullacks=1
linger.ms=100
batch.size=65536
compression.type=lz4
buffer.memory=67108864
max.in.flight.requests.per.connection=5fetch.min.bytes=1
fetch.max.wait.ms=500
max.partition.fetch.bytes=1048576
max.poll.records=500
session.timeout.ms=30000
heartbeat.interval.ms=3000// Order events
OrderCreated -> OrderPaid -> OrderShipped -> OrderDelivered
// Event store as Kafka topic
Topic: order-events
Compaction: None (keep full history)
Retention: Infinite or very long// Write side: Commands produce events
commands -> producers -> events-topic
// Read side: Consumers build projections
events-topic -> streams -> materialized-view (KTable)// Order saga
order-requested -> payment-requested -> payment-completed ->
inventory-reserved -> order-confirmed
// Compensating transactions on failure
payment-failed -> order-cancelled// Database transaction writes to outbox table
BEGIN TRANSACTION;
INSERT INTO orders VALUES (...);
INSERT INTO outbox VALUES (event_data);
COMMIT;
// CDC connector reads outbox and publishes to Kafka
Debezium -> outbox-topic -> downstream consumers// Single topic, multiple consumer groups
user-events topic
-> email-service (consumer group: email)
-> analytics-service (consumer group: analytics)
-> notification-service (consumer group: notifications)try {
processRecord(record);
} catch (RetriableException e) {
// Retry
retry(record);
} catch (NonRetriableException e) {
// Send to DLQ
sendToDLQ(record, e);
}KStream<String, PageView> views = ...;
// Tumbling window: non-overlapping fixed windows
views.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
// Hopping window: overlapping windows
views.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1)))
.count();
// Session window: activity-based windows
views.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
.count();kstream.repartition(...);
// or for user-managed topics:
kstream.to("user-topic");
streamsBuilder.stream("user-topic");Properties props = new Properties();
props.put(StreamsConfig.topicPrefix("my-prefix.") + "replication.factor", "3");
KafkaStreams streams = new KafkaStreams(topology, props);