microservices-expert

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Microservices Expert

微服务专家指南

Expert guidance for microservices architecture, design patterns, service communication, and distributed system challenges.
为微服务架构、设计模式、服务通信及分布式系统难题提供专业指导。

Core Concepts

核心概念

Microservices Principles

微服务原则

  • Single responsibility per service
  • Independently deployable
  • Decentralized data management
  • Infrastructure automation
  • Design for failure
  • Evolutionary design
  • 每个服务单一职责
  • 可独立部署
  • 去中心化数据管理
  • 基础设施自动化
  • 为故障设计
  • 演进式设计

Architecture Patterns

架构模式

  • API Gateway
  • Service Discovery
  • Circuit Breaker
  • Saga Pattern
  • Event Sourcing
  • CQRS
  • API Gateway
  • Service Discovery
  • Circuit Breaker
  • Saga Pattern
  • Event Sourcing
  • CQRS

Communication

通信方式

  • Synchronous (HTTP/REST, gRPC)
  • Asynchronous (Message queues, Events)
  • Service mesh
  • API composition
  • Backend for Frontend (BFF)
  • 同步通信(HTTP/REST、gRPC)
  • 异步通信(消息队列、事件)
  • Service Mesh
  • API组合
  • Backend for Frontend (BFF)

Service Design

服务设计

python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
from typing import List, Optional
from circuitbreaker import circuit
import asyncio
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import httpx
from typing import List, Optional
from circuitbreaker import circuit
import asyncio

Individual Microservice

Individual Microservice

app = FastAPI(title="Order Service", version="1.0.0")
class Order(BaseModel): id: str user_id: str items: List[dict] total: float status: str
class OrderService: def init(self, inventory_url: str, payment_url: str): self.inventory_url = inventory_url self.payment_url = payment_url self.client = httpx.AsyncClient()
@circuit(failure_threshold=5, recovery_timeout=60)
async def check_inventory(self, items: List[dict]) -> bool:
    """Check inventory availability with circuit breaker"""
    try:
        response = await self.client.post(
            f"{self.inventory_url}/check",
            json={"items": items},
            timeout=5.0
        )
        return response.json()["available"]
    except Exception as e:
        print(f"Inventory service error: {e}")
        raise

@circuit(failure_threshold=5, recovery_timeout=60)
async def process_payment(self, user_id: str, amount: float) -> dict:
    """Process payment with circuit breaker"""
    try:
        response = await self.client.post(
            f"{self.payment_url}/charge",
            json={"user_id": user_id, "amount": amount},
            timeout=10.0
        )
        return response.json()
    except Exception as e:
        print(f"Payment service error: {e}")
        raise

async def create_order(self, order: Order) -> Order:
    """Create order with coordination"""
    # 1. Check inventory
    inventory_available = await self.check_inventory(order.items)
    if not inventory_available:
        raise HTTPException(400, "Items not available")

    # 2. Process payment
    payment = await self.process_payment(order.user_id, order.total)
    if payment["status"] != "success":
        raise HTTPException(400, "Payment failed")

    # 3. Reserve inventory
    await self.reserve_inventory(order.items)

    # 4. Create order record
    order.status = "confirmed"
    await self.save_order(order)

    return order
@app.post("/orders", response_model=Order) async def create_order(order: Order): service = OrderService( inventory_url="http://inventory-service", payment_url="http://payment-service" ) return await service.create_order(order)
undefined
app = FastAPI(title="Order Service", version="1.0.0")
class Order(BaseModel): id: str user_id: str items: List[dict] total: float status: str
class OrderService: def init(self, inventory_url: str, payment_url: str): self.inventory_url = inventory_url self.payment_url = payment_url self.client = httpx.AsyncClient()
@circuit(failure_threshold=5, recovery_timeout=60)
async def check_inventory(self, items: List[dict]) -> bool:
    """Check inventory availability with circuit breaker"""
    try:
        response = await self.client.post(
            f"{self.inventory_url}/check",
            json={"items": items},
            timeout=5.0
        )
        return response.json()["available"]
    except Exception as e:
        print(f"Inventory service error: {e}")
        raise

@circuit(failure_threshold=5, recovery_timeout=60)
async def process_payment(self, user_id: str, amount: float) -> dict:
    """Process payment with circuit breaker"""
    try:
        response = await self.client.post(
            f"{self.payment_url}/charge",
            json={"user_id": user_id, "amount": amount},
            timeout=10.0
        )
        return response.json()
    except Exception as e:
        print(f"Payment service error: {e}")
        raise

async def create_order(self, order: Order) -> Order:
    """Create order with coordination"""
    # 1. Check inventory
    inventory_available = await self.check_inventory(order.items)
    if not inventory_available:
        raise HTTPException(400, "Items not available")

    # 2. Process payment
    payment = await self.process_payment(order.user_id, order.total)
    if payment["status"] != "success":
        raise HTTPException(400, "Payment failed")

    # 3. Reserve inventory
    await self.reserve_inventory(order.items)

    # 4. Create order record
    order.status = "confirmed"
    await self.save_order(order)

    return order
@app.post("/orders", response_model=Order) async def create_order(order: Order): service = OrderService( inventory_url="http://inventory-service", payment_url="http://payment-service" ) return await service.create_order(order)
undefined

Saga Pattern (Distributed Transactions)

Saga模式(分布式事务)

python
from enum import Enum
from typing import List, Callable
import asyncio

class SagaStep:
    def __init__(self, action: Callable, compensation: Callable):
        self.action = action
        self.compensation = compensation

class SagaOrchestrator:
    """Orchestrate distributed transactions using Saga pattern"""

    def __init__(self):
        self.steps: List[SagaStep] = []
        self.completed_steps: List[SagaStep] = []

    def add_step(self, action: Callable, compensation: Callable):
        """Add a step to the saga"""
        self.steps.append(SagaStep(action, compensation))

    async def execute(self) -> bool:
        """Execute saga with compensation on failure"""
        try:
            # Execute all steps
            for step in self.steps:
                result = await step.action()
                self.completed_steps.append(step)

                if not result:
                    await self.compensate()
                    return False

            return True

        except Exception as e:
            print(f"Saga failed: {e}")
            await self.compensate()
            return False

    async def compensate(self):
        """Rollback completed steps"""
        # Execute compensations in reverse order
        for step in reversed(self.completed_steps):
            try:
                await step.compensation()
            except Exception as e:
                print(f"Compensation failed: {e}")
python
from enum import Enum
from typing import List, Callable
import asyncio

class SagaStep:
    def __init__(self, action: Callable, compensation: Callable):
        self.action = action
        self.compensation = compensation

class SagaOrchestrator:
    """Orchestrate distributed transactions using Saga pattern"""

    def __init__(self):
        self.steps: List[SagaStep] = []
        self.completed_steps: List[SagaStep] = []

    def add_step(self, action: Callable, compensation: Callable):
        """Add a step to the saga"""
        self.steps.append(SagaStep(action, compensation))

    async def execute(self) -> bool:
        """Execute saga with compensation on failure"""
        try:
            # Execute all steps
            for step in self.steps:
                result = await step.action()
                self.completed_steps.append(step)

                if not result:
                    await self.compensate()
                    return False

            return True

        except Exception as e:
            print(f"Saga failed: {e}")
            await self.compensate()
            return False

    async def compensate(self):
        """Rollback completed steps"""
        # Execute compensations in reverse order
        for step in reversed(self.completed_steps):
            try:
                await step.compensation()
            except Exception as e:
                print(f"Compensation failed: {e}")

Example: Order Saga

Example: Order Saga

class OrderSaga: async def create_order_with_saga(self, order_data: dict): saga = SagaOrchestrator()
    # Step 1: Reserve inventory
    saga.add_step(
        action=lambda: self.reserve_inventory(order_data["items"]),
        compensation=lambda: self.release_inventory(order_data["items"])
    )

    # Step 2: Process payment
    saga.add_step(
        action=lambda: self.charge_payment(order_data["user_id"], order_data["total"]),
        compensation=lambda: self.refund_payment(order_data["user_id"], order_data["total"])
    )

    # Step 3: Create order
    saga.add_step(
        action=lambda: self.create_order_record(order_data),
        compensation=lambda: self.delete_order_record(order_data["id"])
    )

    # Execute saga
    success = await saga.execute()

    if success:
        await self.send_confirmation(order_data["user_id"])
        return {"status": "success", "order_id": order_data["id"]}
    else:
        return {"status": "failed", "message": "Order creation failed"}
undefined
class OrderSaga: async def create_order_with_saga(self, order_data: dict): saga = SagaOrchestrator()
    # Step 1: Reserve inventory
    saga.add_step(
        action=lambda: self.reserve_inventory(order_data["items"]),
        compensation=lambda: self.release_inventory(order_data["items"])
    )

    # Step 2: Process payment
    saga.add_step(
        action=lambda: self.charge_payment(order_data["user_id"], order_data["total"]),
        compensation=lambda: self.refund_payment(order_data["user_id"], order_data["total"])
    )

    # Step 3: Create order
    saga.add_step(
        action=lambda: self.create_order_record(order_data),
        compensation=lambda: self.delete_order_record(order_data["id"])
    )

    # Execute saga
    success = await saga.execute()

    if success:
        await self.send_confirmation(order_data["user_id"])
        return {"status": "success", "order_id": order_data["id"]}
    else:
        return {"status": "failed", "message": "Order creation failed"}
undefined

Service Discovery

服务发现

python
import consul
from typing import List, Optional
import random

class ServiceRegistry:
    """Service discovery using Consul"""

    def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
        self.consul = consul.Consul(host=consul_host, port=consul_port)

    def register_service(self, service_name: str, service_id: str,
                        host: str, port: int, tags: List[str] = None):
        """Register service with Consul"""
        self.consul.agent.service.register(
            name=service_name,
            service_id=service_id,
            address=host,
            port=port,
            tags=tags or [],
            check=consul.Check.http(
                f"http://{host}:{port}/health",
                interval="10s",
                timeout="5s"
            )
        )

    def deregister_service(self, service_id: str):
        """Deregister service"""
        self.consul.agent.service.deregister(service_id)

    def discover_service(self, service_name: str) -> Optional[dict]:
        """Discover healthy service instance"""
        _, services = self.consul.health.service(service_name, passing=True)

        if not services:
            return None

        # Load balance: random selection
        service = random.choice(services)

        return {
            "id": service["Service"]["ID"],
            "address": service["Service"]["Address"],
            "port": service["Service"]["Port"],
            "tags": service["Service"]["Tags"]
        }

    def get_all_services(self, service_name: str) -> List[dict]:
        """Get all healthy instances of a service"""
        _, services = self.consul.health.service(service_name, passing=True)

        return [
            {
                "id": s["Service"]["ID"],
                "address": s["Service"]["Address"],
                "port": s["Service"]["Port"]
            }
            for s in services
        ]
python
import consul
from typing import List, Optional
import random

class ServiceRegistry:
    """Service discovery using Consul"""

    def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
        self.consul = consul.Consul(host=consul_host, port=consul_port)

    def register_service(self, service_name: str, service_id: str,
                        host: str, port: int, tags: List[str] = None):
        """Register service with Consul"""
        self.consul.agent.service.register(
            name=service_name,
            service_id=service_id,
            address=host,
            port=port,
            tags=tags or [],
            check=consul.Check.http(
                f"http://{host}:{port}/health",
                interval="10s",
                timeout="5s"
            )
        )

    def deregister_service(self, service_id: str):
        """Deregister service"""
        self.consul.agent.service.deregister(service_id)

    def discover_service(self, service_name: str) -> Optional[dict]:
        """Discover healthy service instance"""
        _, services = self.consul.health.service(service_name, passing=True)

        if not services:
            return None

        # Load balance: random selection
        service = random.choice(services)

        return {
            "id": service["Service"]["ID"],
            "address": service["Service"]["Address"],
            "port": service["Service"]["Port"],
            "tags": service["Service"]["Tags"]
        }

    def get_all_services(self, service_name: str) -> List[dict]:
        """Get all healthy instances of a service"""
        _, services = self.consul.health.service(service_name, passing=True)

        return [
            {
                "id": s["Service"]["ID"],
                "address": s["Service"]["Address"],
                "port": s["Service"]["Port"]
            }
            for s in services
        ]

API Gateway

API网关

python
from fastapi import FastAPI, Request, Response
import httpx
from typing import Dict
import jwt

app = FastAPI(title="API Gateway")

class APIGateway:
    """API Gateway for routing and cross-cutting concerns"""

    def __init__(self):
        self.service_registry = ServiceRegistry()
        self.client = httpx.AsyncClient()

    async def route_request(self, service: str, path: str,
                           method: str, **kwargs) -> Response:
        """Route request to appropriate microservice"""
        # Discover service
        service_info = self.service_registry.discover_service(service)

        if not service_info:
            return Response(
                content={"error": "Service unavailable"},
                status_code=503
            )

        # Build URL
        url = f"http://{service_info['address']}:{service_info['port']}{path}"

        # Forward request
        response = await self.client.request(method, url, **kwargs)

        return Response(
            content=response.content,
            status_code=response.status_code,
            headers=dict(response.headers)
        )

    async def authenticate(self, token: str) -> Optional[dict]:
        """Centralized authentication"""
        try:
            payload = jwt.decode(token, "secret", algorithms=["HS256"])
            return payload
        except jwt.JWTError:
            return None

    async def rate_limit(self, client_id: str) -> bool:
        """Centralized rate limiting"""
        # Implementation using Redis
        pass
python
from fastapi import FastAPI, Request, Response
import httpx
from typing import Dict
import jwt

app = FastAPI(title="API Gateway")

class APIGateway:
    """API Gateway for routing and cross-cutting concerns"""

    def __init__(self):
        self.service_registry = ServiceRegistry()
        self.client = httpx.AsyncClient()

    async def route_request(self, service: str, path: str,
                           method: str, **kwargs) -> Response:
        """Route request to appropriate microservice"""
        # Discover service
        service_info = self.service_registry.discover_service(service)

        if not service_info:
            return Response(
                content={"error": "Service unavailable"},
                status_code=503
            )

        # Build URL
        url = f"http://{service_info['address']}:{service_info['port']}{path}"

        # Forward request
        response = await self.client.request(method, url, **kwargs)

        return Response(
            content=response.content,
            status_code=response.status_code,
            headers=dict(response.headers)
        )

    async def authenticate(self, token: str) -> Optional[dict]:
        """Centralized authentication"""
        try:
            payload = jwt.decode(token, "secret", algorithms=["HS256"])
            return payload
        except jwt.JWTError:
            return None

    async def rate_limit(self, client_id: str) -> bool:
        """Centralized rate limiting"""
        # Implementation using Redis
        pass

Gateway endpoints

Gateway endpoints

@app.api_route("/{service}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) async def gateway_route(service: str, path: str, request: Request): gateway = APIGateway()
# Authentication
token = request.headers.get("Authorization", "").replace("Bearer ", "")
user = await gateway.authenticate(token)

if not user:
    return Response(content={"error": "Unauthorized"}, status_code=401)

# Rate limiting
if not await gateway.rate_limit(user["id"]):
    return Response(content={"error": "Rate limit exceeded"}, status_code=429)

# Route request
return await gateway.route_request(
    service=service,
    path=f"/{path}",
    method=request.method,
    headers=dict(request.headers),
    content=await request.body()
)
undefined
@app.api_route("/{service}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) async def gateway_route(service: str, path: str, request: Request): gateway = APIGateway()
# Authentication
token = request.headers.get("Authorization", "").replace("Bearer ", "")
user = await gateway.authenticate(token)

if not user:
    return Response(content={"error": "Unauthorized"}, status_code=401)

# Rate limiting
if not await gateway.rate_limit(user["id"]):
    return Response(content={"error": "Rate limit exceeded"}, status_code=429)

# Route request
return await gateway.route_request(
    service=service,
    path=f"/{path}",
    method=request.method,
    headers=dict(request.headers),
    content=await request.body()
)
undefined

Event-Driven Architecture

事件驱动架构

python
import pika
import json
from typing import Callable, Dict
import asyncio

class EventBus:
    """Message broker for event-driven communication"""

    def __init__(self, rabbitmq_url: str):
        self.connection = pika.BlockingConnection(
            pika.URLParameters(rabbitmq_url)
        )
        self.channel = self.connection.channel()
        self.handlers: Dict[str, Callable] = {}

    def publish_event(self, event_type: str, data: dict):
        """Publish event to all subscribers"""
        self.channel.exchange_declare(
            exchange='events',
            exchange_type='topic',
            durable=True
        )

        message = json.dumps({
            "event_type": event_type,
            "data": data,
            "timestamp": datetime.now().isoformat()
        })

        self.channel.basic_publish(
            exchange='events',
            routing_key=event_type,
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2  # persistent
            )
        )

    def subscribe(self, event_type: str, handler: Callable):
        """Subscribe to event type"""
        self.handlers[event_type] = handler

        # Declare queue
        queue_name = f"{event_type}_queue"
        self.channel.queue_declare(queue=queue_name, durable=True)

        # Bind queue to exchange
        self.channel.queue_bind(
            queue=queue_name,
            exchange='events',
            routing_key=event_type
        )

        # Start consuming
        def callback(ch, method, properties, body):
            message = json.loads(body)
            handler(message["data"])
            ch.basic_ack(delivery_tag=method.delivery_tag)

        self.channel.basic_consume(
            queue=queue_name,
            on_message_callback=callback
        )

    def start_consuming(self):
        """Start consuming events"""
        self.channel.start_consuming()
python
import pika
import json
from typing import Callable, Dict
import asyncio

class EventBus:
    """Message broker for event-driven communication"""

    def __init__(self, rabbitmq_url: str):
        self.connection = pika.BlockingConnection(
            pika.URLParameters(rabbitmq_url)
        )
        self.channel = self.connection.channel()
        self.handlers: Dict[str, Callable] = {}

    def publish_event(self, event_type: str, data: dict):
        """Publish event to all subscribers"""
        self.channel.exchange_declare(
            exchange='events',
            exchange_type='topic',
            durable=True
        )

        message = json.dumps({
            "event_type": event_type,
            "data": data,
            "timestamp": datetime.now().isoformat()
        })

        self.channel.basic_publish(
            exchange='events',
            routing_key=event_type,
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2  # persistent
            )
        )

    def subscribe(self, event_type: str, handler: Callable):
        """Subscribe to event type"""
        self.handlers[event_type] = handler

        # Declare queue
        queue_name = f"{event_type}_queue"
        self.channel.queue_declare(queue=queue_name, durable=True)

        # Bind queue to exchange
        self.channel.queue_bind(
            queue=queue_name,
            exchange='events',
            routing_key=event_type
        )

        # Start consuming
        def callback(ch, method, properties, body):
            message = json.loads(body)
            handler(message["data"])
            ch.basic_ack(delivery_tag=method.delivery_tag)

        self.channel.basic_consume(
            queue=queue_name,
            on_message_callback=callback
        )

    def start_consuming(self):
        """Start consuming events"""
        self.channel.start_consuming()

Usage Example

Usage Example

event_bus = EventBus("amqp://localhost")
event_bus = EventBus("amqp://localhost")

Service A publishes event

Service A publishes event

event_bus.publish_event("order.created", { "order_id": "12345", "user_id": "user_1", "total": 99.99 })
event_bus.publish_event("order.created", { "order_id": "12345", "user_id": "user_1", "total": 99.99 })

Service B subscribes to event

Service B subscribes to event

def handle_order_created(data): print(f"Processing order: {data['order_id']}") # Send email, update inventory, etc.
event_bus.subscribe("order.created", handle_order_created)
undefined
def handle_order_created(data): print(f"Processing order: {data['order_id']}") # Send email, update inventory, etc.
event_bus.subscribe("order.created", handle_order_created)
undefined

Best Practices

最佳实践

Design

设计原则

  • Keep services small and focused
  • Design for failure (circuit breakers, retries)
  • Use asynchronous communication when possible
  • Implement proper service boundaries
  • Avoid distributed monoliths
  • Use API versioning
  • Implement health checks
  • 保持服务小巧且聚焦单一职责
  • 为故障设计(断路器、重试机制)
  • 尽可能使用异步通信
  • 实现合理的服务边界
  • 避免分布式单体
  • 使用API版本控制
  • 实现健康检查

Data Management

数据管理

  • Database per service
  • Use eventual consistency
  • Implement saga pattern for distributed transactions
  • Use event sourcing for audit trails
  • Cache aggressively
  • Avoid distributed joins
  • 每个服务独立数据库
  • 使用最终一致性
  • 采用Saga模式处理分布式事务
  • 使用事件溯源实现审计追踪
  • 积极使用缓存
  • 避免分布式关联查询

Operations

运维管理

  • Implement distributed tracing
  • Centralized logging
  • Monitor service health
  • Automate deployments
  • Use service mesh for cross-cutting concerns
  • Implement feature flags
  • Practice chaos engineering
  • 实现分布式链路追踪
  • 集中式日志管理
  • 监控服务健康状态
  • 自动化部署
  • 使用Service Mesh处理横切关注点
  • 实现功能开关
  • 践行混沌工程

Anti-Patterns

反模式

❌ Distributed monolith ❌ Shared database between services ❌ Synchronous communication everywhere ❌ No service versioning ❌ Tight coupling between services ❌ No circuit breakers ❌ Missing distributed tracing
❌ 分布式单体 ❌ 服务间共享数据库 ❌ 全同步通信 ❌ 无服务版本控制 ❌ 服务间紧耦合 ❌ 未实现断路器 ❌ 缺失分布式链路追踪

Resources

参考资源