asyncio-concurrency-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAsyncio Concurrency Patterns
Asyncio并发模式
A comprehensive skill for mastering Python's asyncio library and concurrent programming patterns. This skill covers event loops, coroutines, tasks, futures, synchronization primitives, async context managers, and production-ready patterns for building high-performance asynchronous applications.
这是一份帮助你掌握Python asyncio库和并发编程模式的全面指南。本指南涵盖Event Loop、协程(Coroutines)、任务(Tasks)、Futures、同步原语、异步上下文管理器,以及用于构建高性能异步应用的生产级模式。
When to Use This Skill
何时使用本指南
Use this skill when:
- Building I/O-bound applications that need to handle many concurrent operations
- Creating web servers, API clients, or websocket applications
- Implementing real-time systems with event-driven architecture
- Optimizing application performance with concurrent request handling
- Managing multiple async operations with proper coordination and error handling
- Building background task processors or job queues
- Implementing async database operations and connection pooling
- Creating chat applications, real-time dashboards, or notification systems
- Handling parallel HTTP requests efficiently
- Managing websocket connections with multiple event sources
- Building microservices with async communication patterns
- Optimizing resource utilization in network applications
在以下场景使用本指南:
- 构建需要处理大量并发操作的I/O密集型应用
- 创建Web服务器、API客户端或WebSocket应用
- 实现基于事件驱动架构的实时系统
- 通过并发请求处理优化应用性能
- 通过适当的协调和错误处理管理多个异步操作
- 构建后台任务处理器或作业队列
- 实现异步数据库操作和连接池
- 创建聊天应用、实时仪表盘或通知系统
- 高效处理并行HTTP请求
- 管理带有多个事件源的WebSocket连接
- 构建采用异步通信模式的微服务
- 优化网络应用中的资源利用率
Core Concepts
核心概念
What is Asyncio?
什么是Asyncio?
Asyncio is Python's built-in library for writing concurrent code using the async/await syntax. It provides:
- Event Loop: The core of asyncio that schedules and runs asynchronous tasks
- Coroutines: Functions defined with that can be paused and resumed
async def - Tasks: Scheduled coroutines that run concurrently
- Futures: Low-level objects representing results of async operations
- Synchronization Primitives: Locks, semaphores, events for coordination
Asyncio是Python的内置库,用于使用async/await语法编写并发代码。它提供:
- Event Loop:Asyncio的核心,用于调度和运行异步任务
- Coroutines:使用定义的可暂停和恢复的函数
async def - Tasks:已调度的、可并发运行的协程
- Futures:表示异步操作结果的底层对象
- 同步原语:用于协调的锁、信号量、事件
Event Loop Fundamentals
Event Loop基础
The event loop is the central execution mechanism in asyncio:
python
import asyncioEvent Loop是Asyncio中的核心执行机制:
python
import asyncioGet or create an event loop
获取或创建事件循环
loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop()
Run a coroutine until complete
运行协程直到完成
loop.run_until_complete(my_coroutine())
loop.run_until_complete(my_coroutine())
Modern approach (Python 3.7+)
现代方式(Python 3.7+)
asyncio.run(my_coroutine())
**Key Event Loop Concepts:**
1. **Single-threaded concurrency**: One thread, many tasks
2. **Cooperative multitasking**: Tasks yield control voluntarily
3. **I/O multiplexing**: Efficient handling of many I/O operations
4. **Non-blocking operations**: Don't wait for I/O, do other workasyncio.run(my_coroutine())
**Event Loop核心概念:**
1. **单线程并发**:一个线程,多个任务
2. **协作式多任务**:任务主动让出控制权
3. **I/O多路复用**:高效处理大量I/O操作
4. **非阻塞操作**:不等待I/O,转而处理其他工作Coroutines vs Functions
协程 vs 普通函数
Regular Function:
python
def fetch_data():
# Blocks until complete
return requests.get('http://api.example.com')Coroutine:
python
async def fetch_data():
# Yields control while waiting
async with aiohttp.ClientSession() as session:
async with session.get('http://api.example.com') as resp:
return await resp.text()普通函数:
python
def fetch_data():
# 阻塞直到完成
return requests.get('http://api.example.com')协程:
python
async def fetch_data():
# 等待时让出控制权
async with aiohttp.ClientSession() as session:
async with session.get('http://api.example.com') as resp:
return await resp.text()Tasks and Futures
任务与Futures
Tasks wrap coroutines and schedule them on the event loop:
python
undefinedTasks用于包装协程并在Event Loop上调度:
python
undefinedCreate a task
创建任务
task = asyncio.create_task(my_coroutine())
task = asyncio.create_task(my_coroutine())
Task runs in background
任务在后台运行
... do other work ...
...执行其他工作...
Wait for result
等待结果
result = await task
**Futures** represent eventual results:
```pythonresult = await task
**Futures**表示异步操作的最终结果:
```pythonLow-level future (rarely used directly)
底层Future(很少直接使用)
future = asyncio.Future()
future = asyncio.Future()
Set result
设置结果
future.set_result(42)
future.set_result(42)
Get result
获取结果
result = await future
undefinedresult = await future
undefinedAsync Context Managers
异步上下文管理器
Manage resources with async setup/teardown:
python
class AsyncResource:
async def __aenter__(self):
# Async setup
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Async cleanup
await self.disconnect()通过异步的初始化/清理流程管理资源:
python
class AsyncResource:
async def __aenter__(self):
# 异步初始化
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 异步清理
await self.disconnect()Usage
使用方式
async with AsyncResource() as resource:
await resource.do_work()
undefinedasync with AsyncResource() as resource:
await resource.do_work()
undefinedConcurrency Patterns
并发模式
Pattern 1: Gather - Concurrent Execution
模式1:Gather - 并发执行
Run multiple coroutines concurrently and wait for all to complete:
python
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
# Run all fetches concurrently
results = await asyncio.gather(
fetch(session, 'http://python.org'),
fetch(session, 'http://docs.python.org'),
fetch(session, 'http://pypi.org')
)
return results同时运行多个协程并等待所有完成:
python
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
# 并发运行所有fetch操作
results = await asyncio.gather(
fetch(session, 'http://python.org'),
fetch(session, 'http://docs.python.org'),
fetch(session, 'http://pypi.org')
)
return resultsResults is a list in the same order as inputs
结果列表与输入顺序一致
results = asyncio.run(main())
**When to use:**
- Need all results
- Order matters
- Want to fail fast on first exception (default)
- Can handle partial results with `return_exceptions=True`results = asyncio.run(main())
**适用场景:**
- 需要获取所有结果
- 结果顺序重要
- 希望第一个异常出现时立即终止(默认行为)
- 通过`return_exceptions=True`处理部分结果Pattern 2: Wait - Flexible Waiting
模式2:Wait - 灵活等待
More control over how to wait for multiple tasks:
python
import asyncio
async def task_a():
await asyncio.sleep(2)
return 'A'
async def task_b():
await asyncio.sleep(1)
return 'B'
async def main():
tasks = [
asyncio.create_task(task_a()),
asyncio.create_task(task_b())
]
# Wait for first to complete
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Get first result
first_result = done.pop().result()
# Cancel remaining
for task in pending:
task.cancel()
return first_result
result = asyncio.run(main()) # Returns 'B' after 1 secondWait strategies:
- : Return when first task finishes
FIRST_COMPLETED - : Return when first task raises exception
FIRST_EXCEPTION - : Wait for all tasks (default)
ALL_COMPLETED
对多任务等待方式提供更多控制:
python
import asyncio
async def task_a():
await asyncio.sleep(2)
return 'A'
async def task_b():
await asyncio.sleep(1)
return 'B'
async def main():
tasks = [
asyncio.create_task(task_a()),
asyncio.create_task(task_b())
]
# 等待第一个完成的任务
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# 获取第一个结果
first_result = done.pop().result()
# 取消剩余任务
for task in pending:
task.cancel()
return first_result
result = asyncio.run(main()) # 1秒后返回'B'等待策略:
- :第一个任务完成时返回
FIRST_COMPLETED - :第一个任务抛出异常时返回
FIRST_EXCEPTION - :等待所有任务完成(默认)
ALL_COMPLETED
Pattern 3: Semaphore - Limit Concurrency
模式3:Semaphore - 限制并发数
Control maximum number of concurrent operations:
python
import asyncio
import aiohttp
async def fetch_with_limit(session, url, semaphore):
async with semaphore:
# Only N requests run concurrently
async with session.get(url) as resp:
return await resp.text()
async def main():
# Limit to 5 concurrent requests
semaphore = asyncio.Semaphore(5)
urls = [f'http://api.example.com/item/{i}' for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_limit(session, url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
return results
asyncio.run(main())When to use:
- Rate limiting API requests
- Controlling database connection usage
- Preventing resource exhaustion
- Respecting external service limits
控制最大并发操作数:
python
import asyncio
import aiohttp
async def fetch_with_limit(session, url, semaphore):
async with semaphore:
# 最多N个请求同时运行
async with session.get(url) as resp:
return await resp.text()
async def main():
# 限制为5个并发请求
semaphore = asyncio.Semaphore(5)
urls = [f'http://api.example.com/item/{i}' for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_limit(session, url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks)
return results
asyncio.run(main())适用场景:
- API请求限流
- 控制数据库连接使用
- 防止资源耗尽
- 遵守外部服务的限制
Pattern 4: Lock - Mutual Exclusion
模式4:Lock - 互斥访问
Ensure only one coroutine accesses a resource at a time:
python
import asyncio
class SharedCounter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
# Critical section - only one coroutine at a time
current = self.value
await asyncio.sleep(0) # Simulate async work
self.value = current + 1
async def worker(counter):
for _ in range(100):
await counter.increment()
async def main():
counter = SharedCounter()
# Run 10 workers concurrently
await asyncio.gather(*[worker(counter) for _ in range(10)])
print(f"Final count: {counter.value}") # Always 1000
asyncio.run(main())确保同一时间只有一个协程访问资源:
python
import asyncio
class SharedCounter:
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock:
# 临界区 - 同一时间仅一个协程可进入
current = self.value
await asyncio.sleep(0) # 模拟异步工作
self.value = current + 1
async def worker(counter):
for _ in range(100):
await counter.increment()
async def main():
counter = SharedCounter()
# 并发运行10个工作协程
await asyncio.gather(*[worker(counter) for _ in range(10)])
print(f"最终计数: {counter.value}") # 始终为1000
asyncio.run(main())Pattern 5: Event - Signaling
模式5:Event - 信号通知
Coordinate multiple coroutines with events:
python
import asyncio
async def waiter(event, name):
print(f'{name} waiting for event')
await event.wait()
print(f'{name} received event')
async def setter(event):
await asyncio.sleep(2)
print('Setting event')
event.set()
async def main():
event = asyncio.Event()
# Multiple waiters
await asyncio.gather(
waiter(event, 'Waiter 1'),
waiter(event, 'Waiter 2'),
waiter(event, 'Waiter 3'),
setter(event)
)
asyncio.run(main())通过事件协调多个协程:
python
import asyncio
async def waiter(event, name):
print(f'{name} 等待事件')
await event.wait()
print(f'{name} 收到事件')
async def setter(event):
await asyncio.sleep(2)
print('触发事件')
event.set()
async def main():
event = asyncio.Event()
# 多个等待协程
await asyncio.gather(
waiter(event, 'Waiter 1'),
waiter(event, 'Waiter 2'),
waiter(event, 'Waiter 3'),
setter(event)
)
asyncio.run(main())Pattern 6: Queue - Producer/Consumer
模式6:Queue - 生产者/消费者
Coordinate work between producers and consumers:
python
import asyncio
async def producer(queue, n):
for i in range(n):
await asyncio.sleep(0.1)
await queue.put(f'item-{i}')
print(f'Produced item-{i}')
# Signal completion
await queue.put(None)
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None:
# Propagate sentinel to other consumers
await queue.put(None)
break
print(f'{name} processing {item}')
await asyncio.sleep(0.2)
queue.task_done()
async def main():
queue = asyncio.Queue()
# Start producer and consumers
await asyncio.gather(
producer(queue, 10),
consumer(queue, 'Consumer-1'),
consumer(queue, 'Consumer-2'),
consumer(queue, 'Consumer-3')
)
asyncio.run(main())协调生产者与消费者之间的工作:
python
import asyncio
async def producer(queue, n):
for i in range(n):
await asyncio.sleep(0.1)
await queue.put(f'item-{i}')
print(f'生产 item-{i}')
# 发送完成信号
await queue.put(None)
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None:
# 将结束信号传递给其他消费者
await queue.put(None)
break
print(f'{name} 处理 {item}')
await asyncio.sleep(0.2)
queue.task_done()
async def main():
queue = asyncio.Queue()
# 启动生产者和消费者
await asyncio.gather(
producer(queue, 10),
consumer(queue, 'Consumer-1'),
consumer(queue, 'Consumer-2'),
consumer(queue, 'Consumer-3')
)
asyncio.run(main())Task Management
任务管理
Creating Tasks
创建任务
Basic Task Creation:
python
import asyncio
async def background_task():
await asyncio.sleep(10)
return 'Done'
async def main():
# Create task - starts running immediately
task = asyncio.create_task(background_task())
# Do other work while task runs
await asyncio.sleep(1)
# Wait for result
result = await task
return result
asyncio.run(main())Named Tasks (Python 3.8+):
python
task = asyncio.create_task(
background_task(),
name='my-background-task'
)
print(task.get_name()) # 'my-background-task'基础任务创建:
python
import asyncio
async def background_task():
await asyncio.sleep(10)
return 'Done'
async def main():
# 创建任务 - 立即开始运行
task = asyncio.create_task(background_task())
# 任务运行时执行其他工作
await asyncio.sleep(1)
# 等待结果
result = await task
return result
asyncio.run(main())命名任务(Python 3.8+):
python
task = asyncio.create_task(
background_task(),
name='my-background-task'
)
print(task.get_name()) # 'my-background-task'Task Cancellation
任务取消
Graceful Cancellation:
python
import asyncio
async def long_running_task():
try:
while True:
await asyncio.sleep(1)
print('Working...')
except asyncio.CancelledError:
print('Task cancelled, cleaning up...')
# Cleanup logic
raise # Re-raise to mark as cancelled
async def main():
task = asyncio.create_task(long_running_task())
# Let it run for 3 seconds
await asyncio.sleep(3)
# Request cancellation
task.cancel()
try:
await task
except asyncio.CancelledError:
print('Task was cancelled')
asyncio.run(main())Cancellation with Context Manager:
python
import asyncio
from contextlib import suppress
async def run_with_timeout():
task = asyncio.create_task(long_running_task())
try:
# Wait with timeout
await asyncio.wait_for(task, timeout=5.0)
except asyncio.TimeoutError:
task.cancel()
with suppress(asyncio.CancelledError):
await task优雅取消:
python
import asyncio
async def long_running_task():
try:
while True:
await asyncio.sleep(1)
print('工作中...')
except asyncio.CancelledError:
print('任务已取消,正在清理...')
# 清理逻辑
raise # 重新抛出以标记任务为已取消
async def main():
task = asyncio.create_task(long_running_task())
# 让任务运行3秒
await asyncio.sleep(3)
# 请求取消任务
task.cancel()
try:
await task
except asyncio.CancelledError:
print('任务已被取消')
asyncio.run(main())使用上下文管理器取消:
python
import asyncio
from contextlib import suppress
async def run_with_timeout():
task = asyncio.create_task(long_running_task())
try:
# 带超时等待
await asyncio.wait_for(task, timeout=5.0)
except asyncio.TimeoutError:
task.cancel()
with suppress(asyncio.CancelledError):
await taskException Handling in Tasks
任务中的异常处理
Gather with Exception Handling:
python
import asyncio
async def failing_task(n):
await asyncio.sleep(n)
raise ValueError(f'Task {n} failed')
async def successful_task(n):
await asyncio.sleep(n)
return f'Task {n} succeeded'
async def main():
# return_exceptions=True: Returns exceptions instead of raising
results = await asyncio.gather(
successful_task(1),
failing_task(2),
successful_task(3),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f'Task {i} failed: {result}')
else:
print(f'Task {i} result: {result}')
asyncio.run(main())Task Exception Retrieval:
python
import asyncio
async def main():
task = asyncio.create_task(failing_task(1))
# Wait for task
await asyncio.sleep(2)
# Check if task failed
if task.done() and task.exception():
print(f'Task failed with: {task.exception()}')
asyncio.run(main())Gather的异常处理:
python
import asyncio
async def failing_task(n):
await asyncio.sleep(n)
raise ValueError(f'Task {n} 失败')
async def successful_task(n):
await asyncio.sleep(n)
return f'Task {n} 成功'
async def main():
# return_exceptions=True:返回异常而非抛出
results = await asyncio.gather(
successful_task(1),
failing_task(2),
successful_task(3),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f'Task {i} 失败: {result}')
else:
print(f'Task {i} 结果: {result}')
asyncio.run(main())任务异常获取:
python
import asyncio
async def main():
task = asyncio.create_task(failing_task(1))
# 等待任务执行
await asyncio.sleep(2)
# 检查任务是否失败
if task.done() and task.exception():
print(f'Task 失败原因: {task.exception()}')
asyncio.run(main())Event Loop Management
Event Loop管理
Event Loop Policies
Event Loop策略
Default Event Loop:
python
import asyncio
async def main():
# Get running loop
loop = asyncio.get_running_loop()
print(f'Loop: {loop}')
asyncio.run(main())Custom Event Loop:
python
import asyncio
async def main():
pass默认Event Loop:
python
import asyncio
async def main():
# 获取当前运行的Event Loop
loop = asyncio.get_running_loop()
print(f'Loop: {loop}')
asyncio.run(main())自定义Event Loop:
python
import asyncio
async def main():
passCreate new event loop
创建新的Event Loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
**Event Loop Best Practices:**
1. **Use `asyncio.run()`** for simple programs (Python 3.7+)
2. **Avoid creating ClientSession outside event loop**
3. **Always close loops when done**
4. **Don't call blocking functions in event loop**loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
**Event Loop最佳实践:**
1. **使用`asyncio.run()`** 编写简单程序(Python 3.7+)
2. **避免在Event Loop外创建ClientSession**
3. **使用完毕后始终关闭Loop**
4. **不要在Event Loop中调用阻塞函数**Running Blocking Code
运行阻塞代码
Using ThreadPoolExecutor:
python
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
# Blocking operation
time.sleep(2)
return 'Done'
async def main():
loop = asyncio.get_running_loop()
# Run blocking code in thread pool
result = await loop.run_in_executor(
None, # Use default executor
blocking_io
)
return result
asyncio.run(main())Custom Executor:
python
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def main():
loop = asyncio.get_running_loop()
# Custom executor with 4 threads
with ThreadPoolExecutor(max_workers=4) as executor:
results = await asyncio.gather(*[
loop.run_in_executor(executor, blocking_io)
for _ in range(10)
])
return results
asyncio.run(main())使用ThreadPoolExecutor:
python
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_io():
# 阻塞操作
time.sleep(2)
return 'Done'
async def main():
loop = asyncio.get_running_loop()
# 在线程池中运行阻塞代码
result = await loop.run_in_executor(
None, # 使用默认执行器
blocking_io
)
return result
asyncio.run(main())自定义执行器:
python
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def main():
loop = asyncio.get_running_loop()
# 自定义4线程执行器
with ThreadPoolExecutor(max_workers=4) as executor:
results = await asyncio.gather(*[
loop.run_in_executor(executor, blocking_io)
for _ in range(10)
])
return results
asyncio.run(main())Loop Callbacks
Loop回调
Schedule Callback:
python
import asyncio
def callback(arg):
print(f'Callback called with {arg}')
async def main():
loop = asyncio.get_running_loop()
# Schedule callback
loop.call_soon(callback, 'immediate')
# Schedule with delay
loop.call_later(2, callback, 'delayed')
# Schedule at specific time
loop.call_at(loop.time() + 3, callback, 'scheduled')
await asyncio.sleep(4)
asyncio.run(main())调度回调:
python
import asyncio
def callback(arg):
print(f'回调函数被调用,参数: {arg}')
async def main():
loop = asyncio.get_running_loop()
# 立即调度回调
loop.call_soon(callback, 'immediate')
# 延迟2秒调度
loop.call_later(2, callback, 'delayed')
# 在指定时间调度
loop.call_at(loop.time() + 3, callback, 'scheduled')
await asyncio.sleep(4)
asyncio.run(main())Async Context Managers
异步上下文管理器
Creating Async Context Managers
创建异步上下文管理器
Class-Based:
python
import asyncio
class AsyncDatabaseConnection:
def __init__(self, host):
self.host = host
self.connection = None
async def __aenter__(self):
print(f'Connecting to {self.host}')
await asyncio.sleep(0.1) # Simulate connection
self.connection = f'Connection to {self.host}'
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f'Closing connection to {self.host}')
await asyncio.sleep(0.1) # Simulate cleanup
self.connection = None
async def query(self, sql):
if not self.connection:
raise RuntimeError('Not connected')
await asyncio.sleep(0.05)
return f'Results for: {sql}'
async def main():
async with AsyncDatabaseConnection('localhost') as db:
result = await db.query('SELECT * FROM users')
print(result)
asyncio.run(main())Decorator-Based:
python
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_resource(name):
# Setup
print(f'Acquiring {name}')
await asyncio.sleep(0.1)
try:
yield name
finally:
# Cleanup
print(f'Releasing {name}')
await asyncio.sleep(0.1)
async def main():
async with async_resource('database') as db:
print(f'Using {db}')
asyncio.run(main())基于类的实现:
python
import asyncio
class AsyncDatabaseConnection:
def __init__(self, host):
self.host = host
self.connection = None
async def __aenter__(self):
print(f'连接到 {self.host}')
await asyncio.sleep(0.1) # 模拟连接过程
self.connection = f'Connection to {self.host}'
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f'关闭到 {self.host} 的连接')
await asyncio.sleep(0.1) # 模拟清理过程
self.connection = None
async def query(self, sql):
if not self.connection:
raise RuntimeError('未连接')
await asyncio.sleep(0.05)
return f'查询结果: {sql}'
async def main():
async with AsyncDatabaseConnection('localhost') as db:
result = await db.query('SELECT * FROM users')
print(result)
asyncio.run(main())基于装饰器的实现:
python
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_resource(name):
# 初始化
print(f'获取 {name}')
await asyncio.sleep(0.1)
try:
yield name
finally:
# 清理
print(f'释放 {name}')
await asyncio.sleep(0.1)
async def main():
async with async_resource('database') as db:
print(f'使用 {db}')
asyncio.run(main())Real-World Example: aiohttp ClientSession
实战示例:aiohttp ClientSession
python
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
# ClientSession as async context manager
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'http://python.org')
print(f'Body: {html[:100]}...')
asyncio.run(main())Why use async context manager for ClientSession?
- Ensures proper cleanup of connections
- Prevents resource leaks
- Manages SSL connections correctly
- Handles graceful shutdown
python
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
# 将ClientSession作为异步上下文管理器
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'http://python.org')
print(f'响应内容: {html[:100]}...')
asyncio.run(main())为何使用异步上下文管理器管理ClientSession?
- 确保连接被正确清理
- 防止资源泄漏
- 正确管理SSL连接
- 处理优雅关闭
Performance Optimization
性能优化
Profiling Async Code
异步代码分析
Basic Timing:
python
import asyncio
import time
async def slow_operation():
await asyncio.sleep(1)
async def main():
start = time.perf_counter()
await slow_operation()
elapsed = time.perf_counter() - start
print(f'Took {elapsed:.2f} seconds')
asyncio.run(main())Profiling Multiple Operations:
python
import asyncio
import time
async def timed_task(name, duration):
start = time.perf_counter()
await asyncio.sleep(duration)
elapsed = time.perf_counter() - start
print(f'{name} took {elapsed:.2f}s')
return name
async def main():
await asyncio.gather(
timed_task('Task 1', 1),
timed_task('Task 2', 2),
timed_task('Task 3', 0.5)
)
asyncio.run(main())基础计时:
python
import asyncio
import time
async def slow_operation():
await asyncio.sleep(1)
async def main():
start = time.perf_counter()
await slow_operation()
elapsed = time.perf_counter() - start
print(f'耗时 {elapsed:.2f} 秒')
asyncio.run(main())多操作分析:
python
import asyncio
import time
async def timed_task(name, duration):
start = time.perf_counter()
await asyncio.sleep(duration)
elapsed = time.perf_counter() - start
print(f'{name} 耗时 {elapsed:.2f}s')
return name
async def main():
await asyncio.gather(
timed_task('Task 1', 1),
timed_task('Task 2', 2),
timed_task('Task 3', 0.5)
)
asyncio.run(main())Optimizing Concurrency
并发优化
Bad - Sequential Execution:
python
async def slow_approach():
results = []
for i in range(10):
result = await fetch_data(i)
results.append(result)
return results糟糕的实现 - 顺序执行:
python
async def slow_approach():
results = []
for i in range(10):
result = await fetch_data(i)
results.append(result)
return resultsTakes 10 * fetch_time
耗时为10 * 单次fetch时间
**Good - Concurrent Execution:**
```python
async def fast_approach():
tasks = [fetch_data(i) for i in range(10)]
results = await asyncio.gather(*tasks)
return results
**良好的实现 - 并发执行:**
```python
async def fast_approach():
tasks = [fetch_data(i) for i in range(10)]
results = await asyncio.gather(*tasks)
return resultsTakes ~fetch_time
耗时约等于单次fetch时间
**Better - Controlled Concurrency:**
```python
async def controlled_approach():
semaphore = asyncio.Semaphore(5) # Max 5 concurrent
async def fetch_with_limit(i):
async with semaphore:
return await fetch_data(i)
tasks = [fetch_with_limit(i) for i in range(10)]
results = await asyncio.gather(*tasks)
return results
**更优的实现 - 受控并发:**
```python
async def controlled_approach():
semaphore = asyncio.Semaphore(5) # 最大5个并发
async def fetch_with_limit(i):
async with semaphore:
return await fetch_data(i)
tasks = [fetch_with_limit(i) for i in range(10)]
results = await asyncio.gather(*tasks)
return resultsTakes ~2 * fetch_time, but respects limits
耗时约为2 * 单次fetch时间,但符合并发限制
undefinedundefinedAvoiding Common Performance Pitfalls
避免常见性能陷阱
1. Don't create sessions per request:
python
undefined1. 不要为每个请求创建新Session:
python
undefinedBAD - Creates new session each time
糟糕的实现 - 每次请求创建新Session
async def bad_fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
async def bad_fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
GOOD - Reuse session
良好的实现 - 复用Session
async def good_fetch():
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
session.get('http://example.com/1'),
session.get('http://example.com/2'),
session.get('http://example.com/3')
)
return results
**2. Don't use blocking operations:**
```python
import asyncio
import requests # Blocking libraryasync def good_fetch():
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
session.get('http://example.com/1'),
session.get('http://example.com/2'),
session.get('http://example.com/3')
)
return results
**2. 不要使用阻塞操作:**
```python
import asyncio
import requests # 阻塞式库BAD - Blocks event loop
糟糕的实现 - 阻塞Event Loop
async def bad_request():
response = requests.get('http://example.com') # BLOCKS!
return response.text
async def bad_request():
response = requests.get('http://example.com') # 阻塞!
return response.text
GOOD - Use async library
良好的实现 - 使用异步库
async def good_request():
async with aiohttp.ClientSession() as session:
async with session.get('http://example.com') as resp:
return await resp.text()
async def good_request():
async with aiohttp.ClientSession() as session:
async with session.get('http://example.com') as resp:
return await resp.text()
ACCEPTABLE - If must use blocking, use executor
可接受的实现 - 必须使用阻塞库时,使用执行器
async def acceptable_request():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None,
lambda: requests.get('http://example.com').text
)
return result
**3. Proper cleanup with zero-sleep:**
```python
async def proper_cleanup():
async with aiohttp.ClientSession() as session:
async with session.get('http://example.org/') as resp:
await resp.read()
# Zero-sleep to allow underlying connections to close
await asyncio.sleep(0)async def acceptable_request():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None,
lambda: requests.get('http://example.com').text
)
return result
**3. 使用零延迟sleep确保正确清理:**
```python
async def proper_cleanup():
async with aiohttp.ClientSession() as session:
async with session.get('http://example.org/') as resp:
await resp.read()
# 零延迟sleep以允许底层连接关闭
await asyncio.sleep(0)Common Pitfalls
常见陷阱
Pitfall 1: Creating ClientSession Outside Event Loop
陷阱1:在Event Loop外创建ClientSession
Problem:
python
import aiohttp问题:
python
import aiohttpBAD - Session created outside event loop
糟糕的实现 - Session在Event Loop外创建
session = aiohttp.ClientSession()
async def fetch(url):
async with session.get(url) as resp:
return await resp.text()
**Why it's bad:**
- Session binds to event loop at creation time
- If loop changes (e.g., uvloop), session becomes invalid
- Can cause program to hang
**Solution:**
```python
import aiohttp
import asyncio
async def main():
# Create session inside async function
async with aiohttp.ClientSession() as session:
async with session.get('http://python.org') as resp:
print(await resp.text())
asyncio.run(main())session = aiohttp.ClientSession()
async def fetch(url):
async with session.get(url) as resp:
return await resp.text()
**为何糟糕:**
- Session在创建时绑定到当前Event Loop
- 如果Loop发生变化(如使用uvloop),Session会失效
- 可能导致程序挂起
**解决方案:**
```python
import aiohttp
import asyncio
async def main():
# 在异步函数内创建Session
async with aiohttp.ClientSession() as session:
async with session.get('http://python.org') as resp:
print(await resp.text())
asyncio.run(main())Pitfall 2: Session as Class Variable
陷阱2:将Session作为类变量
Problem:
python
class API:
session = aiohttp.ClientSession() # BAD - global instance
async def fetch(self, url):
async with self.session.get(url) as resp:
return await resp.text()Solution:
python
class API:
def __init__(self):
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
await self.session.close()
async def fetch(self, url):
async with self.session.get(url) as resp:
return await resp.text()问题:
python
class API:
session = aiohttp.ClientSession() # 糟糕的实现 - 全局实例
async def fetch(self, url):
async with self.session.get(url) as resp:
return await resp.text()解决方案:
python
class API:
def __init__(self):
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
await self.session.close()
async def fetch(self, url):
async with self.session.get(url) as resp:
return await resp.text()Usage
使用方式
async def main():
async with API() as api:
result = await api.fetch('http://example.com')
undefinedasync def main():
async with API() as api:
result = await api.fetch('http://example.com')
undefinedPitfall 3: Forgetting await
陷阱3:忘记使用await
Problem:
python
async def process_data():
# Forgot await - returns coroutine, doesn't execute!
result = fetch_data() # Missing await
return resultSolution:
python
async def process_data():
result = await fetch_data() # Proper await
return result问题:
python
async def process_data():
# 忘记await - 返回协程对象,不会执行!
result = fetch_data() # 缺少await
return result解决方案:
python
async def process_data():
result = await fetch_data() # 正确使用await
return resultPitfall 4: Blocking the Event Loop
陷阱4:阻塞Event Loop
Problem:
python
import asyncio
import time
async def bad_sleep():
time.sleep(5) # BAD - Blocks entire event loop!
async def main():
await asyncio.gather(
bad_sleep(),
another_task() # Blocked for 5 seconds
)Solution:
python
import asyncio
async def good_sleep():
await asyncio.sleep(5) # GOOD - Yields control
async def main():
await asyncio.gather(
good_sleep(),
another_task() # Runs concurrently
)问题:
python
import asyncio
import time
async def bad_sleep():
time.sleep(5) # 糟糕的实现 - 阻塞整个Event Loop!
async def main():
await asyncio.gather(
bad_sleep(),
another_task() # 被阻塞5秒
)解决方案:
python
import asyncio
async def good_sleep():
await asyncio.sleep(5) # 良好的实现 - 让出控制权
async def main():
await asyncio.gather(
good_sleep(),
another_task() # 并发执行
)Pitfall 5: Not Handling Task Cancellation
陷阱5:未处理任务取消
Problem:
python
async def bad_task():
while True:
await asyncio.sleep(1)
process_data()
# No cleanup on cancellation!Solution:
python
async def good_task():
try:
while True:
await asyncio.sleep(1)
process_data()
except asyncio.CancelledError:
# Cleanup resources
cleanup()
raise # Re-raise to mark as cancelled问题:
python
async def bad_task():
while True:
await asyncio.sleep(1)
process_data()
# 取消时无清理逻辑!解决方案:
python
async def good_task():
try:
while True:
await asyncio.sleep(1)
process_data()
except asyncio.CancelledError:
# 清理资源
cleanup()
raise # 重新抛出以标记任务为已取消Pitfall 6: Deadlocks with Locks
陷阱6:锁导致死锁
Problem:
python
import asyncio
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async def task_a():
async with lock1:
await asyncio.sleep(0.1)
async with lock2: # Deadlock potential
pass
async def task_b():
async with lock2:
await asyncio.sleep(0.1)
async with lock1: # Deadlock potential
passSolution:
python
undefined问题:
python
import asyncio
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async def task_a():
async with lock1:
await asyncio.sleep(0.1)
async with lock2: # 可能导致死锁
pass
async def task_b():
async with lock2:
await asyncio.sleep(0.1)
async with lock1: # 可能导致死锁
pass解决方案:
python
undefinedAlways acquire locks in same order
始终按相同顺序获取锁
async def safe_task_a():
async with lock1:
async with lock2:
pass
async def safe_task_b():
async with lock1: # Same order
async with lock2:
pass
undefinedasync def safe_task_a():
async with lock1:
async with lock2:
pass
async def safe_task_b():
async with lock1: # 相同顺序
async with lock2:
pass
undefinedProduction Patterns
生产级模式
Pattern 1: Graceful Shutdown
模式1:优雅关闭
Complete Shutdown Example:
python
import asyncio
import signal
from contextlib import suppress
class Application:
def __init__(self):
self.should_exit = False
self.tasks = []
async def worker(self, name):
try:
while not self.should_exit:
print(f'{name} working...')
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f'{name} cancelled, cleaning up...')
raise
def handle_signal(self, sig):
print(f'Received signal {sig}, shutting down...')
self.should_exit = True
async def run(self):
# Setup signal handlers
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda s=sig: self.handle_signal(s)
)
# Start workers
self.tasks = [
asyncio.create_task(self.worker(f'Worker-{i}'))
for i in range(3)
]
# Wait for shutdown signal
while not self.should_exit:
await asyncio.sleep(0.1)
# Cancel all tasks
for task in self.tasks:
task.cancel()
# Wait for cancellation to complete
await asyncio.gather(*self.tasks, return_exceptions=True)
print('Shutdown complete')完整关闭示例:
python
import asyncio
import signal
from contextlib import suppress
class Application:
def __init__(self):
self.should_exit = False
self.tasks = []
async def worker(self, name):
try:
while not self.should_exit:
print(f'{name} 工作中...')
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f'{name} 已取消,正在清理...')
raise
def handle_signal(self, sig):
print(f'收到信号 {sig},正在关闭...')
self.should_exit = True
async def run(self):
# 设置信号处理器
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig,
lambda s=sig: self.handle_signal(s)
)
# 启动工作协程
self.tasks = [
asyncio.create_task(self.worker(f'Worker-{i}'))
for i in range(3)
]
# 等待关闭信号
while not self.should_exit:
await asyncio.sleep(0.1)
# 取消所有任务
for task in self.tasks:
task.cancel()
# 等待取消完成
await asyncio.gather(*self.tasks, return_exceptions=True)
print('关闭完成')Run application
运行应用
app = Application()
asyncio.run(app.run())
undefinedapp = Application()
asyncio.run(app.run())
undefinedPattern 2: Background Tasks with Application Lifecycle
模式2:与应用生命周期绑定的后台任务
aiohttp Application with Background Tasks:
python
import asyncio
from contextlib import suppress
from aiohttp import web
async def listen_to_redis(app):
"""Background task that listens to Redis"""
# Simulated Redis listening
try:
while True:
# Process messages
await asyncio.sleep(1)
print('Processing Redis message...')
except asyncio.CancelledError:
print('Redis listener stopped')
raise
async def background_tasks(app):
"""Cleanup context for managing background tasks"""
# Startup: Create background task
app['redis_listener'] = asyncio.create_task(listen_to_redis(app))
yield # App is running
# Cleanup: Cancel background task
app['redis_listener'].cancel()
with suppress(asyncio.CancelledError):
await app['redis_listener']带后台任务的aiohttp应用:
python
import asyncio
from contextlib import suppress
from aiohttp import web
async def listen_to_redis(app):
"""监听Redis的后台任务"""
# 模拟Redis监听
try:
while True:
# 处理消息
await asyncio.sleep(1)
print('处理Redis消息...')
except asyncio.CancelledError:
print('Redis监听器已停止')
raise
async def background_tasks(app):
"""管理后台任务的清理上下文"""
# 启动:创建后台任务
app['redis_listener'] = asyncio.create_task(listen_to_redis(app))
yield # 应用运行中
# 清理:取消后台任务
app['redis_listener'].cancel()
with suppress(asyncio.CancelledError):
await app['redis_listener']Setup application
配置应用
app = web.Application()
app.cleanup_ctx.append(background_tasks)
undefinedapp = web.Application()
app.cleanup_ctx.append(background_tasks)
undefinedPattern 3: Retry Logic with Exponential Backoff
模式3:带指数退避的重试逻辑
python
import asyncio
import aiohttp
from typing import Any, Callable
async def retry_with_backoff(
coro_func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
*args,
**kwargs
) -> Any:
"""
Retry async function with exponential backoff
Args:
coro_func: Async function to retry
max_retries: Maximum number of retries
base_delay: Initial delay between retries
max_delay: Maximum delay between retries
"""
for attempt in range(max_retries):
try:
return await coro_func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
# Last attempt failed
raise
# Calculate delay with exponential backoff
delay = min(base_delay * (2 ** attempt), max_delay)
print(f'Attempt {attempt + 1} failed: {e}')
print(f'Retrying in {delay:.1f} seconds...')
await asyncio.sleep(delay)python
import asyncio
import aiohttp
from typing import Any, Callable
async def retry_with_backoff(
coro_func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
*args,
**kwargs
) -> Any:
"""
带指数退避的异步函数重试机制
参数:
coro_func: 要重试的异步函数
max_retries: 最大重试次数
base_delay: 初始重试延迟
max_delay: 最大重试延迟
"""
for attempt in range(max_retries):
try:
return await coro_func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
# 最后一次尝试失败
raise
# 计算指数退避延迟
delay = min(base_delay * (2 ** attempt), max_delay)
print(f'第 {attempt + 1} 次尝试失败: {e}')
print(f'将在 {delay:.1f} 秒后重试...')
await asyncio.sleep(delay)Usage
使用方式
async def unstable_api_call():
async with aiohttp.ClientSession() as session:
async with session.get('http://unstable-api.com') as resp:
return await resp.json()
async def main():
result = await retry_with_backoff(
unstable_api_call,
max_retries=5,
base_delay=1.0
)
return result
undefinedasync def unstable_api_call():
async with aiohttp.ClientSession() as session:
async with session.get('http://unstable-api.com') as resp:
return await resp.json()
async def main():
result = await retry_with_backoff(
unstable_api_call,
max_retries=5,
base_delay=1.0
)
return result
undefinedPattern 4: Circuit Breaker
模式4:熔断器
python
import asyncio
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.state = CircuitState.CLOSED
self.opened_at = None
async def call(self, coro_func, *args, **kwargs):
if self.state == CircuitState.OPEN:
# Check if should try recovery
if datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout):
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise Exception('Circuit breaker is OPEN')
try:
result = await coro_func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.success_count = 0
def _on_failure(self):
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()python
import asyncio
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # 正常运行
OPEN = "open" # 故障中,拒绝请求
HALF_OPEN = "half_open" # 测试服务是否恢复
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.state = CircuitState.CLOSED
self.opened_at = None
async def call(self, coro_func, *args, **kwargs):
if self.state == CircuitState.OPEN:
# 检查是否应尝试恢复
if datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout):
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise Exception('熔断器已打开')
try:
result = await coro_func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.success_count = 0
def _on_failure(self):
self.failure_count += 1
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.opened_at = datetime.now()Usage
使用方式
async def flaky_service():
# Simulated flaky service
import random
await asyncio.sleep(0.1)
if random.random() < 0.5:
raise Exception('Service error')
return 'Success'
async def main():
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=5.0)
for i in range(20):
try:
result = await breaker.call(flaky_service)
print(f'Request {i}: {result} - State: {breaker.state.value}')
except Exception as e:
print(f'Request {i}: Failed - State: {breaker.state.value}')
await asyncio.sleep(0.5)undefinedasync def flaky_service():
# 模拟不稳定服务
import random
await asyncio.sleep(0.1)
if random.random() < 0.5:
raise Exception('服务错误')
return 'Success'
async def main():
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=5.0)
for i in range(20):
try:
result = await breaker.call(flaky_service)
print(f'请求 {i}: {result} - 状态: {breaker.state.value}')
except Exception as e:
print(f'请求 {i}: 失败 - 状态: {breaker.state.value}')
await asyncio.sleep(0.5)undefinedPattern 5: WebSocket with Multiple Event Sources
模式5:多事件源WebSocket
Handling Parallel WebSocket and Background Events:
python
import asyncio
from aiohttp import web
async def read_subscription(ws, redis):
"""Background task reading from Redis and sending to WebSocket"""
# Simulated Redis subscription
channel = await redis.subscribe('channel:1')
try:
# Simulate receiving messages
for i in range(10):
await asyncio.sleep(1)
message = f'Redis message {i}'
await ws.send_str(message)
finally:
await redis.unsubscribe('channel:1')
async def websocket_handler(request):
"""WebSocket handler with parallel event sources"""
ws = web.WebSocketResponse()
await ws.prepare(request)
# Create background task for Redis subscription
redis = request.app['redis']
task = asyncio.create_task(read_subscription(ws, redis))
try:
# Handle incoming WebSocket messages
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
# Process incoming message
await ws.send_str(f'Echo: {msg.data}')
elif msg.type == web.WSMsgType.ERROR:
print(f'WebSocket error: {ws.exception()}')
finally:
# Cleanup: Cancel background task
task.cancel()
return ws处理并行WebSocket与后台事件:
python
import asyncio
from aiohttp import web
async def read_subscription(ws, redis):
"""从Redis读取并发送到WebSocket的后台任务"""
# 模拟Redis订阅
channel = await redis.subscribe('channel:1')
try:
# 模拟接收消息
for i in range(10):
await asyncio.sleep(1)
message = f'Redis消息 {i}'
await ws.send_str(message)
finally:
await redis.unsubscribe('channel:1')
async def websocket_handler(request):
"""带并行事件源的WebSocket处理器"""
ws = web.WebSocketResponse()
await ws.prepare(request)
# 创建Redis订阅后台任务
redis = request.app['redis']
task = asyncio.create_task(read_subscription(ws, redis))
try:
# 处理WebSocket传入消息
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
# 处理传入消息
await ws.send_str(f'回声: {msg.data}')
elif msg.type == web.WSMsgType.ERROR:
print(f'WebSocket错误: {ws.exception()}')
finally:
# 清理:取消后台任务
task.cancel()
return wsBest Practices
最佳实践
Testing Async Code
测试异步代码
Using pytest-asyncio:
python
import pytest
import asyncio
@pytest.mark.asyncio
async def test_async_function():
result = await async_operation()
assert result == 'expected'
@pytest.mark.asyncio
async def test_with_fixture(aiohttp_client):
client = await aiohttp_client(create_app())
resp = await client.get('/')
assert resp.status == 200Manual Event Loop Setup:
python
import asyncio
import unittest
class TestAsyncCode(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def tearDown(self):
self.loop.close()
def test_coroutine(self):
async def test_impl():
result = await async_function()
self.assertEqual(result, 'expected')
self.loop.run_until_complete(test_impl())使用pytest-asyncio:
python
import pytest
import asyncio
@pytest.mark.asyncio
async def test_async_function():
result = await async_operation()
assert result == 'expected'
@pytest.mark.asyncio
async def test_with_fixture(aiohttp_client):
client = await aiohttp_client(create_app())
resp = await client.get('/')
assert resp.status == 200手动Event Loop配置:
python
import asyncio
import unittest
class TestAsyncCode(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def tearDown(self):
self.loop.close()
def test_coroutine(self):
async def test_impl():
result = await async_function()
self.assertEqual(result, 'expected')
self.loop.run_until_complete(test_impl())Debugging Async Code
调试异步代码
Enable Debug Mode:
python
import asyncio
import warnings启用调试模式:
python
import asyncio
import warningsEnable asyncio debug mode
启用asyncio调试模式
asyncio.run(main(), debug=True)
asyncio.run(main(), debug=True)
Or manually
或手动配置
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
**What debug mode detects:**
- Coroutines that were never awaited
- Callbacks taking too long
- Tasks destroyed while pending
**Logging Slow Callbacks:**
```python
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.slow_callback_duration = 0.1 # 100ms threshold
loop.set_debug(True)loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
**调试模式检测内容:**
- 从未被await的协程
- 执行时间过长的回调
- 被销毁时仍处于挂起状态的任务
**记录慢回调:**
```python
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.slow_callback_duration = 0.1 # 100ms阈值
loop.set_debug(True)Documentation
文档编写
Documenting Async Functions:
python
async def fetch_user_data(user_id: int) -> dict:
"""
Fetch user data from the database.
Args:
user_id: The unique identifier of the user
Returns:
Dictionary containing user data
Raises:
UserNotFoundError: If user doesn't exist
DatabaseError: If database connection fails
Example:
>>> async def main():
... user = await fetch_user_data(123)
... print(user['name'])
Note:
This function must be called within an async context.
Connection pooling is handled automatically.
"""
async with get_db_connection() as conn:
return await conn.fetch_one(
'SELECT * FROM users WHERE id = $1',
user_id
)异步函数文档:
python
async def fetch_user_data(user_id: int) -> dict:
"""
从数据库获取用户数据。
参数:
user_id: 用户的唯一标识符
返回:
包含用户数据的字典
抛出:
UserNotFoundError: 用户不存在时抛出
DatabaseError: 数据库连接失败时抛出
示例:
>>> async def main():
... user = await fetch_user_data(123)
... print(user['name'])
注意:
此函数必须在异步上下文中调用。
连接池已自动处理。
"""
async with get_db_connection() as conn:
return await conn.fetch_one(
'SELECT * FROM users WHERE id = $1',
user_id
)Complete Examples
完整示例
Example 1: Parallel HTTP Requests
示例1:并行HTTP请求
python
import asyncio
import aiohttp
import time
async def fetch(session, url):
"""Fetch a single URL"""
async with session.get(url) as response:
return {
'url': url,
'status': response.status,
'length': len(await response.text())
}
async def fetch_all(urls):
"""Fetch multiple URLs concurrently"""
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
'http://python.org',
'http://docs.python.org',
'http://pypi.org',
'http://github.com/python',
'http://www.python.org/dev/peps/'
]
start = time.perf_counter()
results = await fetch_all(urls)
elapsed = time.perf_counter() - start
for result in results:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")
asyncio.run(main())python
import asyncio
import aiohttp
import time
async def fetch(session, url):
"""获取单个URL"""
async with session.get(url) as response:
return {
'url': url,
'status': response.status,
'length': len(await response.text())
}
async def fetch_all(urls):
"""并发获取多个URL"""
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
'http://python.org',
'http://docs.python.org',
'http://pypi.org',
'http://github.com/python',
'http://www.python.org/dev/peps/'
]
start = time.perf_counter()
results = await fetch_all(urls)
elapsed = time.perf_counter() - start
for result in results:
print(f"{result['url']}: {result['status']} ({result['length']} 字节)")
print(f"\n在 {elapsed:.2f} 秒内获取了 {len(urls)} 个URL")
asyncio.run(main())Example 2: Rate-Limited API Client
示例2:限流API客户端
python
import asyncio
import aiohttp
from typing import List, Dict, Any
class RateLimitedClient:
def __init__(self, rate_limit: int = 10):
"""
Args:
rate_limit: Maximum concurrent requests
"""
self.semaphore = asyncio.Semaphore(rate_limit)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
await self.session.close()
# Allow connections to close
await asyncio.sleep(0)
async def fetch(self, url: str) -> Dict[str, Any]:
"""Fetch URL with rate limiting"""
async with self.semaphore:
print(f'Fetching {url}')
async with self.session.get(url) as resp:
return {
'url': url,
'status': resp.status,
'data': await resp.json()
}
async def fetch_all(self, urls: List[str]) -> List[Dict[str, Any]]:
"""Fetch all URLs with rate limiting"""
tasks = [self.fetch(url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
async def main():
urls = [f'https://api.github.com/users/{user}'
for user in ['python', 'django', 'flask', 'requests', 'aiohttp']]
async with RateLimitedClient(rate_limit=2) as client:
results = await client.fetch_all(urls)
for result in results:
if isinstance(result, Exception):
print(f'Error: {result}')
else:
print(f"User: {result['data'].get('login', 'unknown')}")
asyncio.run(main())python
import asyncio
import aiohttp
from typing import List, Dict, Any
class RateLimitedClient:
def __init__(self, rate_limit: int = 10):
"""
参数:
rate_limit: 最大并发请求数
"""
self.semaphore = asyncio.Semaphore(rate_limit)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
await self.session.close()
# 等待连接关闭
await asyncio.sleep(0)
async def fetch(self, url: str) -> Dict[str, Any]:
"""带限流的URL获取"""
async with self.semaphore:
print(f'正在获取 {url}')
async with self.session.get(url) as resp:
return {
'url': url,
'status': resp.status,
'data': await resp.json()
}
async def fetch_all(self, urls: List[str]) -> List[Dict[str, Any]]:
"""带限流的批量URL获取"""
tasks = [self.fetch(url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
async def main():
urls = [f'https://api.github.com/users/{user}'
for user in ['python', 'django', 'flask', 'requests', 'aiohttp']]
async with RateLimitedClient(rate_limit=2) as client:
results = await client.fetch_all(urls)
for result in results:
if isinstance(result, Exception):
print(f'错误: {result}')
else:
print(f"用户: {result['data'].get('login', 'unknown')}")
asyncio.run(main())Example 3: Database Connection Pool
示例3:数据库连接池
python
import asyncio
from typing import List, Any
class AsyncConnectionPool:
def __init__(self, size: int = 10):
self.pool = asyncio.Queue(maxsize=size)
self.size = size
async def init(self):
"""Initialize connection pool"""
for i in range(self.size):
conn = await self._create_connection(i)
await self.pool.put(conn)
async def _create_connection(self, conn_id: int):
"""Create a database connection (simulated)"""
await asyncio.sleep(0.1) # Simulate connection time
return {'id': conn_id, 'connected': True}
async def acquire(self):
"""Acquire connection from pool"""
return await self.pool.get()
async def release(self, conn):
"""Release connection back to pool"""
await self.pool.put(conn)
async def execute(self, query: str) -> Any:
"""Execute query using pooled connection"""
conn = await self.acquire()
try:
# Simulate query execution
await asyncio.sleep(0.05)
return f"Query '{query}' executed on connection {conn['id']}"
finally:
await self.release(conn)
async def close(self):
"""Close all connections"""
while not self.pool.empty():
conn = await self.pool.get()
# Close connection (simulated)
conn['connected'] = False
async def worker(pool: AsyncConnectionPool, worker_id: int):
"""Worker that executes queries"""
for i in range(5):
result = await pool.execute(f'SELECT * FROM table WHERE id={i}')
print(f'Worker {worker_id}: {result}')
async def main():
# Create and initialize pool
pool = AsyncConnectionPool(size=5)
await pool.init()
# Run multiple workers concurrently
await asyncio.gather(*[
worker(pool, i) for i in range(10)
])
# Cleanup
await pool.close()
asyncio.run(main())python
import asyncio
from typing import List, Any
class AsyncConnectionPool:
def __init__(self, size: int = 10):
self.pool = asyncio.Queue(maxsize=size)
self.size = size
async def init(self):
"""初始化连接池"""
for i in range(self.size):
conn = await self._create_connection(i)
await self.pool.put(conn)
async def _create_connection(self, conn_id: int):
"""创建数据库连接(模拟)"""
await asyncio.sleep(0.1) # 模拟连接耗时
return {'id': conn_id, 'connected': True}
async def acquire(self):
"""从池获取连接"""
return await self.pool.get()
async def release(self, conn):
"""将连接放回池"""
await self.pool.put(conn)
async def execute(self, query: str) -> Any:
"""使用池连接执行查询"""
conn = await self.acquire()
try:
# 模拟查询执行
await asyncio.sleep(0.05)
return f"查询 '{query}' 在连接 {conn['id']} 上执行"
finally:
await self.release(conn)
async def close(self):
"""关闭所有连接"""
while not self.pool.empty():
conn = await self.pool.get()
# 关闭连接(模拟)
conn['connected'] = False
async def worker(pool: AsyncConnectionPool, worker_id: int):
"""执行查询的工作协程"""
for i in range(5):
result = await pool.execute(f'SELECT * FROM table WHERE id={i}')
print(f'工作协程 {worker_id}: {result}')
async def main():
# 创建并初始化连接池
pool = AsyncConnectionPool(size=5)
await pool.init()
# 并发运行多个工作协程
await asyncio.gather(*[
worker(pool, i) for i in range(10)
])
# 清理
await pool.close()
asyncio.run(main())Example 4: Real-Time Data Processor
示例4:实时数据处理器
python
import asyncio
import random
from datetime import datetime
class DataProcessor:
def __init__(self):
self.queue = asyncio.Queue()
self.processed = 0
self.errors = 0
async def producer(self, producer_id: int):
"""Produce data items"""
for i in range(10):
await asyncio.sleep(random.uniform(0.1, 0.5))
item = {
'producer_id': producer_id,
'item_id': i,
'timestamp': datetime.now(),
'data': random.randint(1, 100)
}
await self.queue.put(item)
print(f'Producer {producer_id} generated item {i}')
# Signal completion
await self.queue.put(None)
async def consumer(self, consumer_id: int):
"""Consume and process data items"""
while True:
item = await self.queue.get()
if item is None:
# Propagate sentinel
await self.queue.put(None)
break
try:
# Simulate processing
await asyncio.sleep(random.uniform(0.05, 0.2))
# Process item
result = item['data'] * 2
print(f"Consumer {consumer_id} processed: {item['item_id']} -> {result}")
self.processed += 1
except Exception as e:
print(f'Consumer {consumer_id} error: {e}')
self.errors += 1
finally:
self.queue.task_done()
async def monitor(self):
"""Monitor processing statistics"""
while True:
await asyncio.sleep(2)
print(f'\n=== Stats: Processed={self.processed}, Errors={self.errors}, Queue={self.queue.qsize()} ===\n')
async def run(self, num_producers: int = 3, num_consumers: int = 5):
"""Run the data processor"""
# Start monitor
monitor_task = asyncio.create_task(self.monitor())
# Start producers and consumers
await asyncio.gather(
*[self.producer(i) for i in range(num_producers)],
*[self.consumer(i) for i in range(num_consumers)]
)
# Cancel monitor
monitor_task.cancel()
print(f'\nFinal Stats: Processed={self.processed}, Errors={self.errors}')
async def main():
processor = DataProcessor()
await processor.run(num_producers=3, num_consumers=5)
asyncio.run(main())python
import asyncio
import random
from datetime import datetime
class DataProcessor:
def __init__(self):
self.queue = asyncio.Queue()
self.processed = 0
self.errors = 0
async def producer(self, producer_id: int):
"""生产数据项"""
for i in range(10):
await asyncio.sleep(random.uniform(0.1, 0.5))
item = {
'producer_id': producer_id,
'item_id': i,
'timestamp': datetime.now(),
'data': random.randint(1, 100)
}
await self.queue.put(item)
print(f'生产者 {producer_id} 生成项 {i}')
# 发送完成信号
await self.queue.put(None)
async def consumer(self, consumer_id: int):
"""消费并处理数据项"""
while True:
item = await self.queue.get()
if item is None:
# 传递结束信号
await self.queue.put(None)
break
try:
# 模拟处理
await asyncio.sleep(random.uniform(0.05, 0.2))
# 处理数据项
result = item['data'] * 2
print(f"消费者 {consumer_id} 已处理: {item['item_id']} -> {result}")
self.processed += 1
except Exception as e:
print(f'消费者 {consumer_id} 错误: {e}')
self.errors += 1
finally:
self.queue.task_done()
async def monitor(self):
"""监控处理统计数据"""
while True:
await asyncio.sleep(2)
print(f'\n=== 统计: 已处理={self.processed}, 错误={self.errors}, 队列长度={self.queue.qsize()} ===\n')
async def run(self, num_producers: int = 3, num_consumers: int = 5):
"""运行数据处理器"""
# 启动监控
monitor_task = asyncio.create_task(self.monitor())
# 启动生产者和消费者
await asyncio.gather(
*[self.producer(i) for i in range(num_producers)],
*[self.consumer(i) for i in range(num_consumers)]
)
# 取消监控
monitor_task.cancel()
print(f'\n最终统计: 已处理={self.processed}, 错误={self.errors}')
async def main():
processor = DataProcessor()
await processor.run(num_producers=3, num_consumers=5)
asyncio.run(main())Example 5: Async File I/O with aiofiles
示例5:使用aiofiles的异步文件I/O
python
import asyncio
import aiofiles
from pathlib import Path
async def write_file(path: str, content: str):
"""Write content to file asynchronously"""
async with aiofiles.open(path, 'w') as f:
await f.write(content)
async def read_file(path: str) -> str:
"""Read file content asynchronously"""
async with aiofiles.open(path, 'r') as f:
return await f.read()
async def process_files(file_paths: list):
"""Process multiple files concurrently"""
tasks = [read_file(path) for path in file_paths]
contents = await asyncio.gather(*tasks)
# Process contents
results = []
for path, content in zip(file_paths, contents):
result = {
'path': path,
'lines': len(content.split('\n')),
'words': len(content.split()),
'chars': len(content)
}
results.append(result)
return results
async def main():
# Create test files
test_files = ['test1.txt', 'test2.txt', 'test3.txt']
# Write files concurrently
await asyncio.gather(*[
write_file(f, f'Content of file {f}\n' * 10)
for f in test_files
])
# Process files
results = await process_files(test_files)
for result in results:
print(f"{result['path']}: {result['lines']} lines, "
f"{result['words']} words, {result['chars']} chars")
# Cleanup
for f in test_files:
Path(f).unlink(missing_ok=True)python
import asyncio
import aiofiles
from pathlib import Path
async def write_file(path: str, content: str):
"""异步写入文件"""
async with aiofiles.open(path, 'w') as f:
await f.write(content)
async def read_file(path: str) -> str:
"""异步读取文件"""
async with aiofiles.open(path, 'r') as f:
return await f.read()
async def process_files(file_paths: list):
"""并发处理多个文件"""
tasks = [read_file(path) for path in file_paths]
contents = await asyncio.gather(*tasks)
# 处理内容
results = []
for path, content in zip(file_paths, contents):
result = {
'path': path,
'lines': len(content.split('\n')),
'words': len(content.split()),
'chars': len(content)
}
results.append(result)
return results
async def main():
# 创建测试文件
test_files = ['test1.txt', 'test2.txt', 'test3.txt']
# 并发写入文件
await asyncio.gather(*[
write_file(f, f'文件 {f} 的内容\n' * 10)
for f in test_files
])
# 处理文件
results = await process_files(test_files)
for result in results:
print(f"{result['path']}: {result['lines']} 行, "
f"{result['words']} 词, {result['chars']} 字符")
# 清理
for f in test_files:
Path(f).unlink(missing_ok=True)asyncio.run(main()) # Uncomment to run (requires aiofiles)
asyncio.run(main()) # 取消注释以运行(需要安装aiofiles)
undefinedundefinedResources
资源
- Python asyncio Documentation: https://docs.python.org/3/library/asyncio.html
- aiohttp Documentation: https://docs.aiohttp.org/
- Real Python asyncio Guide: https://realpython.com/async-io-python/
- PEP 492 - Coroutines with async and await syntax: https://www.python.org/dev/peps/pep-0492/
- asyncio Cheat Sheet: https://www.pythonsheets.com/notes/python-asyncio.html
- Effective Python: Item 60 - Consider asyncio: https://effectivepython.com/
Skill Version: 1.0.0
Last Updated: October 2025
Skill Category: Concurrency, Performance, Async Programming
Compatible With: Python 3.7+, aiohttp, asyncio, uvloop
- Python asyncio官方文档: https://docs.python.org/3/library/asyncio.html
- aiohttp官方文档: https://docs.aiohttp.org/
- Real Python asyncio指南: https://realpython.com/async-io-python/
- PEP 492 - 带async和await语法的协: https://www.python.org/dev/peps/pep-0492/
- asyncio速查表: https://www.pythonsheets.com/notes/python-asyncio.html
- Effective Python: Item 60 - 考虑使用asyncio: https://effectivepython.com/
指南版本: 1.0.0
最后更新: 2025年10月
指南分类: 并发、性能、异步编程
兼容版本: Python 3.7+, aiohttp, asyncio, uvloop