microservices-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMicroservices Patterns
微服务模式
Master microservices architecture patterns including service boundaries, inter-service communication, data management, and resilience patterns for building distributed systems.
掌握用于构建分布式系统的微服务架构模式,包括服务边界、服务间通信、数据管理和弹性模式。
When to Use This Skill
何时使用该技能
- Decomposing monoliths into microservices
- Designing service boundaries and contracts
- Implementing inter-service communication
- Managing distributed data and transactions
- Building resilient distributed systems
- Implementing service discovery and load balancing
- Designing event-driven architectures
- 将单体应用拆分为微服务
- 设计服务边界和契约
- 实现服务间通信
- 管理分布式数据与事务
- 构建高弹性分布式系统
- 实现服务发现与负载均衡
- 设计事件驱动架构
Core Concepts
核心概念
1. Service Decomposition Strategies
1. 服务拆分策略
By Business Capability
- Organize services around business functions
- Each service owns its domain
- Example: OrderService, PaymentService, InventoryService
By Subdomain (DDD)
- Core domain, supporting subdomains
- Bounded contexts map to services
- Clear ownership and responsibility
Strangler Fig Pattern
- Gradually extract from monolith
- New functionality as microservices
- Proxy routes to old/new systems
按业务能力拆分
- 围绕业务功能组织服务
- 每个服务独立负责其领域
- 示例:OrderService、PaymentService、InventoryService
按子领域(DDD)拆分
- 核心领域、支撑子领域
- 限界上下文映射到对应服务
- 明确的所有权与职责划分
绞杀者模式(Strangler Fig Pattern)
- 逐步从单体应用中提取功能
- 新功能以微服务形式实现
- 通过代理路由请求到新旧系统
2. Communication Patterns
2. 通信模式
Synchronous (Request/Response)
- REST APIs
- gRPC
- GraphQL
Asynchronous (Events/Messages)
- Event streaming (Kafka)
- Message queues (RabbitMQ, SQS)
- Pub/Sub patterns
同步(请求/响应)
- REST APIs
- gRPC
- GraphQL
异步(事件/消息)
- 事件流(Kafka)
- 消息队列(RabbitMQ、SQS)
- 发布/订阅模式
3. Data Management
3. 数据管理
Database Per Service
- Each service owns its data
- No shared databases
- Loose coupling
Saga Pattern
- Distributed transactions
- Compensating actions
- Eventual consistency
服务专属数据库(Database Per Service)
- 每个服务独立管理自身数据
- 不共享数据库
- 实现松耦合
Saga模式
- 分布式事务处理
- 补偿操作机制
- 最终一致性
4. Resilience Patterns
4. 弹性模式
Circuit Breaker
- Fail fast on repeated errors
- Prevent cascade failures
Retry with Backoff
- Transient fault handling
- Exponential backoff
Bulkhead
- Isolate resources
- Limit impact of failures
断路器模式(Circuit Breaker)
- 重复错误时快速失败
- 防止级联故障
退避重试(Retry with Backoff)
- 临时故障处理
- 指数退避策略
舱壁模式(Bulkhead)
- 资源隔离
- 限制故障影响范围
Service Decomposition Patterns
服务拆分模式
Pattern 1: By Business Capability
模式1:按业务能力拆分
python
undefinedpython
undefinedE-commerce example
E-commerce example
Order Service
Order Service
class OrderService:
"""Handles order lifecycle."""
async def create_order(self, order_data: dict) -> Order:
order = Order.create(order_data)
# Publish event for other services
await self.event_bus.publish(
OrderCreatedEvent(
order_id=order.id,
customer_id=order.customer_id,
items=order.items,
total=order.total
)
)
return orderclass OrderService:
"""Handles order lifecycle."""
async def create_order(self, order_data: dict) -> Order:
order = Order.create(order_data)
# Publish event for other services
await self.event_bus.publish(
OrderCreatedEvent(
order_id=order.id,
customer_id=order.customer_id,
items=order.items,
total=order.total
)
)
return orderPayment Service (separate service)
Payment Service (separate service)
class PaymentService:
"""Handles payment processing."""
async def process_payment(self, payment_request: PaymentRequest) -> PaymentResult:
# Process payment
result = await self.payment_gateway.charge(
amount=payment_request.amount,
customer=payment_request.customer_id
)
if result.success:
await self.event_bus.publish(
PaymentCompletedEvent(
order_id=payment_request.order_id,
transaction_id=result.transaction_id
)
)
return resultclass PaymentService:
"""Handles payment processing."""
async def process_payment(self, payment_request: PaymentRequest) -> PaymentResult:
# Process payment
result = await self.payment_gateway.charge(
amount=payment_request.amount,
customer=payment_request.customer_id
)
if result.success:
await self.event_bus.publish(
PaymentCompletedEvent(
order_id=payment_request.order_id,
transaction_id=result.transaction_id
)
)
return resultInventory Service (separate service)
Inventory Service (separate service)
class InventoryService:
"""Handles inventory management."""
async def reserve_items(self, order_id: str, items: List[OrderItem]) -> ReservationResult:
# Check availability
for item in items:
available = await self.inventory_repo.get_available(item.product_id)
if available < item.quantity:
return ReservationResult(
success=False,
error=f"Insufficient inventory for {item.product_id}"
)
# Reserve items
reservation = await self.create_reservation(order_id, items)
await self.event_bus.publish(
InventoryReservedEvent(
order_id=order_id,
reservation_id=reservation.id
)
)
return ReservationResult(success=True, reservation=reservation)undefinedclass InventoryService:
"""Handles inventory management."""
async def reserve_items(self, order_id: str, items: List[OrderItem]) -> ReservationResult:
# Check availability
for item in items:
available = await self.inventory_repo.get_available(item.product_id)
if available < item.quantity:
return ReservationResult(
success=False,
error=f"Insufficient inventory for {item.product_id}"
)
# Reserve items
reservation = await self.create_reservation(order_id, items)
await self.event_bus.publish(
InventoryReservedEvent(
order_id=order_id,
reservation_id=reservation.id
)
)
return ReservationResult(success=True, reservation=reservation)undefinedPattern 2: API Gateway
模式2:API网关
python
from fastapi import FastAPI, HTTPException, Depends
import httpx
from circuitbreaker import circuit
app = FastAPI()
class APIGateway:
"""Central entry point for all client requests."""
def __init__(self):
self.order_service_url = "http://order-service:8000"
self.payment_service_url = "http://payment-service:8001"
self.inventory_service_url = "http://inventory-service:8002"
self.http_client = httpx.AsyncClient(timeout=5.0)
@circuit(failure_threshold=5, recovery_timeout=30)
async def call_order_service(self, path: str, method: str = "GET", **kwargs):
"""Call order service with circuit breaker."""
response = await self.http_client.request(
method,
f"{self.order_service_url}{path}",
**kwargs
)
response.raise_for_status()
return response.json()
async def create_order_aggregate(self, order_id: str) -> dict:
"""Aggregate data from multiple services."""
# Parallel requests
order, payment, inventory = await asyncio.gather(
self.call_order_service(f"/orders/{order_id}"),
self.call_payment_service(f"/payments/order/{order_id}"),
self.call_inventory_service(f"/reservations/order/{order_id}"),
return_exceptions=True
)
# Handle partial failures
result = {"order": order}
if not isinstance(payment, Exception):
result["payment"] = payment
if not isinstance(inventory, Exception):
result["inventory"] = inventory
return result
@app.post("/api/orders")
async def create_order(
order_data: dict,
gateway: APIGateway = Depends()
):
"""API Gateway endpoint."""
try:
# Route to order service
order = await gateway.call_order_service(
"/orders",
method="POST",
json=order_data
)
return {"order": order}
except httpx.HTTPError as e:
raise HTTPException(status_code=503, detail="Order service unavailable")python
from fastapi import FastAPI, HTTPException, Depends
import httpx
from circuitbreaker import circuit
app = FastAPI()
class APIGateway:
"""Central entry point for all client requests."""
def __init__(self):
self.order_service_url = "http://order-service:8000"
self.payment_service_url = "http://payment-service:8001"
self.inventory_service_url = "http://inventory-service:8002"
self.http_client = httpx.AsyncClient(timeout=5.0)
@circuit(failure_threshold=5, recovery_timeout=30)
async def call_order_service(self, path: str, method: str = "GET", **kwargs):
"""Call order service with circuit breaker."""
response = await self.http_client.request(
method,
f"{self.order_service_url}{path}",
**kwargs
)
response.raise_for_status()
return response.json()
async def create_order_aggregate(self, order_id: str) -> dict:
"""Aggregate data from multiple services."""
# Parallel requests
order, payment, inventory = await asyncio.gather(
self.call_order_service(f"/orders/{order_id}"),
self.call_payment_service(f"/payments/order/{order_id}"),
self.call_inventory_service(f"/reservations/order/{order_id}"),
return_exceptions=True
)
# Handle partial failures
result = {"order": order}
if not isinstance(payment, Exception):
result["payment"] = payment
if not isinstance(inventory, Exception):
result["inventory"] = inventory
return result
@app.post("/api/orders")
async def create_order(
order_data: dict,
gateway: APIGateway = Depends()
):
"""API Gateway endpoint."""
try:
# Route to order service
order = await gateway.call_order_service(
"/orders",
method="POST",
json=order_data
)
return {"order": order}
except httpx.HTTPError as e:
raise HTTPException(status_code=503, detail="Order service unavailable")Communication Patterns
通信模式
Pattern 1: Synchronous REST Communication
模式1:同步REST通信
python
undefinedpython
undefinedService A calls Service B
Service A calls Service B
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
class ServiceClient:
"""HTTP client with retries and timeout."""
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(5.0, connect=2.0),
limits=httpx.Limits(max_keepalive_connections=20)
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def get(self, path: str, **kwargs):
"""GET with automatic retries."""
response = await self.client.get(f"{self.base_url}{path}", **kwargs)
response.raise_for_status()
return response.json()
async def post(self, path: str, **kwargs):
"""POST request."""
response = await self.client.post(f"{self.base_url}{path}", **kwargs)
response.raise_for_status()
return response.json()import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
class ServiceClient:
"""HTTP client with retries and timeout."""
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(5.0, connect=2.0),
limits=httpx.Limits(max_keepalive_connections=20)
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def get(self, path: str, **kwargs):
"""GET with automatic retries."""
response = await self.client.get(f"{self.base_url}{path}", **kwargs)
response.raise_for_status()
return response.json()
async def post(self, path: str, **kwargs):
"""POST request."""
response = await self.client.post(f"{self.base_url}{path}", **kwargs)
response.raise_for_status()
return response.json()Usage
Usage
payment_client = ServiceClient("http://payment-service:8001")
result = await payment_client.post("/payments", json=payment_data)
undefinedpayment_client = ServiceClient("http://payment-service:8001")
result = await payment_client.post("/payments", json=payment_data)
undefinedPattern 2: Asynchronous Event-Driven
模式2:异步事件驱动通信
python
undefinedpython
undefinedEvent-driven communication with Kafka
Event-driven communication with Kafka
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class DomainEvent:
event_id: str
event_type: str
aggregate_id: str
occurred_at: datetime
data: dict
class EventBus:
"""Event publishing and subscription."""
def __init__(self, bootstrap_servers: List[str]):
self.bootstrap_servers = bootstrap_servers
self.producer = None
async def start(self):
self.producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode()
)
await self.producer.start()
async def publish(self, event: DomainEvent):
"""Publish event to Kafka topic."""
topic = event.event_type
await self.producer.send_and_wait(
topic,
value=asdict(event),
key=event.aggregate_id.encode()
)
async def subscribe(self, topic: str, handler: callable):
"""Subscribe to events."""
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=self.bootstrap_servers,
value_deserializer=lambda v: json.loads(v.decode()),
group_id="my-service"
)
await consumer.start()
try:
async for message in consumer:
event_data = message.value
await handler(event_data)
finally:
await consumer.stop()from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class DomainEvent:
event_id: str
event_type: str
aggregate_id: str
occurred_at: datetime
data: dict
class EventBus:
"""Event publishing and subscription."""
def __init__(self, bootstrap_servers: List[str]):
self.bootstrap_servers = bootstrap_servers
self.producer = None
async def start(self):
self.producer = AIOKafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode()
)
await self.producer.start()
async def publish(self, event: DomainEvent):
"""Publish event to Kafka topic."""
topic = event.event_type
await self.producer.send_and_wait(
topic,
value=asdict(event),
key=event.aggregate_id.encode()
)
async def subscribe(self, topic: str, handler: callable):
"""Subscribe to events."""
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=self.bootstrap_servers,
value_deserializer=lambda v: json.loads(v.decode()),
group_id="my-service"
)
await consumer.start()
try:
async for message in consumer:
event_data = message.value
await handler(event_data)
finally:
await consumer.stop()Order Service publishes event
Order Service publishes event
async def create_order(order_data: dict):
order = await save_order(order_data)
event = DomainEvent(
event_id=str(uuid.uuid4()),
event_type="OrderCreated",
aggregate_id=order.id,
occurred_at=datetime.now(),
data={
"order_id": order.id,
"customer_id": order.customer_id,
"total": order.total
}
)
await event_bus.publish(event)async def create_order(order_data: dict):
order = await save_order(order_data)
event = DomainEvent(
event_id=str(uuid.uuid4()),
event_type="OrderCreated",
aggregate_id=order.id,
occurred_at=datetime.now(),
data={
"order_id": order.id,
"customer_id": order.customer_id,
"total": order.total
}
)
await event_bus.publish(event)Inventory Service listens for OrderCreated
Inventory Service listens for OrderCreated
async def handle_order_created(event_data: dict):
"""React to order creation."""
order_id = event_data["data"]["order_id"]
items = event_data["data"]["items"]
# Reserve inventory
await reserve_inventory(order_id, items)undefinedasync def handle_order_created(event_data: dict):
"""React to order creation."""
order_id = event_data["data"]["order_id"]
items = event_data["data"]["items"]
# Reserve inventory
await reserve_inventory(order_id, items)undefinedPattern 3: Saga Pattern (Distributed Transactions)
模式3:Saga模式(分布式事务)
python
undefinedpython
undefinedSaga orchestration for order fulfillment
Saga orchestration for order fulfillment
from enum import Enum
from typing import List, Callable
class SagaStep:
"""Single step in saga."""
def __init__(
self,
name: str,
action: Callable,
compensation: Callable
):
self.name = name
self.action = action
self.compensation = compensationclass SagaStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"
class OrderFulfillmentSaga:
"""Orchestrated saga for order fulfillment."""
def __init__(self):
self.steps: List[SagaStep] = [
SagaStep(
"create_order",
action=self.create_order,
compensation=self.cancel_order
),
SagaStep(
"reserve_inventory",
action=self.reserve_inventory,
compensation=self.release_inventory
),
SagaStep(
"process_payment",
action=self.process_payment,
compensation=self.refund_payment
),
SagaStep(
"confirm_order",
action=self.confirm_order,
compensation=self.cancel_order_confirmation
)
]
async def execute(self, order_data: dict) -> SagaResult:
"""Execute saga steps."""
completed_steps = []
context = {"order_data": order_data}
try:
for step in self.steps:
# Execute step
result = await step.action(context)
if not result.success:
# Compensate
await self.compensate(completed_steps, context)
return SagaResult(
status=SagaStatus.FAILED,
error=result.error
)
completed_steps.append(step)
context.update(result.data)
return SagaResult(status=SagaStatus.COMPLETED, data=context)
except Exception as e:
# Compensate on error
await self.compensate(completed_steps, context)
return SagaResult(status=SagaStatus.FAILED, error=str(e))
async def compensate(self, completed_steps: List[SagaStep], context: dict):
"""Execute compensating actions in reverse order."""
for step in reversed(completed_steps):
try:
await step.compensation(context)
except Exception as e:
# Log compensation failure
print(f"Compensation failed for {step.name}: {e}")
# Step implementations
async def create_order(self, context: dict) -> StepResult:
order = await order_service.create(context["order_data"])
return StepResult(success=True, data={"order_id": order.id})
async def cancel_order(self, context: dict):
await order_service.cancel(context["order_id"])
async def reserve_inventory(self, context: dict) -> StepResult:
result = await inventory_service.reserve(
context["order_id"],
context["order_data"]["items"]
)
return StepResult(
success=result.success,
data={"reservation_id": result.reservation_id}
)
async def release_inventory(self, context: dict):
await inventory_service.release(context["reservation_id"])
async def process_payment(self, context: dict) -> StepResult:
result = await payment_service.charge(
context["order_id"],
context["order_data"]["total"]
)
return StepResult(
success=result.success,
data={"transaction_id": result.transaction_id},
error=result.error
)
async def refund_payment(self, context: dict):
await payment_service.refund(context["transaction_id"])undefinedfrom enum import Enum
from typing import List, Callable
class SagaStep:
"""Single step in saga."""
def __init__(
self,
name: str,
action: Callable,
compensation: Callable
):
self.name = name
self.action = action
self.compensation = compensationclass SagaStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"
class OrderFulfillmentSaga:
"""Orchestrated saga for order fulfillment."""
def __init__(self):
self.steps: List[SagaStep] = [
SagaStep(
"create_order",
action=self.create_order,
compensation=self.cancel_order
),
SagaStep(
"reserve_inventory",
action=self.reserve_inventory,
compensation=self.release_inventory
),
SagaStep(
"process_payment",
action=self.process_payment,
compensation=self.refund_payment
),
SagaStep(
"confirm_order",
action=self.confirm_order,
compensation=self.cancel_order_confirmation
)
]
async def execute(self, order_data: dict) -> SagaResult:
"""Execute saga steps."""
completed_steps = []
context = {"order_data": order_data}
try:
for step in self.steps:
# Execute step
result = await step.action(context)
if not result.success:
# Compensate
await self.compensate(completed_steps, context)
return SagaResult(
status=SagaStatus.FAILED,
error=result.error
)
completed_steps.append(step)
context.update(result.data)
return SagaResult(status=SagaStatus.COMPLETED, data=context)
except Exception as e:
# Compensate on error
await self.compensate(completed_steps, context)
return SagaResult(status=SagaStatus.FAILED, error=str(e))
async def compensate(self, completed_steps: List[SagaStep], context: dict):
"""Execute compensating actions in reverse order."""
for step in reversed(completed_steps):
try:
await step.compensation(context)
except Exception as e:
# Log compensation failure
print(f"Compensation failed for {step.name}: {e}")
# Step implementations
async def create_order(self, context: dict) -> StepResult:
order = await order_service.create(context["order_data"])
return StepResult(success=True, data={"order_id": order.id})
async def cancel_order(self, context: dict):
await order_service.cancel(context["order_id"])
async def reserve_inventory(self, context: dict) -> StepResult:
result = await inventory_service.reserve(
context["order_id"],
context["order_data"]["items"]
)
return StepResult(
success=result.success,
data={"reservation_id": result.reservation_id}
)
async def release_inventory(self, context: dict):
await inventory_service.release(context["reservation_id"])
async def process_payment(self, context: dict) -> StepResult:
result = await payment_service.charge(
context["order_id"],
context["order_data"]["total"]
)
return StepResult(
success=result.success,
data={"transaction_id": result.transaction_id},
error=result.error
)
async def refund_payment(self, context: dict):
await payment_service.refund(context["transaction_id"])undefinedResilience Patterns
弹性模式
Circuit Breaker Pattern
断路器模式
python
from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
"""Circuit breaker for service calls."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.state = CircuitState.CLOSED
self.opened_at = None
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker."""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Handle successful call."""
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.success_count = 0
def _on_failure(self):
"""Handle failed call."""
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()
def _should_attempt_reset(self) -> bool:
"""Check if enough time passed to try again."""
return (
datetime.now() - self.opened_at
> timedelta(seconds=self.recovery_timeout)
)python
from enum import Enum
from datetime import datetime, timedelta
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
"""Circuit breaker for service calls."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.state = CircuitState.CLOSED
self.opened_at = None
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with circuit breaker."""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Handle successful call."""
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.success_count = 0
def _on_failure(self):
"""Handle failed call."""
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()
def _should_attempt_reset(self) -> bool:
"""Check if enough time passed to try again."""
return (
datetime.now() - self.opened_at
> timedelta(seconds=self.recovery_timeout)
)Usage
Usage
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
async def call_payment_service(payment_data: dict):
return await breaker.call(
payment_client.process_payment,
payment_data
)
undefinedbreaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
async def call_payment_service(payment_data: dict):
return await breaker.call(
payment_client.process_payment,
payment_data
)
undefinedResources
资源
- references/service-decomposition-guide.md: Breaking down monoliths
- references/communication-patterns.md: Sync vs async patterns
- references/saga-implementation.md: Distributed transactions
- assets/circuit-breaker.py: Production circuit breaker
- assets/event-bus-template.py: Kafka event bus implementation
- assets/api-gateway-template.py: Complete API gateway
- references/service-decomposition-guide.md: 单体应用拆分指南
- references/communication-patterns.md: 同步与异步模式对比
- references/saga-implementation.md: 分布式事务实现
- assets/circuit-breaker.py: 生产环境断路器实现
- assets/event-bus-template.py: Kafka事件总线实现模板
- assets/api-gateway-template.py: 完整API网关模板
Best Practices
最佳实践
- Service Boundaries: Align with business capabilities
- Database Per Service: No shared databases
- API Contracts: Versioned, backward compatible
- Async When Possible: Events over direct calls
- Circuit Breakers: Fail fast on service failures
- Distributed Tracing: Track requests across services
- Service Registry: Dynamic service discovery
- Health Checks: Liveness and readiness probes
- 服务边界:与业务能力对齐
- 服务专属数据库:不共享数据库
- API契约:版本化、向后兼容
- 优先异步:采用事件而非直接调用
- 断路器:服务故障时快速失败
- 分布式追踪:跨服务追踪请求
- 服务注册中心:动态服务发现
- 健康检查:存活与就绪探针
Common Pitfalls
常见误区
- Distributed Monolith: Tightly coupled services
- Chatty Services: Too many inter-service calls
- Shared Databases: Tight coupling through data
- No Circuit Breakers: Cascade failures
- Synchronous Everything: Tight coupling, poor resilience
- Premature Microservices: Starting with microservices
- Ignoring Network Failures: Assuming reliable network
- No Compensation Logic: Can't undo failed transactions
- 分布式单体:服务间紧耦合
- 服务通信频繁:过多的服务间调用
- 共享数据库:通过数据实现紧耦合
- 未使用断路器:引发级联故障
- 全同步通信:紧耦合、低弹性
- 过早微服务化:初始阶段就采用微服务
- 忽略网络故障:假设网络绝对可靠
- 无补偿逻辑:无法回滚失败的事务