microservices-expert
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMicroservices Expert
微服务专家指南
Expert guidance for microservices architecture, design patterns, service communication, and distributed system challenges.
为微服务架构、设计模式、服务通信及分布式系统难题提供专业指导。
Core Concepts
核心概念
Microservices Principles
微服务原则
- Single responsibility per service
- Independently deployable
- Decentralized data management
- Infrastructure automation
- Design for failure
- Evolutionary design
- 每个服务单一职责
- 可独立部署
- 去中心化数据管理
- 基础设施自动化
- 为故障设计
- 演进式设计
Architecture Patterns
架构模式
- API Gateway
- Service Discovery
- Circuit Breaker
- Saga Pattern
- Event Sourcing
- CQRS
- API Gateway
- Service Discovery
- Circuit Breaker
- Saga Pattern
- Event Sourcing
- CQRS
Communication
通信方式
- Synchronous (HTTP/REST, gRPC)
- Asynchronous (Message queues, Events)
- Service mesh
- API composition
- Backend for Frontend (BFF)
- 同步通信(HTTP/REST、gRPC)
- 异步通信(消息队列、事件)
- Service Mesh
- API组合
- Backend for Frontend (BFF)
Service Design
服务设计
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
from typing import List, Optional
from circuitbreaker import circuit
import asynciopython
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
from typing import List, Optional
from circuitbreaker import circuit
import asyncioIndividual Microservice
Individual Microservice
app = FastAPI(title="Order Service", version="1.0.0")
class Order(BaseModel):
id: str
user_id: str
items: List[dict]
total: float
status: str
class OrderService:
def init(self, inventory_url: str, payment_url: str):
self.inventory_url = inventory_url
self.payment_url = payment_url
self.client = httpx.AsyncClient()
@circuit(failure_threshold=5, recovery_timeout=60)
async def check_inventory(self, items: List[dict]) -> bool:
"""Check inventory availability with circuit breaker"""
try:
response = await self.client.post(
f"{self.inventory_url}/check",
json={"items": items},
timeout=5.0
)
return response.json()["available"]
except Exception as e:
print(f"Inventory service error: {e}")
raise
@circuit(failure_threshold=5, recovery_timeout=60)
async def process_payment(self, user_id: str, amount: float) -> dict:
"""Process payment with circuit breaker"""
try:
response = await self.client.post(
f"{self.payment_url}/charge",
json={"user_id": user_id, "amount": amount},
timeout=10.0
)
return response.json()
except Exception as e:
print(f"Payment service error: {e}")
raise
async def create_order(self, order: Order) -> Order:
"""Create order with coordination"""
# 1. Check inventory
inventory_available = await self.check_inventory(order.items)
if not inventory_available:
raise HTTPException(400, "Items not available")
# 2. Process payment
payment = await self.process_payment(order.user_id, order.total)
if payment["status"] != "success":
raise HTTPException(400, "Payment failed")
# 3. Reserve inventory
await self.reserve_inventory(order.items)
# 4. Create order record
order.status = "confirmed"
await self.save_order(order)
return order@app.post("/orders", response_model=Order)
async def create_order(order: Order):
service = OrderService(
inventory_url="http://inventory-service",
payment_url="http://payment-service"
)
return await service.create_order(order)
undefinedapp = FastAPI(title="Order Service", version="1.0.0")
class Order(BaseModel):
id: str
user_id: str
items: List[dict]
total: float
status: str
class OrderService:
def init(self, inventory_url: str, payment_url: str):
self.inventory_url = inventory_url
self.payment_url = payment_url
self.client = httpx.AsyncClient()
@circuit(failure_threshold=5, recovery_timeout=60)
async def check_inventory(self, items: List[dict]) -> bool:
"""Check inventory availability with circuit breaker"""
try:
response = await self.client.post(
f"{self.inventory_url}/check",
json={"items": items},
timeout=5.0
)
return response.json()["available"]
except Exception as e:
print(f"Inventory service error: {e}")
raise
@circuit(failure_threshold=5, recovery_timeout=60)
async def process_payment(self, user_id: str, amount: float) -> dict:
"""Process payment with circuit breaker"""
try:
response = await self.client.post(
f"{self.payment_url}/charge",
json={"user_id": user_id, "amount": amount},
timeout=10.0
)
return response.json()
except Exception as e:
print(f"Payment service error: {e}")
raise
async def create_order(self, order: Order) -> Order:
"""Create order with coordination"""
# 1. Check inventory
inventory_available = await self.check_inventory(order.items)
if not inventory_available:
raise HTTPException(400, "Items not available")
# 2. Process payment
payment = await self.process_payment(order.user_id, order.total)
if payment["status"] != "success":
raise HTTPException(400, "Payment failed")
# 3. Reserve inventory
await self.reserve_inventory(order.items)
# 4. Create order record
order.status = "confirmed"
await self.save_order(order)
return order@app.post("/orders", response_model=Order)
async def create_order(order: Order):
service = OrderService(
inventory_url="http://inventory-service",
payment_url="http://payment-service"
)
return await service.create_order(order)
undefinedSaga Pattern (Distributed Transactions)
Saga模式(分布式事务)
python
from enum import Enum
from typing import List, Callable
import asyncio
class SagaStep:
def __init__(self, action: Callable, compensation: Callable):
self.action = action
self.compensation = compensation
class SagaOrchestrator:
"""Orchestrate distributed transactions using Saga pattern"""
def __init__(self):
self.steps: List[SagaStep] = []
self.completed_steps: List[SagaStep] = []
def add_step(self, action: Callable, compensation: Callable):
"""Add a step to the saga"""
self.steps.append(SagaStep(action, compensation))
async def execute(self) -> bool:
"""Execute saga with compensation on failure"""
try:
# Execute all steps
for step in self.steps:
result = await step.action()
self.completed_steps.append(step)
if not result:
await self.compensate()
return False
return True
except Exception as e:
print(f"Saga failed: {e}")
await self.compensate()
return False
async def compensate(self):
"""Rollback completed steps"""
# Execute compensations in reverse order
for step in reversed(self.completed_steps):
try:
await step.compensation()
except Exception as e:
print(f"Compensation failed: {e}")python
from enum import Enum
from typing import List, Callable
import asyncio
class SagaStep:
def __init__(self, action: Callable, compensation: Callable):
self.action = action
self.compensation = compensation
class SagaOrchestrator:
"""Orchestrate distributed transactions using Saga pattern"""
def __init__(self):
self.steps: List[SagaStep] = []
self.completed_steps: List[SagaStep] = []
def add_step(self, action: Callable, compensation: Callable):
"""Add a step to the saga"""
self.steps.append(SagaStep(action, compensation))
async def execute(self) -> bool:
"""Execute saga with compensation on failure"""
try:
# Execute all steps
for step in self.steps:
result = await step.action()
self.completed_steps.append(step)
if not result:
await self.compensate()
return False
return True
except Exception as e:
print(f"Saga failed: {e}")
await self.compensate()
return False
async def compensate(self):
"""Rollback completed steps"""
# Execute compensations in reverse order
for step in reversed(self.completed_steps):
try:
await step.compensation()
except Exception as e:
print(f"Compensation failed: {e}")Example: Order Saga
Example: Order Saga
class OrderSaga:
async def create_order_with_saga(self, order_data: dict):
saga = SagaOrchestrator()
# Step 1: Reserve inventory
saga.add_step(
action=lambda: self.reserve_inventory(order_data["items"]),
compensation=lambda: self.release_inventory(order_data["items"])
)
# Step 2: Process payment
saga.add_step(
action=lambda: self.charge_payment(order_data["user_id"], order_data["total"]),
compensation=lambda: self.refund_payment(order_data["user_id"], order_data["total"])
)
# Step 3: Create order
saga.add_step(
action=lambda: self.create_order_record(order_data),
compensation=lambda: self.delete_order_record(order_data["id"])
)
# Execute saga
success = await saga.execute()
if success:
await self.send_confirmation(order_data["user_id"])
return {"status": "success", "order_id": order_data["id"]}
else:
return {"status": "failed", "message": "Order creation failed"}undefinedclass OrderSaga:
async def create_order_with_saga(self, order_data: dict):
saga = SagaOrchestrator()
# Step 1: Reserve inventory
saga.add_step(
action=lambda: self.reserve_inventory(order_data["items"]),
compensation=lambda: self.release_inventory(order_data["items"])
)
# Step 2: Process payment
saga.add_step(
action=lambda: self.charge_payment(order_data["user_id"], order_data["total"]),
compensation=lambda: self.refund_payment(order_data["user_id"], order_data["total"])
)
# Step 3: Create order
saga.add_step(
action=lambda: self.create_order_record(order_data),
compensation=lambda: self.delete_order_record(order_data["id"])
)
# Execute saga
success = await saga.execute()
if success:
await self.send_confirmation(order_data["user_id"])
return {"status": "success", "order_id": order_data["id"]}
else:
return {"status": "failed", "message": "Order creation failed"}undefinedService Discovery
服务发现
python
import consul
from typing import List, Optional
import random
class ServiceRegistry:
"""Service discovery using Consul"""
def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
self.consul = consul.Consul(host=consul_host, port=consul_port)
def register_service(self, service_name: str, service_id: str,
host: str, port: int, tags: List[str] = None):
"""Register service with Consul"""
self.consul.agent.service.register(
name=service_name,
service_id=service_id,
address=host,
port=port,
tags=tags or [],
check=consul.Check.http(
f"http://{host}:{port}/health",
interval="10s",
timeout="5s"
)
)
def deregister_service(self, service_id: str):
"""Deregister service"""
self.consul.agent.service.deregister(service_id)
def discover_service(self, service_name: str) -> Optional[dict]:
"""Discover healthy service instance"""
_, services = self.consul.health.service(service_name, passing=True)
if not services:
return None
# Load balance: random selection
service = random.choice(services)
return {
"id": service["Service"]["ID"],
"address": service["Service"]["Address"],
"port": service["Service"]["Port"],
"tags": service["Service"]["Tags"]
}
def get_all_services(self, service_name: str) -> List[dict]:
"""Get all healthy instances of a service"""
_, services = self.consul.health.service(service_name, passing=True)
return [
{
"id": s["Service"]["ID"],
"address": s["Service"]["Address"],
"port": s["Service"]["Port"]
}
for s in services
]python
import consul
from typing import List, Optional
import random
class ServiceRegistry:
"""Service discovery using Consul"""
def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
self.consul = consul.Consul(host=consul_host, port=consul_port)
def register_service(self, service_name: str, service_id: str,
host: str, port: int, tags: List[str] = None):
"""Register service with Consul"""
self.consul.agent.service.register(
name=service_name,
service_id=service_id,
address=host,
port=port,
tags=tags or [],
check=consul.Check.http(
f"http://{host}:{port}/health",
interval="10s",
timeout="5s"
)
)
def deregister_service(self, service_id: str):
"""Deregister service"""
self.consul.agent.service.deregister(service_id)
def discover_service(self, service_name: str) -> Optional[dict]:
"""Discover healthy service instance"""
_, services = self.consul.health.service(service_name, passing=True)
if not services:
return None
# Load balance: random selection
service = random.choice(services)
return {
"id": service["Service"]["ID"],
"address": service["Service"]["Address"],
"port": service["Service"]["Port"],
"tags": service["Service"]["Tags"]
}
def get_all_services(self, service_name: str) -> List[dict]:
"""Get all healthy instances of a service"""
_, services = self.consul.health.service(service_name, passing=True)
return [
{
"id": s["Service"]["ID"],
"address": s["Service"]["Address"],
"port": s["Service"]["Port"]
}
for s in services
]API Gateway
API网关
python
from fastapi import FastAPI, Request, Response
import httpx
from typing import Dict
import jwt
app = FastAPI(title="API Gateway")
class APIGateway:
"""API Gateway for routing and cross-cutting concerns"""
def __init__(self):
self.service_registry = ServiceRegistry()
self.client = httpx.AsyncClient()
async def route_request(self, service: str, path: str,
method: str, **kwargs) -> Response:
"""Route request to appropriate microservice"""
# Discover service
service_info = self.service_registry.discover_service(service)
if not service_info:
return Response(
content={"error": "Service unavailable"},
status_code=503
)
# Build URL
url = f"http://{service_info['address']}:{service_info['port']}{path}"
# Forward request
response = await self.client.request(method, url, **kwargs)
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
async def authenticate(self, token: str) -> Optional[dict]:
"""Centralized authentication"""
try:
payload = jwt.decode(token, "secret", algorithms=["HS256"])
return payload
except jwt.JWTError:
return None
async def rate_limit(self, client_id: str) -> bool:
"""Centralized rate limiting"""
# Implementation using Redis
passpython
from fastapi import FastAPI, Request, Response
import httpx
from typing import Dict
import jwt
app = FastAPI(title="API Gateway")
class APIGateway:
"""API Gateway for routing and cross-cutting concerns"""
def __init__(self):
self.service_registry = ServiceRegistry()
self.client = httpx.AsyncClient()
async def route_request(self, service: str, path: str,
method: str, **kwargs) -> Response:
"""Route request to appropriate microservice"""
# Discover service
service_info = self.service_registry.discover_service(service)
if not service_info:
return Response(
content={"error": "Service unavailable"},
status_code=503
)
# Build URL
url = f"http://{service_info['address']}:{service_info['port']}{path}"
# Forward request
response = await self.client.request(method, url, **kwargs)
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
async def authenticate(self, token: str) -> Optional[dict]:
"""Centralized authentication"""
try:
payload = jwt.decode(token, "secret", algorithms=["HS256"])
return payload
except jwt.JWTError:
return None
async def rate_limit(self, client_id: str) -> bool:
"""Centralized rate limiting"""
# Implementation using Redis
passGateway endpoints
Gateway endpoints
@app.api_route("/{service}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def gateway_route(service: str, path: str, request: Request):
gateway = APIGateway()
# Authentication
token = request.headers.get("Authorization", "").replace("Bearer ", "")
user = await gateway.authenticate(token)
if not user:
return Response(content={"error": "Unauthorized"}, status_code=401)
# Rate limiting
if not await gateway.rate_limit(user["id"]):
return Response(content={"error": "Rate limit exceeded"}, status_code=429)
# Route request
return await gateway.route_request(
service=service,
path=f"/{path}",
method=request.method,
headers=dict(request.headers),
content=await request.body()
)undefined@app.api_route("/{service}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def gateway_route(service: str, path: str, request: Request):
gateway = APIGateway()
# Authentication
token = request.headers.get("Authorization", "").replace("Bearer ", "")
user = await gateway.authenticate(token)
if not user:
return Response(content={"error": "Unauthorized"}, status_code=401)
# Rate limiting
if not await gateway.rate_limit(user["id"]):
return Response(content={"error": "Rate limit exceeded"}, status_code=429)
# Route request
return await gateway.route_request(
service=service,
path=f"/{path}",
method=request.method,
headers=dict(request.headers),
content=await request.body()
)undefinedEvent-Driven Architecture
事件驱动架构
python
import pika
import json
from typing import Callable, Dict
import asyncio
class EventBus:
"""Message broker for event-driven communication"""
def __init__(self, rabbitmq_url: str):
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
self.handlers: Dict[str, Callable] = {}
def publish_event(self, event_type: str, data: dict):
"""Publish event to all subscribers"""
self.channel.exchange_declare(
exchange='events',
exchange_type='topic',
durable=True
)
message = json.dumps({
"event_type": event_type,
"data": data,
"timestamp": datetime.now().isoformat()
})
self.channel.basic_publish(
exchange='events',
routing_key=event_type,
body=message,
properties=pika.BasicProperties(
delivery_mode=2 # persistent
)
)
def subscribe(self, event_type: str, handler: Callable):
"""Subscribe to event type"""
self.handlers[event_type] = handler
# Declare queue
queue_name = f"{event_type}_queue"
self.channel.queue_declare(queue=queue_name, durable=True)
# Bind queue to exchange
self.channel.queue_bind(
queue=queue_name,
exchange='events',
routing_key=event_type
)
# Start consuming
def callback(ch, method, properties, body):
message = json.loads(body)
handler(message["data"])
ch.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback
)
def start_consuming(self):
"""Start consuming events"""
self.channel.start_consuming()python
import pika
import json
from typing import Callable, Dict
import asyncio
class EventBus:
"""Message broker for event-driven communication"""
def __init__(self, rabbitmq_url: str):
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
self.handlers: Dict[str, Callable] = {}
def publish_event(self, event_type: str, data: dict):
"""Publish event to all subscribers"""
self.channel.exchange_declare(
exchange='events',
exchange_type='topic',
durable=True
)
message = json.dumps({
"event_type": event_type,
"data": data,
"timestamp": datetime.now().isoformat()
})
self.channel.basic_publish(
exchange='events',
routing_key=event_type,
body=message,
properties=pika.BasicProperties(
delivery_mode=2 # persistent
)
)
def subscribe(self, event_type: str, handler: Callable):
"""Subscribe to event type"""
self.handlers[event_type] = handler
# Declare queue
queue_name = f"{event_type}_queue"
self.channel.queue_declare(queue=queue_name, durable=True)
# Bind queue to exchange
self.channel.queue_bind(
queue=queue_name,
exchange='events',
routing_key=event_type
)
# Start consuming
def callback(ch, method, properties, body):
message = json.loads(body)
handler(message["data"])
ch.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback
)
def start_consuming(self):
"""Start consuming events"""
self.channel.start_consuming()Usage Example
Usage Example
event_bus = EventBus("amqp://localhost")
event_bus = EventBus("amqp://localhost")
Service A publishes event
Service A publishes event
event_bus.publish_event("order.created", {
"order_id": "12345",
"user_id": "user_1",
"total": 99.99
})
event_bus.publish_event("order.created", {
"order_id": "12345",
"user_id": "user_1",
"total": 99.99
})
Service B subscribes to event
Service B subscribes to event
def handle_order_created(data):
print(f"Processing order: {data['order_id']}")
# Send email, update inventory, etc.
event_bus.subscribe("order.created", handle_order_created)
undefineddef handle_order_created(data):
print(f"Processing order: {data['order_id']}")
# Send email, update inventory, etc.
event_bus.subscribe("order.created", handle_order_created)
undefinedBest Practices
最佳实践
Design
设计原则
- Keep services small and focused
- Design for failure (circuit breakers, retries)
- Use asynchronous communication when possible
- Implement proper service boundaries
- Avoid distributed monoliths
- Use API versioning
- Implement health checks
- 保持服务小巧且聚焦单一职责
- 为故障设计(断路器、重试机制)
- 尽可能使用异步通信
- 实现合理的服务边界
- 避免分布式单体
- 使用API版本控制
- 实现健康检查
Data Management
数据管理
- Database per service
- Use eventual consistency
- Implement saga pattern for distributed transactions
- Use event sourcing for audit trails
- Cache aggressively
- Avoid distributed joins
- 每个服务独立数据库
- 使用最终一致性
- 采用Saga模式处理分布式事务
- 使用事件溯源实现审计追踪
- 积极使用缓存
- 避免分布式关联查询
Operations
运维管理
- Implement distributed tracing
- Centralized logging
- Monitor service health
- Automate deployments
- Use service mesh for cross-cutting concerns
- Implement feature flags
- Practice chaos engineering
- 实现分布式链路追踪
- 集中式日志管理
- 监控服务健康状态
- 自动化部署
- 使用Service Mesh处理横切关注点
- 实现功能开关
- 践行混沌工程
Anti-Patterns
反模式
❌ Distributed monolith
❌ Shared database between services
❌ Synchronous communication everywhere
❌ No service versioning
❌ Tight coupling between services
❌ No circuit breakers
❌ Missing distributed tracing
❌ 分布式单体
❌ 服务间共享数据库
❌ 全同步通信
❌ 无服务版本控制
❌ 服务间紧耦合
❌ 未实现断路器
❌ 缺失分布式链路追踪
Resources
参考资源
- Microservices Patterns: https://microservices.io/patterns/
- Building Microservices (book)
- Service Mesh: https://istio.io/
- Consul: https://www.consul.io/
- RabbitMQ: https://www.rabbitmq.com/
- 微服务模式:https://microservices.io/patterns/
- 《构建微服务》(书籍)
- Service Mesh:https://istio.io/
- Consul:https://www.consul.io/
- RabbitMQ:https://www.rabbitmq.com/