microservices-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Microservices Patterns

微服务模式

Master microservices architecture patterns including service boundaries, inter-service communication, data management, and resilience patterns for building distributed systems.
掌握用于构建分布式系统的微服务架构模式,包括服务边界、服务间通信、数据管理和弹性模式。

When to Use This Skill

何时使用该技能

  • Decomposing monoliths into microservices
  • Designing service boundaries and contracts
  • Implementing inter-service communication
  • Managing distributed data and transactions
  • Building resilient distributed systems
  • Implementing service discovery and load balancing
  • Designing event-driven architectures
  • 将单体应用拆分为微服务
  • 设计服务边界和契约
  • 实现服务间通信
  • 管理分布式数据与事务
  • 构建高弹性分布式系统
  • 实现服务发现与负载均衡
  • 设计事件驱动架构

Core Concepts

核心概念

1. Service Decomposition Strategies

1. 服务拆分策略

By Business Capability
  • Organize services around business functions
  • Each service owns its domain
  • Example: OrderService, PaymentService, InventoryService
By Subdomain (DDD)
  • Core domain, supporting subdomains
  • Bounded contexts map to services
  • Clear ownership and responsibility
Strangler Fig Pattern
  • Gradually extract from monolith
  • New functionality as microservices
  • Proxy routes to old/new systems
按业务能力拆分
  • 围绕业务功能组织服务
  • 每个服务独立负责其领域
  • 示例:OrderService、PaymentService、InventoryService
按子领域(DDD)拆分
  • 核心领域、支撑子领域
  • 限界上下文映射到对应服务
  • 明确的所有权与职责划分
绞杀者模式(Strangler Fig Pattern)
  • 逐步从单体应用中提取功能
  • 新功能以微服务形式实现
  • 通过代理路由请求到新旧系统

2. Communication Patterns

2. 通信模式

Synchronous (Request/Response)
  • REST APIs
  • gRPC
  • GraphQL
Asynchronous (Events/Messages)
  • Event streaming (Kafka)
  • Message queues (RabbitMQ, SQS)
  • Pub/Sub patterns
同步(请求/响应)
  • REST APIs
  • gRPC
  • GraphQL
异步(事件/消息)
  • 事件流(Kafka)
  • 消息队列(RabbitMQ、SQS)
  • 发布/订阅模式

3. Data Management

3. 数据管理

Database Per Service
  • Each service owns its data
  • No shared databases
  • Loose coupling
Saga Pattern
  • Distributed transactions
  • Compensating actions
  • Eventual consistency
服务专属数据库(Database Per Service)
  • 每个服务独立管理自身数据
  • 不共享数据库
  • 实现松耦合
Saga模式
  • 分布式事务处理
  • 补偿操作机制
  • 最终一致性

4. Resilience Patterns

4. 弹性模式

Circuit Breaker
  • Fail fast on repeated errors
  • Prevent cascade failures
Retry with Backoff
  • Transient fault handling
  • Exponential backoff
Bulkhead
  • Isolate resources
  • Limit impact of failures
断路器模式(Circuit Breaker)
  • 重复错误时快速失败
  • 防止级联故障
退避重试(Retry with Backoff)
  • 临时故障处理
  • 指数退避策略
舱壁模式(Bulkhead)
  • 资源隔离
  • 限制故障影响范围

Service Decomposition Patterns

服务拆分模式

Pattern 1: By Business Capability

模式1:按业务能力拆分

python
undefined
python
undefined

E-commerce example

E-commerce example

Order Service

Order Service

class OrderService: """Handles order lifecycle."""
async def create_order(self, order_data: dict) -> Order:
    order = Order.create(order_data)

    # Publish event for other services
    await self.event_bus.publish(
        OrderCreatedEvent(
            order_id=order.id,
            customer_id=order.customer_id,
            items=order.items,
            total=order.total
        )
    )

    return order
class OrderService: """Handles order lifecycle."""
async def create_order(self, order_data: dict) -> Order:
    order = Order.create(order_data)

    # Publish event for other services
    await self.event_bus.publish(
        OrderCreatedEvent(
            order_id=order.id,
            customer_id=order.customer_id,
            items=order.items,
            total=order.total
        )
    )

    return order

Payment Service (separate service)

Payment Service (separate service)

class PaymentService: """Handles payment processing."""
async def process_payment(self, payment_request: PaymentRequest) -> PaymentResult:
    # Process payment
    result = await self.payment_gateway.charge(
        amount=payment_request.amount,
        customer=payment_request.customer_id
    )

    if result.success:
        await self.event_bus.publish(
            PaymentCompletedEvent(
                order_id=payment_request.order_id,
                transaction_id=result.transaction_id
            )
        )

    return result
class PaymentService: """Handles payment processing."""
async def process_payment(self, payment_request: PaymentRequest) -> PaymentResult:
    # Process payment
    result = await self.payment_gateway.charge(
        amount=payment_request.amount,
        customer=payment_request.customer_id
    )

    if result.success:
        await self.event_bus.publish(
            PaymentCompletedEvent(
                order_id=payment_request.order_id,
                transaction_id=result.transaction_id
            )
        )

    return result

Inventory Service (separate service)

Inventory Service (separate service)

class InventoryService: """Handles inventory management."""
async def reserve_items(self, order_id: str, items: List[OrderItem]) -> ReservationResult:
    # Check availability
    for item in items:
        available = await self.inventory_repo.get_available(item.product_id)
        if available < item.quantity:
            return ReservationResult(
                success=False,
                error=f"Insufficient inventory for {item.product_id}"
            )

    # Reserve items
    reservation = await self.create_reservation(order_id, items)

    await self.event_bus.publish(
        InventoryReservedEvent(
            order_id=order_id,
            reservation_id=reservation.id
        )
    )

    return ReservationResult(success=True, reservation=reservation)
undefined
class InventoryService: """Handles inventory management."""
async def reserve_items(self, order_id: str, items: List[OrderItem]) -> ReservationResult:
    # Check availability
    for item in items:
        available = await self.inventory_repo.get_available(item.product_id)
        if available < item.quantity:
            return ReservationResult(
                success=False,
                error=f"Insufficient inventory for {item.product_id}"
            )

    # Reserve items
    reservation = await self.create_reservation(order_id, items)

    await self.event_bus.publish(
        InventoryReservedEvent(
            order_id=order_id,
            reservation_id=reservation.id
        )
    )

    return ReservationResult(success=True, reservation=reservation)
undefined

Pattern 2: API Gateway

模式2:API网关

python
from fastapi import FastAPI, HTTPException, Depends
import httpx
from circuitbreaker import circuit

app = FastAPI()

class APIGateway:
    """Central entry point for all client requests."""

    def __init__(self):
        self.order_service_url = "http://order-service:8000"
        self.payment_service_url = "http://payment-service:8001"
        self.inventory_service_url = "http://inventory-service:8002"
        self.http_client = httpx.AsyncClient(timeout=5.0)

    @circuit(failure_threshold=5, recovery_timeout=30)
    async def call_order_service(self, path: str, method: str = "GET", **kwargs):
        """Call order service with circuit breaker."""
        response = await self.http_client.request(
            method,
            f"{self.order_service_url}{path}",
            **kwargs
        )
        response.raise_for_status()
        return response.json()

    async def create_order_aggregate(self, order_id: str) -> dict:
        """Aggregate data from multiple services."""
        # Parallel requests
        order, payment, inventory = await asyncio.gather(
            self.call_order_service(f"/orders/{order_id}"),
            self.call_payment_service(f"/payments/order/{order_id}"),
            self.call_inventory_service(f"/reservations/order/{order_id}"),
            return_exceptions=True
        )

        # Handle partial failures
        result = {"order": order}
        if not isinstance(payment, Exception):
            result["payment"] = payment
        if not isinstance(inventory, Exception):
            result["inventory"] = inventory

        return result

@app.post("/api/orders")
async def create_order(
    order_data: dict,
    gateway: APIGateway = Depends()
):
    """API Gateway endpoint."""
    try:
        # Route to order service
        order = await gateway.call_order_service(
            "/orders",
            method="POST",
            json=order_data
        )
        return {"order": order}
    except httpx.HTTPError as e:
        raise HTTPException(status_code=503, detail="Order service unavailable")
python
from fastapi import FastAPI, HTTPException, Depends
import httpx
from circuitbreaker import circuit

app = FastAPI()

class APIGateway:
    """Central entry point for all client requests."""

    def __init__(self):
        self.order_service_url = "http://order-service:8000"
        self.payment_service_url = "http://payment-service:8001"
        self.inventory_service_url = "http://inventory-service:8002"
        self.http_client = httpx.AsyncClient(timeout=5.0)

    @circuit(failure_threshold=5, recovery_timeout=30)
    async def call_order_service(self, path: str, method: str = "GET", **kwargs):
        """Call order service with circuit breaker."""
        response = await self.http_client.request(
            method,
            f"{self.order_service_url}{path}",
            **kwargs
        )
        response.raise_for_status()
        return response.json()

    async def create_order_aggregate(self, order_id: str) -> dict:
        """Aggregate data from multiple services."""
        # Parallel requests
        order, payment, inventory = await asyncio.gather(
            self.call_order_service(f"/orders/{order_id}"),
            self.call_payment_service(f"/payments/order/{order_id}"),
            self.call_inventory_service(f"/reservations/order/{order_id}"),
            return_exceptions=True
        )

        # Handle partial failures
        result = {"order": order}
        if not isinstance(payment, Exception):
            result["payment"] = payment
        if not isinstance(inventory, Exception):
            result["inventory"] = inventory

        return result

@app.post("/api/orders")
async def create_order(
    order_data: dict,
    gateway: APIGateway = Depends()
):
    """API Gateway endpoint."""
    try:
        # Route to order service
        order = await gateway.call_order_service(
            "/orders",
            method="POST",
            json=order_data
        )
        return {"order": order}
    except httpx.HTTPError as e:
        raise HTTPException(status_code=503, detail="Order service unavailable")

Communication Patterns

通信模式

Pattern 1: Synchronous REST Communication

模式1:同步REST通信

python
undefined
python
undefined

Service A calls Service B

Service A calls Service B

import httpx from tenacity import retry, stop_after_attempt, wait_exponential
class ServiceClient: """HTTP client with retries and timeout."""
def __init__(self, base_url: str):
    self.base_url = base_url
    self.client = httpx.AsyncClient(
        timeout=httpx.Timeout(5.0, connect=2.0),
        limits=httpx.Limits(max_keepalive_connections=20)
    )

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def get(self, path: str, **kwargs):
    """GET with automatic retries."""
    response = await self.client.get(f"{self.base_url}{path}", **kwargs)
    response.raise_for_status()
    return response.json()

async def post(self, path: str, **kwargs):
    """POST request."""
    response = await self.client.post(f"{self.base_url}{path}", **kwargs)
    response.raise_for_status()
    return response.json()
import httpx from tenacity import retry, stop_after_attempt, wait_exponential
class ServiceClient: """HTTP client with retries and timeout."""
def __init__(self, base_url: str):
    self.base_url = base_url
    self.client = httpx.AsyncClient(
        timeout=httpx.Timeout(5.0, connect=2.0),
        limits=httpx.Limits(max_keepalive_connections=20)
    )

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def get(self, path: str, **kwargs):
    """GET with automatic retries."""
    response = await self.client.get(f"{self.base_url}{path}", **kwargs)
    response.raise_for_status()
    return response.json()

async def post(self, path: str, **kwargs):
    """POST request."""
    response = await self.client.post(f"{self.base_url}{path}", **kwargs)
    response.raise_for_status()
    return response.json()

Usage

Usage

payment_client = ServiceClient("http://payment-service:8001") result = await payment_client.post("/payments", json=payment_data)
undefined
payment_client = ServiceClient("http://payment-service:8001") result = await payment_client.post("/payments", json=payment_data)
undefined

Pattern 2: Asynchronous Event-Driven

模式2:异步事件驱动通信

python
undefined
python
undefined

Event-driven communication with Kafka

Event-driven communication with Kafka

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer import json from dataclasses import dataclass, asdict from datetime import datetime
@dataclass class DomainEvent: event_id: str event_type: str aggregate_id: str occurred_at: datetime data: dict
class EventBus: """Event publishing and subscription."""
def __init__(self, bootstrap_servers: List[str]):
    self.bootstrap_servers = bootstrap_servers
    self.producer = None

async def start(self):
    self.producer = AIOKafkaProducer(
        bootstrap_servers=self.bootstrap_servers,
        value_serializer=lambda v: json.dumps(v).encode()
    )
    await self.producer.start()

async def publish(self, event: DomainEvent):
    """Publish event to Kafka topic."""
    topic = event.event_type
    await self.producer.send_and_wait(
        topic,
        value=asdict(event),
        key=event.aggregate_id.encode()
    )

async def subscribe(self, topic: str, handler: callable):
    """Subscribe to events."""
    consumer = AIOKafkaConsumer(
        topic,
        bootstrap_servers=self.bootstrap_servers,
        value_deserializer=lambda v: json.loads(v.decode()),
        group_id="my-service"
    )
    await consumer.start()

    try:
        async for message in consumer:
            event_data = message.value
            await handler(event_data)
    finally:
        await consumer.stop()
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer import json from dataclasses import dataclass, asdict from datetime import datetime
@dataclass class DomainEvent: event_id: str event_type: str aggregate_id: str occurred_at: datetime data: dict
class EventBus: """Event publishing and subscription."""
def __init__(self, bootstrap_servers: List[str]):
    self.bootstrap_servers = bootstrap_servers
    self.producer = None

async def start(self):
    self.producer = AIOKafkaProducer(
        bootstrap_servers=self.bootstrap_servers,
        value_serializer=lambda v: json.dumps(v).encode()
    )
    await self.producer.start()

async def publish(self, event: DomainEvent):
    """Publish event to Kafka topic."""
    topic = event.event_type
    await self.producer.send_and_wait(
        topic,
        value=asdict(event),
        key=event.aggregate_id.encode()
    )

async def subscribe(self, topic: str, handler: callable):
    """Subscribe to events."""
    consumer = AIOKafkaConsumer(
        topic,
        bootstrap_servers=self.bootstrap_servers,
        value_deserializer=lambda v: json.loads(v.decode()),
        group_id="my-service"
    )
    await consumer.start()

    try:
        async for message in consumer:
            event_data = message.value
            await handler(event_data)
    finally:
        await consumer.stop()

Order Service publishes event

Order Service publishes event

async def create_order(order_data: dict): order = await save_order(order_data)
event = DomainEvent(
    event_id=str(uuid.uuid4()),
    event_type="OrderCreated",
    aggregate_id=order.id,
    occurred_at=datetime.now(),
    data={
        "order_id": order.id,
        "customer_id": order.customer_id,
        "total": order.total
    }
)

await event_bus.publish(event)
async def create_order(order_data: dict): order = await save_order(order_data)
event = DomainEvent(
    event_id=str(uuid.uuid4()),
    event_type="OrderCreated",
    aggregate_id=order.id,
    occurred_at=datetime.now(),
    data={
        "order_id": order.id,
        "customer_id": order.customer_id,
        "total": order.total
    }
)

await event_bus.publish(event)

Inventory Service listens for OrderCreated

Inventory Service listens for OrderCreated

async def handle_order_created(event_data: dict): """React to order creation.""" order_id = event_data["data"]["order_id"] items = event_data["data"]["items"]
# Reserve inventory
await reserve_inventory(order_id, items)
undefined
async def handle_order_created(event_data: dict): """React to order creation.""" order_id = event_data["data"]["order_id"] items = event_data["data"]["items"]
# Reserve inventory
await reserve_inventory(order_id, items)
undefined

Pattern 3: Saga Pattern (Distributed Transactions)

模式3:Saga模式(分布式事务)

python
undefined
python
undefined

Saga orchestration for order fulfillment

Saga orchestration for order fulfillment

from enum import Enum from typing import List, Callable
class SagaStep: """Single step in saga."""
def __init__(
    self,
    name: str,
    action: Callable,
    compensation: Callable
):
    self.name = name
    self.action = action
    self.compensation = compensation
class SagaStatus(Enum): PENDING = "pending" COMPLETED = "completed" COMPENSATING = "compensating" FAILED = "failed"
class OrderFulfillmentSaga: """Orchestrated saga for order fulfillment."""
def __init__(self):
    self.steps: List[SagaStep] = [
        SagaStep(
            "create_order",
            action=self.create_order,
            compensation=self.cancel_order
        ),
        SagaStep(
            "reserve_inventory",
            action=self.reserve_inventory,
            compensation=self.release_inventory
        ),
        SagaStep(
            "process_payment",
            action=self.process_payment,
            compensation=self.refund_payment
        ),
        SagaStep(
            "confirm_order",
            action=self.confirm_order,
            compensation=self.cancel_order_confirmation
        )
    ]

async def execute(self, order_data: dict) -> SagaResult:
    """Execute saga steps."""
    completed_steps = []
    context = {"order_data": order_data}

    try:
        for step in self.steps:
            # Execute step
            result = await step.action(context)
            if not result.success:
                # Compensate
                await self.compensate(completed_steps, context)
                return SagaResult(
                    status=SagaStatus.FAILED,
                    error=result.error
                )

            completed_steps.append(step)
            context.update(result.data)

        return SagaResult(status=SagaStatus.COMPLETED, data=context)

    except Exception as e:
        # Compensate on error
        await self.compensate(completed_steps, context)
        return SagaResult(status=SagaStatus.FAILED, error=str(e))

async def compensate(self, completed_steps: List[SagaStep], context: dict):
    """Execute compensating actions in reverse order."""
    for step in reversed(completed_steps):
        try:
            await step.compensation(context)
        except Exception as e:
            # Log compensation failure
            print(f"Compensation failed for {step.name}: {e}")

# Step implementations
async def create_order(self, context: dict) -> StepResult:
    order = await order_service.create(context["order_data"])
    return StepResult(success=True, data={"order_id": order.id})

async def cancel_order(self, context: dict):
    await order_service.cancel(context["order_id"])

async def reserve_inventory(self, context: dict) -> StepResult:
    result = await inventory_service.reserve(
        context["order_id"],
        context["order_data"]["items"]
    )
    return StepResult(
        success=result.success,
        data={"reservation_id": result.reservation_id}
    )

async def release_inventory(self, context: dict):
    await inventory_service.release(context["reservation_id"])

async def process_payment(self, context: dict) -> StepResult:
    result = await payment_service.charge(
        context["order_id"],
        context["order_data"]["total"]
    )
    return StepResult(
        success=result.success,
        data={"transaction_id": result.transaction_id},
        error=result.error
    )

async def refund_payment(self, context: dict):
    await payment_service.refund(context["transaction_id"])
undefined
from enum import Enum from typing import List, Callable
class SagaStep: """Single step in saga."""
def __init__(
    self,
    name: str,
    action: Callable,
    compensation: Callable
):
    self.name = name
    self.action = action
    self.compensation = compensation
class SagaStatus(Enum): PENDING = "pending" COMPLETED = "completed" COMPENSATING = "compensating" FAILED = "failed"
class OrderFulfillmentSaga: """Orchestrated saga for order fulfillment."""
def __init__(self):
    self.steps: List[SagaStep] = [
        SagaStep(
            "create_order",
            action=self.create_order,
            compensation=self.cancel_order
        ),
        SagaStep(
            "reserve_inventory",
            action=self.reserve_inventory,
            compensation=self.release_inventory
        ),
        SagaStep(
            "process_payment",
            action=self.process_payment,
            compensation=self.refund_payment
        ),
        SagaStep(
            "confirm_order",
            action=self.confirm_order,
            compensation=self.cancel_order_confirmation
        )
    ]

async def execute(self, order_data: dict) -> SagaResult:
    """Execute saga steps."""
    completed_steps = []
    context = {"order_data": order_data}

    try:
        for step in self.steps:
            # Execute step
            result = await step.action(context)
            if not result.success:
                # Compensate
                await self.compensate(completed_steps, context)
                return SagaResult(
                    status=SagaStatus.FAILED,
                    error=result.error
                )

            completed_steps.append(step)
            context.update(result.data)

        return SagaResult(status=SagaStatus.COMPLETED, data=context)

    except Exception as e:
        # Compensate on error
        await self.compensate(completed_steps, context)
        return SagaResult(status=SagaStatus.FAILED, error=str(e))

async def compensate(self, completed_steps: List[SagaStep], context: dict):
    """Execute compensating actions in reverse order."""
    for step in reversed(completed_steps):
        try:
            await step.compensation(context)
        except Exception as e:
            # Log compensation failure
            print(f"Compensation failed for {step.name}: {e}")

# Step implementations
async def create_order(self, context: dict) -> StepResult:
    order = await order_service.create(context["order_data"])
    return StepResult(success=True, data={"order_id": order.id})

async def cancel_order(self, context: dict):
    await order_service.cancel(context["order_id"])

async def reserve_inventory(self, context: dict) -> StepResult:
    result = await inventory_service.reserve(
        context["order_id"],
        context["order_data"]["items"]
    )
    return StepResult(
        success=result.success,
        data={"reservation_id": result.reservation_id}
    )

async def release_inventory(self, context: dict):
    await inventory_service.release(context["reservation_id"])

async def process_payment(self, context: dict) -> StepResult:
    result = await payment_service.charge(
        context["order_id"],
        context["order_data"]["total"]
    )
    return StepResult(
        success=result.success,
        data={"transaction_id": result.transaction_id},
        error=result.error
    )

async def refund_payment(self, context: dict):
    await payment_service.refund(context["transaction_id"])
undefined

Resilience Patterns

弹性模式

Circuit Breaker Pattern

断路器模式

python
from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    """Circuit breaker for service calls."""

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold

        self.failure_count = 0
        self.success_count = 0
        self.state = CircuitState.CLOSED
        self.opened_at = None

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with circuit breaker."""

        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpenError("Circuit breaker is open")

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result

        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        """Handle successful call."""
        self.failure_count = 0

        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.success_count = 0

    def _on_failure(self):
        """Handle failed call."""
        self.failure_count += 1

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()

        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()

    def _should_attempt_reset(self) -> bool:
        """Check if enough time passed to try again."""
        return (
            datetime.now() - self.opened_at
            > timedelta(seconds=self.recovery_timeout)
        )
python
from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    """Circuit breaker for service calls."""

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold

        self.failure_count = 0
        self.success_count = 0
        self.state = CircuitState.CLOSED
        self.opened_at = None

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with circuit breaker."""

        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpenError("Circuit breaker is open")

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result

        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        """Handle successful call."""
        self.failure_count = 0

        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.success_count = 0

    def _on_failure(self):
        """Handle failed call."""
        self.failure_count += 1

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()

        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()

    def _should_attempt_reset(self) -> bool:
        """Check if enough time passed to try again."""
        return (
            datetime.now() - self.opened_at
            > timedelta(seconds=self.recovery_timeout)
        )

Usage

Usage

breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
async def call_payment_service(payment_data: dict): return await breaker.call( payment_client.process_payment, payment_data )
undefined
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
async def call_payment_service(payment_data: dict): return await breaker.call( payment_client.process_payment, payment_data )
undefined

Resources

资源

  • references/service-decomposition-guide.md: Breaking down monoliths
  • references/communication-patterns.md: Sync vs async patterns
  • references/saga-implementation.md: Distributed transactions
  • assets/circuit-breaker.py: Production circuit breaker
  • assets/event-bus-template.py: Kafka event bus implementation
  • assets/api-gateway-template.py: Complete API gateway
  • references/service-decomposition-guide.md: 单体应用拆分指南
  • references/communication-patterns.md: 同步与异步模式对比
  • references/saga-implementation.md: 分布式事务实现
  • assets/circuit-breaker.py: 生产环境断路器实现
  • assets/event-bus-template.py: Kafka事件总线实现模板
  • assets/api-gateway-template.py: 完整API网关模板

Best Practices

最佳实践

  1. Service Boundaries: Align with business capabilities
  2. Database Per Service: No shared databases
  3. API Contracts: Versioned, backward compatible
  4. Async When Possible: Events over direct calls
  5. Circuit Breakers: Fail fast on service failures
  6. Distributed Tracing: Track requests across services
  7. Service Registry: Dynamic service discovery
  8. Health Checks: Liveness and readiness probes
  1. 服务边界:与业务能力对齐
  2. 服务专属数据库:不共享数据库
  3. API契约:版本化、向后兼容
  4. 优先异步:采用事件而非直接调用
  5. 断路器:服务故障时快速失败
  6. 分布式追踪:跨服务追踪请求
  7. 服务注册中心:动态服务发现
  8. 健康检查:存活与就绪探针

Common Pitfalls

常见误区

  • Distributed Monolith: Tightly coupled services
  • Chatty Services: Too many inter-service calls
  • Shared Databases: Tight coupling through data
  • No Circuit Breakers: Cascade failures
  • Synchronous Everything: Tight coupling, poor resilience
  • Premature Microservices: Starting with microservices
  • Ignoring Network Failures: Assuming reliable network
  • No Compensation Logic: Can't undo failed transactions
  • 分布式单体:服务间紧耦合
  • 服务通信频繁:过多的服务间调用
  • 共享数据库:通过数据实现紧耦合
  • 未使用断路器:引发级联故障
  • 全同步通信:紧耦合、低弹性
  • 过早微服务化:初始阶段就采用微服务
  • 忽略网络故障:假设网络绝对可靠
  • 无补偿逻辑:无法回滚失败的事务