fastapi-async-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseFastAPI Async Patterns
FastAPI 异步模式
Master async patterns in FastAPI for building high-performance,
concurrent APIs with optimal resource usage.
掌握FastAPI中的异步模式,以最优资源利用率构建高性能、高并发的API。
Basic Async Route Handlers
基础异步路由处理器
Understanding async vs sync endpoints in FastAPI.
python
from fastapi import FastAPI
import time
import asyncio
app = FastAPI()了解FastAPI中的异步与同步端点差异。
python
from fastapi import FastAPI
import time
import asyncio
app = FastAPI()Sync endpoint (blocks the event loop)
Sync endpoint (blocks the event loop)
@app.get('/sync')
def sync_endpoint():
time.sleep(1) # Blocks the entire server
return {'message': 'Completed after 1 second'}
@app.get('/sync')
def sync_endpoint():
time.sleep(1) # Blocks the entire server
return {'message': 'Completed after 1 second'}
Async endpoint (non-blocking)
Async endpoint (non-blocking)
@app.get('/async')
async def async_endpoint():
await asyncio.sleep(1) # Other requests can be handled
return {'message': 'Completed after 1 second'}
@app.get('/async')
async def async_endpoint():
await asyncio.sleep(1) # Other requests can be handled
return {'message': 'Completed after 1 second'}
CPU-bound work (use sync)
CPU-bound work (use sync)
@app.get('/cpu-intensive')
def cpu_intensive():
result = sum(i * i for i in range(10000000))
return {'result': result}
@app.get('/cpu-intensive')
def cpu_intensive():
result = sum(i * i for i in range(10000000))
return {'result': result}
I/O-bound work (use async)
I/O-bound work (use async)
@app.get('/io-intensive')
async def io_intensive():
async with httpx.AsyncClient() as client:
response = await client.get('https://api.example.com/data')
return response.json()
undefined@app.get('/io-intensive')
async def io_intensive():
async with httpx.AsyncClient() as client:
response = await client.get('https://api.example.com/data')
return response.json()
undefinedAsync Database Operations
异步数据库操作
Async database patterns with popular ORMs and libraries.
python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
import asyncpg
from motor.motor_asyncio import AsyncIOMotorClient
from tortoise import Tortoise
from tortoise.contrib.fastapi import register_tortoise
app = FastAPI()结合主流ORM与库的异步数据库模式。
python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
import asyncpg
from motor.motor_asyncio import AsyncIOMotorClient
from tortoise import Tortoise
from tortoise.contrib.fastapi import register_tortoise
app = FastAPI()SQLAlchemy async setup
SQLAlchemy async setup
DATABASE_URL = 'postgresql+asyncpg://user:pass@localhost/db'
engine = create_async_engine(DATABASE_URL, echo=True, future=True)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@app.get('/users/{user_id}')
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail='User not found')
return user
DATABASE_URL = 'postgresql+asyncpg://user:pass@localhost/db'
engine = create_async_engine(DATABASE_URL, echo=True, future=True)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@app.get('/users/{user_id}')
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail='User not found')
return user
Direct asyncpg (lower level, faster)
Direct asyncpg (lower level, faster)
async def get_asyncpg_pool():
pool = await asyncpg.create_pool(
'postgresql://user:pass@localhost/db',
min_size=10,
max_size=20
)
try:
yield pool
finally:
await pool.close()
@app.get('/users-fast/{user_id}')
async def get_user_fast(user_id: int, pool = Depends(get_asyncpg_pool)):
async with pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT * FROM users WHERE id = $1', user_id
)
if not row:
raise HTTPException(status_code=404, detail='User not found')
return dict(row)
async def get_asyncpg_pool():
pool = await asyncpg.create_pool(
'postgresql://user:pass@localhost/db',
min_size=10,
max_size=20
)
try:
yield pool
finally:
await pool.close()
@app.get('/users-fast/{user_id}')
async def get_user_fast(user_id: int, pool = Depends(get_asyncpg_pool)):
async with pool.acquire() as conn:
row = await conn.fetchrow(
'SELECT * FROM users WHERE id = $1', user_id
)
if not row:
raise HTTPException(status_code=404, detail='User not found')
return dict(row)
MongoDB with Motor
MongoDB with Motor
mongo_client = AsyncIOMotorClient('mongodb://localhost:27017')
db = mongo_client.mydatabase
@app.get('/documents/{doc_id}')
async def get_document(doc_id: str):
document = await db.collection.find_one({'_id': doc_id})
if not document:
raise HTTPException(status_code=404, detail='Document not found')
return document
@app.post('/documents')
async def create_document(data: dict):
result = await db.collection.insert_one(data)
return {'id': str(result.inserted_id)}
mongo_client = AsyncIOMotorClient('mongodb://localhost:27017')
db = mongo_client.mydatabase
@app.get('/documents/{doc_id}')
async def get_document(doc_id: str):
document = await db.collection.find_one({'_id': doc_id})
if not document:
raise HTTPException(status_code=404, detail='Document not found')
return document
@app.post('/documents')
async def create_document(data: dict):
result = await db.collection.insert_one(data)
return {'id': str(result.inserted_id)}
Tortoise ORM async
Tortoise ORM async
register_tortoise(
app,
db_url='postgres://user:pass@localhost/db',
modules={'models': ['app.models']},
generate_schemas=True,
add_exception_handlers=True,
)
from tortoise.models import Model
from tortoise import fields
class UserModel(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=255)
email = fields.CharField(max_length=255)
@app.get('/tortoise-users/{user_id}')
async def get_tortoise_user(user_id: int):
user = await UserModel.get_or_none(id=user_id)
if not user:
raise HTTPException(status_code=404, detail='User not found')
return user
undefinedregister_tortoise(
app,
db_url='postgres://user:pass@localhost/db',
modules={'models': ['app.models']},
generate_schemas=True,
add_exception_handlers=True,
)
from tortoise.models import Model
from tortoise import fields
class UserModel(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=255)
email = fields.CharField(max_length=255)
@app.get('/tortoise-users/{user_id}')
async def get_tortoise_user(user_id: int):
user = await UserModel.get_or_none(id=user_id)
if not user:
raise HTTPException(status_code=404, detail='User not found')
return user
undefinedBackground Tasks
后台任务
Fire-and-forget tasks without blocking the response.
python
from fastapi import BackgroundTasks, FastAPI
import asyncio
from datetime import datetime
app = FastAPI()无需阻塞响应的“即发即忘”任务。
python
from fastapi import BackgroundTasks, FastAPI
import asyncio
from datetime import datetime
app = FastAPI()Simple background task
Simple background task
async def send_email(email: str, message: str):
await asyncio.sleep(2) # Simulate email sending
print(f'Email sent to {email}: {message}')
@app.post('/send-email')
async def send_email_endpoint(
email: str,
message: str,
background_tasks: BackgroundTasks
):
background_tasks.add_task(send_email, email, message)
return {'status': 'Email will be sent in background'}
async def send_email(email: str, message: str):
await asyncio.sleep(2) # Simulate email sending
print(f'Email sent to {email}: {message}')
@app.post('/send-email')
async def send_email_endpoint(
email: str,
message: str,
background_tasks: BackgroundTasks
):
background_tasks.add_task(send_email, email, message)
return {'status': 'Email will be sent in background'}
Multiple background tasks
Multiple background tasks
async def log_activity(user_id: int, action: str):
await asyncio.sleep(0.5)
print(f'[{datetime.now()}] User {user_id} performed: {action}')
async def update_analytics(action: str):
await asyncio.sleep(1)
print(f'Analytics updated for action: {action}')
@app.post('/users/{user_id}/action')
async def perform_action(
user_id: int,
action: str,
background_tasks: BackgroundTasks
):
# Add multiple tasks
background_tasks.add_task(log_activity, user_id, action)
background_tasks.add_task(update_analytics, action)
return {'status': 'Action logged'}
async def log_activity(user_id: int, action: str):
await asyncio.sleep(0.5)
print(f'[{datetime.now()}] User {user_id} performed: {action}')
async def update_analytics(action: str):
await asyncio.sleep(1)
print(f'Analytics updated for action: {action}')
@app.post('/users/{user_id}/action')
async def perform_action(
user_id: int,
action: str,
background_tasks: BackgroundTasks
):
# Add multiple tasks
background_tasks.add_task(log_activity, user_id, action)
background_tasks.add_task(update_analytics, action)
return {'status': 'Action logged'}
Background cleanup
Background cleanup
async def cleanup_temp_files(file_path: str):
await asyncio.sleep(60) # Wait before cleanup
import os
if os.path.exists(file_path):
os.remove(file_path)
print(f'Cleaned up: {file_path}')
@app.post('/upload')
async def upload_file(
file: UploadFile,
background_tasks: BackgroundTasks
):
temp_path = f'/tmp/{file.filename}'
with open(temp_path, 'wb') as f:
content = await file.read()
f.write(content)
# Schedule cleanup
background_tasks.add_task(cleanup_temp_files, temp_path)
return {'filename': file.filename, 'path': temp_path}undefinedasync def cleanup_temp_files(file_path: str):
await asyncio.sleep(60) # Wait before cleanup
import os
if os.path.exists(file_path):
os.remove(file_path)
print(f'Cleaned up: {file_path}')
@app.post('/upload')
async def upload_file(
file: UploadFile,
background_tasks: BackgroundTasks
):
temp_path = f'/tmp/{file.filename}'
with open(temp_path, 'wb') as f:
content = await file.read()
f.write(content)
# Schedule cleanup
background_tasks.add_task(cleanup_temp_files, temp_path)
return {'filename': file.filename, 'path': temp_path}undefinedWebSocket Handling
WebSocket 处理
Real-time bidirectional communication patterns.
python
from fastapi import WebSocket, WebSocketDisconnect, Depends
from typing import List
import json
app = FastAPI()实时双向通信模式。
python
from fastapi import WebSocket, WebSocketDisconnect, Depends
from typing import List
import json
app = FastAPI()Simple WebSocket
Simple WebSocket
@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f'Echo: {data}')
except WebSocketDisconnect:
print('Client disconnected')
@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f'Echo: {data}')
except WebSocketDisconnect:
print('Client disconnected')
WebSocket with authentication
WebSocket with authentication
async def get_current_user_ws(websocket: WebSocket):
token = websocket.query_params.get('token')
if not token or not verify_token(token):
await websocket.close(code=1008) # Policy violation
raise HTTPException(status_code=401, detail='Unauthorized')
return decode_token(token)
@app.websocket('/ws/authenticated')
async def authenticated_websocket(
websocket: WebSocket,
user = Depends(get_current_user_ws)
):
await websocket.accept()
try:
await websocket.send_text(f'Welcome {user["name"]}')
while True:
data = await websocket.receive_text()
await websocket.send_text(f'{user["name"]}: {data}')
except WebSocketDisconnect:
print(f'User {user["name"]} disconnected')
async def get_current_user_ws(websocket: WebSocket):
token = websocket.query_params.get('token')
if not token or not verify_token(token):
await websocket.close(code=1008) # Policy violation
raise HTTPException(status_code=401, detail='Unauthorized')
return decode_token(token)
@app.websocket('/ws/authenticated')
async def authenticated_websocket(
websocket: WebSocket,
user = Depends(get_current_user_ws)
):
await websocket.accept()
try:
await websocket.send_text(f'Welcome {user["name"]}')
while True:
data = await websocket.receive_text()
await websocket.send_text(f'{user["name"]}: {data}')
except WebSocketDisconnect:
print(f'User {user["name"]} disconnected')
Broadcasting to multiple connections
Broadcasting to multiple connections
class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)manager = ConnectionManager()
@app.websocket('/ws/chat/{client_id}')
async def chat_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
await manager.broadcast(f'Client {client_id} joined the chat')
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f'Client {client_id}: {data}')
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f'Client {client_id} left the chat')
class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)manager = ConnectionManager()
@app.websocket('/ws/chat/{client_id}')
async def chat_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
await manager.broadcast(f'Client {client_id} joined the chat')
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f'Client {client_id}: {data}')
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f'Client {client_id} left the chat')
WebSocket with JSON messages
WebSocket with JSON messages
@app.websocket('/ws/json')
async def json_websocket(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
message_type = data.get('type')
if message_type == 'ping':
await websocket.send_json({'type': 'pong'})
elif message_type == 'message':
await websocket.send_json({
'type': 'response',
'data': f'Received: {data.get("content")}'
})
except WebSocketDisconnect:
print('Client disconnected')undefined@app.websocket('/ws/json')
async def json_websocket(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
message_type = data.get('type')
if message_type == 'ping':
await websocket.send_json({'type': 'pong'})
elif message_type == 'message':
await websocket.send_json({
'type': 'response',
'data': f'Received: {data.get("content")}'
})
except WebSocketDisconnect:
print('Client disconnected')undefinedServer-Sent Events (SSE)
服务器发送事件(SSE)
One-way streaming from server to client.
python
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
app = FastAPI()
@app.get('/sse')
async def sse_endpoint():
async def event_generator():
for i in range(10):
await asyncio.sleep(1)
yield {
'event': 'message',
'data': f'Message {i}'
}
return EventSourceResponse(event_generator())从服务器到客户端的单向流传输。
python
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
app = FastAPI()
@app.get('/sse')
async def sse_endpoint():
async def event_generator():
for i in range(10):
await asyncio.sleep(1)
yield {
'event': 'message',
'data': f'Message {i}'
}
return EventSourceResponse(event_generator())SSE with real-time updates
SSE with real-time updates
@app.get('/sse/updates')
async def sse_updates():
async def update_generator():
while True:
# Simulate fetching updates
await asyncio.sleep(2)
update = await fetch_latest_update()
yield {
'event': 'update',
'data': json.dumps(update)
}
return EventSourceResponse(update_generator())@app.get('/sse/updates')
async def sse_updates():
async def update_generator():
while True:
# Simulate fetching updates
await asyncio.sleep(2)
update = await fetch_latest_update()
yield {
'event': 'update',
'data': json.dumps(update)
}
return EventSourceResponse(update_generator())SSE with heartbeat
SSE with heartbeat
@app.get('/sse/heartbeat')
async def sse_heartbeat():
async def heartbeat_generator():
try:
while True:
await asyncio.sleep(30)
yield {
'event': 'heartbeat',
'data': datetime.now().isoformat()
}
except asyncio.CancelledError:
print('SSE connection closed')
return EventSourceResponse(heartbeat_generator())undefined@app.get('/sse/heartbeat')
async def sse_heartbeat():
async def heartbeat_generator():
try:
while True:
await asyncio.sleep(30)
yield {
'event': 'heartbeat',
'data': datetime.now().isoformat()
}
except asyncio.CancelledError:
print('SSE connection closed')
return EventSourceResponse(heartbeat_generator())undefinedStreaming Responses
流式响应
Stream large files or generated content.
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import io
import csv
app = FastAPI()流式传输大文件或生成的内容。
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import io
import csv
app = FastAPI()Stream large file
Stream large file
@app.get('/download/{filename}')
async def download_file(filename: str):
async def file_stream():
with open(f'/data/{filename}', 'rb') as f:
while chunk := f.read(8192):
yield chunk
return StreamingResponse(
file_stream(),
media_type='application/octet-stream',
headers={'Content-Disposition': f'attachment; filename={filename}'}
)@app.get('/download/{filename}')
async def download_file(filename: str):
async def file_stream():
with open(f'/data/{filename}', 'rb') as f:
while chunk := f.read(8192):
yield chunk
return StreamingResponse(
file_stream(),
media_type='application/octet-stream',
headers={'Content-Disposition': f'attachment; filename={filename}'}
)Stream generated CSV
Stream generated CSV
@app.get('/export/users')
async def export_users():
async def csv_stream():
output = io.StringIO()
writer = csv.writer(output)
# Write header
writer.writerow(['ID', 'Name', 'Email'])
yield output.getvalue()
output.truncate(0)
output.seek(0)
# Stream users in batches
offset = 0
batch_size = 100
while True:
users = await fetch_users_batch(offset, batch_size)
if not users:
break
for user in users:
writer.writerow([user.id, user.name, user.email])
yield output.getvalue()
output.truncate(0)
output.seek(0)
offset += batch_size
return StreamingResponse(
csv_stream(),
media_type='text/csv',
headers={'Content-Disposition': 'attachment; filename=users.csv'}
)@app.get('/export/users')
async def export_users():
async def csv_stream():
output = io.StringIO()
writer = csv.writer(output)
# Write header
writer.writerow(['ID', 'Name', 'Email'])
yield output.getvalue()
output.truncate(0)
output.seek(0)
# Stream users in batches
offset = 0
batch_size = 100
while True:
users = await fetch_users_batch(offset, batch_size)
if not users:
break
for user in users:
writer.writerow([user.id, user.name, user.email])
yield output.getvalue()
output.truncate(0)
output.seek(0)
offset += batch_size
return StreamingResponse(
csv_stream(),
media_type='text/csv',
headers={'Content-Disposition': 'attachment; filename=users.csv'}
)Stream generated content
Stream generated content
@app.get('/generate/report')
async def generate_report():
async def report_stream():
yield b'<html><body><h1>Report</h1>'
for section in ['users', 'orders', 'analytics']:
await asyncio.sleep(0.5) # Simulate processing
data = await fetch_section_data(section)
yield f'<h2>{section.title()}</h2>'.encode()
yield f'<pre>{data}</pre>'.encode()
yield b'</body></html>'
return StreamingResponse(report_stream(), media_type='text/html')undefined@app.get('/generate/report')
async def generate_report():
async def report_stream():
yield b'<html><body><h1>Report</h1>'
for section in ['users', 'orders', 'analytics']:
await asyncio.sleep(0.5) # Simulate processing
data = await fetch_section_data(section)
yield f'<h2>{section.title()}</h2>'.encode()
yield f'<pre>{data}</pre>'.encode()
yield b'</body></html>'
return StreamingResponse(report_stream(), media_type='text/html')undefinedConcurrent Request Handling
并发请求处理
Parallel processing patterns for multiple operations.
python
from fastapi import FastAPI
import asyncio
import httpx
app = FastAPI()多操作的并行处理模式。
python
from fastapi import FastAPI
import asyncio
import httpx
app = FastAPI()Parallel API calls
Parallel API calls
@app.get('/aggregate/user/{user_id}')
async def aggregate_user_data(user_id: int):
async with httpx.AsyncClient() as client:
# Fetch from multiple sources in parallel
profile_task = client.get(f'https://api.example.com/users/{user_id}')
posts_task = client.get(f'https://api.example.com/users/{user_id}/posts')
comments_task = client.get(f'https://api.example.com/users/{user_id}/comments')
profile, posts, comments = await asyncio.gather(
profile_task,
posts_task,
comments_task
)
return {
'profile': profile.json(),
'posts': posts.json(),
'comments': comments.json()
}@app.get('/aggregate/user/{user_id}')
async def aggregate_user_data(user_id: int):
async with httpx.AsyncClient() as client:
# Fetch from multiple sources in parallel
profile_task = client.get(f'https://api.example.com/users/{user_id}')
posts_task = client.get(f'https://api.example.com/users/{user_id}/posts')
comments_task = client.get(f'https://api.example.com/users/{user_id}/comments')
profile, posts, comments = await asyncio.gather(
profile_task,
posts_task,
comments_task
)
return {
'profile': profile.json(),
'posts': posts.json(),
'comments': comments.json()
}Parallel database queries
Parallel database queries
@app.get('/dashboard')
async def get_dashboard(db: AsyncSession = Depends(get_db)):
# Execute multiple queries in parallel
users_query = db.execute(select(User).limit(10))
orders_query = db.execute(select(Order).limit(10))
stats_query = db.execute(select(func.count(User.id)))
users, orders, stats = await asyncio.gather(
users_query,
orders_query,
stats_query
)
return {
'users': users.scalars().all(),
'orders': orders.scalars().all(),
'total_users': stats.scalar()
}@app.get('/dashboard')
async def get_dashboard(db: AsyncSession = Depends(get_db)):
# Execute multiple queries in parallel
users_query = db.execute(select(User).limit(10))
orders_query = db.execute(select(Order).limit(10))
stats_query = db.execute(select(func.count(User.id)))
users, orders, stats = await asyncio.gather(
users_query,
orders_query,
stats_query
)
return {
'users': users.scalars().all(),
'orders': orders.scalars().all(),
'total_users': stats.scalar()
}Race condition (first to complete wins)
Race condition (first to complete wins)
@app.get('/fastest-price/{product_id}')
async def get_fastest_price(product_id: str):
async with httpx.AsyncClient() as client:
tasks = [
client.get(f'https://store1.com/price/{product_id}'),
client.get(f'https://store2.com/price/{product_id}'),
client.get(f'https://store3.com/price/{product_id}')
]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel pending requests
for task in pending:
task.cancel()
result = done.pop().result()
return result.json()undefined@app.get('/fastest-price/{product_id}')
async def get_fastest_price(product_id: str):
async with httpx.AsyncClient() as client:
tasks = [
client.get(f'https://store1.com/price/{product_id}'),
client.get(f'https://store2.com/price/{product_id}'),
client.get(f'https://store3.com/price/{product_id}')
]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel pending requests
for task in pending:
task.cancel()
result = done.pop().result()
return result.json()undefinedAsync Context Managers
异步上下文管理器
Resource management with async context managers.
python
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio使用异步上下文管理器进行资源管理。
python
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncioAsync context manager for lifespan events
Async context manager for lifespan events
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
print('Starting up...')
db_pool = await create_db_pool()
redis_client = await create_redis_client()
# Store in app state
app.state.db_pool = db_pool
app.state.redis = redis_client
yield
# Shutdown
print('Shutting down...')
await db_pool.close()
await redis_client.close()app = FastAPI(lifespan=lifespan)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
print('Starting up...')
db_pool = await create_db_pool()
redis_client = await create_redis_client()
# Store in app state
app.state.db_pool = db_pool
app.state.redis = redis_client
yield
# Shutdown
print('Shutting down...')
await db_pool.close()
await redis_client.close()app = FastAPI(lifespan=lifespan)
Custom async context manager
Custom async context manager
class AsyncDatabaseSession:
def init(self, pool):
self.pool = pool
self.conn = None
async def __aenter__(self):
self.conn = await self.pool.acquire()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.pool.release(self.conn)
if exc_type is not None:
# Handle exception
await self.conn.rollback()
return False@app.get('/data')
async def get_data():
async with AsyncDatabaseSession(app.state.db_pool) as conn:
result = await conn.fetch('SELECT * FROM data')
return result
undefinedclass AsyncDatabaseSession:
def init(self, pool):
self.pool = pool
self.conn = None
async def __aenter__(self):
self.conn = await self.pool.acquire()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.pool.release(self.conn)
if exc_type is not None:
# Handle exception
await self.conn.rollback()
return False@app.get('/data')
async def get_data():
async with AsyncDatabaseSession(app.state.db_pool) as conn:
result = await conn.fetch('SELECT * FROM data')
return result
undefinedConnection Pooling
连接池
Efficient connection management for databases and HTTP clients.
python
from fastapi import FastAPI, Depends
import asyncpg
import httpx
from typing import AsyncGenerator
app = FastAPI()数据库与HTTP客户端的高效连接管理。
python
from fastapi import FastAPI, Depends
import asyncpg
import httpx
from typing import AsyncGenerator
app = FastAPI()Database connection pool
Database connection pool
class DatabasePool:
def init(self):
self.pool = None
async def create_pool(self):
self.pool = await asyncpg.create_pool(
'postgresql://user:pass@localhost/db',
min_size=10,
max_size=20,
command_timeout=60,
max_queries=50000,
max_inactive_connection_lifetime=300
)
async def close_pool(self):
await self.pool.close()
async def get_connection(self):
async with self.pool.acquire() as connection:
yield connectiondb_pool = DatabasePool()
@app.on_event('startup')
async def startup():
await db_pool.create_pool()
@app.on_event('shutdown')
async def shutdown():
await db_pool.close_pool()
@app.get('/users')
async def get_users(conn = Depends(db_pool.get_connection)):
rows = await conn.fetch('SELECT * FROM users')
return [dict(row) for row in rows]
class DatabasePool:
def init(self):
self.pool = None
async def create_pool(self):
self.pool = await asyncpg.create_pool(
'postgresql://user:pass@localhost/db',
min_size=10,
max_size=20,
command_timeout=60,
max_queries=50000,
max_inactive_connection_lifetime=300
)
async def close_pool(self):
await self.pool.close()
async def get_connection(self):
async with self.pool.acquire() as connection:
yield connectiondb_pool = DatabasePool()
@app.on_event('startup')
async def startup():
await db_pool.create_pool()
@app.on_event('shutdown')
async def shutdown():
await db_pool.close_pool()
@app.get('/users')
async def get_users(conn = Depends(db_pool.get_connection)):
rows = await conn.fetch('SELECT * FROM users')
return [dict(row) for row in rows]
HTTP client pool
HTTP client pool
class HTTPClientPool:
def init(self):
self.client = None
async def get_client(self) -> AsyncGenerator[httpx.AsyncClient, None]:
if self.client is None:
self.client = httpx.AsyncClient(
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100),
timeout=httpx.Timeout(10.0)
)
yield self.client
async def close(self):
if self.client:
await self.client.aclose()http_pool = HTTPClientPool()
@app.get('/external-api')
async def call_external_api(client: httpx.AsyncClient = Depends(http_pool.get_client)):
response = await client.get('https://api.example.com/data')
return response.json()
undefinedclass HTTPClientPool:
def init(self):
self.client = None
async def get_client(self) -> AsyncGenerator[httpx.AsyncClient, None]:
if self.client is None:
self.client = httpx.AsyncClient(
limits=httpx.Limits(max_keepalive_connections=20, max_connections=100),
timeout=httpx.Timeout(10.0)
)
yield self.client
async def close(self):
if self.client:
await self.client.aclose()http_pool = HTTPClientPool()
@app.get('/external-api')
async def call_external_api(client: httpx.AsyncClient = Depends(http_pool.get_client)):
response = await client.get('https://api.example.com/data')
return response.json()
undefinedPerformance Optimization
性能优化
Async patterns for optimal performance.
python
from fastapi import FastAPI
import asyncio
from functools import lru_cache
app = FastAPI()实现最优性能的异步模式。
python
from fastapi import FastAPI
import asyncio
from functools import lru_cache
app = FastAPI()Cache expensive async operations
Cache expensive async operations
from aiocache import Cache
from aiocache.serializers import JsonSerializer
cache = Cache(Cache.MEMORY, serializer=JsonSerializer())
@app.get('/expensive-data/{key}')
async def get_expensive_data(key: str):
# Check cache first
cached = await cache.get(key)
if cached:
return {'data': cached, 'cached': True}
# Expensive operation
await asyncio.sleep(2)
data = compute_expensive_result(key)
# Store in cache
await cache.set(key, data, ttl=300)
return {'data': data, 'cached': False}from aiocache import Cache
from aiocache.serializers import JsonSerializer
cache = Cache(Cache.MEMORY, serializer=JsonSerializer())
@app.get('/expensive-data/{key}')
async def get_expensive_data(key: str):
# Check cache first
cached = await cache.get(key)
if cached:
return {'data': cached, 'cached': True}
# Expensive operation
await asyncio.sleep(2)
data = compute_expensive_result(key)
# Store in cache
await cache.set(key, data, ttl=300)
return {'data': data, 'cached': False}Batch operations
Batch operations
@app.post('/users/batch')
async def create_users_batch(users: List[UserCreate], db = Depends(get_db)):
# Create users in batch (more efficient than one-by-one)
user_objects = [User(**user.dict()) for user in users]
db.add_all(user_objects)
await db.flush()
return user_objects
@app.post('/users/batch')
async def create_users_batch(users: List[UserCreate], db = Depends(get_db)):
# Create users in batch (more efficient than one-by-one)
user_objects = [User(**user.dict()) for user in users]
db.add_all(user_objects)
await db.flush()
return user_objects
Debouncing with asyncio
Debouncing with asyncio
class Debouncer:
def init(self, delay: float):
self.delay = delay
self.task = None
async def debounce(self, coro):
if self.task:
self.task.cancel()
async def delayed():
await asyncio.sleep(self.delay)
await coro
self.task = asyncio.create_task(delayed())
await self.taskdebouncer = Debouncer(delay=1.0)
class Debouncer:
def init(self, delay: float):
self.delay = delay
self.task = None
async def debounce(self, coro):
if self.task:
self.task.cancel()
async def delayed():
await asyncio.sleep(self.delay)
await coro
self.task = asyncio.create_task(delayed())
await self.taskdebouncer = Debouncer(delay=1.0)
Prefetching related data
Prefetching related data
@app.get('/posts/{post_id}')
async def get_post_with_relations(post_id: int, db = Depends(get_db)):
# Fetch post and related data in parallel
post_task = db.get(Post, post_id)
comments_task = db.execute(
select(Comment).where(Comment.post_id == post_id)
)
author_task = db.execute(
select(User).where(User.id == Post.author_id)
)
post, comments_result, author_result = await asyncio.gather(
post_task, comments_task, author_task
)
return {
'post': post,
'comments': comments_result.scalars().all(),
'author': author_result.scalar_one()
}undefined@app.get('/posts/{post_id}')
async def get_post_with_relations(post_id: int, db = Depends(get_db)):
# Fetch post and related data in parallel
post_task = db.get(Post, post_id)
comments_task = db.execute(
select(Comment).where(Comment.post_id == post_id)
)
author_task = db.execute(
select(User).where(User.id == Post.author_id)
)
post, comments_result, author_result = await asyncio.gather(
post_task, comments_task, author_task
)
return {
'post': post,
'comments': comments_result.scalars().all(),
'author': author_result.scalar_one()
}undefinedWhen to Use This Skill
何时使用该技能
Use fastapi-async-patterns when:
- Building high-throughput APIs that handle many concurrent requests
- Working with I/O-bound operations (database, external APIs, file operations)
- Implementing real-time features (WebSockets, SSE)
- Processing multiple operations in parallel
- Streaming large datasets or files
- Building microservices that communicate with other services
- Optimizing API response times and resource usage
- Handling background tasks without blocking responses
在以下场景中使用fastapi-async-patterns:
- 构建高吞吐量、需处理大量并发请求的API
- 处理I/O密集型操作(数据库、外部API、文件操作)
- 实现实时功能(WebSocket、SSE)
- 并行处理多个操作
- 流式传输大型数据集或文件
- 构建与其他服务通信的微服务
- 优化API响应时间与资源利用率
- 处理无需阻塞响应的后台任务
FastAPI Async Best Practices
FastAPI 异步最佳实践
- Use async for I/O - Always use async for database, HTTP requests, and file operations
- Avoid blocking calls - Never use blocking calls in async functions (time.sleep, requests library)
- Connection pooling - Use connection pools for databases and HTTP clients
- Proper cleanup - Always clean up resources with try/finally or async context managers
- Concurrent operations - Use asyncio.gather for parallel operations when possible
- Background tasks - Use BackgroundTasks for fire-and-forget operations
- Stream large data - Use StreamingResponse for large files or generated content
- Timeout handling - Set timeouts on all external calls to prevent hanging
- Error propagation - Handle exceptions properly in async code
- Monitor performance - Use tools like aiomonitor to debug async issues
- I/O操作使用异步 - 数据库、HTTP请求和文件操作始终使用异步
- 避免阻塞调用 - 异步函数中绝不使用阻塞调用(如time.sleep、requests库)
- 使用连接池 - 为数据库和HTTP客户端使用连接池
- 正确清理资源 - 始终使用try/finally或异步上下文管理器清理资源
- 并发操作 - 尽可能使用asyncio.gather实现并行操作
- 后台任务 - 使用BackgroundTasks处理即发即忘的操作
- 流式传输大数据 - 使用StreamingResponse处理大文件或生成的内容
- 超时处理 - 为所有外部调用设置超时,防止服务挂起
- 错误传播 - 在异步代码中正确处理异常
- 性能监控 - 使用aiomonitor等工具调试异步问题
FastAPI Async Common Pitfalls
FastAPI 异步常见陷阱
- Blocking the event loop - Using synchronous I/O in async functions kills performance
- Missing await - Forgetting await on async functions causes coroutine warnings
- Creating too many tasks - Spawning unlimited tasks can exhaust resources
- Not closing connections - Resource leaks from unclosed database/HTTP connections
- Mixing sync and async - Incorrect mixing causes event loop issues
- Race conditions - Shared state in async code without proper locking
- Timeout issues - No timeouts on external calls can hang the server
- Memory leaks - Background tasks that never complete accumulate
- Error swallowing - Silent failures in background tasks and event handlers
- Deadlocks - Circular waits in async dependencies or locks
- 阻塞事件循环 - 异步函数中使用同步I/O会严重影响性能
- 遗漏await - 忘记在异步函数前加await会导致协程警告
- 创建过多任务 - 无限制创建任务会耗尽资源
- 未关闭连接 - 未关闭的数据库/HTTP连接会导致资源泄漏
- 混合同步与异步 - 错误混合同步与异步代码会引发事件循环问题
- 竞态条件 - 异步代码中的共享状态未正确加锁
- 超时问题 - 外部调用未设置超时会导致服务器挂起
- 内存泄漏 - 未完成的后台任务会不断累积
- 错误吞灭 - 后台任务和事件处理程序中的静默失败
- 死锁 - 异步依赖或锁中的循环等待