python-resilience

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Python Resilience Patterns

Python韧性模式

Build fault-tolerant Python applications that gracefully handle transient failures, network issues, and service outages. Resilience patterns keep systems running when dependencies are unreliable.
构建可优雅处理瞬时故障、网络问题和服务中断的容错Python应用。当依赖服务不可靠时,韧性模式可保障系统持续运行。

When to Use This Skill

适用场景

  • Adding retry logic to external service calls
  • Implementing timeouts for network operations
  • Building fault-tolerant microservices
  • Handling rate limiting and backpressure
  • Creating infrastructure decorators
  • Designing circuit breakers
  • 为外部服务调用添加重试逻辑
  • 为网络操作实现超时机制
  • 构建容错微服务
  • 处理限流与背压
  • 创建基础设施装饰器
  • 设计断路器

Core Concepts

核心概念

1. Transient vs Permanent Failures

1. 瞬时故障 vs 永久故障

Retry transient errors (network timeouts, temporary service issues). Don't retry permanent errors (invalid credentials, bad requests).
对瞬时错误(网络超时、临时服务问题)进行重试,不要对永久错误(无效凭证、错误请求)进行重试。

2. Exponential Backoff

2. 指数退避

Increase wait time between retries to avoid overwhelming recovering services.
增加重试间隔时间,避免给正在恢复的服务造成过大压力。

3. Jitter

3. 抖动机制

Add randomness to backoff to prevent thundering herd when many clients retry simultaneously.
在退避间隔中加入随机性,防止多个客户端同时重试时出现“惊群效应”。

4. Bounded Retries

4. 有限重试

Cap both attempt count and total duration to prevent infinite retry loops.
限制重试次数和总时长,避免无限重试循环。

Quick Start

快速入门

python
from tenacity import retry, stop_after_attempt, wait_exponential_jitter

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential_jitter(initial=1, max=10),
)
def call_external_service(request: dict) -> dict:
    return httpx.post("https://api.example.com", json=request).json()
python
from tenacity import retry, stop_after_attempt, wait_exponential_jitter

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential_jitter(initial=1, max=10),
)
def call_external_service(request: dict) -> dict:
    return httpx.post("https://api.example.com", json=request).json()

Fundamental Patterns

基础模式

Pattern 1: Basic Retry with Tenacity

模式1:基于Tenacity的基础重试

Use the
tenacity
library for production-grade retry logic. For simpler cases, consider built-in retry functionality or a lightweight custom implementation.
python
from tenacity import (
    retry,
    stop_after_attempt,
    stop_after_delay,
    wait_exponential_jitter,
    retry_if_exception_type,
)

TRANSIENT_ERRORS = (ConnectionError, TimeoutError, OSError)

@retry(
    retry=retry_if_exception_type(TRANSIENT_ERRORS),
    stop=stop_after_attempt(5) | stop_after_delay(60),
    wait=wait_exponential_jitter(initial=1, max=30),
)
def fetch_data(url: str) -> dict:
    """Fetch data with automatic retry on transient failures."""
    response = httpx.get(url, timeout=30)
    response.raise_for_status()
    return response.json()
使用
tenacity
库实现生产级别的重试逻辑。对于简单场景,可考虑使用内置重试功能或轻量级自定义实现。
python
from tenacity import (
    retry,
    stop_after_attempt,
    stop_after_delay,
    wait_exponential_jitter,
    retry_if_exception_type,
)

TRANSIENT_ERRORS = (ConnectionError, TimeoutError, OSError)

@retry(
    retry=retry_if_exception_type(TRANSIENT_ERRORS),
    stop=stop_after_attempt(5) | stop_after_delay(60),
    wait=wait_exponential_jitter(initial=1, max=30),
)
def fetch_data(url: str) -> dict:
    """Fetch data with automatic retry on transient failures."""
    response = httpx.get(url, timeout=30)
    response.raise_for_status()
    return response.json()

Pattern 2: Retry Only Appropriate Errors

模式2:仅对合适的错误进行重试

Whitelist specific transient exceptions. Never retry:
  • ValueError
    ,
    TypeError
    - These are bugs, not transient issues
  • AuthenticationError
    - Invalid credentials won't become valid
  • HTTP 4xx errors (except 429) - Client errors are permanent
python
from tenacity import retry, retry_if_exception_type
import httpx
白名单指定的瞬时异常。绝对不要重试:
  • ValueError
    TypeError
    - 这些是程序错误,而非瞬时问题
  • AuthenticationError
    - 无效凭证不会自动变为有效
  • HTTP 4xx状态码(除429外)- 客户端错误属于永久错误
python
from tenacity import retry, retry_if_exception_type
import httpx

Define what's retryable

Define what's retryable

RETRYABLE_EXCEPTIONS = ( ConnectionError, TimeoutError, httpx.ConnectTimeout, httpx.ReadTimeout, )
@retry( retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS), stop=stop_after_attempt(3), wait=wait_exponential_jitter(initial=1, max=10), ) def resilient_api_call(endpoint: str) -> dict: """Make API call with retry on network issues.""" return httpx.get(endpoint, timeout=10).json()
undefined
RETRYABLE_EXCEPTIONS = ( ConnectionError, TimeoutError, httpx.ConnectTimeout, httpx.ReadTimeout, )
@retry( retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS), stop=stop_after_attempt(3), wait=wait_exponential_jitter(initial=1, max=10), ) def resilient_api_call(endpoint: str) -> dict: """Make API call with retry on network issues.""" return httpx.get(endpoint, timeout=10).json()
undefined

Pattern 3: HTTP Status Code Retries

模式3:基于HTTP状态码的重试

Retry specific HTTP status codes that indicate transient issues.
python
from tenacity import retry, retry_if_result, stop_after_attempt
import httpx

RETRY_STATUS_CODES = {429, 502, 503, 504}

def should_retry_response(response: httpx.Response) -> bool:
    """Check if response indicates a retryable error."""
    return response.status_code in RETRY_STATUS_CODES

@retry(
    retry=retry_if_result(should_retry_response),
    stop=stop_after_attempt(3),
    wait=wait_exponential_jitter(initial=1, max=10),
)
def http_request(method: str, url: str, **kwargs) -> httpx.Response:
    """Make HTTP request with retry on transient status codes."""
    return httpx.request(method, url, timeout=30, **kwargs)
对指示瞬时问题的特定HTTP状态码进行重试。
python
from tenacity import retry, retry_if_result, stop_after_attempt
import httpx

RETRY_STATUS_CODES = {429, 502, 503, 504}

def should_retry_response(response: httpx.Response) -> bool:
    """Check if response indicates a retryable error."""
    return response.status_code in RETRY_STATUS_CODES

@retry(
    retry=retry_if_result(should_retry_response),
    stop=stop_after_attempt(3),
    wait=wait_exponential_jitter(initial=1, max=10),
)
def http_request(method: str, url: str, **kwargs) -> httpx.Response:
    """Make HTTP request with retry on transient status codes."""
    return httpx.request(method, url, timeout=30, **kwargs)

Pattern 4: Combined Exception and Status Retry

模式4:结合异常与状态码的重试

Handle both network exceptions and HTTP status codes.
python
from tenacity import (
    retry,
    retry_if_exception_type,
    retry_if_result,
    stop_after_attempt,
    wait_exponential_jitter,
    before_sleep_log,
)
import logging
import httpx

logger = logging.getLogger(__name__)

TRANSIENT_EXCEPTIONS = (
    ConnectionError,
    TimeoutError,
    httpx.ConnectError,
    httpx.ReadTimeout,
)
RETRY_STATUS_CODES = {429, 500, 502, 503, 504}

def is_retryable_response(response: httpx.Response) -> bool:
    return response.status_code in RETRY_STATUS_CODES

@retry(
    retry=(
        retry_if_exception_type(TRANSIENT_EXCEPTIONS) |
        retry_if_result(is_retryable_response)
    ),
    stop=stop_after_attempt(5),
    wait=wait_exponential_jitter(initial=1, max=30),
    before_sleep=before_sleep_log(logger, logging.WARNING),
)
def robust_http_call(
    method: str,
    url: str,
    **kwargs,
) -> httpx.Response:
    """HTTP call with comprehensive retry handling."""
    return httpx.request(method, url, timeout=30, **kwargs)
同时处理网络异常和HTTP状态码。
python
from tenacity import (
    retry,
    retry_if_exception_type,
    retry_if_result,
    stop_after_attempt,
    wait_exponential_jitter,
    before_sleep_log,
)
import logging
import httpx

logger = logging.getLogger(__name__)

TRANSIENT_EXCEPTIONS = (
    ConnectionError,
    TimeoutError,
    httpx.ConnectError,
    httpx.ReadTimeout,
)
RETRY_STATUS_CODES = {429, 500, 502, 503, 504}

def is_retryable_response(response: httpx.Response) -> bool:
    return response.status_code in RETRY_STATUS_CODES

@retry(
    retry=(
        retry_if_exception_type(TRANSIENT_EXCEPTIONS) |
        retry_if_result(is_retryable_response)
    ),
    stop=stop_after_attempt(5),
    wait=wait_exponential_jitter(initial=1, max=30),
    before_sleep=before_sleep_log(logger, logging.WARNING),
)
def robust_http_call(
    method: str,
    url: str,
    **kwargs,
) -> httpx.Response:
    """HTTP call with comprehensive retry handling."""
    return httpx.request(method, url, timeout=30, **kwargs)

Advanced Patterns

进阶模式

Pattern 5: Logging Retry Attempts

模式5:记录重试尝试

Track retry behavior for debugging and alerting.
python
from tenacity import retry, stop_after_attempt, wait_exponential
import structlog

logger = structlog.get_logger()

def log_retry_attempt(retry_state):
    """Log detailed retry information."""
    exception = retry_state.outcome.exception()
    logger.warning(
        "Retrying operation",
        attempt=retry_state.attempt_number,
        exception_type=type(exception).__name__,
        exception_message=str(exception),
        next_wait_seconds=retry_state.next_action.sleep if retry_state.next_action else None,
    )

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, max=10),
    before_sleep=log_retry_attempt,
)
def call_with_logging(request: dict) -> dict:
    """External call with retry logging."""
    ...
跟踪重试行为以用于调试和告警。
python
from tenacity import retry, stop_after_attempt, wait_exponential
import structlog

logger = structlog.get_logger()

def log_retry_attempt(retry_state):
    """Log detailed retry information."""
    exception = retry_state.outcome.exception()
    logger.warning(
        "Retrying operation",
        attempt=retry_state.attempt_number,
        exception_type=type(exception).__name__,
        exception_message=str(exception),
        next_wait_seconds=retry_state.next_action.sleep if retry_state.next_action else None,
    )

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, max=10),
    before_sleep=log_retry_attempt,
)
def call_with_logging(request: dict) -> dict:
    """External call with retry logging."""
    ...

Pattern 6: Timeout Decorator

模式6:超时装饰器

Create reusable timeout decorators for consistent timeout handling.
python
import asyncio
from functools import wraps
from typing import TypeVar, Callable

T = TypeVar("T")

def with_timeout(seconds: float):
    """Decorator to add timeout to async functions."""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            return await asyncio.wait_for(
                func(*args, **kwargs),
                timeout=seconds,
            )
        return wrapper
    return decorator

@with_timeout(30)
async def fetch_with_timeout(url: str) -> dict:
    """Fetch URL with 30 second timeout."""
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.json()
创建可复用的超时装饰器,实现一致的超时处理。
python
import asyncio
from functools import wraps
from typing import TypeVar, Callable

T = TypeVar("T")

def with_timeout(seconds: float):
    """Decorator to add timeout to async functions."""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            return await asyncio.wait_for(
                func(*args, **kwargs),
                timeout=seconds,
            )
        return wrapper
    return decorator

@with_timeout(30)
async def fetch_with_timeout(url: str) -> dict:
    """Fetch URL with 30 second timeout."""
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.json()

Pattern 7: Cross-Cutting Concerns via Decorators

模式7:通过装饰器实现横切关注点

Stack decorators to separate infrastructure from business logic.
python
from functools import wraps
from typing import TypeVar, Callable
import structlog

logger = structlog.get_logger()
T = TypeVar("T")

def traced(name: str | None = None):
    """Add tracing to function calls."""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        span_name = name or func.__name__

        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            logger.info("Operation started", operation=span_name)
            try:
                result = await func(*args, **kwargs)
                logger.info("Operation completed", operation=span_name)
                return result
            except Exception as e:
                logger.error("Operation failed", operation=span_name, error=str(e))
                raise
        return wrapper
    return decorator
堆叠装饰器,将基础设施逻辑与业务逻辑分离。
python
from functools import wraps
from typing import TypeVar, Callable
import structlog

logger = structlog.get_logger()
T = TypeVar("T")

def traced(name: str | None = None):
    """Add tracing to function calls."""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        span_name = name or func.__name__

        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            logger.info("Operation started", operation=span_name)
            try:
                result = await func(*args, **kwargs)
                logger.info("Operation completed", operation=span_name)
                return result
            except Exception as e:
                logger.error("Operation failed", operation=span_name, error=str(e))
                raise
        return wrapper
    return decorator

Stack multiple concerns

Stack multiple concerns

@traced("fetch_user_data") @with_timeout(30) @retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter()) async def fetch_user_data(user_id: str) -> dict: """Fetch user with tracing, timeout, and retry.""" ...
undefined
@traced("fetch_user_data") @with_timeout(30) @retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter()) async def fetch_user_data(user_id: str) -> dict: """Fetch user with tracing, timeout, and retry.""" ...
undefined

Pattern 8: Dependency Injection for Testability

模式8:依赖注入以提升可测试性

Pass infrastructure components through constructors for easy testing.
python
from dataclasses import dataclass
from typing import Protocol

class Logger(Protocol):
    def info(self, msg: str, **kwargs) -> None: ...
    def error(self, msg: str, **kwargs) -> None: ...

class MetricsClient(Protocol):
    def increment(self, metric: str, tags: dict | None = None) -> None: ...
    def timing(self, metric: str, value: float) -> None: ...

@dataclass
class UserService:
    """Service with injected infrastructure."""

    repository: UserRepository
    logger: Logger
    metrics: MetricsClient

    async def get_user(self, user_id: str) -> User:
        self.logger.info("Fetching user", user_id=user_id)
        start = time.perf_counter()

        try:
            user = await self.repository.get(user_id)
            self.metrics.increment("user.fetch.success")
            return user
        except Exception as e:
            self.metrics.increment("user.fetch.error")
            self.logger.error("Failed to fetch user", user_id=user_id, error=str(e))
            raise
        finally:
            elapsed = time.perf_counter() - start
            self.metrics.timing("user.fetch.duration", elapsed)
通过构造函数传递基础设施组件,便于测试。
python
from dataclasses import dataclass
from typing import Protocol

class Logger(Protocol):
    def info(self, msg: str, **kwargs) -> None: ...
    def error(self, msg: str, **kwargs) -> None: ...

class MetricsClient(Protocol):
    def increment(self, metric: str, tags: dict | None = None) -> None: ...
    def timing(self, metric: str, value: float) -> None: ...

@dataclass
class UserService:
    """Service with injected infrastructure."""

    repository: UserRepository
    logger: Logger
    metrics: MetricsClient

    async def get_user(self, user_id: str) -> User:
        self.logger.info("Fetching user", user_id=user_id)
        start = time.perf_counter()

        try:
            user = await self.repository.get(user_id)
            self.metrics.increment("user.fetch.success")
            return user
        except Exception as e:
            self.metrics.increment("user.fetch.error")
            self.logger.error("Failed to fetch user", user_id=user_id, error=str(e))
            raise
        finally:
            elapsed = time.perf_counter() - start
            self.metrics.timing("user.fetch.duration", elapsed)

Easy to test with fakes

Easy to test with fakes

service = UserService( repository=FakeRepository(), logger=FakeLogger(), metrics=FakeMetrics(), )
undefined
service = UserService( repository=FakeRepository(), logger=FakeLogger(), metrics=FakeMetrics(), )
undefined

Pattern 9: Fail-Safe Defaults

模式9:故障安全默认值

Degrade gracefully when non-critical operations fail.
python
from typing import TypeVar
from collections.abc import Callable

T = TypeVar("T")

def fail_safe(default: T, log_failure: bool = True):
    """Return default value on failure instead of raising."""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                if log_failure:
                    logger.warning(
                        "Operation failed, using default",
                        function=func.__name__,
                        error=str(e),
                    )
                return default
        return wrapper
    return decorator

@fail_safe(default=[])
async def get_recommendations(user_id: str) -> list[str]:
    """Get recommendations, return empty list on failure."""
    ...
当非关键操作失败时,优雅降级。
python
from typing import TypeVar
from collections.abc import Callable

T = TypeVar("T")

def fail_safe(default: T, log_failure: bool = True):
    """Return default value on failure instead of raising."""
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            try:
                return await func(*args, **kwargs)
            except Exception as e:
                if log_failure:
                    logger.warning(
                        "Operation failed, using default",
                        function=func.__name__,
                        error=str(e),
                    )
                return default
        return wrapper
    return decorator

@fail_safe(default=[])
async def get_recommendations(user_id: str) -> list[str]:
    """Get recommendations, return empty list on failure."""
    ...

Best Practices Summary

最佳实践总结

  1. Retry only transient errors - Don't retry bugs or authentication failures
  2. Use exponential backoff - Give services time to recover
  3. Add jitter - Prevent thundering herd from synchronized retries
  4. Cap total duration -
    stop_after_attempt(5) | stop_after_delay(60)
  5. Log every retry - Silent retries hide systemic problems
  6. Use decorators - Keep retry logic separate from business logic
  7. Inject dependencies - Make infrastructure testable
  8. Set timeouts everywhere - Every network call needs a timeout
  9. Fail gracefully - Return cached/default values for non-critical paths
  10. Monitor retry rates - High retry rates indicate underlying issues
  1. 仅重试瞬时错误 - 不要重试程序错误或认证失败
  2. 使用指数退避 - 给服务留出恢复时间
  3. 添加抖动机制 - 避免同步重试导致的惊群效应
  4. 限制总时长 - 使用
    stop_after_attempt(5) | stop_after_delay(60)
  5. 记录每次重试 - 静默重试会隐藏系统性问题
  6. 使用装饰器 - 将重试逻辑与业务逻辑分离
  7. 注入依赖 - 让基础设施逻辑可测试
  8. 处处设置超时 - 每个网络调用都需要超时机制
  9. 优雅降级 - 为非关键路径返回缓存/默认值
  10. 监控重试率 - 高重试率表明存在潜在问题