gcp-pubsub
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseGoogle Cloud Pub/Sub
Google Cloud Pub/Sub
Table of Contents
目录
Purpose
用途
Build robust, production-ready event-driven systems using Google Cloud Pub/Sub with Python. Covers setup, publishing, subscribing, error handling, dead letter queues, and local development with the emulator.
使用Python结合Google Cloud Pub/Sub构建稳健、可投入生产的事件驱动系统。内容涵盖环境搭建、消息发布、消息订阅、错误处理、死信队列配置以及基于模拟器的本地开发。
When to Use
适用场景
Use this skill when you need to:
- Build event-driven architectures with message-based communication
- Implement reliable message queuing between services
- Handle at-least-once message delivery guarantees
- Manage high-throughput message systems (1000+ msgs/sec)
- Configure local development with Pub/Sub emulator
- Implement dead letter queues for failed message handling
当你需要以下功能时,可使用本技能:
- 构建基于消息通信的事件驱动架构
- 在服务间实现可靠的消息队列
- 满足至少一次消息投递的保障要求
- 管理高吞吐量消息系统(1000+ 消息/秒)
- 配置Pub/Sub模拟器进行本地开发
- 为失败消息处理配置死信队列
Quick Start
快速开始
Install and authenticate:
bash
pip install google-cloud-pubsub
gcloud auth application-default login
python -c "from google.cloud import pubsub_v1; print('Ready')"Publish a message:
python
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")安装与认证:
bash
pip install google-cloud-pubsub
gcloud auth application-default login
python -c "from google.cloud import pubsub_v1; print('Ready')"发布消息:
python
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")Create topic
创建主题
try:
publisher.create_topic(request={"name": topic_path})
except Exception as e:
if "ALREADY_EXISTS" not in str(e):
raise
try:
publisher.create_topic(request={"name": topic_path})
except Exception as e:
if "ALREADY_EXISTS" not in str(e):
raise
Publish
发布消息
future = publisher.publish(topic_path, b"Hello, World!")
print(f"Published: {future.result()}")
**Subscribe to messages:**
```python
from google.cloud import pubsub_v1
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
def callback(message):
print(f"Received: {message.data.decode()}")
message.ack()
future = subscriber.subscribe(subscription_path, callback=callback)
try:
future.result(timeout=30)
except Exception:
future.cancel()future = publisher.publish(topic_path, b"Hello, World!")
print(f"Published: {future.result()}")
**订阅消息:**
```python
from google.cloud import pubsub_v1
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
def callback(message):
print(f"Received: {message.data.decode()}")
message.ack()
future = subscriber.subscribe(subscription_path, callback=callback)
try:
future.result(timeout=30)
except Exception:
future.cancel()Instructions
操作步骤
Step 1: Set Up Development Environment
步骤1:搭建开发环境
Install dependencies and configure authentication:
bash
pip install google-cloud-pubsub
gcloud auth application-default loginFor production, use service account:
bash
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"安装依赖并配置认证:
bash
pip install google-cloud-pubsub
gcloud auth application-default login生产环境使用服务账号:
bash
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"Step 2: Create Topics and Subscriptions
步骤2:创建主题与订阅
python
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()python
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()Create topic
创建主题
topic_path = publisher.topic_path("my-project", "my-topic")
publisher.create_topic(request={"name": topic_path})
topic_path = publisher.topic_path("my-project", "my-topic")
publisher.create_topic(request={"name": topic_path})
Create subscription
创建订阅
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
subscription_config = {
"name": subscription_path,
"topic": topic_path,
"ack_deadline_seconds": 60,
}
subscriber.create_subscription(request=subscription_config)
See [references/detailed-guide.md](./references/detailed-guide.md) for advanced configuration options.subscription_path = subscriber.subscription_path("my-project", "my-subscription")
subscription_config = {
"name": subscription_path,
"topic": topic_path,
"ack_deadline_seconds": 60,
}
subscriber.create_subscription(request=subscription_config)
高级配置选项请参考 [references/detailed-guide.md](./references/detailed-guide.md)。Step 3: Publish Messages
步骤3:发布消息
Simple publishing:
python
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")
future = publisher.publish(topic_path, b"Message data")
message_id = future.result()Publish with attributes:
python
import json
data = json.dumps({"event": "user.created", "user_id": "123"}).encode()
future = publisher.publish(
topic_path,
data,
event_type="user.created",
timestamp="2024-01-15T10:30:00Z"
)See references/detailed-guide.md for production-ready publisher with batching and error handling.
简单发布:
python
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")
future = publisher.publish(topic_path, b"Message data")
message_id = future.result()带属性的发布:
python
import json
data = json.dumps({"event": "user.created", "user_id": "123"}).encode()
future = publisher.publish(
topic_path,
data,
event_type="user.created",
timestamp="2024-01-15T10:30:00Z"
)具备批处理和错误处理的生产级发布器请参考 references/detailed-guide.md。
Step 4: Subscribe to Messages
步骤4:订阅消息
Basic subscriber:
python
from google.cloud import pubsub_v1
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
def callback(message):
try:
print(f"Received: {message.data.decode()}")
# Process message
message.ack()
except Exception as e:
print(f"Error: {e}")
message.nack() # Will be redelivered
future = subscriber.subscribe(subscription_path, callback=callback)
try:
future.result() # Block indefinitely
except KeyboardInterrupt:
future.cancel()With flow control:
python
future = subscriber.subscribe(
subscription_path,
callback=callback,
flow_control=pubsub_v1.types.FlowControl(
max_messages=100,
max_bytes=100 * 1024 * 1024, # 100 MB
),
)See references/detailed-guide.md for production subscriber with monitoring.
基础订阅器:
python
from google.cloud import pubsub_v1
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
def callback(message):
try:
print(f"Received: {message.data.decode()}")
# 处理消息
message.ack()
except Exception as e:
print(f"Error: {e}")
message.nack() # 将被重新投递
future = subscriber.subscribe(subscription_path, callback=callback)
try:
future.result() # 无限期阻塞
except KeyboardInterrupt:
future.cancel()带流量控制:
python
future = subscriber.subscribe(
subscription_path,
callback=callback,
flow_control=pubsub_v1.types.FlowControl(
max_messages=100,
max_bytes=100 * 1024 * 1024, # 100 MB
),
)具备监控功能的生产级订阅器请参考 references/detailed-guide.md。
Step 5: Configure Dead Letter Queue
步骤5:配置死信队列
python
from google.cloud import pubsub_v1
from google.protobuf.duration_pb2 import Duration
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()python
from google.cloud import pubsub_v1
from google.protobuf.duration_pb2 import Duration
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()Create dead letter topic
创建死信主题
dlq_topic_path = publisher.topic_path("my-project", "my-topic-dlq")
publisher.create_topic(request={"name": dlq_topic_path})
dlq_topic_path = publisher.topic_path("my-project", "my-topic-dlq")
publisher.create_topic(request={"name": dlq_topic_path})
Create subscription with DLQ
创建带DLQ的订阅
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
subscription = pubsub_v1.types.Subscription(
name=subscription_path,
topic=publisher.topic_path("my-project", "my-topic"),
dead_letter_policy=pubsub_v1.types.DeadLetterPolicy(
dead_letter_topic=dlq_topic_path,
max_delivery_attempts=5,
),
retry_policy=pubsub_v1.types.RetryPolicy(
minimum_backoff=Duration(seconds=10),
maximum_backoff=Duration(seconds=600),
),
)
subscriber.create_subscription(request=subscription)
See [references/detailed-guide.md](./references/detailed-guide.md) for complete DLQ setup with monitoring.subscription_path = subscriber.subscription_path("my-project", "my-subscription")
subscription = pubsub_v1.types.Subscription(
name=subscription_path,
topic=publisher.topic_path("my-project", "my-topic"),
dead_letter_policy=pubsub_v1.types.DeadLetterPolicy(
dead_letter_topic=dlq_topic_path,
max_delivery_attempts=5,
),
retry_policy=pubsub_v1.types.RetryPolicy(
minimum_backoff=Duration(seconds=10),
maximum_backoff=Duration(seconds=600),
),
)
subscriber.create_subscription(request=subscription)
完整的DLQ搭建及监控请参考 [references/detailed-guide.md](./references/detailed-guide.md)。Step 6: Implement Idempotency
步骤6:实现幂等性
Track processed messages to avoid duplicate processing:
python
class IdempotentProcessor:
def __init__(self):
self.processed_ids = set()
def process(self, message):
msg_id = message.message_id
if msg_id in self.processed_ids:
print(f"Already processed: {msg_id}")
message.ack()
return
try:
# Process message
print(f"Processing: {message.data.decode()}")
self.processed_ids.add(msg_id)
message.ack()
except Exception as e:
print(f"Failed: {e}")
message.nack()See references/detailed-guide.md for production-ready idempotency patterns.
跟踪已处理消息以避免重复处理:
python
class IdempotentProcessor:
def __init__(self):
self.processed_ids = set()
def process(self, message):
msg_id = message.message_id
if msg_id in self.processed_ids:
print(f"已处理过: {msg_id}")
message.ack()
return
try:
# 处理消息
print(f"处理中: {message.data.decode()}")
self.processed_ids.add(msg_id)
message.ack()
except Exception as e:
print(f"处理失败: {e}")
message.nack()生产级幂等性实现模式请参考 references/detailed-guide.md。
Step 7: Local Development with Emulator
步骤7:基于模拟器的本地开发
bash
undefinedbash
undefinedInstall and start emulator
安装并启动模拟器
gcloud components install pubsub-emulator
gcloud beta emulators pubsub start
gcloud components install pubsub-emulator
gcloud beta emulators pubsub start
In another terminal
在另一个终端中执行
export PUBSUB_EMULATOR_HOST=localhost:8085
python your_script.py # Uses emulator automatically
See [references/detailed-guide.md](./references/detailed-guide.md) for emulator configuration patterns.export PUBSUB_EMULATOR_HOST=localhost:8085
python your_script.py # 自动使用模拟器
模拟器配置模式请参考 [references/detailed-guide.md](./references/detailed-guide.md)。Step 8: Monitor Operations
步骤8:监控操作
Enable logging and track metrics:
python
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)启用日志并跟踪指标:
python
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)Query subscription stats
查询订阅统计信息
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
subscription = subscriber.get_subscription(request={"subscription": subscription_path})
print(f"Topic: {subscription.topic}")
print(f"Ack deadline: {subscription.ack_deadline_seconds}s")
See [references/detailed-guide.md](./references/detailed-guide.md) for comprehensive monitoring patterns.subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")
subscription = subscriber.get_subscription(request={"subscription": subscription_path})
print(f"主题: {subscription.topic}")
print(f"确认超时: {subscription.ack_deadline_seconds}秒")
全面的监控模式请参考 [references/detailed-guide.md](./references/detailed-guide.md)。Requirements
环境要求
- Python: 3.7+
- Dependencies:
bash
pip install google-cloud-pubsub>=2.18.0 - GCP Project: Active project with Pub/Sub API enabled
- Authentication: Application Default Credentials or service account key
- IAM Permissions:
- - Publish messages
roles/pubsub.publisher - - Subscribe to messages
roles/pubsub.subscriber - - Create/delete topics and subscriptions
roles/pubsub.admin
- For Local Development:
bash
gcloud components install pubsub-emulator
- Python版本: 3.7+
- 依赖包:
bash
pip install google-cloud-pubsub>=2.18.0 - GCP项目: 已启用Pub/Sub API的活跃项目
- 认证方式: 应用默认凭据或服务账号密钥
- IAM权限:
- - 发布消息权限
roles/pubsub.publisher - - 订阅消息权限
roles/pubsub.subscriber - - 创建/删除主题和订阅权限
roles/pubsub.admin
- 本地开发额外要求:
bash
gcloud components install pubsub-emulator
See Also
相关链接
- references/detailed-guide.md - Comprehensive implementation guide with production patterns
- examples/examples.md - Working code examples including async patterns and testing
- scripts/setup_emulator.sh - Emulator setup utility
- scripts/pubsub_utils.py - Helper utilities for common operations
- references/detailed-guide.md - 包含生产级实现模式的完整指南
- examples/examples.md - 包含异步模式和测试的可运行代码示例
- scripts/setup_emulator.sh - 模拟器搭建工具脚本
- scripts/pubsub_utils.py - 通用操作辅助工具类