fastapi-async-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

FastAPI Async Patterns

FastAPI 异步模式

Master async patterns in FastAPI for building high-performance, concurrent APIs with optimal resource usage.
掌握FastAPI中的异步模式,以最优资源利用率构建高性能、高并发的API。

Basic Async Route Handlers

基础异步路由处理器

Understanding async vs sync endpoints in FastAPI.
python
from fastapi import FastAPI
import time
import asyncio

app = FastAPI()
了解FastAPI中的异步与同步端点差异。
python
from fastapi import FastAPI
import time
import asyncio

app = FastAPI()

Sync endpoint (blocks the event loop)

Sync endpoint (blocks the event loop)

@app.get('/sync') def sync_endpoint(): time.sleep(1) # Blocks the entire server return {'message': 'Completed after 1 second'}
@app.get('/sync') def sync_endpoint(): time.sleep(1) # Blocks the entire server return {'message': 'Completed after 1 second'}

Async endpoint (non-blocking)

Async endpoint (non-blocking)

@app.get('/async') async def async_endpoint(): await asyncio.sleep(1) # Other requests can be handled return {'message': 'Completed after 1 second'}
@app.get('/async') async def async_endpoint(): await asyncio.sleep(1) # Other requests can be handled return {'message': 'Completed after 1 second'}

CPU-bound work (use sync)

CPU-bound work (use sync)

@app.get('/cpu-intensive') def cpu_intensive(): result = sum(i * i for i in range(10000000)) return {'result': result}
@app.get('/cpu-intensive') def cpu_intensive(): result = sum(i * i for i in range(10000000)) return {'result': result}

I/O-bound work (use async)

I/O-bound work (use async)

@app.get('/io-intensive') async def io_intensive(): async with httpx.AsyncClient() as client: response = await client.get('https://api.example.com/data') return response.json()
undefined
@app.get('/io-intensive') async def io_intensive(): async with httpx.AsyncClient() as client: response = await client.get('https://api.example.com/data') return response.json()
undefined

Async Database Operations

异步数据库操作

Async database patterns with popular ORMs and libraries.
python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
import asyncpg
from motor.motor_asyncio import AsyncIOMotorClient
from tortoise import Tortoise
from tortoise.contrib.fastapi import register_tortoise

app = FastAPI()
结合主流ORM与库的异步数据库模式。
python
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
import asyncpg
from motor.motor_asyncio import AsyncIOMotorClient
from tortoise import Tortoise
from tortoise.contrib.fastapi import register_tortoise

app = FastAPI()

SQLAlchemy async setup

SQLAlchemy async setup

DATABASE_URL = 'postgresql+asyncpg://user:pass@localhost/db' engine = create_async_engine(DATABASE_URL, echo=True, future=True) AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False )
async def get_db() -> AsyncSession: async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise
@app.get('/users/{user_id}') async def get_user(user_id: int, db: AsyncSession = Depends(get_db)): result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail='User not found') return user
DATABASE_URL = 'postgresql+asyncpg://user:pass@localhost/db' engine = create_async_engine(DATABASE_URL, echo=True, future=True) AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False )
async def get_db() -> AsyncSession: async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise
@app.get('/users/{user_id}') async def get_user(user_id: int, db: AsyncSession = Depends(get_db)): result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail='User not found') return user

Direct asyncpg (lower level, faster)

Direct asyncpg (lower level, faster)

async def get_asyncpg_pool(): pool = await asyncpg.create_pool( 'postgresql://user:pass@localhost/db', min_size=10, max_size=20 ) try: yield pool finally: await pool.close()
@app.get('/users-fast/{user_id}') async def get_user_fast(user_id: int, pool = Depends(get_asyncpg_pool)): async with pool.acquire() as conn: row = await conn.fetchrow( 'SELECT * FROM users WHERE id = $1', user_id ) if not row: raise HTTPException(status_code=404, detail='User not found') return dict(row)
async def get_asyncpg_pool(): pool = await asyncpg.create_pool( 'postgresql://user:pass@localhost/db', min_size=10, max_size=20 ) try: yield pool finally: await pool.close()
@app.get('/users-fast/{user_id}') async def get_user_fast(user_id: int, pool = Depends(get_asyncpg_pool)): async with pool.acquire() as conn: row = await conn.fetchrow( 'SELECT * FROM users WHERE id = $1', user_id ) if not row: raise HTTPException(status_code=404, detail='User not found') return dict(row)

MongoDB with Motor

MongoDB with Motor

mongo_client = AsyncIOMotorClient('mongodb://localhost:27017') db = mongo_client.mydatabase
@app.get('/documents/{doc_id}') async def get_document(doc_id: str): document = await db.collection.find_one({'_id': doc_id}) if not document: raise HTTPException(status_code=404, detail='Document not found') return document
@app.post('/documents') async def create_document(data: dict): result = await db.collection.insert_one(data) return {'id': str(result.inserted_id)}
mongo_client = AsyncIOMotorClient('mongodb://localhost:27017') db = mongo_client.mydatabase
@app.get('/documents/{doc_id}') async def get_document(doc_id: str): document = await db.collection.find_one({'_id': doc_id}) if not document: raise HTTPException(status_code=404, detail='Document not found') return document
@app.post('/documents') async def create_document(data: dict): result = await db.collection.insert_one(data) return {'id': str(result.inserted_id)}

Tortoise ORM async

Tortoise ORM async

register_tortoise( app, db_url='postgres://user:pass@localhost/db', modules={'models': ['app.models']}, generate_schemas=True, add_exception_handlers=True, )
from tortoise.models import Model from tortoise import fields
class UserModel(Model): id = fields.IntField(pk=True) name = fields.CharField(max_length=255) email = fields.CharField(max_length=255)
@app.get('/tortoise-users/{user_id}') async def get_tortoise_user(user_id: int): user = await UserModel.get_or_none(id=user_id) if not user: raise HTTPException(status_code=404, detail='User not found') return user
undefined
register_tortoise( app, db_url='postgres://user:pass@localhost/db', modules={'models': ['app.models']}, generate_schemas=True, add_exception_handlers=True, )
from tortoise.models import Model from tortoise import fields
class UserModel(Model): id = fields.IntField(pk=True) name = fields.CharField(max_length=255) email = fields.CharField(max_length=255)
@app.get('/tortoise-users/{user_id}') async def get_tortoise_user(user_id: int): user = await UserModel.get_or_none(id=user_id) if not user: raise HTTPException(status_code=404, detail='User not found') return user
undefined

Background Tasks

后台任务

Fire-and-forget tasks without blocking the response.
python
from fastapi import BackgroundTasks, FastAPI
import asyncio
from datetime import datetime

app = FastAPI()
无需阻塞响应的“即发即忘”任务。
python
from fastapi import BackgroundTasks, FastAPI
import asyncio
from datetime import datetime

app = FastAPI()

Simple background task

Simple background task

async def send_email(email: str, message: str): await asyncio.sleep(2) # Simulate email sending print(f'Email sent to {email}: {message}')
@app.post('/send-email') async def send_email_endpoint( email: str, message: str, background_tasks: BackgroundTasks ): background_tasks.add_task(send_email, email, message) return {'status': 'Email will be sent in background'}
async def send_email(email: str, message: str): await asyncio.sleep(2) # Simulate email sending print(f'Email sent to {email}: {message}')
@app.post('/send-email') async def send_email_endpoint( email: str, message: str, background_tasks: BackgroundTasks ): background_tasks.add_task(send_email, email, message) return {'status': 'Email will be sent in background'}

Multiple background tasks

Multiple background tasks

async def log_activity(user_id: int, action: str): await asyncio.sleep(0.5) print(f'[{datetime.now()}] User {user_id} performed: {action}')
async def update_analytics(action: str): await asyncio.sleep(1) print(f'Analytics updated for action: {action}')
@app.post('/users/{user_id}/action') async def perform_action( user_id: int, action: str, background_tasks: BackgroundTasks ): # Add multiple tasks background_tasks.add_task(log_activity, user_id, action) background_tasks.add_task(update_analytics, action) return {'status': 'Action logged'}
async def log_activity(user_id: int, action: str): await asyncio.sleep(0.5) print(f'[{datetime.now()}] User {user_id} performed: {action}')
async def update_analytics(action: str): await asyncio.sleep(1) print(f'Analytics updated for action: {action}')
@app.post('/users/{user_id}/action') async def perform_action( user_id: int, action: str, background_tasks: BackgroundTasks ): # Add multiple tasks background_tasks.add_task(log_activity, user_id, action) background_tasks.add_task(update_analytics, action) return {'status': 'Action logged'}

Background cleanup

Background cleanup

async def cleanup_temp_files(file_path: str): await asyncio.sleep(60) # Wait before cleanup import os if os.path.exists(file_path): os.remove(file_path) print(f'Cleaned up: {file_path}')
@app.post('/upload') async def upload_file( file: UploadFile, background_tasks: BackgroundTasks ): temp_path = f'/tmp/{file.filename}' with open(temp_path, 'wb') as f: content = await file.read() f.write(content)
# Schedule cleanup
background_tasks.add_task(cleanup_temp_files, temp_path)
return {'filename': file.filename, 'path': temp_path}
undefined
async def cleanup_temp_files(file_path: str): await asyncio.sleep(60) # Wait before cleanup import os if os.path.exists(file_path): os.remove(file_path) print(f'Cleaned up: {file_path}')
@app.post('/upload') async def upload_file( file: UploadFile, background_tasks: BackgroundTasks ): temp_path = f'/tmp/{file.filename}' with open(temp_path, 'wb') as f: content = await file.read() f.write(content)
# Schedule cleanup
background_tasks.add_task(cleanup_temp_files, temp_path)
return {'filename': file.filename, 'path': temp_path}
undefined

WebSocket Handling

WebSocket 处理

Real-time bidirectional communication patterns.
python
from fastapi import WebSocket, WebSocketDisconnect, Depends
from typing import List
import json

app = FastAPI()
实时双向通信模式。
python
from fastapi import WebSocket, WebSocketDisconnect, Depends
from typing import List
import json

app = FastAPI()

Simple WebSocket

Simple WebSocket

@app.websocket('/ws') async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_text() await websocket.send_text(f'Echo: {data}') except WebSocketDisconnect: print('Client disconnected')
@app.websocket('/ws') async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_text() await websocket.send_text(f'Echo: {data}') except WebSocketDisconnect: print('Client disconnected')

WebSocket with authentication

WebSocket with authentication

async def get_current_user_ws(websocket: WebSocket): token = websocket.query_params.get('token') if not token or not verify_token(token): await websocket.close(code=1008) # Policy violation raise HTTPException(status_code=401, detail='Unauthorized') return decode_token(token)
@app.websocket('/ws/authenticated') async def authenticated_websocket( websocket: WebSocket, user = Depends(get_current_user_ws) ): await websocket.accept() try: await websocket.send_text(f'Welcome {user["name"]}') while True: data = await websocket.receive_text() await websocket.send_text(f'{user["name"]}: {data}') except WebSocketDisconnect: print(f'User {user["name"]} disconnected')
async def get_current_user_ws(websocket: WebSocket): token = websocket.query_params.get('token') if not token or not verify_token(token): await websocket.close(code=1008) # Policy violation raise HTTPException(status_code=401, detail='Unauthorized') return decode_token(token)
@app.websocket('/ws/authenticated') async def authenticated_websocket( websocket: WebSocket, user = Depends(get_current_user_ws) ): await websocket.accept() try: await websocket.send_text(f'Welcome {user["name"]}') while True: data = await websocket.receive_text() await websocket.send_text(f'{user["name"]}: {data}') except WebSocketDisconnect: print(f'User {user["name"]} disconnected')

Broadcasting to multiple connections

Broadcasting to multiple connections

class ConnectionManager: def init(self): self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
    await websocket.accept()
    self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
    self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
    await websocket.send_text(message)

async def broadcast(self, message: str):
    for connection in self.active_connections:
        await connection.send_text(message)
manager = ConnectionManager()
@app.websocket('/ws/chat/{client_id}') async def chat_endpoint(websocket: WebSocket, client_id: str): await manager.connect(websocket) await manager.broadcast(f'Client {client_id} joined the chat') try: while True: data = await websocket.receive_text() await manager.broadcast(f'Client {client_id}: {data}') except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f'Client {client_id} left the chat')
class ConnectionManager: def init(self): self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
    await websocket.accept()
    self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
    self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
    await websocket.send_text(message)

async def broadcast(self, message: str):
    for connection in self.active_connections:
        await connection.send_text(message)
manager = ConnectionManager()
@app.websocket('/ws/chat/{client_id}') async def chat_endpoint(websocket: WebSocket, client_id: str): await manager.connect(websocket) await manager.broadcast(f'Client {client_id} joined the chat') try: while True: data = await websocket.receive_text() await manager.broadcast(f'Client {client_id}: {data}') except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f'Client {client_id} left the chat')

WebSocket with JSON messages

WebSocket with JSON messages

@app.websocket('/ws/json') async def json_websocket(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_json() message_type = data.get('type')
        if message_type == 'ping':
            await websocket.send_json({'type': 'pong'})
        elif message_type == 'message':
            await websocket.send_json({
                'type': 'response',
                'data': f'Received: {data.get("content")}'
            })
except WebSocketDisconnect:
    print('Client disconnected')
undefined
@app.websocket('/ws/json') async def json_websocket(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_json() message_type = data.get('type')
        if message_type == 'ping':
            await websocket.send_json({'type': 'pong'})
        elif message_type == 'message':
            await websocket.send_json({
                'type': 'response',
                'data': f'Received: {data.get("content")}'
            })
except WebSocketDisconnect:
    print('Client disconnected')
undefined

Server-Sent Events (SSE)

服务器发送事件(SSE)

One-way streaming from server to client.
python
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio

app = FastAPI()

@app.get('/sse')
async def sse_endpoint():
    async def event_generator():
        for i in range(10):
            await asyncio.sleep(1)
            yield {
                'event': 'message',
                'data': f'Message {i}'
            }

    return EventSourceResponse(event_generator())
从服务器到客户端的单向流传输。
python
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio

app = FastAPI()

@app.get('/sse')
async def sse_endpoint():
    async def event_generator():
        for i in range(10):
            await asyncio.sleep(1)
            yield {
                'event': 'message',
                'data': f'Message {i}'
            }

    return EventSourceResponse(event_generator())

SSE with real-time updates

SSE with real-time updates

@app.get('/sse/updates') async def sse_updates(): async def update_generator(): while True: # Simulate fetching updates await asyncio.sleep(2) update = await fetch_latest_update() yield { 'event': 'update', 'data': json.dumps(update) }
return EventSourceResponse(update_generator())
@app.get('/sse/updates') async def sse_updates(): async def update_generator(): while True: # Simulate fetching updates await asyncio.sleep(2) update = await fetch_latest_update() yield { 'event': 'update', 'data': json.dumps(update) }
return EventSourceResponse(update_generator())

SSE with heartbeat

SSE with heartbeat

@app.get('/sse/heartbeat') async def sse_heartbeat(): async def heartbeat_generator(): try: while True: await asyncio.sleep(30) yield { 'event': 'heartbeat', 'data': datetime.now().isoformat() } except asyncio.CancelledError: print('SSE connection closed')
return EventSourceResponse(heartbeat_generator())
undefined
@app.get('/sse/heartbeat') async def sse_heartbeat(): async def heartbeat_generator(): try: while True: await asyncio.sleep(30) yield { 'event': 'heartbeat', 'data': datetime.now().isoformat() } except asyncio.CancelledError: print('SSE connection closed')
return EventSourceResponse(heartbeat_generator())
undefined

Streaming Responses

流式响应

Stream large files or generated content.
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import io
import csv

app = FastAPI()
流式传输大文件或生成的内容。
python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import io
import csv

app = FastAPI()

Stream large file

Stream large file

@app.get('/download/{filename}') async def download_file(filename: str): async def file_stream(): with open(f'/data/{filename}', 'rb') as f: while chunk := f.read(8192): yield chunk
return StreamingResponse(
    file_stream(),
    media_type='application/octet-stream',
    headers={'Content-Disposition': f'attachment; filename={filename}'}
)
@app.get('/download/{filename}') async def download_file(filename: str): async def file_stream(): with open(f'/data/{filename}', 'rb') as f: while chunk := f.read(8192): yield chunk
return StreamingResponse(
    file_stream(),
    media_type='application/octet-stream',
    headers={'Content-Disposition': f'attachment; filename={filename}'}
)

Stream generated CSV

Stream generated CSV

@app.get('/export/users') async def export_users(): async def csv_stream(): output = io.StringIO() writer = csv.writer(output)
    # Write header
    writer.writerow(['ID', 'Name', 'Email'])
    yield output.getvalue()
    output.truncate(0)
    output.seek(0)

    # Stream users in batches
    offset = 0
    batch_size = 100
    while True:
        users = await fetch_users_batch(offset, batch_size)
        if not users:
            break

        for user in users:
            writer.writerow([user.id, user.name, user.email])
            yield output.getvalue()
            output.truncate(0)
            output.seek(0)

        offset += batch_size

return StreamingResponse(
    csv_stream(),
    media_type='text/csv',
    headers={'Content-Disposition': 'attachment; filename=users.csv'}
)
@app.get('/export/users') async def export_users(): async def csv_stream(): output = io.StringIO() writer = csv.writer(output)
    # Write header
    writer.writerow(['ID', 'Name', 'Email'])
    yield output.getvalue()
    output.truncate(0)
    output.seek(0)

    # Stream users in batches
    offset = 0
    batch_size = 100
    while True:
        users = await fetch_users_batch(offset, batch_size)
        if not users:
            break

        for user in users:
            writer.writerow([user.id, user.name, user.email])
            yield output.getvalue()
            output.truncate(0)
            output.seek(0)

        offset += batch_size

return StreamingResponse(
    csv_stream(),
    media_type='text/csv',
    headers={'Content-Disposition': 'attachment; filename=users.csv'}
)

Stream generated content

Stream generated content

@app.get('/generate/report') async def generate_report(): async def report_stream(): yield b'<html><body><h1>Report</h1>'
    for section in ['users', 'orders', 'analytics']:
        await asyncio.sleep(0.5)  # Simulate processing
        data = await fetch_section_data(section)
        yield f'<h2>{section.title()}</h2>'.encode()
        yield f'<pre>{data}</pre>'.encode()

    yield b'</body></html>'

return StreamingResponse(report_stream(), media_type='text/html')
undefined
@app.get('/generate/report') async def generate_report(): async def report_stream(): yield b'<html><body><h1>Report</h1>'
    for section in ['users', 'orders', 'analytics']:
        await asyncio.sleep(0.5)  # Simulate processing
        data = await fetch_section_data(section)
        yield f'<h2>{section.title()}</h2>'.encode()
        yield f'<pre>{data}</pre>'.encode()

    yield b'</body></html>'

return StreamingResponse(report_stream(), media_type='text/html')
undefined

Concurrent Request Handling

并发请求处理

Parallel processing patterns for multiple operations.
python
from fastapi import FastAPI
import asyncio
import httpx

app = FastAPI()
多操作的并行处理模式。
python
from fastapi import FastAPI
import asyncio
import httpx

app = FastAPI()

Parallel API calls

Parallel API calls

@app.get('/aggregate/user/{user_id}') async def aggregate_user_data(user_id: int): async with httpx.AsyncClient() as client: # Fetch from multiple sources in parallel profile_task = client.get(f'https://api.example.com/users/{user_id}') posts_task = client.get(f'https://api.example.com/users/{user_id}/posts') comments_task = client.get(f'https://api.example.com/users/{user_id}/comments')
    profile, posts, comments = await asyncio.gather(
        profile_task,
        posts_task,
        comments_task
    )

    return {
        'profile': profile.json(),
        'posts': posts.json(),
        'comments': comments.json()
    }
@app.get('/aggregate/user/{user_id}') async def aggregate_user_data(user_id: int): async with httpx.AsyncClient() as client: # Fetch from multiple sources in parallel profile_task = client.get(f'https://api.example.com/users/{user_id}') posts_task = client.get(f'https://api.example.com/users/{user_id}/posts') comments_task = client.get(f'https://api.example.com/users/{user_id}/comments')
    profile, posts, comments = await asyncio.gather(
        profile_task,
        posts_task,
        comments_task
    )

    return {
        'profile': profile.json(),
        'posts': posts.json(),
        'comments': comments.json()
    }

Parallel database queries

Parallel database queries

@app.get('/dashboard') async def get_dashboard(db: AsyncSession = Depends(get_db)): # Execute multiple queries in parallel users_query = db.execute(select(User).limit(10)) orders_query = db.execute(select(Order).limit(10)) stats_query = db.execute(select(func.count(User.id)))
users, orders, stats = await asyncio.gather(
    users_query,
    orders_query,
    stats_query
)

return {
    'users': users.scalars().all(),
    'orders': orders.scalars().all(),
    'total_users': stats.scalar()
}
@app.get('/dashboard') async def get_dashboard(db: AsyncSession = Depends(get_db)): # Execute multiple queries in parallel users_query = db.execute(select(User).limit(10)) orders_query = db.execute(select(Order).limit(10)) stats_query = db.execute(select(func.count(User.id)))
users, orders, stats = await asyncio.gather(
    users_query,
    orders_query,
    stats_query
)

return {
    'users': users.scalars().all(),
    'orders': orders.scalars().all(),
    'total_users': stats.scalar()
}

Race condition (first to complete wins)

Race condition (first to complete wins)

@app.get('/fastest-price/{product_id}') async def get_fastest_price(product_id: str): async with httpx.AsyncClient() as client: tasks = [ client.get(f'https://store1.com/price/{product_id}'), client.get(f'https://store2.com/price/{product_id}'), client.get(f'https://store3.com/price/{product_id}') ]
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    # Cancel pending requests
    for task in pending:
        task.cancel()

    result = done.pop().result()
    return result.json()
undefined
@app.get('/fastest-price/{product_id}') async def get_fastest_price(product_id: str): async with httpx.AsyncClient() as client: tasks = [ client.get(f'https://store1.com/price/{product_id}'), client.get(f'https://store2.com/price/{product_id}'), client.get(f'https://store3.com/price/{product_id}') ]
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    # Cancel pending requests
    for task in pending:
        task.cancel()

    result = done.pop().result()
    return result.json()
undefined

Async Context Managers

异步上下文管理器

Resource management with async context managers.
python
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
使用异步上下文管理器进行资源管理。
python
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio

Async context manager for lifespan events

Async context manager for lifespan events

@asynccontextmanager async def lifespan(app: FastAPI): # Startup print('Starting up...') db_pool = await create_db_pool() redis_client = await create_redis_client()
# Store in app state
app.state.db_pool = db_pool
app.state.redis = redis_client

yield

# Shutdown
print('Shutting down...')
await db_pool.close()
await redis_client.close()
app = FastAPI(lifespan=lifespan)
@asynccontextmanager async def lifespan(app: FastAPI): # Startup print('Starting up...') db_pool = await create_db_pool() redis_client = await create_redis_client()
# Store in app state
app.state.db_pool = db_pool
app.state.redis = redis_client

yield

# Shutdown
print('Shutting down...')
await db_pool.close()
await redis_client.close()
app = FastAPI(lifespan=lifespan)

Custom async context manager

Custom async context manager

class AsyncDatabaseSession: def init(self, pool): self.pool = pool self.conn = None
async def __aenter__(self):
    self.conn = await self.pool.acquire()
    return self.conn

async def __aexit__(self, exc_type, exc_val, exc_tb):
    await self.pool.release(self.conn)
    if exc_type is not None:
        # Handle exception
        await self.conn.rollback()
    return False
@app.get('/data') async def get_data(): async with AsyncDatabaseSession(app.state.db_pool) as conn: result = await conn.fetch('SELECT * FROM data') return result
undefined
class AsyncDatabaseSession: def init(self, pool): self.pool = pool self.conn = None
async def __aenter__(self):
    self.conn = await self.pool.acquire()
    return self.conn

async def __aexit__(self, exc_type, exc_val, exc_tb):
    await self.pool.release(self.conn)
    if exc_type is not None:
        # Handle exception
        await self.conn.rollback()
    return False
@app.get('/data') async def get_data(): async with AsyncDatabaseSession(app.state.db_pool) as conn: result = await conn.fetch('SELECT * FROM data') return result
undefined

Connection Pooling

连接池

Efficient connection management for databases and HTTP clients.
python
from fastapi import FastAPI, Depends
import asyncpg
import httpx
from typing import AsyncGenerator

app = FastAPI()
数据库与HTTP客户端的高效连接管理。
python
from fastapi import FastAPI, Depends
import asyncpg
import httpx
from typing import AsyncGenerator

app = FastAPI()

Database connection pool

Database connection pool

class DatabasePool: def init(self): self.pool = None
async def create_pool(self):
    self.pool = await asyncpg.create_pool(
        'postgresql://user:pass@localhost/db',
        min_size=10,
        max_size=20,
        command_timeout=60,
        max_queries=50000,
        max_inactive_connection_lifetime=300
    )

async def close_pool(self):
    await self.pool.close()

async def get_connection(self):
    async with self.pool.acquire() as connection:
        yield connection
db_pool = DatabasePool()
@app.on_event('startup') async def startup(): await db_pool.create_pool()
@app.on_event('shutdown') async def shutdown(): await db_pool.close_pool()
@app.get('/users') async def get_users(conn = Depends(db_pool.get_connection)): rows = await conn.fetch('SELECT * FROM users') return [dict(row) for row in rows]
class DatabasePool: def init(self): self.pool = None
async def create_pool(self):
    self.pool = await asyncpg.create_pool(
        'postgresql://user:pass@localhost/db',
        min_size=10,
        max_size=20,
        command_timeout=60,
        max_queries=50000,
        max_inactive_connection_lifetime=300
    )

async def close_pool(self):
    await self.pool.close()

async def get_connection(self):
    async with self.pool.acquire() as connection:
        yield connection
db_pool = DatabasePool()
@app.on_event('startup') async def startup(): await db_pool.create_pool()
@app.on_event('shutdown') async def shutdown(): await db_pool.close_pool()
@app.get('/users') async def get_users(conn = Depends(db_pool.get_connection)): rows = await conn.fetch('SELECT * FROM users') return [dict(row) for row in rows]

HTTP client pool

HTTP client pool

class HTTPClientPool: def init(self): self.client = None
async def get_client(self) -> AsyncGenerator[httpx.AsyncClient, None]:
    if self.client is None:
        self.client = httpx.AsyncClient(
            limits=httpx.Limits(max_keepalive_connections=20, max_connections=100),
            timeout=httpx.Timeout(10.0)
        )
    yield self.client

async def close(self):
    if self.client:
        await self.client.aclose()
http_pool = HTTPClientPool()
@app.get('/external-api') async def call_external_api(client: httpx.AsyncClient = Depends(http_pool.get_client)): response = await client.get('https://api.example.com/data') return response.json()
undefined
class HTTPClientPool: def init(self): self.client = None
async def get_client(self) -> AsyncGenerator[httpx.AsyncClient, None]:
    if self.client is None:
        self.client = httpx.AsyncClient(
            limits=httpx.Limits(max_keepalive_connections=20, max_connections=100),
            timeout=httpx.Timeout(10.0)
        )
    yield self.client

async def close(self):
    if self.client:
        await self.client.aclose()
http_pool = HTTPClientPool()
@app.get('/external-api') async def call_external_api(client: httpx.AsyncClient = Depends(http_pool.get_client)): response = await client.get('https://api.example.com/data') return response.json()
undefined

Performance Optimization

性能优化

Async patterns for optimal performance.
python
from fastapi import FastAPI
import asyncio
from functools import lru_cache

app = FastAPI()
实现最优性能的异步模式。
python
from fastapi import FastAPI
import asyncio
from functools import lru_cache

app = FastAPI()

Cache expensive async operations

Cache expensive async operations

from aiocache import Cache from aiocache.serializers import JsonSerializer
cache = Cache(Cache.MEMORY, serializer=JsonSerializer())
@app.get('/expensive-data/{key}') async def get_expensive_data(key: str): # Check cache first cached = await cache.get(key) if cached: return {'data': cached, 'cached': True}
# Expensive operation
await asyncio.sleep(2)
data = compute_expensive_result(key)

# Store in cache
await cache.set(key, data, ttl=300)
return {'data': data, 'cached': False}
from aiocache import Cache from aiocache.serializers import JsonSerializer
cache = Cache(Cache.MEMORY, serializer=JsonSerializer())
@app.get('/expensive-data/{key}') async def get_expensive_data(key: str): # Check cache first cached = await cache.get(key) if cached: return {'data': cached, 'cached': True}
# Expensive operation
await asyncio.sleep(2)
data = compute_expensive_result(key)

# Store in cache
await cache.set(key, data, ttl=300)
return {'data': data, 'cached': False}

Batch operations

Batch operations

@app.post('/users/batch') async def create_users_batch(users: List[UserCreate], db = Depends(get_db)): # Create users in batch (more efficient than one-by-one) user_objects = [User(**user.dict()) for user in users] db.add_all(user_objects) await db.flush() return user_objects
@app.post('/users/batch') async def create_users_batch(users: List[UserCreate], db = Depends(get_db)): # Create users in batch (more efficient than one-by-one) user_objects = [User(**user.dict()) for user in users] db.add_all(user_objects) await db.flush() return user_objects

Debouncing with asyncio

Debouncing with asyncio

class Debouncer: def init(self, delay: float): self.delay = delay self.task = None
async def debounce(self, coro):
    if self.task:
        self.task.cancel()

    async def delayed():
        await asyncio.sleep(self.delay)
        await coro

    self.task = asyncio.create_task(delayed())
    await self.task
debouncer = Debouncer(delay=1.0)
class Debouncer: def init(self, delay: float): self.delay = delay self.task = None
async def debounce(self, coro):
    if self.task:
        self.task.cancel()

    async def delayed():
        await asyncio.sleep(self.delay)
        await coro

    self.task = asyncio.create_task(delayed())
    await self.task
debouncer = Debouncer(delay=1.0)

Prefetching related data

Prefetching related data

@app.get('/posts/{post_id}') async def get_post_with_relations(post_id: int, db = Depends(get_db)): # Fetch post and related data in parallel post_task = db.get(Post, post_id) comments_task = db.execute( select(Comment).where(Comment.post_id == post_id) ) author_task = db.execute( select(User).where(User.id == Post.author_id) )
post, comments_result, author_result = await asyncio.gather(
    post_task, comments_task, author_task
)

return {
    'post': post,
    'comments': comments_result.scalars().all(),
    'author': author_result.scalar_one()
}
undefined
@app.get('/posts/{post_id}') async def get_post_with_relations(post_id: int, db = Depends(get_db)): # Fetch post and related data in parallel post_task = db.get(Post, post_id) comments_task = db.execute( select(Comment).where(Comment.post_id == post_id) ) author_task = db.execute( select(User).where(User.id == Post.author_id) )
post, comments_result, author_result = await asyncio.gather(
    post_task, comments_task, author_task
)

return {
    'post': post,
    'comments': comments_result.scalars().all(),
    'author': author_result.scalar_one()
}
undefined

When to Use This Skill

何时使用该技能

Use fastapi-async-patterns when:
  • Building high-throughput APIs that handle many concurrent requests
  • Working with I/O-bound operations (database, external APIs, file operations)
  • Implementing real-time features (WebSockets, SSE)
  • Processing multiple operations in parallel
  • Streaming large datasets or files
  • Building microservices that communicate with other services
  • Optimizing API response times and resource usage
  • Handling background tasks without blocking responses
在以下场景中使用fastapi-async-patterns:
  • 构建高吞吐量、需处理大量并发请求的API
  • 处理I/O密集型操作(数据库、外部API、文件操作)
  • 实现实时功能(WebSocket、SSE)
  • 并行处理多个操作
  • 流式传输大型数据集或文件
  • 构建与其他服务通信的微服务
  • 优化API响应时间与资源利用率
  • 处理无需阻塞响应的后台任务

FastAPI Async Best Practices

FastAPI 异步最佳实践

  1. Use async for I/O - Always use async for database, HTTP requests, and file operations
  2. Avoid blocking calls - Never use blocking calls in async functions (time.sleep, requests library)
  3. Connection pooling - Use connection pools for databases and HTTP clients
  4. Proper cleanup - Always clean up resources with try/finally or async context managers
  5. Concurrent operations - Use asyncio.gather for parallel operations when possible
  6. Background tasks - Use BackgroundTasks for fire-and-forget operations
  7. Stream large data - Use StreamingResponse for large files or generated content
  8. Timeout handling - Set timeouts on all external calls to prevent hanging
  9. Error propagation - Handle exceptions properly in async code
  10. Monitor performance - Use tools like aiomonitor to debug async issues
  1. I/O操作使用异步 - 数据库、HTTP请求和文件操作始终使用异步
  2. 避免阻塞调用 - 异步函数中绝不使用阻塞调用(如time.sleep、requests库)
  3. 使用连接池 - 为数据库和HTTP客户端使用连接池
  4. 正确清理资源 - 始终使用try/finally或异步上下文管理器清理资源
  5. 并发操作 - 尽可能使用asyncio.gather实现并行操作
  6. 后台任务 - 使用BackgroundTasks处理即发即忘的操作
  7. 流式传输大数据 - 使用StreamingResponse处理大文件或生成的内容
  8. 超时处理 - 为所有外部调用设置超时,防止服务挂起
  9. 错误传播 - 在异步代码中正确处理异常
  10. 性能监控 - 使用aiomonitor等工具调试异步问题

FastAPI Async Common Pitfalls

FastAPI 异步常见陷阱

  1. Blocking the event loop - Using synchronous I/O in async functions kills performance
  2. Missing await - Forgetting await on async functions causes coroutine warnings
  3. Creating too many tasks - Spawning unlimited tasks can exhaust resources
  4. Not closing connections - Resource leaks from unclosed database/HTTP connections
  5. Mixing sync and async - Incorrect mixing causes event loop issues
  6. Race conditions - Shared state in async code without proper locking
  7. Timeout issues - No timeouts on external calls can hang the server
  8. Memory leaks - Background tasks that never complete accumulate
  9. Error swallowing - Silent failures in background tasks and event handlers
  10. Deadlocks - Circular waits in async dependencies or locks
  1. 阻塞事件循环 - 异步函数中使用同步I/O会严重影响性能
  2. 遗漏await - 忘记在异步函数前加await会导致协程警告
  3. 创建过多任务 - 无限制创建任务会耗尽资源
  4. 未关闭连接 - 未关闭的数据库/HTTP连接会导致资源泄漏
  5. 混合同步与异步 - 错误混合同步与异步代码会引发事件循环问题
  6. 竞态条件 - 异步代码中的共享状态未正确加锁
  7. 超时问题 - 外部调用未设置超时会导致服务器挂起
  8. 内存泄漏 - 未完成的后台任务会不断累积
  9. 错误吞灭 - 后台任务和事件处理程序中的静默失败
  10. 死锁 - 异步依赖或锁中的循环等待

Resources

参考资源