connection-pooling

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Connection Pooling Patterns ()

连接池模式

Database and HTTP connection pooling for high-performance async Python applications.
面向高性能Python异步应用的数据库与HTTP连接池方案。

Overview

概述

  • Configuring asyncpg/SQLAlchemy connection pools
  • Setting up aiohttp ClientSession for HTTP requests
  • Diagnosing connection exhaustion or leaks
  • Optimizing pool sizes for workload
  • Implementing health checks and connection validation
  • 配置asyncpg/SQLAlchemy连接池
  • 为HTTP请求设置aiohttp ClientSession
  • 诊断连接耗尽或泄漏问题
  • 根据工作负载优化连接池大小
  • 实现健康检查与连接验证

Quick Reference

快速参考

SQLAlchemy Async Pool Configuration

SQLAlchemy异步连接池配置

python
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",

    # Pool sizing
    pool_size=20,           # Steady-state connections
    max_overflow=10,        # Burst capacity (total max = 30)

    # Connection health
    pool_pre_ping=True,     # Validate before use (adds ~1ms latency)
    pool_recycle=3600,      # Recreate connections after 1 hour

    # Timeouts
    pool_timeout=30,        # Wait for connection from pool
    connect_args={
        "command_timeout": 60,      # Query timeout
        "server_settings": {
            "statement_timeout": "60000",  # 60s query timeout
        },
    },
)
python
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",

    # 连接池大小设置
    pool_size=20,           # 稳态连接数
    max_overflow=10,        # 突发扩容容量(总最大连接数=30)

    # 连接健康检查
    pool_pre_ping=True,     # 使用前验证连接(增加约1ms延迟)
    pool_recycle=3600,      # 1小时后重建连接

    # 超时设置
    pool_timeout=30,        # 等待获取连接池连接的超时时间
    connect_args={
        "command_timeout": 60,      # 查询超时
        "server_settings": {
            "statement_timeout": "60000",  # 60秒查询超时
        },
    },
)

Direct asyncpg Pool

直接使用asyncpg连接池

python
import asyncpg

pool = await asyncpg.create_pool(
    "postgresql://user:pass@localhost/db",

    # Pool sizing
    min_size=10,            # Minimum connections kept open
    max_size=20,            # Maximum connections

    # Connection lifecycle
    max_inactive_connection_lifetime=300,  # Close idle after 5 min

    # Timeouts
    command_timeout=60,     # Query timeout
    timeout=30,             # Connection timeout

    # Setup for each connection
    setup=setup_connection,
)

async def setup_connection(conn):
    """Run on each new connection."""
    await conn.execute("SET timezone TO 'UTC'")
    await conn.execute("SET statement_timeout TO '60s'")
python
import asyncpg

pool = await asyncpg.create_pool(
    "postgresql://user:pass@localhost/db",

    # 连接池大小设置
    min_size=10,            # 保持打开的最小连接数
    max_size=20,            # 最大连接数

    # 连接生命周期
    max_inactive_connection_lifetime=300,  # 闲置5分钟后关闭连接

    # 超时设置
    command_timeout=60,     # 查询超时
    timeout=30,             # 连接超时

    # 每个连接的初始化操作
    setup=setup_connection,
)

async def setup_connection(conn):
    """为每个新连接执行初始化操作。"""
    await conn.execute("SET timezone TO 'UTC'")
    await conn.execute("SET statement_timeout TO '60s'")

aiohttp Session Pool

aiohttp会话连接池

python
import aiohttp
from aiohttp import TCPConnector

connector = TCPConnector(
    # Connection limits
    limit=100,              # Total connections
    limit_per_host=20,      # Per-host limit

    # Timeouts
    keepalive_timeout=30,   # Keep-alive duration

    # SSL
    ssl=False,              # Or ssl.SSLContext for HTTPS

    # DNS
    ttl_dns_cache=300,      # DNS cache TTL
)

session = aiohttp.ClientSession(
    connector=connector,
    timeout=aiohttp.ClientTimeout(
        total=30,           # Total request timeout
        connect=10,         # Connection timeout
        sock_read=20,       # Read timeout
    ),
)
python
import aiohttp
from aiohttp import TCPConnector

connector = TCPConnector(
    # 连接限制
    limit=100,              # 总连接数
    limit_per_host=20,      # 单主机连接数限制

    # 超时设置
    keepalive_timeout=30,   # 连接保持时长

    # SSL配置
    ssl=False,              # 或使用ssl.SSLContext配置HTTPS

    # DNS缓存
    ttl_dns_cache=300,      # DNS缓存有效期
)

session = aiohttp.ClientSession(
    connector=connector,
    timeout=aiohttp.ClientTimeout(
        total=30,           # 请求总超时
        connect=10,         # 连接超时
        sock_read=20,       # 读取超时
    ),
)

IMPORTANT: Reuse session across requests

重要提示:跨请求复用会话

Create once at startup, close at shutdown

在启动时创建一次,关闭时销毁

undefined
undefined

FastAPI Lifespan with Pools

搭配连接池的FastAPI生命周期管理

python
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: create pools
    app.state.db_pool = await asyncpg.create_pool(DATABASE_URL)
    app.state.http_session = aiohttp.ClientSession(
        connector=TCPConnector(limit=100)
    )

    yield

    # Shutdown: close pools
    await app.state.db_pool.close()
    await app.state.http_session.close()

app = FastAPI(lifespan=lifespan)
python
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 启动阶段:创建连接池
    app.state.db_pool = await asyncpg.create_pool(DATABASE_URL)
    app.state.http_session = aiohttp.ClientSession(
        connector=TCPConnector(limit=100)
    )

    yield

    # 关闭阶段:销毁连接池
    await app.state.db_pool.close()
    await app.state.http_session.close()

app = FastAPI(lifespan=lifespan)

Pool Monitoring

连接池监控

python
from prometheus_client import Gauge
python
from prometheus_client import Gauge

Metrics

监控指标

pool_size = Gauge("db_pool_size", "Current pool size") pool_available = Gauge("db_pool_available", "Available connections") pool_waiting = Gauge("db_pool_waiting", "Requests waiting for connection")
async def collect_pool_metrics(pool: asyncpg.Pool): """Collect pool metrics periodically.""" pool_size.set(pool.get_size()) pool_available.set(pool.get_idle_size()) # For waiting, need custom tracking
undefined
pool_size = Gauge("db_pool_size", "Current pool size") pool_available = Gauge("db_pool_available", "Available connections") pool_waiting = Gauge("db_pool_waiting", "Requests waiting for connection")
async def collect_pool_metrics(pool: asyncpg.Pool): """定期收集连接池指标。""" pool_size.set(pool.get_size()) pool_available.set(pool.get_idle_size()) # 等待请求数需要自定义跟踪逻辑
undefined

Key Decisions

关键配置决策

ParameterSmall ServiceMedium ServiceHigh Load
pool_size5-1020-5050-100
max_overflow510-2020-50
pool_pre_pingTrueTrueConsider False*
pool_recycle36001800900
pool_timeout30155
*For very high load, pre_ping adds latency; use shorter recycle instead.
参数小型服务中型服务高负载服务
pool_size5-1020-5050-100
max_overflow510-2020-50
pool_pre_ping开启开启考虑关闭*
pool_recycle36001800900
pool_timeout30155
*在极高负载场景下,pre_ping会增加延迟;建议改用更短的pool_recycle时长。

Sizing Formula

连接池大小计算公式

pool_size = (concurrent_requests / avg_queries_per_request) * 1.5

Example:
- 100 concurrent requests
- 3 queries per request average
- pool_size = (100 / 3) * 1.5 = 50
pool_size = (并发请求数 / 平均每个请求的查询数) * 1.5

示例:
- 100个并发请求
- 平均每个请求包含3次查询
- pool_size = (100 / 3) * 1.5 = 50

Anti-Patterns (FORBIDDEN)

反模式(禁止使用)

python
undefined
python
undefined

NEVER create engine/pool per request

绝不要为每个请求创建引擎/连接池

async def get_data(): engine = create_async_engine(url) # WRONG - pool per request! async with engine.connect() as conn: return await conn.execute(...)
async def get_data(): engine = create_async_engine(url) # 错误 - 每个请求创建连接池! async with engine.connect() as conn: return await conn.execute(...)

NEVER create ClientSession per request

绝不要为每个请求创建ClientSession

async def fetch(): async with aiohttp.ClientSession() as session: # WRONG! return await session.get(url)
async def fetch(): async with aiohttp.ClientSession() as session: # 错误! return await session.get(url)

NEVER forget to close pools on shutdown

绝不要在服务关闭时忘记销毁连接池

app = FastAPI() engine = create_async_engine(url)
app = FastAPI() engine = create_async_engine(url)

WRONG - engine never closed!

错误 - 引擎从未被销毁!

NEVER use pool_pre_ping=False without short pool_recycle

绝不要在未设置短时长pool_recycle的情况下关闭pool_pre_ping

engine = create_async_engine(url, pool_pre_ping=False) # Stale connections!
engine = create_async_engine(url, pool_pre_ping=False) # 会产生失效连接!

NEVER set pool_size too high

绝不要设置过大的pool_size

engine = create_async_engine(url, pool_size=500) # Exhausts DB connections!
undefined
engine = create_async_engine(url, pool_size=500) # 会耗尽数据库连接!
undefined

Troubleshooting

故障排查

Connection Exhaustion

连接耗尽

python
undefined
python
undefined

Symptom: "QueuePool limit reached" or timeouts

症状:"QueuePool limit reached" 或超时错误

Diagnosis

诊断方法

from sqlalchemy import event
@event.listens_for(engine.sync_engine, "checkout") def log_checkout(dbapi_conn, conn_record, conn_proxy): print(f"Connection checked out: {id(dbapi_conn)}")
@event.listens_for(engine.sync_engine, "checkin") def log_checkin(dbapi_conn, conn_record): print(f"Connection returned: {id(dbapi_conn)}")
from sqlalchemy import event
@event.listens_for(engine.sync_engine, "checkout") def log_checkout(dbapi_conn, conn_record, conn_proxy): print(f"连接已取出: {id(dbapi_conn)}")
@event.listens_for(engine.sync_engine, "checkin") def log_checkin(dbapi_conn, conn_record): print(f"连接已归还: {id(dbapi_conn)}")

Fix: Ensure connections are returned

修复方案:确保连接被正确归还

async with session.begin(): # ... work ... pass # Connection returned here
undefined
async with session.begin(): # ... 业务操作 ... pass # 连接在此处被归还
undefined

Stale Connections

失效连接

python
undefined
python
undefined

Symptom: "connection closed" errors

症状:"connection closed" 错误

Fix 1: Enable pool_pre_ping

修复方案1:开启pool_pre_ping

engine = create_async_engine(url, pool_pre_ping=True)
engine = create_async_engine(url, pool_pre_ping=True)

Fix 2: Reduce pool_recycle

修复方案2:缩短pool_recycle时长

engine = create_async_engine(url, pool_recycle=900)
engine = create_async_engine(url, pool_recycle=900)

Fix 3: Handle in application

修复方案3:在应用层处理重试

from sqlalchemy.exc import DBAPIError
async def with_retry(session, operation, max_retries=3): for attempt in range(max_retries): try: return await operation(session) except DBAPIError as e: if attempt == max_retries - 1: raise await session.rollback()
undefined
from sqlalchemy.exc import DBAPIError
async def with_retry(session, operation, max_retries=3): for attempt in range(max_retries): try: return await operation(session) except DBAPIError as e: if attempt == max_retries - 1: raise await session.rollback()
undefined

Related Skills

相关技能

  • sqlalchemy-2-async
    - SQLAlchemy async session patterns
  • asyncio-advanced
    - Async concurrency patterns
  • observability-monitoring
    - Metrics and alerting
  • caching-strategies
    - Redis connection pooling
  • sqlalchemy-2-async
    - SQLAlchemy异步会话模式
  • asyncio-advanced
    - 异步并发模式
  • observability-monitoring
    - 指标与告警
  • caching-strategies
    - Redis连接池

Capability Details

能力详情

database-pool

database-pool

Keywords: pool_size, max_overflow, asyncpg, pool_pre_ping, connection pool Solves:
  • How do I size database connection pool?
  • Configure asyncpg/SQLAlchemy pool
  • Prevent connection exhaustion
关键词: pool_size, max_overflow, asyncpg, pool_pre_ping, connection pool 解决问题:
  • 如何设置数据库连接池大小?
  • 配置asyncpg/SQLAlchemy连接池
  • 防止连接耗尽

http-session

http-session

Keywords: aiohttp, ClientSession, TCPConnector, http pool, connection limit Solves:
  • How do I configure aiohttp session?
  • Reuse HTTP connections properly
  • Set timeouts for HTTP requests
关键词: aiohttp, ClientSession, TCPConnector, http pool, connection limit 解决问题:
  • 如何配置aiohttp会话?
  • 正确复用HTTP连接
  • 为HTTP请求设置超时

pool-monitoring

pool-monitoring

Keywords: pool metrics, connection leak, pool exhaustion, monitoring Solves:
  • How do I monitor connection pool health?
  • Detect connection leaks
  • Troubleshoot pool exhaustion
关键词: pool metrics, connection leak, pool exhaustion, monitoring 解决问题:
  • 如何监控连接池健康状态?
  • 检测连接泄漏
  • 排查连接池耗尽问题