azure-eventhub-py

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Azure Event Hubs SDK for Python

适用于Python的Azure Event Hubs SDK

Big data streaming platform for high-throughput event ingestion.
大数据流处理平台,用于高吞吐量事件摄入。

Installation

安装

bash
pip install azure-eventhub azure-identity
bash
pip install azure-eventhub azure-identity

For checkpointing with blob storage

For checkpointing with blob storage

pip install azure-eventhub-checkpointstoreblob-aio
undefined
pip install azure-eventhub-checkpointstoreblob-aio
undefined

Environment Variables

环境变量

bash
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENT_HUB_NAME=my-eventhub
STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net
CHECKPOINT_CONTAINER=checkpoints
bash
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENT_HUB_NAME=my-eventhub
STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net
CHECKPOINT_CONTAINER=checkpoints

Authentication

身份验证

python
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
eventhub_name = "my-eventhub"
python
from azure.identity import DefaultAzureCredential
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
eventhub_name = "my-eventhub"

Producer

Producer

producer = EventHubProducerClient( fully_qualified_namespace=namespace, eventhub_name=eventhub_name, credential=credential )
producer = EventHubProducerClient( fully_qualified_namespace=namespace, eventhub_name=eventhub_name, credential=credential )

Consumer

Consumer

consumer = EventHubConsumerClient( fully_qualified_namespace=namespace, eventhub_name=eventhub_name, consumer_group="$Default", credential=credential )
undefined
consumer = EventHubConsumerClient( fully_qualified_namespace=namespace, eventhub_name=eventhub_name, consumer_group="$Default", credential=credential )
undefined

Client Types

客户端类型

ClientPurpose
EventHubProducerClient
Send events to Event Hub
EventHubConsumerClient
Receive events from Event Hub
BlobCheckpointStore
Track consumer progress
客户端用途
EventHubProducerClient
向Event Hub发送事件
EventHubConsumerClient
从Event Hub接收事件
BlobCheckpointStore
跟踪消费者处理进度

Send Events

发送事件

python
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential

producer = EventHubProducerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    credential=DefaultAzureCredential()
)

with producer:
    # Create batch (handles size limits)
    event_data_batch = producer.create_batch()
    
    for i in range(10):
        try:
            event_data_batch.add(EventData(f"Event {i}"))
        except ValueError:
            # Batch is full, send and create new one
            producer.send_batch(event_data_batch)
            event_data_batch = producer.create_batch()
            event_data_batch.add(EventData(f"Event {i}"))
    
    # Send remaining
    producer.send_batch(event_data_batch)
python
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential

producer = EventHubProducerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    credential=DefaultAzureCredential()
)

with producer:
    # Create batch (handles size limits)
    event_data_batch = producer.create_batch()
    
    for i in range(10):
        try:
            event_data_batch.add(EventData(f"Event {i}"))
        except ValueError:
            # Batch is full, send and create new one
            producer.send_batch(event_data_batch)
            event_data_batch = producer.create_batch()
            event_data_batch.add(EventData(f"Event {i}"))
    
    # Send remaining
    producer.send_batch(event_data_batch)

Send to Specific Partition

发送至指定分区

python
undefined
python
undefined

By partition ID

By partition ID

event_data_batch = producer.create_batch(partition_id="0")
event_data_batch = producer.create_batch(partition_id="0")

By partition key (consistent hashing)

By partition key (consistent hashing)

event_data_batch = producer.create_batch(partition_key="user-123")
undefined
event_data_batch = producer.create_batch(partition_key="user-123")
undefined

Receive Events

接收事件

Simple Receive

简单接收

python
from azure.eventhub import EventHubConsumerClient

def on_event(partition_context, event):
    print(f"Partition: {partition_context.partition_id}")
    print(f"Data: {event.body_as_str()}")
    partition_context.update_checkpoint(event)

consumer = EventHubConsumerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    consumer_group="$Default",
    credential=DefaultAzureCredential()
)

with consumer:
    consumer.receive(
        on_event=on_event,
        starting_position="-1",  # Beginning of stream
    )
python
from azure.eventhub import EventHubConsumerClient

def on_event(partition_context, event):
    print(f"Partition: {partition_context.partition_id}")
    print(f"Data: {event.body_as_str()}")
    partition_context.update_checkpoint(event)

consumer = EventHubConsumerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    consumer_group="$Default",
    credential=DefaultAzureCredential()
)

with consumer:
    consumer.receive(
        on_event=on_event,
        starting_position="-1",  # Beginning of stream
    )

With Blob Checkpoint Store (Production)

搭配Blob检查点存储(生产环境)

python
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential

checkpoint_store = BlobCheckpointStore(
    blob_account_url="https://<account>.blob.core.windows.net",
    container_name="checkpoints",
    credential=DefaultAzureCredential()
)

consumer = EventHubConsumerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    consumer_group="$Default",
    credential=DefaultAzureCredential(),
    checkpoint_store=checkpoint_store
)

def on_event(partition_context, event):
    print(f"Received: {event.body_as_str()}")
    # Checkpoint after processing
    partition_context.update_checkpoint(event)

with consumer:
    consumer.receive(on_event=on_event)
python
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential

checkpoint_store = BlobCheckpointStore(
    blob_account_url="https://<account>.blob.core.windows.net",
    container_name="checkpoints",
    credential=DefaultAzureCredential()
)

consumer = EventHubConsumerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    consumer_group="$Default",
    credential=DefaultAzureCredential(),
    checkpoint_store=checkpoint_store
)

def on_event(partition_context, event):
    print(f"Received: {event.body_as_str()}")
    # Checkpoint after processing
    partition_context.update_checkpoint(event)

with consumer:
    consumer.receive(on_event=on_event)

Async Client

异步客户端

python
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
import asyncio

async def send_events():
    credential = DefaultAzureCredential()
    
    async with EventHubProducerClient(
        fully_qualified_namespace="<namespace>.servicebus.windows.net",
        eventhub_name="my-eventhub",
        credential=credential
    ) as producer:
        batch = await producer.create_batch()
        batch.add(EventData("Async event"))
        await producer.send_batch(batch)

async def receive_events():
    async def on_event(partition_context, event):
        print(event.body_as_str())
        await partition_context.update_checkpoint(event)
    
    async with EventHubConsumerClient(
        fully_qualified_namespace="<namespace>.servicebus.windows.net",
        eventhub_name="my-eventhub",
        consumer_group="$Default",
        credential=DefaultAzureCredential()
    ) as consumer:
        await consumer.receive(on_event=on_event)

asyncio.run(send_events())
python
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
import asyncio

async def send_events():
    credential = DefaultAzureCredential()
    
    async with EventHubProducerClient(
        fully_qualified_namespace="<namespace>.servicebus.windows.net",
        eventhub_name="my-eventhub",
        credential=credential
    ) as producer:
        batch = await producer.create_batch()
        batch.add(EventData("Async event"))
        await producer.send_batch(batch)

async def receive_events():
    async def on_event(partition_context, event):
        print(event.body_as_str())
        await partition_context.update_checkpoint(event)
    
    async with EventHubConsumerClient(
        fully_qualified_namespace="<namespace>.servicebus.windows.net",
        eventhub_name="my-eventhub",
        consumer_group="$Default",
        credential=DefaultAzureCredential()
    ) as consumer:
        await consumer.receive(on_event=on_event)

asyncio.run(send_events())

Event Properties

事件属性

python
event = EventData("My event body")
python
event = EventData("My event body")

Set properties

Set properties

event.properties = {"custom_property": "value"} event.content_type = "application/json"
event.properties = {"custom_property": "value"} event.content_type = "application/json"

Read properties (on receive)

Read properties (on receive)

print(event.body_as_str()) print(event.sequence_number) print(event.offset) print(event.enqueued_time) print(event.partition_key)
undefined
print(event.body_as_str()) print(event.sequence_number) print(event.offset) print(event.enqueued_time) print(event.partition_key)
undefined

Get Event Hub Info

获取Event Hub信息

python
with producer:
    info = producer.get_eventhub_properties()
    print(f"Name: {info['name']}")
    print(f"Partitions: {info['partition_ids']}")
    
    for partition_id in info['partition_ids']:
        partition_info = producer.get_partition_properties(partition_id)
        print(f"Partition {partition_id}: {partition_info['last_enqueued_sequence_number']}")
python
with producer:
    info = producer.get_eventhub_properties()
    print(f"Name: {info['name']}")
    print(f"Partitions: {info['partition_ids']}")
    
    for partition_id in info['partition_ids']:
        partition_info = producer.get_partition_properties(partition_id)
        print(f"Partition {partition_id}: {partition_info['last_enqueued_sequence_number']}")

Best Practices

最佳实践

  1. Use batches for sending multiple events
  2. Use checkpoint store in production for reliable processing
  3. Use async client for high-throughput scenarios
  4. Use partition keys for ordered delivery within a partition
  5. Handle batch size limits — catch ValueError when batch is full
  6. Use context managers (
    with
    /
    async with
    ) for proper cleanup
  7. Set appropriate consumer groups for different applications
  1. 使用批量发送来处理多个事件
  2. 在生产环境中使用检查点存储以实现可靠处理
  3. 使用异步客户端应对高吞吐量场景
  4. 使用分区键保证分区内的有序投递
  5. 处理批量大小限制 —— 当批量已满时捕获ValueError异常
  6. 使用上下文管理器 (
    with
    /
    async with
    ) 以确保资源正确清理
  7. 为不同应用设置合适的消费者组

Reference Files

参考文件

FileContents
references/checkpointing.mdCheckpoint store patterns, blob checkpointing, checkpoint strategies
references/partitions.mdPartition management, load balancing, starting positions
scripts/setup_consumer.pyCLI for Event Hub info, consumer setup, and event sending/receiving
文件内容
references/checkpointing.md检查点存储模式、Blob检查点、检查点策略
references/partitions.md分区管理、负载均衡、起始位置设置
scripts/setup_consumer.py用于Event Hub信息查看、消费者设置以及事件收发的CLI工具