kafka-stream-processing

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Kafka Stream Processing

Kafka流处理

A comprehensive skill for building event-driven applications with Apache Kafka. Master producers, consumers, Kafka Streams, connectors, schema registry, and production deployment patterns for real-time data processing at scale.
这是一份使用Apache Kafka构建事件驱动应用的全面指南,将帮助你掌握生产者、消费者、Kafka Streams、连接器、Schema Registry,以及大规模实时数据处理的生产环境部署模式。

When to Use This Skill

适用场景

Use this skill when:
  • Building event-driven microservices architectures
  • Processing real-time data streams and event logs
  • Implementing publish-subscribe messaging systems
  • Creating data pipelines for analytics and ETL
  • Building streaming data applications with stateful processing
  • Integrating heterogeneous systems with Kafka Connect
  • Implementing change data capture (CDC) patterns
  • Building real-time dashboards and monitoring systems
  • Processing IoT sensor data at scale
  • Implementing event sourcing and CQRS patterns
  • Building distributed systems requiring guaranteed message delivery
  • Creating real-time recommendation engines
  • Processing financial transactions with exactly-once semantics
  • Building log aggregation and monitoring pipelines
在以下场景中使用本指南:
  • 构建事件驱动的微服务架构
  • 处理实时数据流与事件日志
  • 实现发布-订阅消息系统
  • 搭建分析与ETL数据管道
  • 构建带状态处理的流数据应用
  • 使用Kafka Connect集成异构系统
  • 实现变更数据捕获(CDC)模式
  • 构建实时仪表盘与监控系统
  • 大规模处理IoT传感器数据
  • 实现事件溯源与CQRS模式
  • 构建需要消息可靠投递的分布式系统
  • 构建实时推荐引擎
  • 使用精确一次语义处理金融交易
  • 构建日志聚合与监控管道

Core Concepts

核心概念

Apache Kafka Architecture

Apache Kafka架构

Kafka is a distributed streaming platform designed for high-throughput, fault-tolerant, real-time data processing.
Key Components:
  1. Topics: Named categories for organizing messages
  2. Partitions: Ordered, immutable sequences of records within topics
  3. Brokers: Kafka servers that store and serve data
  4. Producers: Applications that publish records to topics
  5. Consumers: Applications that read records from topics
  6. Consumer Groups: Coordinated consumers sharing workload
  7. ZooKeeper/KRaft: Cluster coordination (ZooKeeper legacy, KRaft modern)
Design Principles:
APIDOC
Kafka Design Philosophy:
  - High Throughput: Millions of messages per second
  - Low Latency: Single-digit millisecond latency
  - Durability: Replicated, persistent storage
  - Scalability: Horizontal scaling via partitions
  - Fault Tolerance: Automatic failover and recovery
  - Message Delivery Semantics: At-least-once, exactly-once support
Kafka是一个分布式流处理平台,专为高吞吐量、容错性强的实时数据处理设计。
核心组件:
  1. Topics(主题):用于组织消息的命名分类
  2. Partitions(分区):主题内有序、不可变的记录序列
  3. Brokers(代理节点):存储并提供数据服务的Kafka服务器
  4. Producers(生产者):向主题发布记录的应用
  5. Consumers(消费者):从主题读取记录的应用
  6. Consumer Groups(消费者组):协同工作、分担负载的消费者集合
  7. ZooKeeper/KRaft:集群协调组件(ZooKeeper为传统方案,KRaft为现代方案)
设计原则:
APIDOC
Kafka Design Philosophy:
  - High Throughput: Millions of messages per second
  - Low Latency: Single-digit millisecond latency
  - Durability: Replicated, persistent storage
  - Scalability: Horizontal scaling via partitions
  - Fault Tolerance: Automatic failover and recovery
  - Message Delivery Semantics: At-least-once, exactly-once support

Topics and Partitions

主题与分区

Topics are logical channels for data streams. Each topic is divided into partitions for parallelism and scalability.
bash
undefined
主题是数据流的逻辑通道,每个主题会被划分为多个分区以实现并行处理与可扩展性。
bash
undefined

Create a topic with 20 partitions and replication factor 3

Create a topic with 20 partitions and replication factor 3

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_topic_name
--partitions 20 --replication-factor 3 --config x=y

**Partition Characteristics:**

- **Ordered**: Messages within a partition are strictly ordered
- **Immutable**: Records cannot be modified after written
- **Append-only**: New records appended to partition end
- **Retention**: Configurable retention by time or size
- **Replication**: Each partition replicated across brokers

**Adding Partitions:**

```bash
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my_topic_name
--partitions 20 --replication-factor 3 --config x=y

**分区特性:**

- **有序性**:同一分区内的消息严格按顺序排列
- **不可变性**:记录写入后无法修改
- **追加写入**:新记录追加到分区末尾
- **保留策略**:可按时间或大小配置保留期限
- **复制机制**:每个分区会在多个代理节点间复制

**添加分区:**

```bash

Increase partition count (cannot decrease)

Increase partition count (cannot decrease)

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my_topic_name
--partitions 40

**Note**: Adding partitions doesn't redistribute existing data and may affect consumers using custom partitioning.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my_topic_name
--partitions 40

**注意**:添加分区不会重新分配现有数据,可能会影响使用自定义分区策略的消费者。

Stream Partitions and Tasks

流分区与任务

html
<h3>Stream Partitions and Tasks</h3>
<p> The messaging layer of Kafka partitions data for storing and transporting it. Kafka Streams partitions data for processing it. In both cases, this partitioning is what enables data locality, elasticity, scalability, high performance, and fault tolerance. Kafka Streams uses the concepts of <b>partitions</b> and <b>tasks</b> as logical units of its parallelism model based on Kafka topic partitions. There are close links between Kafka Streams and Kafka in the context of parallelism: </p>
<ul>
<li>Each <b>stream partition</b> is a totally ordered sequence of data records and maps to a Kafka <b>topic partition</b>.</li>
<li>A <b>data record</b> in the stream maps to a Kafka <b>message</b> from that topic.</li>
<li>The <b>keys</b> of data records determine the partitioning of data in both Kafka and Kafka Streams, i.e., how data is routed to specific partitions within topics.</li>
</ul>
<p> An application's processor topology is scaled by breaking it into multiple tasks. More specifically, Kafka Streams creates a fixed number of tasks based on the input stream partitions for the application, with each task assigned a list of partitions from the input streams (i.e., Kafka topics). The assignment of partitions to tasks never changes so that each task is a fixed unit of parallelism of the application. Tasks can then instantiate their own processor topology based on the assigned partitions; they also maintain a buffer for each of its assigned partitions and process messages one-at-a-time from these record buffers. As a result stream tasks can be processed independently and in parallel without manual intervention. </p>
<p> Slightly simplified, the maximum parallelism at which your application may run is bounded by the maximum number of stream tasks, which itself is determined by maximum number of partitions of the input topic(s) the application is reading from. For example, if your input topic has 5 partitions, then you can run up to 5 applications instances. These instances will collaboratively process the topic's data. If you run a larger number of app instances than partitions of the input topic, the "excess" app instances will launch but remain idle; however, if one of the busy instances goes down, one of the idle instances will resume the former's work. </p>
<p> It is important to understand that Kafka Streams is not a resource manager, but a library that "runs" anywhere its stream processing application runs. Multiple instances of the application are executed either on the same machine, or spread across multiple machines and tasks can be distributed automatically by the library to those running application instances. The assignment of partitions to tasks never changes; if an application instance fails, all its assigned tasks will be automatically restarted on other instances and continue to consume from the same stream partitions. </p>
html
<h3>Stream Partitions and Tasks</h3>
<p> Kafka的消息层对数据进行分区以实现存储与传输,而Kafka Streams对数据进行分区以实现处理。在这两种场景下,分区机制都能实现数据本地化、弹性伸缩、可扩展性、高性能与容错性。Kafka Streams基于Kafka主题分区,将分区与任务作为其并行模型的逻辑单元。在并行处理方面,Kafka Streams与Kafka之间存在紧密联系: </p>
<ul>
<li>每个<b>流分区</b>是一个完全有序的数据记录序列,对应一个Kafka<b>主题分区</b></li>
<li>流中的一条<b>数据记录</b>对应该主题中的一条Kafka<b>消息</b></li>
<li>数据记录的<b></b>决定了Kafka与Kafka Streams中的数据分区方式,即数据如何路由到主题内的特定分区。</li>
</ul>
<p> 应用的处理器拓扑会被拆分为多个任务以实现扩展。具体来说,Kafka Streams会根据应用的输入流分区创建固定数量的任务,每个任务会被分配输入流的一个分区列表(即Kafka主题分区)。分区到任务的分配不会改变,因此每个任务是应用并行处理的固定单元。任务会根据分配的分区实例化自己的处理器拓扑,同时为每个分配的分区维护一个缓冲区,并从这些记录缓冲区中逐个处理消息。因此,流任务可以独立、并行地处理,无需人工干预。 </p>
<p> 简单来说,应用可运行的最大并行度受限于流任务的最大数量,而流任务的数量又由应用读取的输入主题的最大分区数决定。例如,如果输入主题有5个分区,那么你最多可以运行5个应用实例,这些实例会协同处理主题的数据。如果运行的应用实例数量超过输入主题的分区数,"多余"的实例会启动但处于空闲状态;但如果某个繁忙的实例故障,其中一个空闲实例会接管前者的工作。 </p>
<p> 重要的是要理解,Kafka Streams不是资源管理器,而是一个可以在流处理应用运行的任何环境中"运行"的库。应用的多个实例可以在同一台机器上执行,也可以分布在多台机器上,库会自动将任务分配给这些运行的应用实例。分区到任务的分配不会改变;如果某个应用实例故障,其分配的所有任务会自动在其他实例上重启,并继续从相同的流分区消费数据。 </p>

Message Delivery Semantics

消息投递语义

APIDOC
Design:
  - The Producer: Design considerations.
  - The Consumer: Design considerations.
  - Message Delivery Semantics: At-least-once, at-most-once, exactly-once.
  - Using Transactions for atomic operations.
At-Least-Once Delivery:
  • Producer retries until acknowledgment received
  • Consumer commits offset after processing
  • Risk: Duplicate processing on failures
  • Use case: When duplicates are acceptable or idempotent processing
At-Most-Once Delivery:
  • Consumer commits offset before processing
  • No producer retries
  • Risk: Message loss on failures
  • Use case: When data loss acceptable (e.g., metrics)
Exactly-Once Semantics (EOS):
  • Transactional writes with idempotent producers
  • Consumer reads committed messages only
  • Use case: Financial transactions, critical data processing
APIDOC
Design:
  - The Producer: Design considerations.
  - The Consumer: Design considerations.
  - Message Delivery Semantics: At-least-once, at-most-once, exactly-once.
  - Using Transactions for atomic operations.
至少一次投递:
  • 生产者会重试直到收到确认
  • 消费者在处理完成后提交偏移量
  • 风险:故障时可能出现重复处理
  • 适用场景:允许重复或处理操作具有幂等性的场景
最多一次投递:
  • 消费者在处理前提交偏移量
  • 生产者不进行重试
  • 风险:故障时可能丢失消息
  • 适用场景:允许数据丢失的场景(如指标统计)
精确一次语义(EOS):
  • 使用事务性写入与幂等生产者
  • 消费者仅读取已提交的消息
  • 适用场景:金融交易、关键数据处理

Producer Load Balancing

生产者负载均衡

APIDOC
ProducerClient:
  publish(topic: str, message: bytes, partition_key: Optional[str] = None)
    topic: The topic to publish the message to.
    message: The message payload to send.
    partition_key: Optional key to determine the partition. If None, random partitioning is used.

  get_metadata(topic: str) -> dict
    topic: The topic to get metadata for.
    Returns: A dictionary containing broker information and partition leader details.
The producer directs data to the partition leader broker without a routing tier. Kafka nodes provide metadata to producers for directing requests to the correct partition leaders. Producers can implement custom partitioning logic or use random distribution.
APIDOC
ProducerClient:
  publish(topic: str, message: bytes, partition_key: Optional[str] = None)
    topic: The topic to publish the message to.
    message: The message payload to send.
    partition_key: Optional key to determine the partition. If None, random partitioning is used.

  get_metadata(topic: str) -> dict
    topic: The topic to get metadata for.
    Returns: A dictionary containing broker information and partition leader details.
生产者会直接将数据发送到分区的领导者代理节点,无需路由层。Kafka节点会向生产者提供元数据,指导请求发送到正确的分区领导者。生产者可以实现自定义分区逻辑,或使用随机分布策略。

Producers

生产者

Kafka producers publish records to topics with configurable reliability and performance characteristics.
Kafka生产者向主题发布记录,支持可配置的可靠性与性能特性。

Producer API

生产者API

APIDOC
Producer API:
  - send(record): Sends a record to a Kafka topic.
    - Parameters:
      - record: The record to send, including topic, key, and value.
    - Returns: A Future representing the result of the send operation.
  - flush(): Forces any buffered records to be sent.
  - close(): Closes the producer, releasing any resources.
  - metrics(): Returns metrics about the producer.

  Configuration:
  - bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
  - key.serializer: The serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.
  - value.serializer: The serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
  - acks: The number of acknowledgments the producer requires the leader to have received before considering a request complete.
  - linger.ms: The producer groups together any records that arrive in between request transmissions into a single batched request.
  - batch.size: The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.
APIDOC
Producer API:
  - send(record): Sends a record to a Kafka topic.
    - Parameters:
      - record: The record to send, including topic, key, and value.
    - Returns: A Future representing the result of the send operation.
  - flush(): Forces any buffered records to be sent.
  - close(): Closes the producer, releasing any resources.
  - metrics(): Returns metrics about the producer.

  Configuration:
  - bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
  - key.serializer: The serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.
  - value.serializer: The serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface.
  - acks: The number of acknowledgments the producer requires the leader to have received before considering a request complete.
  - linger.ms: The producer groups together any records that arrive in between request transmissions into a single batched request.
  - batch.size: The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.

Producer Configuration

生产者配置

Essential Settings:
  1. bootstrap.servers: Kafka cluster connection string
    • Format:
      host1:9092,host2:9092,host3:9092
    • Use multiple brokers for fault tolerance
  2. key.serializer / value.serializer: Data serialization
    • org.apache.kafka.common.serialization.StringSerializer
    • org.apache.kafka.common.serialization.ByteArraySerializer
    • Custom serializers for complex types
  3. acks: Acknowledgment level
    • 0
      : No acknowledgment (fire and forget)
    • 1
      : Leader acknowledgment only
    • all
      /
      -1
      : All in-sync replicas acknowledge (strongest durability)
  4. retries: Retry count for failed sends
    • Default: 2147483647 (Integer.MAX_VALUE)
    • Set to 0 to disable retries
  5. enable.idempotence: Exactly-once semantics
    • true
      : Enables idempotent producer (prevents duplicates)
    • false
      : Default behavior
Performance Tuning:
  1. linger.ms: Batching delay
    • Default: 0 (send immediately)
    • Higher values (5-100ms) increase throughput via batching
    • Trade-off: Latency vs throughput
  2. batch.size: Batch size in bytes
    • Default: 16384 (16KB)
    • Larger batches improve throughput
    • Maximum single batch size per partition
  3. compression.type: Message compression
    • Options:
      none
      ,
      gzip
      ,
      snappy
      ,
      lz4
      ,
      zstd
    • Reduces network bandwidth and storage
    • CPU overhead for compression/decompression
  4. buffer.memory: Total producer buffer memory
    • Default: 33554432 (32MB)
    • Memory for buffering unsent records
    • Producer blocks when buffer full
核心配置项:
  1. bootstrap.servers:Kafka集群连接字符串
    • 格式:
      host1:9092,host2:9092,host3:9092
    • 使用多个代理节点以实现容错
  2. key.serializer / value.serializer:数据序列化器
    • org.apache.kafka.common.serialization.StringSerializer
    • org.apache.kafka.common.serialization.ByteArraySerializer
    • 复杂类型可使用自定义序列化器
  3. acks:确认级别
    • 0
      :无需确认(发后即忘)
    • 1
      :仅需领导者节点确认
    • all
      /
      -1
      :所有同步副本确认(最强的持久性)
  4. retries:发送失败后的重试次数
    • 默认值:2147483647(Integer.MAX_VALUE)
    • 设置为0可禁用重试
  5. enable.idempotence:精确一次语义
    • true
      :启用幂等生产者(防止重复)
    • false
      :默认行为
性能调优:
  1. linger.ms:批处理延迟
    • 默认值:0(立即发送)
    • 较高的值(5-100ms)通过批处理提高吞吐量
    • 权衡:延迟与吞吐量
  2. batch.size:批处理大小(字节)
    • 默认值:16384(16KB)
    • 更大的批处理可提高吞吐量
    • 每个分区的最大单批大小
  3. compression.type:消息压缩
    • 选项:
      none
      ,
      gzip
      ,
      snappy
      ,
      lz4
      ,
      zstd
    • 减少网络带宽与存储占用
    • 压缩/解压缩会带来CPU开销
  4. buffer.memory:生产者总缓冲区内存
    • 默认值:33554432(32MB)
    • 用于缓冲未发送记录的内存
    • 缓冲区满时生产者会阻塞

Producer Example (Java)

生产者示例(Java)

java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            ProducerRecord<String, String> record =
                new ProducerRecord<>("my-topic", "key1", "Hello Kafka!");

            // Async send with callback
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    System.err.println("Error producing: " + exception);
                } else {
                    System.out.printf("Sent to partition %d, offset %d%n",
                        metadata.partition(), metadata.offset());
                }
            });
        }
    }
}
java
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);

        try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
            ProducerRecord<String, String> record =
                new ProducerRecord<>("my-topic", "key1", "Hello Kafka!");

            // Async send with callback
            producer.send(record, (metadata, exception) -> {
                if (exception != null) {
                    System.err.println("Error producing: " + exception);
                } else {
                    System.out.printf("Sent to partition %d, offset %d%n",
                        metadata.partition(), metadata.offset());
                }
            });
        }
    }
}

Producer Best Practices

生产者最佳实践

  1. Use Batching: Configure linger.ms and batch.size for throughput
  2. Enable Idempotence: Set enable.idempotence=true for reliability
  3. Handle Errors: Implement proper callback error handling
  4. Use Compression: Enable compression for large messages
  5. Partition Keys: Use meaningful keys for ordered processing
  6. Close Producers: Always close producers in finally blocks
  7. Monitor Metrics: Track producer metrics (record-send-rate, compression-rate)
  8. Resource Pools: Reuse producer instances when possible
  1. 使用批处理:配置linger.ms与batch.size以提高吞吐量
  2. 启用幂等性:设置enable.idempotence=true以保证可靠性
  3. 处理错误:实现完善的回调错误处理逻辑
  4. 启用压缩:对大消息启用压缩
  5. 合理设置分区键:使用有意义的键以保证有序处理
  6. 关闭生产者:始终在finally块中关闭生产者
  7. 监控指标:跟踪生产者指标(record-send-rate、compression-rate)
  8. 复用生产者实例:尽可能复用生产者实例

Consumers

消费者

Kafka consumers read records from topics, supporting both individual and group-based consumption.
Kafka消费者从主题读取记录,支持独立消费与基于组的消费模式。

Consumer Groups

消费者组

Consumer groups enable parallel processing with automatic load balancing and fault tolerance.
Key Concepts:
  • Group ID: Unique identifier for consumer group
  • Partition Assignment: Each partition consumed by one consumer in group
  • Rebalancing: Automatic reassignment when consumers join/leave
  • Offset Management: Group tracks committed offsets per partition
Consumer Group Monitoring:
bash
undefined
消费者组支持并行处理,具备自动负载均衡与容错能力。
核心概念:
  • 组ID:消费者组的唯一标识
  • 分区分配:每个分区由组内的一个消费者消费
  • 重平衡:消费者加入/离开时自动重新分配分区
  • 偏移量管理:组会跟踪每个分区的已提交偏移量
消费者组监控:
bash
undefined

List consumer groups

List consumer groups

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

Describe consumer group members with partition assignments

Describe consumer group members with partition assignments

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0) consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2) consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1) consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -
undefined
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0) consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2) consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1) consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -
undefined

Consumer Configuration

消费者配置

Essential Settings:
  1. bootstrap.servers: Kafka cluster connection
  2. group.id: Consumer group identifier
  3. key.deserializer / value.deserializer: Data deserialization
  4. enable.auto.commit: Automatic offset commits
    • true
      : Auto-commit offsets periodically
    • false
      : Manual offset management
  5. auto.offset.reset: Behavior when no offset found
    • earliest
      : Start from beginning
    • latest
      : Start from end
    • none
      : Throw exception
Consumer-Specific Kafka Streams Defaults:
APIDOC
Parameter Name: max.poll.records
Corresponding Client: Consumer
Streams Default: 100

Parameter Name: client.id
Corresponding Client: -
Streams Default: <application.id>-<random-UUID>

Parameter Name: enable.auto.commit
Description: Controls whether the consumer automatically commits offsets. When true, the consumer will automatically commit offsets periodically based on the poll interval.
Default Value: true
核心配置项:
  1. bootstrap.servers:Kafka集群连接字符串
  2. group.id:消费者组标识
  3. key.deserializer / value.deserializer:数据反序列化器
  4. enable.auto.commit:自动提交偏移量
    • true
      :定期自动提交偏移量
    • false
      :手动管理偏移量
  5. auto.offset.reset:未找到偏移量时的行为
    • earliest
      :从起始位置开始消费
    • latest
      :从最新位置开始消费
    • none
      :抛出异常
Kafka Streams专属消费者默认配置:
APIDOC
Parameter Name: max.poll.records
Corresponding Client: Consumer
Streams Default: 100

Parameter Name: client.id
Corresponding Client: -
Streams Default: <application.id>-<random-UUID>

Parameter Name: enable.auto.commit
Description: Controls whether the consumer automatically commits offsets. When true, the consumer will automatically commit offsets periodically based on the poll interval.
Default Value: true

Consumer Example (Java)

消费者示例(Java)

java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("my-topic"));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",
                        record.partition(), record.offset(), record.key(), record.value());

                    // Process record
                    processRecord(record);
                }

                // Manual commit after processing batch
                consumer.commitSync();
            }
        }
    }

    private static void processRecord(ConsumerRecord<String, String> record) {
        // Business logic here
    }
}
java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("my-topic"));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",
                        record.partition(), record.offset(), record.key(), record.value());

                    // Process record
                    processRecord(record);
                }

                // Manual commit after processing batch
                consumer.commitSync();
            }
        }
    }

    private static void processRecord(ConsumerRecord<String, String> record) {
        // Business logic here
    }
}

Consumer Offset Management

消费者偏移量管理

Offset Commit Strategies:
  1. Auto-commit (default):
    • Simple but risky for at-least-once delivery
    • May commit before processing completes
  2. Manual Synchronous Commit:
    • Blocks until commit succeeds
    • Guarantees offset committed before continuing
    • Lower throughput
  3. Manual Asynchronous Commit:
    • Non-blocking commit
    • Higher throughput
    • Handle failures in callback
  4. Hybrid Approach:
    • Async commits during processing
    • Sync commit before rebalance/shutdown
java
// Async commit with callback
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        System.err.println("Commit failed: " + exception);
    }
});

// Sync commit for reliability
try {
    consumer.commitSync();
} catch (CommitFailedException e) {
    System.err.println("Commit failed: " + e);
}
偏移量提交策略:
  1. 自动提交(默认):
    • 配置简单,但对于至少一次投递语义存在风险
    • 可能在处理完成前提交偏移量
  2. 手动同步提交:
    • 阻塞直到提交成功
    • 保证偏移量提交后再继续处理
    • 吞吐量较低
  3. 手动异步提交:
    • 非阻塞提交
    • 吞吐量较高
    • 在回调中处理提交失败
  4. 混合方式:
    • 处理过程中使用异步提交
    • 重平衡/关闭前使用同步提交
java
// Async commit with callback
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        System.err.println("Commit failed: " + exception);
    }
});

// Sync commit for reliability
try {
    consumer.commitSync();
} catch (CommitFailedException e) {
    System.err.println("Commit failed: " + e);
}

Consumer Best Practices

消费者最佳实践

  1. Choose Right Auto-commit: Disable for at-least-once semantics
  2. Handle Rebalancing: Implement ConsumerRebalanceListener
  3. Process Efficiently: Minimize poll() call duration
  4. Graceful Shutdown: Close consumers properly
  5. Monitor Lag: Track consumer lag metrics
  6. Partition Assignment: Understand assignment strategies
  7. Thread Safety: Kafka consumers are NOT thread-safe
  8. Error Handling: Retry logic for transient failures
  1. 选择合适的自动提交策略:为实现至少一次语义,禁用自动提交
  2. 处理重平衡:实现ConsumerRebalanceListener
  3. 高效处理记录:最小化poll()调用的持续时间
  4. 优雅关闭:正确关闭消费者
  5. 监控延迟:跟踪消费者延迟指标
  6. 理解分区分配策略:熟悉分区分配的逻辑
  7. 线程安全:Kafka消费者不是线程安全的
  8. 错误处理:为瞬时故障实现重试逻辑

Kafka Streams

Kafka Streams

Kafka Streams is a client library for building real-time streaming applications with stateful processing.
Kafka Streams是一个用于构建实时流处理应用的客户端库,支持带状态的处理。

Kafka Streams Architecture

Kafka Streams架构

Processor Topology:
markdown
There are two special processors in the topology:
<ul>
<li><b>Source Processor</b>: A special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.</li>
<li><b>Sink Processor</b>: A special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.</li>
</ul>
Note that in normal processor nodes other remote systems can also be accessed while processing the current record. Therefore the processed results can either be streamed back into Kafka or written to an external system.
Sub-topologies:
Applications are decomposed into sub-topologies connected by repartition topics. Each sub-topology can scale independently.
处理器拓扑:
markdown
There are two special processors in the topology:
<ul>
<li><b>Source Processor</b>: A special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forwarding them to its down-stream processors.</li>
<li><b>Sink Processor</b>: A special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.</li>
</ul>
Note that in normal processor nodes other remote systems can also be accessed while processing the current record. Therefore the processed results can either be streamed back into Kafka or written to an external system.
子拓扑:
应用会被分解为多个子拓扑,通过重分区主题连接。每个子拓扑可独立扩展。

KStream vs KTable vs GlobalKTable

KStream vs KTable vs GlobalKTable

KStream: Immutable stream of records
java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> wordCounts = builder.stream(
    "word-counts-input-topic", /* input topic */
    Consumed.with(
        Serdes.String(), /* key serde */
        Serdes.Long() /* value serde */
    )
);
KTable: Changelog stream (latest value per key)
java
import org.apache.kafka.streams.StreamsBuilder;

StreamsBuilder builder = new StreamsBuilder();
builder.table("input-topic");
GlobalKTable: Fully replicated table available to all instances
kafka
KTable: Each application instance gets data from only 1 partition.
GlobalKTable: Each application instance gets data from all partitions.
KStream:不可变的记录流
java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Long> wordCounts = builder.stream(
    "word-counts-input-topic", /* input topic */
    Consumed.with(
        Serdes.String(), /* key serde */
        Serdes.Long() /* value serde */
    )
);
KTable:变更日志流(每个键对应最新值)
java
import org.apache.kafka.streams.StreamsBuilder;

StreamsBuilder builder = new StreamsBuilder();
builder.table("input-topic");
GlobalKTable:全量复制的表,所有应用实例均可访问
kafka
KTable: Each application instance gets data from only 1 partition.
GlobalKTable: Each application instance gets data from all partitions.

Writing Streams to Kafka

将流写入Kafka

java
KStream<String, Long> stream = ...;
// Write the stream to the output topic, using the configured default key
// and value serdes.
stream.to("my-stream-output-topic");

// Write the stream to the output topic, using explicit key and value serdes,
// (thus overriding the defaults in the config properties).
stream.to("my-stream-output-topic", Produced.with(Serdes.String(), Serdes.Long()));
Any streams and tables may be (continuously) written back to a Kafka topic. The output data might be re-partitioned depending on the situation.
java
KStream<String, Long> stream = ...;
// Write the stream to the output topic, using the configured default key
// and value serdes.
stream.to("my-stream-output-topic");

// Write the stream to the output topic, using explicit key and value serdes,
// (thus overriding the defaults in the config properties).
stream.to("my-stream-output-topic", Produced.with(Serdes.String(), Serdes.Long()));
任何流与表都可以(持续)写回Kafka主题,输出数据可能会根据情况重新分区。

Repartitioning

重分区

Manual repartitioning with specified partition count:
java
KStream<byte[], String> stream = ... ;
KStream<byte[], String> repartitionedStream = stream.repartition(Repartitioned.numberOfPartitions(10));
Kafka Streams manages the generated topic as an internal topic, ensuring data purging and allowing for scaling downstream sub-topologies. This operation is useful when key-changing operations are performed beforehand and auto-repartitioning is not triggered.
手动指定分区数进行重分区:
java
KStream<byte[], String> stream = ... ;
KStream<byte[], String> repartitionedStream = stream.repartition(Repartitioned.numberOfPartitions(10));
Kafka Streams会将生成的主题作为内部主题管理,确保数据清理,并允许下游子拓扑扩展。当执行键变更操作且未触发自动重分区时,此操作非常有用。

Joins and Co-partitioning

连接与共分区

APIDOC
Join Co-partitioning Requirements:

For equi-joins in Kafka Streams, input data must be co-partitioned. This ensures that records with the same key from both sides of the join are delivered to the same stream task.

Requirements for data co-partitioning:
1. Input topics (left and right sides) must have the same number of partitions.
2. All applications writing to the input topics must use the same partitioning strategy to ensure records with the same key are delivered to the same partition number.
   - This applies to producer settings like `partitioner.class` (e.g., `ProducerConfig.PARTITIONER_CLASS_CONFIG`) and Kafka Streams `StreamPartitioner` for operations like `KStream#to()`.
   - Using default partitioner settings across all applications generally satisfies this requirement.

Why co-partitioning is required:
- KStream-KStream, KTable-KTable, and KStream-KTable joins are performed based on record keys (e.g., `leftRecord.key == rightRecord.key`). Co-partitioning by key ensures these records meet.

Exceptions where co-partitioning is NOT required:
1. KStream-GlobalKTable joins:
   - All partitions of the GlobalKTable's underlying changelog stream are available to each KafkaStreams instance.
   - A `KeyValueMapper` allows non-key based joins from KStream to GlobalKTable.
2. KTable-KTable Foreign-Key joins:
   - Kafka Streams internally ensures co-partitioning for these joins.
APIDOC
Join Co-partitioning Requirements:

For equi-joins in Kafka Streams, input data must be co-partitioned. This ensures that records with the same key from both sides of the join are delivered to the same stream task.

Requirements for data co-partitioning:
1. Input topics (left and right sides) must have the same number of partitions.
2. All applications writing to the input topics must use the same partitioning strategy to ensure records with the same key are delivered to the same partition number.
   - This applies to producer settings like `partitioner.class` (e.g., `ProducerConfig.PARTITIONER_CLASS_CONFIG`) and Kafka Streams `StreamPartitioner` for operations like `KStream#to()`.
   - Using default partitioner settings across all applications generally satisfies this requirement.

Why co-partitioning is required:
- KStream-KStream, KTable-KTable, and KStream-KTable joins are performed based on record keys (e.g., `leftRecord.key == rightRecord.key`). Co-partitioning by key ensures these records meet.

Exceptions where co-partitioning is NOT required:
1. KStream-GlobalKTable joins:
   - All partitions of the GlobalKTable's underlying changelog stream are available to each KafkaStreams instance.
   - A `KeyValueMapper` allows non-key based joins from KStream to GlobalKTable.
2. KTable-KTable Foreign-Key joins:
   - Kafka Streams internally ensures co-partitioning for these joins.

Stateful Operations

带状态的操作

Kafka Streams supports stateful operations like aggregations, windowing, and joins using state stores.
State Store Types:
  • Key-Value Stores: For aggregations and joins
  • Window Stores: For time-based operations
  • Session Stores: For session-based aggregations
State Store Configuration:
APIDOC
Internal Topic Configuration:

- message.timestamp.type: 'CreateTime' for all internal topics.

- Internal Repartition Topics:
  - compaction.policy: 'delete'
  - retention.time: -1 (infinite)

- Internal Changelog Topics for Key-Value Stores:
  - compaction.policy: 'compact'

- Internal Changelog Topics for Windowed Key-Value Stores:
  - compaction.policy: 'delete,compact'
  - retention.time: 24 hours + windowed store setting

- Internal Changelog Topics for Versioned State Stores:
  - cleanup.policy: 'compact'
  - min.compaction.lag.ms: 24 hours + store's historyRetentionMs
Kafka Streams支持使用状态存储实现聚合、窗口化与连接等带状态的操作。
状态存储类型:
  • 键值存储:用于聚合与连接
  • 窗口存储:用于基于时间的操作
  • 会话存储:用于基于会话的聚合
状态存储配置:
APIDOC
Internal Topic Configuration:

- message.timestamp.type: 'CreateTime' for all internal topics.

- Internal Repartition Topics:
  - compaction.policy: 'delete'
  - retention.time: -1 (infinite)

- Internal Changelog Topics for Key-Value Stores:
  - compaction.policy: 'compact'

- Internal Changelog Topics for Windowed Key-Value Stores:
  - compaction.policy: 'delete,compact'
  - retention.time: 24 hours + windowed store setting

- Internal Changelog Topics for Versioned State Stores:
  - cleanup.policy: 'compact'
  - min.compaction.lag.ms: 24 hours + store's historyRetentionMs

Application Parallelism

应用并行度

java
The parallelism of a Kafka Streams application is primarily determined by how many partitions the input topics have. For example, if your application reads from a single topic that has ten partitions, then you can run up to ten instances of your applications. You can run further instances, but these will be idle.
The number of topic partitions is the upper limit for the parallelism of your Kafka Streams application and for the number of running instances of your application.
To achieve balanced workload processing across application instances and to prevent processing hotpots, you should distribute data and processing workloads:
Data should be equally distributed across topic partitions. For example, if two topic partitions each have 1 million messages, this is better than a single partition with 2 million messages and none in the other.
Processing workload should be equally distributed across topic partitions. For example, if the time to process messages varies widely, then it is better to spread the processing-intensive messages across partitions rather than storing these messages within the same partition.
java
The parallelism of a Kafka Streams application is primarily determined by how many partitions the input topics have. For example, if your application reads from a single topic that has ten partitions, then you can run up to ten instances of your applications. You can run further instances, but these will be idle.
The number of topic partitions is the upper limit for the parallelism of your Kafka Streams application and for the number of running instances of your application.
To achieve balanced workload processing across application instances and to prevent processing hotpots, you should distribute data and processing workloads:
Data should be equally distributed across topic partitions. For example, if two topic partitions each have 1 million messages, this is better than a single partition with 2 million messages and none in the other.
Processing workload should be equally distributed across topic partitions. For example, if the time to process messages varies widely, then it is better to spread the processing-intensive messages across partitions rather than storing these messages within the same partition.

Kafka Streams Configuration

Kafka Streams配置

Client Prefixes:
java
Properties streamsSettings = new Properties();
// same value for consumer, producer, and admin client
streamsSettings.put("PARAMETER_NAME", "value");
// different values for consumer and producer
streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value");
streamsSettings.put("producer.PARAMETER_NAME", "producer-value");
streamsSettings.put("admin.PARAMETER_NAME", "admin-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value");
streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");
streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value");
Specific Consumer Types:
java
Properties streamsSettings = new Properties();
// same config value for all consumer types
streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
// set a different restore consumer config. This would make restore consumer take restore-consumer-value,
// while main consumer and global consumer stay with general-consumer-value
streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");
Topic Configuration:
java
Properties streamsSettings = new Properties();
// Override default for both changelog and repartition topics
streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");
客户端前缀:
java
Properties streamsSettings = new Properties();
// same value for consumer, producer, and admin client
streamsSettings.put("PARAMETER_NAME", "value");
// different values for consumer and producer
streamsSettings.put("consumer.PARAMETER_NAME", "consumer-value");
streamsSettings.put("producer.PARAMETER_NAME", "producer-value");
streamsSettings.put("admin.PARAMETER_NAME", "admin-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.consumerPrefix("PARAMETER_NAME"), "consumer-value");
streamsSettings.put(StreamsConfig.producerPrefix("PARAMETER_NAME"), "producer-value");
streamsSettings.put(StreamsConfig.adminClientPrefix("PARAMETER_NAME"), "admin-value");
特定消费者类型:
java
Properties streamsSettings = new Properties();
// same config value for all consumer types
streamsSettings.put("consumer.PARAMETER_NAME", "general-consumer-value");
// set a different restore consumer config. This would make restore consumer take restore-consumer-value,
// while main consumer and global consumer stay with general-consumer-value
streamsSettings.put("restore.consumer.PARAMETER_NAME", "restore-consumer-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.restoreConsumerPrefix("PARAMETER_NAME"), "restore-consumer-value");
主题配置:
java
Properties streamsSettings = new Properties();
// Override default for both changelog and repartition topics
streamsSettings.put("topic.PARAMETER_NAME", "topic-value");
// alternatively, you can use
streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");

Exactly-Once Semantics in Streams

Kafka Streams中的精确一次语义

APIDOC
Producer Client ID Naming Schema:

  - at-least-once (default):
    `[client.Id]-StreamThread-[sequence-number]`

  - exactly-once (EOS version 1):
    `[client.Id]-StreamThread-[sequence-number]-[taskId]`

  - exactly-once-beta (EOS version 2):
    `[client.Id]-StreamThread-[sequence-number]`

Where `[client.Id]` is either set via Streams configuration parameter `client.id` or defaults to `[application.id]-[processId]` (`[processId]` is a random UUID).
EOS Configuration:
APIDOC
Parameter Name: isolation.level
Corresponding Client: Consumer
Streams Default: READ_COMMITTED

Parameter Name: enable.idempotence
Corresponding Client: Producer
Streams Default: true
APIDOC
Parameter Name: transaction.timeout.ms
Corresponding Client: Producer
Streams Default: 10000

Parameter Name: delivery.timeout.ms
Corresponding Client: Producer
Streams Default: Integer.MAX_VALUE
APIDOC
Producer Client ID Naming Schema:

  - at-least-once (default):
    `[client.Id]-StreamThread-[sequence-number]`

  - exactly-once (EOS version 1):
    `[client.Id]-StreamThread-[sequence-number]-[taskId]`

  - exactly-once-beta (EOS version 2):
    `[client.Id]-StreamThread-[sequence-number]`

Where `[client.Id]` is either set via Streams configuration parameter `client.id` or defaults to `[application.id]-[processId]` (`[processId]` is a random UUID).
EOS配置:
APIDOC
Parameter Name: isolation.level
Corresponding Client: Consumer
Streams Default: READ_COMMITTED

Parameter Name: enable.idempotence
Corresponding Client: Producer
Streams Default: true
APIDOC
Parameter Name: transaction.timeout.ms
Corresponding Client: Producer
Streams Default: 10000

Parameter Name: delivery.timeout.ms
Corresponding Client: Producer
Streams Default: Integer.MAX_VALUE

Topology Naming and Stability

拓扑命名与稳定性

Default Topology (Auto-generated names):
text
Topologies: Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input]) --> KSTREAM-FILTER-0000000001
Processor: KSTREAM-FILTER-0000000001 (stores: []) --> KSTREAM-MAPVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000002 (stores: []) --> KSTREAM-SINK-0000000003
<-- KSTREAM-FILTER-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: output)
<-- KSTREAM-MAPVALUES-0000000002
Explicit Naming for Stability:
APIDOC
Kafka Streams Topology Naming:

- Aggregation repartition topics: Grouped
- KStream-KTable Join repartition topic: Joined
- KStream-KStream Join repartition topics: StreamJoined
- KStream-KTable Join state stores: Joined
- KStream-KStream Join state stores: StreamJoined
- State Stores (for aggregations and KTable-KTable joins): Materialized
- Stream/Table non-stateful operations: Named
APIDOC
Operation							Naming Class
------------------------------------------------------------------
Aggregation repartition topics			Grouped
KStream-KStream Join repartition topics		StreamJoined
KStream-KTable Join repartition topic		Joined
KStream-KStream Join state stores		StreamJoined
State Stores (for aggregations and KTable-KTable joins)	Materialized
Stream/Table non-stateful operations		Named
Enforce Explicit Naming:
java
Properties props = new Properties();
props.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
This prevents the application from starting with auto-generated names, guaranteeing stability across topology updates.
默认拓扑(自动生成名称):
text
Topologies: Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input]) --> KSTREAM-FILTER-0000000001
Processor: KSTREAM-FILTER-0000000001 (stores: []) --> KSTREAM-MAPVALUES-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000002 (stores: []) --> KSTREAM-SINK-0000000003
<-- KSTREAM-FILTER-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: output)
<-- KSTREAM-MAPVALUES-0000000002
显式命名以保证稳定性:
APIDOC
Kafka Streams Topology Naming:

- Aggregation repartition topics: Grouped
- KStream-KTable Join repartition topic: Joined
- KStream-KStream Join repartition topics: StreamJoined
- KStream-KTable Join state stores: Joined
- KStream-KStream Join state stores: StreamJoined
- State Stores (for aggregations and KTable-KTable joins): Materialized
- Stream/Table non-stateful operations: Named
APIDOC
Operation							Naming Class
------------------------------------------------------------------
Aggregation repartition topics			Grouped
KStream-KStream Join repartition topics		StreamJoined
KStream-KTable Join repartition topic		Joined
KStream-KStream Join state stores		StreamJoined
State Stores (for aggregations and KTable-KTable joins)	Materialized
Stream/Table non-stateful operations		Named
强制显式命名:
java
Properties props = new Properties();
props.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
这会阻止应用使用自动生成的名称启动,保证拓扑更新时的稳定性。

Topology Optimization

拓扑优化

properties
"topology.optimization":"all"
properties
"topology.optimization":"none"
Topology optimization allows reuse of source topics as changelog topics, crucial when migrating from KStreamBuilder to StreamsBuilder.
properties
"topology.optimization":"all"
properties
"topology.optimization":"none"
拓扑优化允许将源主题复用为变更日志主题,这在从KStreamBuilder迁移到StreamsBuilder时至关重要。

WordCount Example with Topology

单词计数示例(带拓扑)

Bash
$ mvn clean package
$ mvn exec:java -Dexec.mainClass=myapps.WordCount
Sub-topologies:
Sub-topology: 0
  Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
  Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: \[\]) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
  Processor: KSTREAM-KEY-SELECT-0000000002(stores: \[\]) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
  Processor: KSTREAM-FILTER-0000000005(stores: \[\]) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
  Sink: KSTREAM-SINK-0000000004(topic: counts-store-repartition) <-- KSTREAM-FILTER-0000000005
Sub-topology: 1
  Source: KSTREAM-SOURCE-0000000006(topics: counts-store-repartition) --> KSTREAM-AGGREGATE-0000000003
  Processor: KSTREAM-AGGREGATE-0000000003(stores: \[counts-store\]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
  Processor: KTABLE-TOSTREAM-0000000007(stores: \[\]) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
  Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
Global Stores: none
This topology shows two disconnected sub-topologies, their sources, processors, sinks, and the repartition topic (counts-store-repartition) used for shuffling data by aggregation key.
Bash
$ mvn clean package
$ mvn exec:java -Dexec.mainClass=myapps.WordCount
Sub-topologies:
Sub-topology: 0
  Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
  Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: \[\]) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
  Processor: KSTREAM-KEY-SELECT-0000000002(stores: \[\]) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
  Processor: KSTREAM-FILTER-0000000005(stores: \[\]) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
  Sink: KSTREAM-SINK-0000000004(topic: counts-store-repartition) <-- KSTREAM-FILTER-0000000005
Sub-topology: 1
  Source: KSTREAM-SOURCE-0000000006(topics: counts-store-repartition) --> KSTREAM-AGGREGATE-0000000003
  Processor: KSTREAM-AGGREGATE-0000000003(stores: \[counts-store\]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
  Processor: KTABLE-TOSTREAM-0000000007(stores: \[\]) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
  Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
Global Stores: none
该拓扑展示了两个不相连的子拓扑、它们的源、处理器、接收器,以及用于按聚合键重新分配数据的重分区主题(counts-store-repartition)。

Schema Registry

Schema Registry

Schema registries enforce data contracts between producers and consumers, ensuring data integrity and preventing malformed events.
APIDOC
Data Contracts with Schema Registry:
  - Purpose: Ensure events written to Kafka can be read properly and prevent malformed events.
  - Implementation: Deploy a schema registry alongside the Kafka cluster.
  - Functionality: Manages event schemas and maps them to topics, guiding producers on correct event formats.
  - Note: Kafka does not include a schema registry; third-party implementations are available.
Schema Registry用于在生产者与消费者之间强制执行数据契约,确保数据完整性,防止格式错误的事件。
APIDOC
Data Contracts with Schema Registry:
  - Purpose: Ensure events written to Kafka can be read properly and prevent malformed events.
  - Implementation: Deploy a schema registry alongside the Kafka cluster.
  - Functionality: Manages event schemas and maps them to topics, guiding producers on correct event formats.
  - Note: Kafka does not include a schema registry; third-party implementations are available.

Schema Registry Benefits

Schema Registry的优势

  1. Schema Evolution: Manage schema changes over time
  2. Compatibility Checking: Enforce backward/forward compatibility
  3. Centralized Management: Single source of truth for schemas
  4. Type Safety: Compile-time type checking
  5. Documentation: Auto-generated schema documentation
  6. Versioning: Track schema versions per subject
  1. Schema演进:管理Schema随时间的变更
  2. 兼容性检查:强制执行向后/向前兼容性
  3. 集中管理:Schema的单一可信来源
  4. 类型安全:编译时类型检查
  5. 文档自动生成:自动生成Schema文档
  6. 版本管理:跟踪每个主题的Schema版本

Schema Formats

Schema格式

Supported Formats:
  • Avro: Compact binary format with rich schema evolution
  • JSON Schema: Human-readable with schema validation
  • Protobuf: Google's Protocol Buffers
支持的格式:
  • Avro:紧凑的二进制格式,支持丰富的Schema演进
  • JSON Schema:人类可读,支持Schema验证
  • Protobuf:Google的Protocol Buffers

Schema Evolution Compatibility Modes

Schema演进兼容性模式

  1. BACKWARD: New schema can read old data
  2. FORWARD: Old schema can read new data
  3. FULL: Both backward and forward compatible
  4. NONE: No compatibility checking
  1. BACKWARD:新Schema可以读取旧数据
  2. FORWARD:旧Schema可以读取新数据
  3. FULL:同时支持向后与向前兼容
  4. NONE:不进行兼容性检查

Avro Producer Example

Avro生产者示例

java
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.*;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");

String userSchema = "{"
    + "\"type\":\"record\","
    + "\"name\":\"User\","
    + "\"fields\":["
    + "  {\"name\":\"name\",\"type\":\"string\"},"
    + "  {\"name\":\"age\",\"type\":\"int\"}"
    + "]}";

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);

GenericRecord user = new GenericData.Record(schema);
user.put("name", "John Doe");
user.put("age", 30);

ProducerRecord<String, GenericRecord> record =
    new ProducerRecord<>("users", "user1", user);

producer.send(record);
java
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.*;

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");

String userSchema = "{"
    + "\"type\":\"record\","
    + "\"name\":\"User\","
    + "\"fields\":["
    + "  {\"name\":\"name\",\"type\":\"string\"},"
    + "  {\"name\":\"age\",\"type\":\"int\"}"
    + "]}";

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);

GenericRecord user = new GenericData.Record(schema);
user.put("name", "John Doe");
user.put("age", 30);

ProducerRecord<String, GenericRecord> record =
    new ProducerRecord<>("users", "user1", user);

producer.send(record);

Kafka Connect

Kafka Connect

Kafka Connect is a framework for streaming data between Kafka and external systems.
APIDOC
Kafka Connect Sink Connector Input Topics

Configuration options for sink connectors to specify input topics using a comma-separated list or a regular expression.

topics
topics.regex
Kafka Connect是一个用于在Kafka与外部系统之间流式传输数据的框架。
APIDOC
Kafka Connect Sink Connector Input Topics

Configuration options for sink connectors to specify input topics using a comma-separated list or a regular expression.

topics
topics.regex

Connector Types

连接器类型

Source Connectors: Import data into Kafka
  • Database CDC (Debezium)
  • File systems
  • Message queues
  • Cloud services (S3, BigQuery)
  • APIs and webhooks
Sink Connectors: Export data from Kafka
  • Databases (JDBC, Elasticsearch)
  • Data warehouses
  • Object storage
  • Search engines
  • Analytics platforms
源连接器:将数据导入Kafka
  • 数据库CDC(Debezium)
  • 文件系统
  • 消息队列
  • 云服务(S3、BigQuery)
  • API与Webhook
接收器连接器:将数据从Kafka导出
  • 数据库(JDBC、Elasticsearch)
  • 数据仓库
  • 对象存储
  • 搜索引擎
  • 分析平台

Connector Configuration

连接器配置

Source Connector Example (JDBC):
json
{
  "name": "jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://localhost:5432/mydb",
    "connection.user": "postgres",
    "connection.password": "password",
    "table.whitelist": "users,orders",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "postgres-"
  }
}
Sink Connector Example (Elasticsearch):
json
{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "user-events,order-events",
    "connection.url": "http://localhost:9200",
    "type.name": "_doc",
    "key.ignore": "false"
  }
}
源连接器示例(JDBC):
json
{
  "name": "jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://localhost:5432/mydb",
    "connection.user": "postgres",
    "connection.password": "password",
    "table.whitelist": "users,orders",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "postgres-"
  }
}
接收器连接器示例(Elasticsearch):
json
{
  "name": "elasticsearch-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "user-events,order-events",
    "connection.url": "http://localhost:9200",
    "type.name": "_doc",
    "key.ignore": "false"
  }
}

Change Data Capture (CDC)

变更数据捕获(CDC)

CDC captures database changes and streams them to Kafka in real-time.
Benefits:
  • Real-time data synchronization
  • Event sourcing from existing databases
  • Microservices data integration
  • Zero-downtime migrations
Popular CDC Connectors:
  • Debezium (MySQL, PostgreSQL, MongoDB, SQL Server)
  • Oracle GoldenGate
  • Maxwell's Daemon
CDC捕获数据库的变更,并实时流式传输到Kafka。
优势:
  • 实时数据同步
  • 从现有数据库实现事件溯源
  • 微服务数据集成
  • 零停机迁移
流行的CDC连接器:
  • Debezium(MySQL、PostgreSQL、MongoDB、SQL Server)
  • Oracle GoldenGate
  • Maxwell's Daemon

Topic Management

主题管理

APIDOC
Kafka Streams Topic Management:

User Topics:
  - Input Topics: Specified via source processors (e.g., StreamsBuilder#stream(), StreamsBuilder#table(), Topology#addSource()).
  - Output Topics: Specified via sink processors (e.g., KStream#to(), KTable.to(), Topology#addSink()).
  - Management: Must be created and managed manually ahead of time (e.g., via topic tools).
  - Sharing: If shared, users must coordinate topic management.
  - Auto-creation: Discouraged due to potential cluster configuration and default topic settings (e.g., replication factor).

Internal Topics:
  - Purpose: Used internally by the application for state stores (e.g., changelog topics).
  - Creation: Created by the application itself.
  - Usage: Only used by the specific stream application.
  - Permissions: Requires underlying clients to have admin permissions on Kafka brokers if security is enabled.
  - Naming Convention: Typically follows '<application.id>-<operatorName>-<suffix>', but not guaranteed for future releases.
APIDOC
Kafka Streams Topic Management:

User Topics:
  - Input Topics: Specified via source processors (e.g., StreamsBuilder#stream(), StreamsBuilder#table(), Topology#addSource()).
  - Output Topics: Specified via sink processors (e.g., KStream#to(), KTable.to(), Topology#addSink()).
  - Management: Must be created and managed manually ahead of time (e.g., via topic tools).
  - Sharing: If shared, users must coordinate topic management.
  - Auto-creation: Discouraged due to potential cluster configuration and default topic settings (e.g., replication factor).

Internal Topics:
  - Purpose: Used internally by the application for state stores (e.g., changelog topics).
  - Creation: Created by the application itself.
  - Usage: Only used by the specific stream application.
  - Permissions: Requires underlying clients to have admin permissions on Kafka brokers if security is enabled.
  - Naming Convention: Typically follows '<application.id>-<operatorName>-<suffix>', but not guaranteed for future releases.

Topic Operations

主题操作

APIDOC
DESCRIBE_PRODUCERS:
  - Action: Read
  - Resource: Topic

DESCRIBE_TOPIC_PARTITIONS:
  - Action: Describe
  - Resource: Topic
APIDOC
DESCRIBE_PRODUCERS:
  - Action: Read
  - Resource: Topic

DESCRIBE_TOPIC_PARTITIONS:
  - Action: Describe
  - Resource: Topic

Produce Test Messages

生成测试消息

bash
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
>all streams lead to kafka
>hello kafka streams
>join kafka summit
bash
$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
>all streams lead to kafka
>hello kafka streams
>join kafka summit

Monitoring and Metrics

监控与指标

Common Metrics

常见指标

APIDOC
Metric Name: outgoing-byte-rate
Description: The average number of outgoing bytes sent per second for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Metric Name: outgoing-byte-total
Description: The total number of outgoing bytes sent for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Metric Name: request-rate
Description: The average number of requests sent per second for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Metric Name: request-total
Description: The total number of requests sent for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Metric Name: request-size-avg
Description: The average size of all requests in the window for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Metric Name: request-size-max
Description: The maximum size of any request sent in the window for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Metric Name: incoming-byte-rate
Description: The average number of incoming bytes received per second for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)
APIDOC
Metric Name: outgoing-byte-rate
Description: The average number of outgoing bytes sent per second for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Metric Name: outgoing-byte-total
Description: The total number of outgoing bytes sent for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Metric Name: request-rate
Description: The average number of requests sent per second for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Metric Name: request-total
Description: The total number of requests sent for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Metric Name: request-size-avg
Description: The average size of all requests in the window for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Metric Name: request-size-max
Description: The maximum size of any request sent in the window for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Metric Name: incoming-byte-rate
Description: The average number of incoming bytes received per second for a node.
Mbean Name Pattern: kafka.[producer|consumer|connect]:type=[consumer|producer|connect]-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Key Monitoring Areas

核心监控领域

  1. Producer Metrics:
    • record-send-rate
    • record-error-rate
    • compression-rate-avg
    • buffer-available-bytes
  2. Consumer Metrics:
    • records-consumed-rate
    • fetch-latency-avg
    • records-lag-max
    • commit-latency-avg
  3. Broker Metrics:
    • UnderReplicatedPartitions
    • OfflinePartitionsCount
    • ActiveControllerCount
    • RequestHandlerAvgIdlePercent
  4. Streams Metrics:
    • process-rate
    • process-latency-avg
    • commit-rate
    • poll-rate
  1. 生产者指标:
    • record-send-rate(记录发送速率)
    • record-error-rate(记录错误速率)
    • compression-rate-avg(平均压缩率)
    • buffer-available-bytes(可用缓冲区字节数)
  2. 消费者指标:
    • records-consumed-rate(记录消费速率)
    • fetch-latency-avg(平均拉取延迟)
    • records-lag-max(最大记录延迟)
    • commit-latency-avg(平均提交延迟)
  3. 代理节点指标:
    • UnderReplicatedPartitions(欠复制分区数)
    • OfflinePartitionsCount(离线分区数)
    • ActiveControllerCount(活跃控制器数)
    • RequestHandlerAvgIdlePercent(请求处理程序平均空闲百分比)
  4. Streams指标:
    • process-rate(处理速率)
    • process-latency-avg(平均处理延迟)
    • commit-rate(提交速率)
    • poll-rate(拉取速率)

Production Deployment

生产环境部署

Cluster Architecture

集群架构

Multi-Broker Setup:
  1. Brokers: Typically 3+ brokers for fault tolerance
  2. Replication: Replication factor 3 for production
  3. Partitions: More partitions = more parallelism
  4. ZooKeeper/KRaft: 3 or 5 nodes for quorum
多代理节点部署:
  1. 代理节点:通常部署3个以上代理节点以实现容错
  2. 复制因子:生产环境建议设置为3
  3. 分区数:更多分区意味着更高的并行度
  4. ZooKeeper/KRaft:部署3或5个节点以实现法定人数

High Availability Configuration

高可用性配置

Broker Configuration:
properties
undefined
代理节点配置:
properties
undefined

Broker ID

Broker ID

broker.id=1
broker.id=1

Listeners

Listeners

listeners=PLAINTEXT://broker1:9092,SSL://broker1:9093
listeners=PLAINTEXT://broker1:9092,SSL://broker1:9093

Log directories (use multiple disks)

Log directories (use multiple disks)

log.dirs=/data/kafka-logs-1,/data/kafka-logs-2
log.dirs=/data/kafka-logs-1,/data/kafka-logs-2

Replication

Replication

default.replication.factor=3 min.insync.replicas=2
default.replication.factor=3 min.insync.replicas=2

Leader election

Leader election

unclean.leader.election.enable=false auto.leader.rebalance.enable=true
unclean.leader.election.enable=false auto.leader.rebalance.enable=true

Log retention

Log retention

log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000
undefined
log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000
undefined

Eligible Leader Replicas (ELR)

合格领导者副本(ELR)

APIDOC
API: DescribeTopicPartitions

Purpose: Fetches detailed information about topic partitions, including Eligible Leader Replicas (ELR).

Usage:
- Via Admin Client: The admin client can fetch ELR info by describing topics.
- Direct API Call: Use the DescribeTopicPartitions API endpoint.

ELR Selection Logic:
- If ELR is not empty, select a replica that is not fenced.
- Select the last known leader if it is unfenced, mimicking pre-4.0 behavior when all replicas are offline.

Dependencies/Side Effects:
- Updating `min.insync.replicas` for a topic will clean the ELR field for that topic.
- Updating the cluster default `min.insync.replicas` will clean ELR fields for all topics.

Return Values:
- ELR status and related replica information for partitions.
APIDOC
API: DescribeTopicPartitions

Purpose: Fetches detailed information about topic partitions, including Eligible Leader Replicas (ELR).

Usage:
- Via Admin Client: The admin client can fetch ELR info by describing topics.
- Direct API Call: Use the DescribeTopicPartitions API endpoint.

ELR Selection Logic:
- If ELR is not empty, select a replica that is not fenced.
- Select the last known leader if it is unfenced, mimicking pre-4.0 behavior when all replicas are offline.

Dependencies/Side Effects:
- Updating `min.insync.replicas` for a topic will clean the ELR field for that topic.
- Updating the cluster default `min.insync.replicas` will clean ELR fields for all topics.

Return Values:
- ELR status and related replica information for partitions.

Security Configuration

安全配置

SSL/TLS Encryption:
properties
undefined
SSL/TLS加密:
properties
undefined

SSL configuration

SSL configuration

listeners=SSL://broker:9093 security.inter.broker.protocol=SSL ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=password ssl.key.password=password ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=password ssl.client.auth=required

**SASL Authentication:**

```properties
listeners=SSL://broker:9093 security.inter.broker.protocol=SSL ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks ssl.keystore.password=password ssl.key.password=password ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks ssl.truststore.password=password ssl.client.auth=required

**SASL认证:**

```properties

SASL/PLAIN configuration

SASL/PLAIN configuration

listeners=SASL_SSL://broker:9093 security.inter.broker.protocol=SASL_SSL sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN
listeners=SASL_SSL://broker:9093 security.inter.broker.protocol=SASL_SSL sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN

JAAS configuration

JAAS configuration

listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
undefined
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
undefined

Performance Tuning

性能调优

Broker Tuning:
properties
undefined
代理节点调优:
properties
undefined

Network threads

Network threads

num.network.threads=8
num.network.threads=8

I/O threads

I/O threads

num.io.threads=16
num.io.threads=16

Socket buffer sizes

Socket buffer sizes

socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600
socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600

Replication

Replication

num.replica.fetchers=4 replica.fetch.max.bytes=1048576
num.replica.fetchers=4 replica.fetch.max.bytes=1048576

Log flush (rely on OS page cache)

Log flush (rely on OS page cache)

log.flush.interval.messages=9223372036854775807 log.flush.interval.ms=null

**Producer Tuning for Throughput:**

```properties
acks=1
linger.ms=100
batch.size=65536
compression.type=lz4
buffer.memory=67108864
max.in.flight.requests.per.connection=5
Consumer Tuning:
properties
fetch.min.bytes=1
fetch.max.wait.ms=500
max.partition.fetch.bytes=1048576
max.poll.records=500
session.timeout.ms=30000
heartbeat.interval.ms=3000
log.flush.interval.messages=9223372036854775807 log.flush.interval.ms=null

**生产者吞吐量调优:**

```properties
acks=1
linger.ms=100
batch.size=65536
compression.type=lz4
buffer.memory=67108864
max.in.flight.requests.per.connection=5
消费者调优:
properties
fetch.min.bytes=1
fetch.max.wait.ms=500
max.partition.fetch.bytes=1048576
max.poll.records=500
session.timeout.ms=30000
heartbeat.interval.ms=3000

Best Practices

最佳实践

Producer Best Practices

生产者最佳实践

  1. Enable Idempotence: Prevent duplicate messages
  2. Configure Acks Properly: Balance durability and throughput
  3. Use Compression: Reduce network and storage costs
  4. Batch Messages: Configure linger.ms and batch.size
  5. Handle Retries: Implement proper retry logic
  6. Monitor Metrics: Track send rates and error rates
  7. Partition Strategy: Use meaningful keys for ordering
  8. Close Gracefully: Call close() with timeout
  1. 启用幂等性:防止重复消息
  2. 合理配置acks:平衡持久性与吞吐量
  3. 使用压缩:降低网络与存储成本
  4. 批量发送消息:配置linger.ms与batch.size
  5. 处理重试:实现完善的重试逻辑
  6. 监控指标:跟踪发送速率与错误速率
  7. 分区策略:使用有意义的键以保证有序性
  8. 优雅关闭:调用带超时的close()方法

Consumer Best Practices

消费者最佳实践

  1. Manual Offset Management: For at-least-once semantics
  2. Handle Rebalancing: Implement ConsumerRebalanceListener
  3. Minimize Poll Duration: Process efficiently
  4. Monitor Consumer Lag: Alert on high lag
  5. Thread Safety: One consumer per thread
  6. Graceful Shutdown: Close consumers properly
  7. Error Handling: Retry transient failures, DLQ for permanent
  8. Seek Capability: Use seek() for replay scenarios
  1. 手动管理偏移量:实现至少一次语义
  2. 处理重平衡:实现ConsumerRebalanceListener
  3. 最小化拉取间隔:高效处理记录
  4. 监控消费者延迟:对高延迟发出告警
  5. 线程安全:每个线程使用一个消费者
  6. 优雅关闭:正确关闭消费者
  7. 错误处理:重试瞬时故障,将永久故障消息发送到死信队列(DLQ)
  8. 重放能力:使用seek()实现消息重放

Kafka Streams Best Practices

Kafka Streams最佳实践

  1. Explicit Naming: Use Named, Grouped, Materialized for stability
  2. State Store Management: Configure changelog topics properly
  3. Error Handling: Implement ProductionExceptionHandler
  4. Scaling: Match application instances to input partitions
  5. Testing: Use TopologyTestDriver for unit tests
  6. Monitoring: Track lag, processing rate, error rate
  7. Exactly-Once: Enable for critical applications
  8. Graceful Shutdown: Handle signals properly
  1. 显式命名:使用Named、Grouped、Materialized以保证稳定性
  2. 状态存储管理:正确配置变更日志主题
  3. 错误处理:实现ProductionExceptionHandler
  4. 扩展策略:应用实例数与输入分区数匹配
  5. 测试:使用TopologyTestDriver进行单元测试
  6. 监控:跟踪延迟、处理速率、错误速率
  7. 精确一次语义:为关键应用启用
  8. 优雅关闭:正确处理信号

Topic Design Best Practices

主题设计最佳实践

  1. Partition Count: Based on throughput requirements
  2. Replication Factor: 3 for production topics
  3. Retention: Set based on use case (time or size)
  4. Compaction: Use for changelog and lookup topics
  5. Naming Convention: Consistent naming scheme
  6. Documentation: Document topic purpose and schema
  7. Access Control: Implement proper ACLs
  8. Monitoring: Track partition metrics
  1. 分区数:根据吞吐量需求设置
  2. 复制因子:生产环境主题设置为3
  3. 保留策略:根据使用场景设置(时间或大小)
  4. 压缩:对变更日志与查找主题使用压缩
  5. 命名规范:使用一致的命名方案
  6. 文档:记录主题用途与Schema
  7. 访问控制:实现完善的ACL
  8. 监控:跟踪分区指标

Operational Best Practices

运维最佳实践

  1. Monitoring: Comprehensive metrics collection
  2. Alerting: Alert on critical metrics
  3. Capacity Planning: Monitor disk, network, CPU
  4. Backup: Implement disaster recovery strategy
  5. Upgrades: Rolling upgrades with testing
  6. Security: Enable encryption and authentication
  7. Documentation: Maintain runbooks
  8. Testing: Load test before production
  1. 监控:全面收集指标
  2. 告警:对关键指标设置告警
  3. 容量规划:监控磁盘、网络、CPU使用情况
  4. 备份:实现灾难恢复策略
  5. 升级:滚动升级并进行测试
  6. 安全:启用加密与认证
  7. 文档:维护运行手册
  8. 测试:生产前进行负载测试

Common Patterns

常见模式

Pattern 1: Event Sourcing

模式1:事件溯源

Store all state changes as immutable events:
java
// Order events
OrderCreated -> OrderPaid -> OrderShipped -> OrderDelivered

// Event store as Kafka topic
Topic: order-events
Compaction: None (keep full history)
Retention: Infinite or very long
将所有状态变更存储为不可变事件:
java
// Order events
OrderCreated -> OrderPaid -> OrderShipped -> OrderDelivered

// Event store as Kafka topic
Topic: order-events
Compaction: None (keep full history)
Retention: Infinite or very long

Pattern 2: CQRS (Command Query Responsibility Segregation)

模式2:CQRS(命令查询职责分离)

Separate read and write models:
java
// Write side: Commands produce events
commands -> producers -> events-topic

// Read side: Consumers build projections
events-topic -> streams -> materialized-view (KTable)
分离读模型与写模型:
java
// Write side: Commands produce events
commands -> producers -> events-topic

// Read side: Consumers build projections
events-topic -> streams -> materialized-view (KTable)

Pattern 3: Saga Pattern

模式3:Saga模式

Distributed transaction coordination:
java
// Order saga
order-requested -> payment-requested -> payment-completed ->
inventory-reserved -> order-confirmed

// Compensating transactions on failure
payment-failed -> order-cancelled
分布式事务协调:
java
// Order saga
order-requested -> payment-requested -> payment-completed ->
inventory-reserved -> order-confirmed

// Compensating transactions on failure
payment-failed -> order-cancelled

Pattern 4: Outbox Pattern

模式4:Outbox模式

Reliably publish database changes:
java
// Database transaction writes to outbox table
BEGIN TRANSACTION;
  INSERT INTO orders VALUES (...);
  INSERT INTO outbox VALUES (event_data);
COMMIT;

// CDC connector reads outbox and publishes to Kafka
Debezium -> outbox-topic -> downstream consumers
可靠地发布数据库变更:
java
// Database transaction writes to outbox table
BEGIN TRANSACTION;
  INSERT INTO orders VALUES (...);
  INSERT INTO outbox VALUES (event_data);
COMMIT;

// CDC connector reads outbox and publishes to Kafka
Debezium -> outbox-topic -> downstream consumers

Pattern 5: Fan-out Pattern

模式5:扇出模式

Broadcast events to multiple consumers:
java
// Single topic, multiple consumer groups
user-events topic
  -> email-service (consumer group: email)
  -> analytics-service (consumer group: analytics)
  -> notification-service (consumer group: notifications)
将事件广播到多个消费者:
java
// Single topic, multiple consumer groups
user-events topic
  -> email-service (consumer group: email)
  -> analytics-service (consumer group: analytics)
  -> notification-service (consumer group: notifications)

Pattern 6: Dead Letter Queue (DLQ)

模式6:死信队列(DLQ)

Handle processing failures:
java
try {
  processRecord(record);
} catch (RetriableException e) {
  // Retry
  retry(record);
} catch (NonRetriableException e) {
  // Send to DLQ
  sendToDLQ(record, e);
}
处理处理失败的消息:
java
try {
  processRecord(record);
} catch (RetriableException e) {
  // Retry
  retry(record);
} catch (NonRetriableException e) {
  // Send to DLQ
  sendToDLQ(record, e);
}

Pattern 7: Windowed Aggregations

模式7:窗口聚合

Time-based aggregations:
java
KStream<String, PageView> views = ...;

// Tumbling window: non-overlapping fixed windows
views.groupByKey()
     .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
     .count();

// Hopping window: overlapping windows
views.groupByKey()
     .windowedBy(TimeWindows.of(Duration.ofMinutes(5))
                            .advanceBy(Duration.ofMinutes(1)))
     .count();

// Session window: activity-based windows
views.groupByKey()
     .windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
     .count();
基于时间的聚合:
java
KStream<String, PageView> views = ...;

// Tumbling window: non-overlapping fixed windows
views.groupByKey()
     .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
     .count();

// Hopping window: overlapping windows
views.groupByKey()
     .windowedBy(TimeWindows.of(Duration.ofMinutes(5))
                            .advanceBy(Duration.ofMinutes(1)))
     .count();

// Session window: activity-based windows
views.groupByKey()
     .windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
     .count();

Troubleshooting

故障排查

Common Issues

常见问题

Issue: Consumer lag increasing
  • Check consumer processing time
  • Scale consumer group (add instances)
  • Optimize processing logic
  • Increase max.poll.records if appropriate
Issue: Messages not arriving
  • Check producer send() error callbacks
  • Verify topic exists and is accessible
  • Check network connectivity
  • Review broker logs for errors
Issue: Duplicate messages
  • Enable idempotent producer
  • Implement idempotent consumer processing
  • Check offset commit strategy
  • Verify exactly-once configuration
Issue: Rebalancing taking too long
  • Reduce max.poll.interval.ms
  • Increase session.timeout.ms
  • Optimize poll() processing time
  • Check consumer health
Issue: Partition leader unavailable
  • Check broker health and logs
  • Verify replication status
  • Check network between brokers
  • Review ISR (In-Sync Replicas)
Issue: Out of memory errors
  • Reduce batch.size and buffer.memory
  • Tune JVM heap settings
  • Monitor memory usage
  • Check for memory leaks in processing
问题:消费者延迟持续增加
  • 检查消费者处理时间
  • 扩展消费者组(增加实例)
  • 优化处理逻辑
  • 适当增加max.poll.records
问题:消息未到达
  • 检查生产者send()的错误回调
  • 验证主题存在且可访问
  • 检查网络连接
  • 查看代理节点日志中的错误
问题:重复消息
  • 启用幂等生产者
  • 实现幂等消费者处理逻辑
  • 检查偏移量提交策略
  • 验证精确一次语义配置
问题:重平衡耗时过长
  • 减小max.poll.interval.ms
  • 增加session.timeout.ms
  • 优化poll()处理时间
  • 检查消费者健康状况
问题:分区领导者不可用
  • 检查代理节点健康状况与日志
  • 验证复制状态
  • 检查代理节点之间的网络
  • 查看ISR(同步副本)状态
问题:内存不足错误
  • 减小batch.size与buffer.memory
  • 调优JVM堆设置
  • 监控内存使用情况
  • 检查处理逻辑中的内存泄漏

Migration and Upgrade Strategies

迁移与升级策略

Kafka Streams Migration

Kafka Streams迁移

KStreamBuilder to StreamsBuilder:
Java
kstream.repartition(...);
// or for user-managed topics:
kstream.to("user-topic");
streamsBuilder.stream("user-topic");
Replaces KStream.through() for managing topic repartitioning.
Topic Prefix Configuration:
Java
Properties props = new Properties();
props.put(StreamsConfig.topicPrefix("my-prefix.") + "replication.factor", "3");
KafkaStreams streams = new KafkaStreams(topology, props);
从KStreamBuilder迁移到StreamsBuilder:
Java
kstream.repartition(...);
// or for user-managed topics:
kstream.to("user-topic");
streamsBuilder.stream("user-topic");
使用上述方式替代KStream.through()来管理主题重分区。
主题前缀配置:
Java
Properties props = new Properties();
props.put(StreamsConfig.topicPrefix("my-prefix.") + "replication.factor", "3");
KafkaStreams streams = new KafkaStreams(topology, props);

Rolling Upgrades

滚动升级

  1. Prepare: Test new version in staging
  2. Upgrade Brokers: One broker at a time
  3. Verify: Check cluster health after each broker
  4. Upgrade Clients: Producers, consumers, streams apps
  5. Monitor: Watch metrics throughout process
  1. 准备:在预发布环境测试新版本
  2. 升级代理节点:一次升级一个代理节点
  3. 验证:每个代理节点升级后检查集群健康状况
  4. 升级客户端:生产者、消费者、Streams应用
  5. 监控:全程监控指标

Resources and References

资源与参考


Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: Stream Processing, Event-Driven Architecture, Real-Time Data Compatible With: Apache Kafka 2.x, 3.x, Confluent Platform

技能版本:1.0.0 最后更新:2025年10月 技能分类:流处理、事件驱动架构、实时数据 兼容版本:Apache Kafka 2.x、3.x,Confluent Platform