Loading...
Loading...
Expert-level Apache Kafka, event streaming, Kafka Streams, and distributed messaging
npx skill4agent add personamanagmentlayer/pcl kafka-expertfrom kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all', # Wait for all replicas
retries=3
)
# Send message
future = producer.send('user-events', {
'user_id': '123',
'event': 'login',
'timestamp': '2024-01-01T00:00:00Z'
})
# Wait for acknowledgment
record_metadata = future.get(timeout=10)
print(f"Topic: {record_metadata.topic}, Partition: {record_metadata.partition}")
producer.flush()
producer.close()from kafka import KafkaConsumer
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
group_id='my-group',
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
print(f"Received: {message.value}")
# Process message
process_event(message.value)
# Manual commit
consumer.commit()Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("input-topic");
// Transform and filter
KStream<String, String> transformed = source
.filter((key, value) -> value.length() > 10)
.mapValues(value -> value.toUpperCase());
transformed.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();