gcp-pubsub

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Google Cloud Pub/Sub

Google Cloud Pub/Sub

Table of Contents

目录

Purpose

用途

Build robust, production-ready event-driven systems using Google Cloud Pub/Sub with Python. Covers setup, publishing, subscribing, error handling, dead letter queues, and local development with the emulator.
使用Python结合Google Cloud Pub/Sub构建稳健、可投入生产的事件驱动系统。内容涵盖环境搭建、消息发布、消息订阅、错误处理、死信队列配置以及基于模拟器的本地开发。

When to Use

适用场景

Use this skill when you need to:
  • Build event-driven architectures with message-based communication
  • Implement reliable message queuing between services
  • Handle at-least-once message delivery guarantees
  • Manage high-throughput message systems (1000+ msgs/sec)
  • Configure local development with Pub/Sub emulator
  • Implement dead letter queues for failed message handling
当你需要以下功能时,可使用本技能:
  • 构建基于消息通信的事件驱动架构
  • 在服务间实现可靠的消息队列
  • 满足至少一次消息投递的保障要求
  • 管理高吞吐量消息系统(1000+ 消息/秒)
  • 配置Pub/Sub模拟器进行本地开发
  • 为失败消息处理配置死信队列

Quick Start

快速开始

Install and authenticate:
bash
pip install google-cloud-pubsub
gcloud auth application-default login
python -c "from google.cloud import pubsub_v1; print('Ready')"
Publish a message:
python
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")
安装与认证:
bash
pip install google-cloud-pubsub
gcloud auth application-default login
python -c "from google.cloud import pubsub_v1; print('Ready')"
发布消息:
python
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")

Create topic

创建主题

try: publisher.create_topic(request={"name": topic_path}) except Exception as e: if "ALREADY_EXISTS" not in str(e): raise
try: publisher.create_topic(request={"name": topic_path}) except Exception as e: if "ALREADY_EXISTS" not in str(e): raise

Publish

发布消息

future = publisher.publish(topic_path, b"Hello, World!") print(f"Published: {future.result()}")

**Subscribe to messages:**

```python
from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")

def callback(message):
    print(f"Received: {message.data.decode()}")
    message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)

try:
    future.result(timeout=30)
except Exception:
    future.cancel()
future = publisher.publish(topic_path, b"Hello, World!") print(f"Published: {future.result()}")

**订阅消息:**

```python
from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")

def callback(message):
    print(f"Received: {message.data.decode()}")
    message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)

try:
    future.result(timeout=30)
except Exception:
    future.cancel()

Instructions

操作步骤

Step 1: Set Up Development Environment

步骤1:搭建开发环境

Install dependencies and configure authentication:
bash
pip install google-cloud-pubsub
gcloud auth application-default login
For production, use service account:
bash
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"
安装依赖并配置认证:
bash
pip install google-cloud-pubsub
gcloud auth application-default login
生产环境使用服务账号:
bash
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account-key.json"

Step 2: Create Topics and Subscriptions

步骤2:创建主题与订阅

python
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
python
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

Create topic

创建主题

topic_path = publisher.topic_path("my-project", "my-topic") publisher.create_topic(request={"name": topic_path})
topic_path = publisher.topic_path("my-project", "my-topic") publisher.create_topic(request={"name": topic_path})

Create subscription

创建订阅

subscription_path = subscriber.subscription_path("my-project", "my-subscription") subscription_config = { "name": subscription_path, "topic": topic_path, "ack_deadline_seconds": 60, } subscriber.create_subscription(request=subscription_config)

See [references/detailed-guide.md](./references/detailed-guide.md) for advanced configuration options.
subscription_path = subscriber.subscription_path("my-project", "my-subscription") subscription_config = { "name": subscription_path, "topic": topic_path, "ack_deadline_seconds": 60, } subscriber.create_subscription(request=subscription_config)

高级配置选项请参考 [references/detailed-guide.md](./references/detailed-guide.md)。

Step 3: Publish Messages

步骤3:发布消息

Simple publishing:
python
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")

future = publisher.publish(topic_path, b"Message data")
message_id = future.result()
Publish with attributes:
python
import json

data = json.dumps({"event": "user.created", "user_id": "123"}).encode()
future = publisher.publish(
    topic_path,
    data,
    event_type="user.created",
    timestamp="2024-01-15T10:30:00Z"
)
See references/detailed-guide.md for production-ready publisher with batching and error handling.
简单发布:
python
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path("my-project", "my-topic")

future = publisher.publish(topic_path, b"Message data")
message_id = future.result()
带属性的发布:
python
import json

data = json.dumps({"event": "user.created", "user_id": "123"}).encode()
future = publisher.publish(
    topic_path,
    data,
    event_type="user.created",
    timestamp="2024-01-15T10:30:00Z"
)
具备批处理和错误处理的生产级发布器请参考 references/detailed-guide.md

Step 4: Subscribe to Messages

步骤4:订阅消息

Basic subscriber:
python
from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")

def callback(message):
    try:
        print(f"Received: {message.data.decode()}")
        # Process message
        message.ack()
    except Exception as e:
        print(f"Error: {e}")
        message.nack()  # Will be redelivered

future = subscriber.subscribe(subscription_path, callback=callback)

try:
    future.result()  # Block indefinitely
except KeyboardInterrupt:
    future.cancel()
With flow control:
python
future = subscriber.subscribe(
    subscription_path,
    callback=callback,
    flow_control=pubsub_v1.types.FlowControl(
        max_messages=100,
        max_bytes=100 * 1024 * 1024,  # 100 MB
    ),
)
See references/detailed-guide.md for production subscriber with monitoring.
基础订阅器:
python
from google.cloud import pubsub_v1

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path("my-project", "my-subscription")

def callback(message):
    try:
        print(f"Received: {message.data.decode()}")
        # 处理消息
        message.ack()
    except Exception as e:
        print(f"Error: {e}")
        message.nack()  # 将被重新投递

future = subscriber.subscribe(subscription_path, callback=callback)

try:
    future.result()  # 无限期阻塞
except KeyboardInterrupt:
    future.cancel()
带流量控制:
python
future = subscriber.subscribe(
    subscription_path,
    callback=callback,
    flow_control=pubsub_v1.types.FlowControl(
        max_messages=100,
        max_bytes=100 * 1024 * 1024,  # 100 MB
    ),
)
具备监控功能的生产级订阅器请参考 references/detailed-guide.md

Step 5: Configure Dead Letter Queue

步骤5:配置死信队列

python
from google.cloud import pubsub_v1
from google.protobuf.duration_pb2 import Duration

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
python
from google.cloud import pubsub_v1
from google.protobuf.duration_pb2 import Duration

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

Create dead letter topic

创建死信主题

dlq_topic_path = publisher.topic_path("my-project", "my-topic-dlq") publisher.create_topic(request={"name": dlq_topic_path})
dlq_topic_path = publisher.topic_path("my-project", "my-topic-dlq") publisher.create_topic(request={"name": dlq_topic_path})

Create subscription with DLQ

创建带DLQ的订阅

subscription_path = subscriber.subscription_path("my-project", "my-subscription") subscription = pubsub_v1.types.Subscription( name=subscription_path, topic=publisher.topic_path("my-project", "my-topic"), dead_letter_policy=pubsub_v1.types.DeadLetterPolicy( dead_letter_topic=dlq_topic_path, max_delivery_attempts=5, ), retry_policy=pubsub_v1.types.RetryPolicy( minimum_backoff=Duration(seconds=10), maximum_backoff=Duration(seconds=600), ), ) subscriber.create_subscription(request=subscription)

See [references/detailed-guide.md](./references/detailed-guide.md) for complete DLQ setup with monitoring.
subscription_path = subscriber.subscription_path("my-project", "my-subscription") subscription = pubsub_v1.types.Subscription( name=subscription_path, topic=publisher.topic_path("my-project", "my-topic"), dead_letter_policy=pubsub_v1.types.DeadLetterPolicy( dead_letter_topic=dlq_topic_path, max_delivery_attempts=5, ), retry_policy=pubsub_v1.types.RetryPolicy( minimum_backoff=Duration(seconds=10), maximum_backoff=Duration(seconds=600), ), ) subscriber.create_subscription(request=subscription)

完整的DLQ搭建及监控请参考 [references/detailed-guide.md](./references/detailed-guide.md)。

Step 6: Implement Idempotency

步骤6:实现幂等性

Track processed messages to avoid duplicate processing:
python
class IdempotentProcessor:
    def __init__(self):
        self.processed_ids = set()

    def process(self, message):
        msg_id = message.message_id

        if msg_id in self.processed_ids:
            print(f"Already processed: {msg_id}")
            message.ack()
            return

        try:
            # Process message
            print(f"Processing: {message.data.decode()}")
            self.processed_ids.add(msg_id)
            message.ack()
        except Exception as e:
            print(f"Failed: {e}")
            message.nack()
See references/detailed-guide.md for production-ready idempotency patterns.
跟踪已处理消息以避免重复处理:
python
class IdempotentProcessor:
    def __init__(self):
        self.processed_ids = set()

    def process(self, message):
        msg_id = message.message_id

        if msg_id in self.processed_ids:
            print(f"已处理过: {msg_id}")
            message.ack()
            return

        try:
            # 处理消息
            print(f"处理中: {message.data.decode()}")
            self.processed_ids.add(msg_id)
            message.ack()
        except Exception as e:
            print(f"处理失败: {e}")
            message.nack()
生产级幂等性实现模式请参考 references/detailed-guide.md

Step 7: Local Development with Emulator

步骤7:基于模拟器的本地开发

bash
undefined
bash
undefined

Install and start emulator

安装并启动模拟器

gcloud components install pubsub-emulator gcloud beta emulators pubsub start
gcloud components install pubsub-emulator gcloud beta emulators pubsub start

In another terminal

在另一个终端中执行

export PUBSUB_EMULATOR_HOST=localhost:8085 python your_script.py # Uses emulator automatically

See [references/detailed-guide.md](./references/detailed-guide.md) for emulator configuration patterns.
export PUBSUB_EMULATOR_HOST=localhost:8085 python your_script.py # 自动使用模拟器

模拟器配置模式请参考 [references/detailed-guide.md](./references/detailed-guide.md)。

Step 8: Monitor Operations

步骤8:监控操作

Enable logging and track metrics:
python
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
启用日志并跟踪指标:
python
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

Query subscription stats

查询订阅统计信息

subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path("my-project", "my-subscription") subscription = subscriber.get_subscription(request={"subscription": subscription_path})
print(f"Topic: {subscription.topic}") print(f"Ack deadline: {subscription.ack_deadline_seconds}s")

See [references/detailed-guide.md](./references/detailed-guide.md) for comprehensive monitoring patterns.
subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path("my-project", "my-subscription") subscription = subscriber.get_subscription(request={"subscription": subscription_path})
print(f"主题: {subscription.topic}") print(f"确认超时: {subscription.ack_deadline_seconds}秒")

全面的监控模式请参考 [references/detailed-guide.md](./references/detailed-guide.md)。

Requirements

环境要求

  • Python: 3.7+
  • Dependencies:
    bash
    pip install google-cloud-pubsub>=2.18.0
  • GCP Project: Active project with Pub/Sub API enabled
  • Authentication: Application Default Credentials or service account key
  • IAM Permissions:
    • roles/pubsub.publisher
      - Publish messages
    • roles/pubsub.subscriber
      - Subscribe to messages
    • roles/pubsub.admin
      - Create/delete topics and subscriptions
  • For Local Development:
    bash
    gcloud components install pubsub-emulator
  • Python版本: 3.7+
  • 依赖包:
    bash
    pip install google-cloud-pubsub>=2.18.0
  • GCP项目: 已启用Pub/Sub API的活跃项目
  • 认证方式: 应用默认凭据或服务账号密钥
  • IAM权限:
    • roles/pubsub.publisher
      - 发布消息权限
    • roles/pubsub.subscriber
      - 订阅消息权限
    • roles/pubsub.admin
      - 创建/删除主题和订阅权限
  • 本地开发额外要求:
    bash
    gcloud components install pubsub-emulator

See Also

相关链接

  • references/detailed-guide.md - Comprehensive implementation guide with production patterns
  • examples/examples.md - Working code examples including async patterns and testing
  • scripts/setup_emulator.sh - Emulator setup utility
  • scripts/pubsub_utils.py - Helper utilities for common operations
  • references/detailed-guide.md - 包含生产级实现模式的完整指南
  • examples/examples.md - 包含异步模式和测试的可运行代码示例
  • scripts/setup_emulator.sh - 模拟器搭建工具脚本
  • scripts/pubsub_utils.py - 通用操作辅助工具类