async-python-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAsync Python Patterns
Python异步编程模式
Comprehensive guidance for implementing asynchronous Python applications using asyncio, concurrent programming patterns, and async/await for building high-performance, non-blocking systems.
本文提供了使用asyncio、并发编程模式以及async/await实现Python异步应用的全面指南,用于构建高性能、非阻塞的系统。
When to Use This Skill
何时使用该技术
- Building async web APIs (FastAPI, aiohttp, Sanic)
- Implementing concurrent I/O operations (database, file, network)
- Creating web scrapers with concurrent requests
- Developing real-time applications (WebSocket servers, chat systems)
- Processing multiple independent tasks simultaneously
- Building microservices with async communication
- Optimizing I/O-bound workloads
- Implementing async background tasks and queues
- 构建异步Web API(FastAPI、aiohttp、Sanic)
- 实现并发I/O操作(数据库、文件、网络)
- 创建支持并发请求的网络爬虫
- 开发实时应用(WebSocket服务器、聊天系统)
- 同时处理多个独立任务
- 构建采用异步通信的微服务
- 优化I/O密集型工作负载
- 实现异步后台任务与队列
Sync vs Async Decision Guide
同步vs异步决策指南
Before adopting async, consider whether it's the right choice for your use case.
| Use Case | Recommended Approach |
|---|---|
| Many concurrent network/DB calls | |
| CPU-bound computation | |
| Mixed I/O + CPU | Offload CPU work with |
| Simple scripts, few connections | Sync (simpler, easier to debug) |
| Web APIs with high concurrency | Async frameworks (FastAPI, aiohttp) |
Key Rule: Stay fully sync or fully async within a call path. Mixing creates hidden blocking and complexity.
在采用异步编程之前,请考虑它是否适合你的使用场景。
| 使用场景 | 推荐方案 |
|---|---|
| 大量并发网络/数据库调用 | |
| CPU密集型计算 | |
| I/O与CPU混合负载 | 使用 |
| 简单脚本、少量连接 | 同步编程(更简单、易于调试) |
| 高并发Web API | 异步框架(FastAPI、aiohttp) |
核心原则: 在调用路径中保持完全同步或完全异步。混合使用会导致隐藏的阻塞和复杂度。
Core Concepts
核心概念
1. Event Loop
1. 事件循环
The event loop is the heart of asyncio, managing and scheduling asynchronous tasks.
Key characteristics:
- Single-threaded cooperative multitasking
- Schedules coroutines for execution
- Handles I/O operations without blocking
- Manages callbacks and futures
事件循环是asyncio的核心,用于管理和调度异步任务。
关键特性:
- 单线程协作式多任务处理
- 调度协程执行
- 无阻塞处理I/O操作
- 管理回调和Future对象
2. Coroutines
2. 协程
Functions defined with that can be paused and resumed.
async defSyntax:
python
async def my_coroutine():
result = await some_async_operation()
return result使用定义的可暂停和恢复的函数。
async def语法:
python
async def my_coroutine():
result = await some_async_operation()
return result3. Tasks
3. 任务
Scheduled coroutines that run concurrently on the event loop.
在事件循环上并发运行的已调度协程。
4. Futures
4. Future对象
Low-level objects representing eventual results of async operations.
表示异步操作最终结果的底层对象。
5. Async Context Managers
5. 异步上下文管理器
Resources that support for proper cleanup.
async with支持语法以实现资源正确清理的资源对象。
async with6. Async Iterators
6. 异步迭代器
Objects that support for iterating over async data sources.
async for支持语法以遍历异步数据源的对象。
async forQuick Start
快速入门
python
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")python
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")Python 3.7+
Python 3.7+
asyncio.run(main())
undefinedasyncio.run(main())
undefinedFundamental Patterns
基础模式
Pattern 1: Basic Async/Await
模式1:基础Async/Await
python
import asyncio
async def fetch_data(url: str) -> dict:
"""Fetch data from URL asynchronously."""
await asyncio.sleep(1) # Simulate I/O
return {"url": url, "data": "result"}
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())python
import asyncio
async def fetch_data(url: str) -> dict:
"""异步从URL获取数据。"""
await asyncio.sleep(1) # 模拟I/O操作
return {"url": url, "data": "result"}
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())Pattern 2: Concurrent Execution with gather()
模式2:使用gather()实现并发执行
python
import asyncio
from typing import List
async def fetch_user(user_id: int) -> dict:
"""Fetch user data."""
await asyncio.sleep(0.5)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
"""Fetch multiple users concurrently."""
tasks = [fetch_user(uid) for uid in user_ids]
results = await asyncio.gather(*tasks)
return results
async def main():
user_ids = [1, 2, 3, 4, 5]
users = await fetch_all_users(user_ids)
print(f"Fetched {len(users)} users")
asyncio.run(main())python
import asyncio
from typing import List
async def fetch_user(user_id: int) -> dict:
"""获取用户数据。"""
await asyncio.sleep(0.5)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
"""并发获取多个用户数据。"""
tasks = [fetch_user(uid) for uid in user_ids]
results = await asyncio.gather(*tasks)
return results
async def main():
user_ids = [1, 2, 3, 4, 5]
users = await fetch_all_users(user_ids)
print(f"Fetched {len(users)} users")
asyncio.run(main())Pattern 3: Task Creation and Management
模式3:任务创建与管理
python
import asyncio
async def background_task(name: str, delay: int):
"""Long-running background task."""
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} completed")
return f"Result from {name}"
async def main():
# Create tasks
task1 = asyncio.create_task(background_task("Task 1", 2))
task2 = asyncio.create_task(background_task("Task 2", 1))
# Do other work
print("Main: doing other work")
await asyncio.sleep(0.5)
# Wait for tasks
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())python
import asyncio
async def background_task(name: str, delay: int):
"""长时间运行的后台任务。"""
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} completed")
return f"Result from {name}"
async def main():
# 创建任务
task1 = asyncio.create_task(background_task("Task 1", 2))
task2 = asyncio.create_task(background_task("Task 2", 1))
# 执行其他工作
print("Main: doing other work")
await asyncio.sleep(0.5)
# 等待任务完成
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())Pattern 4: Error Handling in Async Code
模式4:异步代码中的错误处理
python
import asyncio
from typing import List, Optional
async def risky_operation(item_id: int) -> dict:
"""Operation that might fail."""
await asyncio.sleep(0.1)
if item_id % 3 == 0:
raise ValueError(f"Item {item_id} failed")
return {"id": item_id, "status": "success"}
async def safe_operation(item_id: int) -> Optional[dict]:
"""Wrapper with error handling."""
try:
return await risky_operation(item_id)
except ValueError as e:
print(f"Error: {e}")
return None
async def process_items(item_ids: List[int]):
"""Process multiple items with error handling."""
tasks = [safe_operation(iid) for iid in item_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out failures
successful = [r for r in results if r is not None and not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"Success: {len(successful)}, Failed: {len(failed)}")
return successful
asyncio.run(process_items([1, 2, 3, 4, 5, 6]))python
import asyncio
from typing import List, Optional
async def risky_operation(item_id: int) -> dict:
"""可能失败的操作。"""
await asyncio.sleep(0.1)
if item_id % 3 == 0:
raise ValueError(f"Item {item_id} failed")
return {"id": item_id, "status": "success"}
async def safe_operation(item_id: int) -> Optional[dict]:
"""带有错误处理的包装函数。"""
try:
return await risky_operation(item_id)
except ValueError as e:
print(f"Error: {e}")
return None
async def process_items(item_ids: List[int]):
"""处理多个带有错误处理的任务。"""
tasks = [safe_operation(iid) for iid in item_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤失败的任务
successful = [r for r in results if r is not None and not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"Success: {len(successful)}, Failed: {len(failed)}")
return successful
asyncio.run(process_items([1, 2, 3, 4, 5, 6]))Pattern 5: Timeout Handling
模式5:超时处理
python
import asyncio
async def slow_operation(delay: int) -> str:
"""Operation that takes time."""
await asyncio.sleep(delay)
return f"Completed after {delay}s"
async def with_timeout():
"""Execute operation with timeout."""
try:
result = await asyncio.wait_for(slow_operation(5), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(with_timeout())python
import asyncio
async def slow_operation(delay: int) -> str:
"""耗时操作。"""
await asyncio.sleep(delay)
return f"Completed after {delay}s"
async def with_timeout():
"""带超时的操作执行。"""
try:
result = await asyncio.wait_for(slow_operation(5), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(with_timeout())Advanced Patterns
高级模式
Pattern 6: Async Context Managers
模式6:异步上下文管理器
python
import asyncio
from typing import Optional
class AsyncDatabaseConnection:
"""Async database connection context manager."""
def __init__(self, dsn: str):
self.dsn = dsn
self.connection: Optional[object] = None
async def __aenter__(self):
print("Opening connection")
await asyncio.sleep(0.1) # Simulate connection
self.connection = {"dsn": self.dsn, "connected": True}
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection")
await asyncio.sleep(0.1) # Simulate cleanup
self.connection = None
async def query_database():
"""Use async context manager."""
async with AsyncDatabaseConnection("postgresql://localhost") as conn:
print(f"Using connection: {conn}")
await asyncio.sleep(0.2) # Simulate query
return {"rows": 10}
asyncio.run(query_database())python
import asyncio
from typing import Optional
class AsyncDatabaseConnection:
"""异步数据库连接上下文管理器。"""
def __init__(self, dsn: str):
self.dsn = dsn
self.connection: Optional[object] = None
async def __aenter__(self):
print("Opening connection")
await asyncio.sleep(0.1) # 模拟连接过程
self.connection = {"dsn": self.dsn, "connected": True}
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection")
await asyncio.sleep(0.1) # 模拟清理过程
self.connection = None
async def query_database():
"""使用异步上下文管理器。"""
async with AsyncDatabaseConnection("postgresql://localhost") as conn:
print(f"Using connection: {conn}")
await asyncio.sleep(0.2) # 模拟查询
return {"rows": 10}
asyncio.run(query_database())Pattern 7: Async Iterators and Generators
模式7:异步迭代器与生成器
python
import asyncio
from typing import AsyncIterator
async def async_range(start: int, end: int, delay: float = 0.1) -> AsyncIterator[int]:
"""Async generator that yields numbers with delay."""
for i in range(start, end):
await asyncio.sleep(delay)
yield i
async def fetch_pages(url: str, max_pages: int) -> AsyncIterator[dict]:
"""Fetch paginated data asynchronously."""
for page in range(1, max_pages + 1):
await asyncio.sleep(0.2) # Simulate API call
yield {
"page": page,
"url": f"{url}?page={page}",
"data": [f"item_{page}_{i}" for i in range(5)]
}
async def consume_async_iterator():
"""Consume async iterator."""
async for number in async_range(1, 5):
print(f"Number: {number}")
print("\nFetching pages:")
async for page_data in fetch_pages("https://api.example.com/items", 3):
print(f"Page {page_data['page']}: {len(page_data['data'])} items")
asyncio.run(consume_async_iterator())python
import asyncio
from typing import AsyncIterator
async def async_range(start: int, end: int, delay: float = 0.1) -> AsyncIterator[int]:
"""带有延迟的异步生成器,生成数字序列。"""
for i in range(start, end):
await asyncio.sleep(delay)
yield i
async def fetch_pages(url: str, max_pages: int) -> AsyncIterator[dict]:
"""异步获取分页数据。"""
for page in range(1, max_pages + 1):
await asyncio.sleep(0.2) # 模拟API调用
yield {
"page": page,
"url": f"{url}?page={page}",
"data": [f"item_{page}_{i}" for i in range(5)]
}
async def consume_async_iterator():
"""消费异步迭代器。"""
async for number in async_range(1, 5):
print(f"Number: {number}")
print("\nFetching pages:")
async for page_data in fetch_pages("https://api.example.com/items", 3):
print(f"Page {page_data['page']}: {len(page_data['data'])} items")
asyncio.run(consume_async_iterator())Pattern 8: Producer-Consumer Pattern
模式8:生产者-消费者模式
python
import asyncio
from asyncio import Queue
from typing import Optional
async def producer(queue: Queue, producer_id: int, num_items: int):
"""Produce items and put them in queue."""
for i in range(num_items):
item = f"Item-{producer_id}-{i}"
await queue.put(item)
print(f"Producer {producer_id} produced: {item}")
await asyncio.sleep(0.1)
await queue.put(None) # Signal completion
async def consumer(queue: Queue, consumer_id: int):
"""Consume items from queue."""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Consumer {consumer_id} processing: {item}")
await asyncio.sleep(0.2) # Simulate work
queue.task_done()
async def producer_consumer_example():
"""Run producer-consumer pattern."""
queue = Queue(maxsize=10)
# Create tasks
producers = [
asyncio.create_task(producer(queue, i, 5))
for i in range(2)
]
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
# Wait for producers
await asyncio.gather(*producers)
# Wait for queue to be empty
await queue.join()
# Cancel consumers
for c in consumers:
c.cancel()
asyncio.run(producer_consumer_example())python
import asyncio
from asyncio import Queue
from typing import Optional
async def producer(queue: Queue, producer_id: int, num_items: int):
"""生产任务项并放入队列。"""
for i in range(num_items):
item = f"Item-{producer_id}-{i}"
await queue.put(item)
print(f"Producer {producer_id} produced: {item}")
await asyncio.sleep(0.1)
await queue.put(None) # 发送完成信号
async def consumer(queue: Queue, consumer_id: int):
"""从队列中消费任务项。"""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Consumer {consumer_id} processing: {item}")
await asyncio.sleep(0.2) # 模拟处理过程
queue.task_done()
async def producer_consumer_example():
"""运行生产者-消费者模式示例。"""
queue = Queue(maxsize=10)
# 创建任务
producers = [
asyncio.create_task(producer(queue, i, 5))
for i in range(2)
]
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
# 等待生产者完成
await asyncio.gather(*producers)
# 等待队列任务全部完成
await queue.join()
# 取消消费者任务
for c in consumers:
c.cancel()
asyncio.run(producer_consumer_example())Pattern 9: Semaphore for Rate Limiting
模式9:使用信号量进行速率限制
python
import asyncio
from typing import List
async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
"""Make API call with rate limiting."""
async with semaphore:
print(f"Calling {url}")
await asyncio.sleep(0.5) # Simulate API call
return {"url": url, "status": 200}
async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
"""Make multiple requests with rate limiting."""
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [api_call(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
results = await rate_limited_requests(urls, max_concurrent=3)
print(f"Completed {len(results)} requests")
asyncio.run(main())python
import asyncio
from typing import List
async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
"""使用信号量进行速率限制的API调用。"""
async with semaphore:
print(f"Calling {url}")
await asyncio.sleep(0.5) # 模拟API调用
return {"url": url, "status": 200}
async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
"""使用速率限制发送多个请求。"""
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [api_call(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
results = await rate_limited_requests(urls, max_concurrent=3)
print(f"Completed {len(results)} requests")
asyncio.run(main())Pattern 10: Async Locks and Synchronization
模式10:异步锁与同步机制
python
import asyncio
class AsyncCounter:
"""Thread-safe async counter."""
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
"""Safely increment counter."""
async with self.lock:
current = self.value
await asyncio.sleep(0.01) # Simulate work
self.value = current + 1
async def get_value(self) -> int:
"""Get current value."""
async with self.lock:
return self.value
async def worker(counter: AsyncCounter, worker_id: int):
"""Worker that increments counter."""
for _ in range(10):
await counter.increment()
print(f"Worker {worker_id} incremented")
async def test_counter():
"""Test concurrent counter."""
counter = AsyncCounter()
workers = [asyncio.create_task(worker(counter, i)) for i in range(5)]
await asyncio.gather(*workers)
final_value = await counter.get_value()
print(f"Final counter value: {final_value}")
asyncio.run(test_counter())python
import asyncio
class AsyncCounter:
"""线程安全的异步计数器。"""
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
"""安全地递增计数器。"""
async with self.lock:
current = self.value
await asyncio.sleep(0.01) # 模拟处理过程
self.value = current + 1
async def get_value(self) -> int:
"""获取当前计数器值。"""
async with self.lock:
return self.value
async def worker(counter: AsyncCounter, worker_id: int):
"""递增计数器的工作任务。"""
for _ in range(10):
await counter.increment()
print(f"Worker {worker_id} incremented")
async def test_counter():
"""测试并发计数器。"""
counter = AsyncCounter()
workers = [asyncio.create_task(worker(counter, i)) for i in range(5)]
await asyncio.gather(*workers)
final_value = await counter.get_value()
print(f"Final counter value: {final_value}")
asyncio.run(test_counter())Real-World Applications
实际应用场景
Web Scraping with aiohttp
使用aiohttp进行网络爬虫
python
import asyncio
import aiohttp
from typing import List, Dict
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
"""Fetch single URL."""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
text = await response.text()
return {
"url": url,
"status": response.status,
"length": len(text)
}
except Exception as e:
return {"url": url, "error": str(e)}
async def scrape_urls(urls: List[str]) -> List[Dict]:
"""Scrape multiple URLs concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/404",
]
results = await scrape_urls(urls)
for result in results:
print(result)
asyncio.run(main())python
import asyncio
import aiohttp
from typing import List, Dict
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
"""获取单个URL的内容。"""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
text = await response.text()
return {
"url": url,
"status": response.status,
"length": len(text)
}
except Exception as e:
return {"url": url, "error": str(e)}
async def scrape_urls(urls: List[str]) -> List[Dict]:
"""并发爬取多个URL。"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/404",
]
results = await scrape_urls(urls)
for result in results:
print(result)
asyncio.run(main())Async Database Operations
异步数据库操作
python
import asyncio
from typing import List, Optionalpython
import asyncio
from typing import List, OptionalSimulated async database client
模拟异步数据库客户端
class AsyncDB:
"""Simulated async database."""
async def execute(self, query: str) -> List[dict]:
"""Execute query."""
await asyncio.sleep(0.1)
return [{"id": 1, "name": "Example"}]
async def fetch_one(self, query: str) -> Optional[dict]:
"""Fetch single row."""
await asyncio.sleep(0.1)
return {"id": 1, "name": "Example"}async def get_user_data(db: AsyncDB, user_id: int) -> dict:
"""Fetch user and related data concurrently."""
user_task = db.fetch_one(f"SELECT * FROM users WHERE id = {user_id}")
orders_task = db.execute(f"SELECT * FROM orders WHERE user_id = {user_id}")
profile_task = db.fetch_one(f"SELECT * FROM profiles WHERE user_id = {user_id}")
user, orders, profile = await asyncio.gather(user_task, orders_task, profile_task)
return {
"user": user,
"orders": orders,
"profile": profile
}async def main():
db = AsyncDB()
user_data = await get_user_data(db, 1)
print(user_data)
asyncio.run(main())
undefinedclass AsyncDB:
"""模拟的异步数据库。"""
async def execute(self, query: str) -> List[dict]:
"""执行查询语句。"""
await asyncio.sleep(0.1)
return [{"id": 1, "name": "Example"}]
async def fetch_one(self, query: str) -> Optional[dict]:
"""获取单行数据。"""
await asyncio.sleep(0.1)
return {"id": 1, "name": "Example"}async def get_user_data(db: AsyncDB, user_id: int) -> dict:
"""并发获取用户及其关联数据。"""
user_task = db.fetch_one(f"SELECT * FROM users WHERE id = {user_id}")
orders_task = db.execute(f"SELECT * FROM orders WHERE user_id = {user_id}")
profile_task = db.fetch_one(f"SELECT * FROM profiles WHERE user_id = {user_id}")
user, orders, profile = await asyncio.gather(user_task, orders_task, profile_task)
return {
"user": user,
"orders": orders,
"profile": profile
}async def main():
db = AsyncDB()
user_data = await get_user_data(db, 1)
print(user_data)
asyncio.run(main())
undefinedWebSocket Server
WebSocket服务器
python
import asyncio
from typing import Setpython
import asyncio
from typing import SetSimulated WebSocket connection
模拟WebSocket连接
class WebSocket:
"""Simulated WebSocket."""
def __init__(self, client_id: str):
self.client_id = client_id
async def send(self, message: str):
"""Send message."""
print(f"Sending to {self.client_id}: {message}")
await asyncio.sleep(0.01)
async def recv(self) -> str:
"""Receive message."""
await asyncio.sleep(1)
return f"Message from {self.client_id}"class WebSocketServer:
"""Simple WebSocket server."""
def __init__(self):
self.clients: Set[WebSocket] = set()
async def register(self, websocket: WebSocket):
"""Register new client."""
self.clients.add(websocket)
print(f"Client {websocket.client_id} connected")
async def unregister(self, websocket: WebSocket):
"""Unregister client."""
self.clients.remove(websocket)
print(f"Client {websocket.client_id} disconnected")
async def broadcast(self, message: str):
"""Broadcast message to all clients."""
if self.clients:
tasks = [client.send(message) for client in self.clients]
await asyncio.gather(*tasks)
async def handle_client(self, websocket: WebSocket):
"""Handle individual client connection."""
await self.register(websocket)
try:
async for message in self.message_iterator(websocket):
await self.broadcast(f"{websocket.client_id}: {message}")
finally:
await self.unregister(websocket)
async def message_iterator(self, websocket: WebSocket):
"""Iterate over messages from client."""
for _ in range(3): # Simulate 3 messages
yield await websocket.recv()undefinedclass WebSocket:
"""模拟的WebSocket连接。"""
def __init__(self, client_id: str):
self.client_id = client_id
async def send(self, message: str):
"""发送消息。"""
print(f"Sending to {self.client_id}: {message}")
await asyncio.sleep(0.01)
async def recv(self) -> str:
"""接收消息。"""
await asyncio.sleep(1)
return f"Message from {self.client_id}"class WebSocketServer:
"""简单的WebSocket服务器。"""
def __init__(self):
self.clients: Set[WebSocket] = set()
async def register(self, websocket: WebSocket):
"""注册新客户端。"""
self.clients.add(websocket)
print(f"Client {websocket.client_id} connected")
async def unregister(self, websocket: WebSocket):
"""注销客户端。"""
self.clients.remove(websocket)
print(f"Client {websocket.client_id} disconnected")
async def broadcast(self, message: str):
"""向所有客户端广播消息。"""
if self.clients:
tasks = [client.send(message) for client in self.clients]
await asyncio.gather(*tasks)
async def handle_client(self, websocket: WebSocket):
"""处理单个客户端连接。"""
await self.register(websocket)
try:
async for message in self.message_iterator(websocket):
await self.broadcast(f"{websocket.client_id}: {message}")
finally:
await self.unregister(websocket)
async def message_iterator(self, websocket: WebSocket):
"""遍历客户端发送的消息。"""
for _ in range(3): # 模拟3条消息
yield await websocket.recv()undefinedPerformance Best Practices
性能最佳实践
1. Use Connection Pools
1. 使用连接池
python
import asyncio
import aiohttp
async def with_connection_pool():
"""Use connection pool for efficiency."""
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [session.get(f"https://api.example.com/item/{i}") for i in range(50)]
responses = await asyncio.gather(*tasks)
return responsespython
import asyncio
import aiohttp
async def with_connection_pool():
"""使用连接池提升效率。"""
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [session.get(f"https://api.example.com/item/{i}") for i in range(50)]
responses = await asyncio.gather(*tasks)
return responses2. Batch Operations
2. 批量操作
python
async def batch_process(items: List[str], batch_size: int = 10):
"""Process items in batches."""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
tasks = [process_item(item) for item in batch]
await asyncio.gather(*tasks)
print(f"Processed batch {i // batch_size + 1}")
async def process_item(item: str):
"""Process single item."""
await asyncio.sleep(0.1)
return f"Processed: {item}"python
async def batch_process(items: List[str], batch_size: int = 10):
"""批量处理任务项。"""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
tasks = [process_item(item) for item in batch]
await asyncio.gather(*tasks)
print(f"Processed batch {i // batch_size + 1}")
async def process_item(item: str):
"""处理单个任务项。"""
await asyncio.sleep(0.1)
return f"Processed: {item}"3. Avoid Blocking Operations
3. 避免阻塞操作
Never block the event loop with synchronous operations. A single blocking call stalls all concurrent tasks.
python
undefined切勿使用同步操作阻塞事件循环。单个阻塞调用会导致所有并发任务停滞。
python
undefinedBAD - blocks the entire event loop
错误示例 - 阻塞整个事件循环
async def fetch_data_bad():
import time
import requests
time.sleep(1) # Blocks!
response = requests.get(url) # Also blocks!
async def fetch_data_bad():
import time
import requests
time.sleep(1) # 阻塞!
response = requests.get(url) # 同样阻塞!
GOOD - use async-native libraries (e.g., httpx for async HTTP)
正确示例 - 使用异步原生库(例如,使用httpx进行异步HTTP请求)
import httpx
async def fetch_data_good(url: str):
await asyncio.sleep(1)
async with httpx.AsyncClient() as client:
response = await client.get(url)
**Wrapping Blocking Code with `asyncio.to_thread()` (Python 3.9+):**
When you must use synchronous libraries, offload to a thread pool:
```python
import asyncio
from pathlib import Path
async def read_file_async(path: str) -> str:
"""Read file without blocking event loop."""
# asyncio.to_thread() runs sync code in a thread pool
return await asyncio.to_thread(Path(path).read_text)
async def call_sync_library(data: dict) -> dict:
"""Wrap a synchronous library call."""
# Useful for sync database drivers, file I/O, CPU work
return await asyncio.to_thread(sync_library.process, data)Lower-level approach with :
run_in_executor()python
import asyncio
import concurrent.futures
from typing import Any
def blocking_operation(data: Any) -> Any:
"""CPU-intensive blocking operation."""
import time
time.sleep(1)
return data * 2
async def run_in_executor(data: Any) -> Any:
"""Run blocking operation in thread pool."""
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_operation, data)
return result
async def main():
results = await asyncio.gather(*[run_in_executor(i) for i in range(5)])
print(results)
asyncio.run(main())import httpx
async def fetch_data_good(url: str):
await asyncio.sleep(1)
async with httpx.AsyncClient() as client:
response = await client.get(url)
**使用`asyncio.to_thread()`包装阻塞代码(Python 3.9+):**
当必须使用同步库时,将其卸载到线程池:
```python
import asyncio
from pathlib import Path
async def read_file_async(path: str) -> str:
"""异步读取文件,不阻塞事件循环。"""
# asyncio.to_thread() 在线程池中运行同步代码
return await asyncio.to_thread(Path(path).read_text)
async def call_sync_library(data: dict) -> dict:
"""包装同步库调用。"""
# 适用于同步数据库驱动、文件I/O、CPU密集型工作
return await asyncio.to_thread(sync_library.process, data)使用的底层实现方式:
run_in_executor()python
import asyncio
import concurrent.futures
from typing import Any
def blocking_operation(data: Any) -> Any:
"""CPU密集型阻塞操作。"""
import time
time.sleep(1)
return data * 2
async def run_in_executor(data: Any) -> Any:
"""在线程池中运行阻塞操作。"""
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_operation, data)
return result
async def main():
results = await asyncio.gather(*[run_in_executor(i) for i in range(5)])
print(results)
asyncio.run(main())Common Pitfalls
常见陷阱
1. Forgetting await
1. 忘记使用await
python
undefinedpython
undefinedWrong - returns coroutine object, doesn't execute
错误 - 返回协程对象,未执行
result = async_function()
result = async_function()
Correct
正确
result = await async_function()
undefinedresult = await async_function()
undefined2. Blocking the Event Loop
2. 阻塞事件循环
python
undefinedpython
undefinedWrong - blocks event loop
错误 - 阻塞事件循环
import time
async def bad():
time.sleep(1) # Blocks!
import time
async def bad():
time.sleep(1) # 阻塞!
Correct
正确
async def good():
await asyncio.sleep(1) # Non-blocking
undefinedasync def good():
await asyncio.sleep(1) # 非阻塞
undefined3. Not Handling Cancellation
3. 未处理任务取消
python
async def cancelable_task():
"""Task that handles cancellation."""
try:
while True:
await asyncio.sleep(1)
print("Working...")
except asyncio.CancelledError:
print("Task cancelled, cleaning up...")
# Perform cleanup
raise # Re-raise to propagate cancellationpython
async def cancelable_task():
"""可取消的任务,包含取消处理逻辑。"""
try:
while True:
await asyncio.sleep(1)
print("Working...")
except asyncio.CancelledError:
print("Task cancelled, cleaning up...")
# 执行清理操作
raise # 重新抛出异常以传播取消信号4. Mixing Sync and Async Code
4. 混合同步与异步代码
python
undefinedpython
undefinedWrong - can't call async from sync directly
错误 - 无法直接从同步函数调用异步函数
def sync_function():
result = await async_function() # SyntaxError!
def sync_function():
result = await async_function() # 语法错误!
Correct
正确
def sync_function():
result = asyncio.run(async_function())
undefineddef sync_function():
result = asyncio.run(async_function())
undefinedTesting Async Code
异步代码测试
python
import asyncio
import pytestpython
import asyncio
import pytestUsing pytest-asyncio
使用pytest-asyncio
@pytest.mark.asyncio
async def test_async_function():
"""Test async function."""
result = await fetch_data("https://api.example.com")
assert result is not None
@pytest.mark.asyncio
async def test_with_timeout():
"""Test with timeout."""
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(5), timeout=1.0)
undefined@pytest.mark.asyncio
async def test_async_function():
"""测试异步函数。"""
result = await fetch_data("https://api.example.com")
assert result is not None
@pytest.mark.asyncio
async def test_with_timeout():
"""测试超时场景。"""
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(5), timeout=1.0)
undefinedResources
参考资源
- Python asyncio documentation: https://docs.python.org/3/library/asyncio.html
- aiohttp: Async HTTP client/server
- FastAPI: Modern async web framework
- asyncpg: Async PostgreSQL driver
- motor: Async MongoDB driver
- Python asyncio官方文档: https://docs.python.org/3/library/asyncio.html
- aiohttp: 异步HTTP客户端/服务器
- FastAPI: 现代异步Web框架
- asyncpg: 异步PostgreSQL驱动
- motor: 异步MongoDB驱动
Best Practices Summary
最佳实践总结
- Use asyncio.run() for entry point (Python 3.7+)
- Always await coroutines to execute them
- Limit concurrency with semaphores - unbounded can exhaust resources
gather() - Implement proper error handling with try/except
- Use timeouts to prevent hanging operations
- Pool connections for better performance
- Never block the event loop - use for sync code
asyncio.to_thread() - Use semaphores for rate limiting external API calls
- Handle task cancellation properly - always re-raise
CancelledError - Test async code with pytest-asyncio
- Stay consistent - fully sync or fully async, avoid mixing
- 使用asyncio.run() 作为入口点(Python 3.7+)
- 始终使用await执行协程
- 使用信号量限制并发数 - 无限制的会耗尽资源
gather() - 实现完善的错误处理 - 使用try/except捕获异常
- 为操作设置超时 - 防止任务挂起
- 使用连接池 - 提升性能
- 切勿阻塞事件循环 - 对同步代码使用
asyncio.to_thread() - 使用信号量限制外部API调用速率
- 正确处理任务取消 - 始终重新抛出
CancelledError - 使用pytest-asyncio测试异步代码
- 保持代码一致性 - 完全同步或完全异步,避免混合使用