connection-pooling
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseConnection Pooling Patterns ()
连接池模式
Database and HTTP connection pooling for high-performance async Python applications.
面向高性能Python异步应用的数据库与HTTP连接池方案。
Overview
概述
- Configuring asyncpg/SQLAlchemy connection pools
- Setting up aiohttp ClientSession for HTTP requests
- Diagnosing connection exhaustion or leaks
- Optimizing pool sizes for workload
- Implementing health checks and connection validation
- 配置asyncpg/SQLAlchemy连接池
- 为HTTP请求设置aiohttp ClientSession
- 诊断连接耗尽或泄漏问题
- 根据工作负载优化连接池大小
- 实现健康检查与连接验证
Quick Reference
快速参考
SQLAlchemy Async Pool Configuration
SQLAlchemy异步连接池配置
python
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
# Pool sizing
pool_size=20, # Steady-state connections
max_overflow=10, # Burst capacity (total max = 30)
# Connection health
pool_pre_ping=True, # Validate before use (adds ~1ms latency)
pool_recycle=3600, # Recreate connections after 1 hour
# Timeouts
pool_timeout=30, # Wait for connection from pool
connect_args={
"command_timeout": 60, # Query timeout
"server_settings": {
"statement_timeout": "60000", # 60s query timeout
},
},
)python
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
# 连接池大小设置
pool_size=20, # 稳态连接数
max_overflow=10, # 突发扩容容量(总最大连接数=30)
# 连接健康检查
pool_pre_ping=True, # 使用前验证连接(增加约1ms延迟)
pool_recycle=3600, # 1小时后重建连接
# 超时设置
pool_timeout=30, # 等待获取连接池连接的超时时间
connect_args={
"command_timeout": 60, # 查询超时
"server_settings": {
"statement_timeout": "60000", # 60秒查询超时
},
},
)Direct asyncpg Pool
直接使用asyncpg连接池
python
import asyncpg
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db",
# Pool sizing
min_size=10, # Minimum connections kept open
max_size=20, # Maximum connections
# Connection lifecycle
max_inactive_connection_lifetime=300, # Close idle after 5 min
# Timeouts
command_timeout=60, # Query timeout
timeout=30, # Connection timeout
# Setup for each connection
setup=setup_connection,
)
async def setup_connection(conn):
"""Run on each new connection."""
await conn.execute("SET timezone TO 'UTC'")
await conn.execute("SET statement_timeout TO '60s'")python
import asyncpg
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db",
# 连接池大小设置
min_size=10, # 保持打开的最小连接数
max_size=20, # 最大连接数
# 连接生命周期
max_inactive_connection_lifetime=300, # 闲置5分钟后关闭连接
# 超时设置
command_timeout=60, # 查询超时
timeout=30, # 连接超时
# 每个连接的初始化操作
setup=setup_connection,
)
async def setup_connection(conn):
"""为每个新连接执行初始化操作。"""
await conn.execute("SET timezone TO 'UTC'")
await conn.execute("SET statement_timeout TO '60s'")aiohttp Session Pool
aiohttp会话连接池
python
import aiohttp
from aiohttp import TCPConnector
connector = TCPConnector(
# Connection limits
limit=100, # Total connections
limit_per_host=20, # Per-host limit
# Timeouts
keepalive_timeout=30, # Keep-alive duration
# SSL
ssl=False, # Or ssl.SSLContext for HTTPS
# DNS
ttl_dns_cache=300, # DNS cache TTL
)
session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(
total=30, # Total request timeout
connect=10, # Connection timeout
sock_read=20, # Read timeout
),
)python
import aiohttp
from aiohttp import TCPConnector
connector = TCPConnector(
# 连接限制
limit=100, # 总连接数
limit_per_host=20, # 单主机连接数限制
# 超时设置
keepalive_timeout=30, # 连接保持时长
# SSL配置
ssl=False, # 或使用ssl.SSLContext配置HTTPS
# DNS缓存
ttl_dns_cache=300, # DNS缓存有效期
)
session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(
total=30, # 请求总超时
connect=10, # 连接超时
sock_read=20, # 读取超时
),
)IMPORTANT: Reuse session across requests
重要提示:跨请求复用会话
Create once at startup, close at shutdown
在启动时创建一次,关闭时销毁
undefinedundefinedFastAPI Lifespan with Pools
搭配连接池的FastAPI生命周期管理
python
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: create pools
app.state.db_pool = await asyncpg.create_pool(DATABASE_URL)
app.state.http_session = aiohttp.ClientSession(
connector=TCPConnector(limit=100)
)
yield
# Shutdown: close pools
await app.state.db_pool.close()
await app.state.http_session.close()
app = FastAPI(lifespan=lifespan)python
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# 启动阶段:创建连接池
app.state.db_pool = await asyncpg.create_pool(DATABASE_URL)
app.state.http_session = aiohttp.ClientSession(
connector=TCPConnector(limit=100)
)
yield
# 关闭阶段:销毁连接池
await app.state.db_pool.close()
await app.state.http_session.close()
app = FastAPI(lifespan=lifespan)Pool Monitoring
连接池监控
python
from prometheus_client import Gaugepython
from prometheus_client import GaugeMetrics
监控指标
pool_size = Gauge("db_pool_size", "Current pool size")
pool_available = Gauge("db_pool_available", "Available connections")
pool_waiting = Gauge("db_pool_waiting", "Requests waiting for connection")
async def collect_pool_metrics(pool: asyncpg.Pool):
"""Collect pool metrics periodically."""
pool_size.set(pool.get_size())
pool_available.set(pool.get_idle_size())
# For waiting, need custom tracking
undefinedpool_size = Gauge("db_pool_size", "Current pool size")
pool_available = Gauge("db_pool_available", "Available connections")
pool_waiting = Gauge("db_pool_waiting", "Requests waiting for connection")
async def collect_pool_metrics(pool: asyncpg.Pool):
"""定期收集连接池指标。"""
pool_size.set(pool.get_size())
pool_available.set(pool.get_idle_size())
# 等待请求数需要自定义跟踪逻辑
undefinedKey Decisions
关键配置决策
| Parameter | Small Service | Medium Service | High Load |
|---|---|---|---|
| pool_size | 5-10 | 20-50 | 50-100 |
| max_overflow | 5 | 10-20 | 20-50 |
| pool_pre_ping | True | True | Consider False* |
| pool_recycle | 3600 | 1800 | 900 |
| pool_timeout | 30 | 15 | 5 |
*For very high load, pre_ping adds latency; use shorter recycle instead.
| 参数 | 小型服务 | 中型服务 | 高负载服务 |
|---|---|---|---|
| pool_size | 5-10 | 20-50 | 50-100 |
| max_overflow | 5 | 10-20 | 20-50 |
| pool_pre_ping | 开启 | 开启 | 考虑关闭* |
| pool_recycle | 3600 | 1800 | 900 |
| pool_timeout | 30 | 15 | 5 |
*在极高负载场景下,pre_ping会增加延迟;建议改用更短的pool_recycle时长。
Sizing Formula
连接池大小计算公式
pool_size = (concurrent_requests / avg_queries_per_request) * 1.5
Example:
- 100 concurrent requests
- 3 queries per request average
- pool_size = (100 / 3) * 1.5 = 50pool_size = (并发请求数 / 平均每个请求的查询数) * 1.5
示例:
- 100个并发请求
- 平均每个请求包含3次查询
- pool_size = (100 / 3) * 1.5 = 50Anti-Patterns (FORBIDDEN)
反模式(禁止使用)
python
undefinedpython
undefinedNEVER create engine/pool per request
绝不要为每个请求创建引擎/连接池
async def get_data():
engine = create_async_engine(url) # WRONG - pool per request!
async with engine.connect() as conn:
return await conn.execute(...)
async def get_data():
engine = create_async_engine(url) # 错误 - 每个请求创建连接池!
async with engine.connect() as conn:
return await conn.execute(...)
NEVER create ClientSession per request
绝不要为每个请求创建ClientSession
async def fetch():
async with aiohttp.ClientSession() as session: # WRONG!
return await session.get(url)
async def fetch():
async with aiohttp.ClientSession() as session: # 错误!
return await session.get(url)
NEVER forget to close pools on shutdown
绝不要在服务关闭时忘记销毁连接池
app = FastAPI()
engine = create_async_engine(url)
app = FastAPI()
engine = create_async_engine(url)
WRONG - engine never closed!
错误 - 引擎从未被销毁!
NEVER use pool_pre_ping=False without short pool_recycle
绝不要在未设置短时长pool_recycle的情况下关闭pool_pre_ping
engine = create_async_engine(url, pool_pre_ping=False) # Stale connections!
engine = create_async_engine(url, pool_pre_ping=False) # 会产生失效连接!
NEVER set pool_size too high
绝不要设置过大的pool_size
engine = create_async_engine(url, pool_size=500) # Exhausts DB connections!
undefinedengine = create_async_engine(url, pool_size=500) # 会耗尽数据库连接!
undefinedTroubleshooting
故障排查
Connection Exhaustion
连接耗尽
python
undefinedpython
undefinedSymptom: "QueuePool limit reached" or timeouts
症状:"QueuePool limit reached" 或超时错误
Diagnosis
诊断方法
from sqlalchemy import event
@event.listens_for(engine.sync_engine, "checkout")
def log_checkout(dbapi_conn, conn_record, conn_proxy):
print(f"Connection checked out: {id(dbapi_conn)}")
@event.listens_for(engine.sync_engine, "checkin")
def log_checkin(dbapi_conn, conn_record):
print(f"Connection returned: {id(dbapi_conn)}")
from sqlalchemy import event
@event.listens_for(engine.sync_engine, "checkout")
def log_checkout(dbapi_conn, conn_record, conn_proxy):
print(f"连接已取出: {id(dbapi_conn)}")
@event.listens_for(engine.sync_engine, "checkin")
def log_checkin(dbapi_conn, conn_record):
print(f"连接已归还: {id(dbapi_conn)}")
Fix: Ensure connections are returned
修复方案:确保连接被正确归还
async with session.begin():
# ... work ...
pass # Connection returned here
undefinedasync with session.begin():
# ... 业务操作 ...
pass # 连接在此处被归还
undefinedStale Connections
失效连接
python
undefinedpython
undefinedSymptom: "connection closed" errors
症状:"connection closed" 错误
Fix 1: Enable pool_pre_ping
修复方案1:开启pool_pre_ping
engine = create_async_engine(url, pool_pre_ping=True)
engine = create_async_engine(url, pool_pre_ping=True)
Fix 2: Reduce pool_recycle
修复方案2:缩短pool_recycle时长
engine = create_async_engine(url, pool_recycle=900)
engine = create_async_engine(url, pool_recycle=900)
Fix 3: Handle in application
修复方案3:在应用层处理重试
from sqlalchemy.exc import DBAPIError
async def with_retry(session, operation, max_retries=3):
for attempt in range(max_retries):
try:
return await operation(session)
except DBAPIError as e:
if attempt == max_retries - 1:
raise
await session.rollback()
undefinedfrom sqlalchemy.exc import DBAPIError
async def with_retry(session, operation, max_retries=3):
for attempt in range(max_retries):
try:
return await operation(session)
except DBAPIError as e:
if attempt == max_retries - 1:
raise
await session.rollback()
undefinedRelated Skills
相关技能
- - SQLAlchemy async session patterns
sqlalchemy-2-async - - Async concurrency patterns
asyncio-advanced - - Metrics and alerting
observability-monitoring - - Redis connection pooling
caching-strategies
- - SQLAlchemy异步会话模式
sqlalchemy-2-async - - 异步并发模式
asyncio-advanced - - 指标与告警
observability-monitoring - - Redis连接池
caching-strategies
Capability Details
能力详情
database-pool
database-pool
Keywords: pool_size, max_overflow, asyncpg, pool_pre_ping, connection pool
Solves:
- How do I size database connection pool?
- Configure asyncpg/SQLAlchemy pool
- Prevent connection exhaustion
关键词: pool_size, max_overflow, asyncpg, pool_pre_ping, connection pool
解决问题:
- 如何设置数据库连接池大小?
- 配置asyncpg/SQLAlchemy连接池
- 防止连接耗尽
http-session
http-session
Keywords: aiohttp, ClientSession, TCPConnector, http pool, connection limit
Solves:
- How do I configure aiohttp session?
- Reuse HTTP connections properly
- Set timeouts for HTTP requests
关键词: aiohttp, ClientSession, TCPConnector, http pool, connection limit
解决问题:
- 如何配置aiohttp会话?
- 正确复用HTTP连接
- 为HTTP请求设置超时
pool-monitoring
pool-monitoring
Keywords: pool metrics, connection leak, pool exhaustion, monitoring
Solves:
- How do I monitor connection pool health?
- Detect connection leaks
- Troubleshoot pool exhaustion
关键词: pool metrics, connection leak, pool exhaustion, monitoring
解决问题:
- 如何监控连接池健康状态?
- 检测连接泄漏
- 排查连接池耗尽问题