async-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePython Async/Await Patterns
Python Async/Await 模式
Overview
概述
Use asynchronous programming for I/O-bound work: network requests, database queries, file operations, and inter-process communication. These operations spend most of their time waiting, and allows other work to proceed during that wait.
asyncioUse synchronous (or multiprocessing-based) code for CPU-bound work: image processing, cryptographic hashing, data compression, and heavy computation. The Python GIL prevents true parallel execution of Python bytecode in threads, so CPU-bound work does not benefit from .
asyncioA useful heuristic: if the bottleneck is waiting, go async. If the bottleneck is computing, use or a task queue.
multiprocessing使用异步编程处理I/O密集型工作:网络请求、数据库查询、文件操作和进程间通信。这些操作的大部分时间都在等待,而允许在等待期间处理其他工作。
asyncio使用同步(或基于多进程的)代码处理CPU密集型工作:图像处理、加密哈希、数据压缩和大量计算。Python全局解释器锁(GIL)阻止了Python字节码在线程中的真正并行执行,因此CPU密集型工作无法从中获益。
asyncio一个实用的判断准则:如果瓶颈在于等待,就使用异步;如果瓶颈在于计算,则使用或任务队列。
multiprocessingCore Concepts
核心概念
Three building blocks underpin all async Python code:
- Coroutine -- a function declared with . Calling it returns a coroutine object that must be awaited or scheduled.
async def - Event loop -- the scheduler that runs coroutines, handles I/O callbacks, and manages timers. creates and manages the loop.
asyncio.run() - Awaitable -- anything that can appear after : coroutines,
awaitobjects, andTaskobjects.Future
python
import asyncio
async def fetch_data() -> str:
await asyncio.sleep(1) # simulate I/O
return "result"
async def main() -> None:
data = await fetch_data()
print(data)
asyncio.run(main())asyncio.run()asyncio.run(main())所有Python异步代码都基于三个核心构建块:
- 协程(Coroutine) -- 使用声明的函数。调用它会返回一个协程对象,必须通过
async def或调度执行。await - 事件循环(Event loop) -- 用于运行协程、处理I/O回调和管理计时器的调度器。会创建并管理事件循环。
asyncio.run() - 可等待对象(Awaitable) -- 可以跟在后的对象:协程、
await对象和Task对象。Future
python
import asyncio
async def fetch_data() -> str:
await asyncio.sleep(1) # 模拟I/O操作
return "result"
async def main() -> None:
data = await fetch_data()
print(data)
asyncio.run(main())asyncio.run()asyncio.run(main())Task Creation
任务创建
asyncio.create_task()
asyncio.create_task()
Wrap a coroutine in a to schedule it concurrently on the running event loop:
Taskpython
async def main() -> None:
task_a = asyncio.create_task(fetch_data("a"))
task_b = asyncio.create_task(fetch_data("b"))
result_a = await task_a
result_b = await task_bAlways keep a reference to created tasks. A task without a live reference can be garbage-collected before completion.
将协程包装为对象,以便在运行中的事件循环上并发调度:
Taskpython
async def main() -> None:
task_a = asyncio.create_task(fetch_data("a"))
task_b = asyncio.create_task(fetch_data("b"))
result_a = await task_a
result_b = await task_b务必保留创建的任务引用。没有活跃引用的任务可能在完成前被垃圾回收。
TaskGroup (Python 3.11+)
TaskGroup(Python 3.11+)
TaskGroupasync withpython
async def main() -> None:
async with asyncio.TaskGroup() as tg:
task_a = tg.create_task(fetch_data("a"))
task_b = tg.create_task(fetch_data("b"))
# Both tasks are done here
print(task_a.result(), task_b.result())If any task raises, the remaining tasks are cancelled and the exception propagates as an .
ExceptionGroupTaskGroupasync withpython
async def main() -> None:
async with asyncio.TaskGroup() as tg:
task_a = tg.create_task(fetch_data("a"))
task_b = tg.create_task(fetch_data("b"))
# 此处两个任务均已完成
print(task_a.result(), task_b.result())如果任何任务抛出异常,其余任务会被取消,异常会以的形式传播。
ExceptionGroupGathering Results
结果收集
asyncio.gather()
asyncio.gather()
Run multiple awaitables concurrently and collect results in order:
python
results = await asyncio.gather(
fetch_data("a"),
fetch_data("b"),
fetch_data("c"),
return_exceptions=True, # exceptions returned as values instead of raised
)With (the default), the first exception cancels the gather. With , exceptions appear in the results list alongside successful values.
return_exceptions=Falsereturn_exceptions=True并发运行多个可等待对象,并按顺序收集结果:
python
results = await asyncio.gather(
fetch_data("a"),
fetch_data("b"),
fetch_data("c"),
return_exceptions=True, # 异常作为值返回而非抛出
)当(默认值)时,第一个异常会取消整个gather操作。当时,异常会与成功结果一同出现在结果列表中。
return_exceptions=Falsereturn_exceptions=Truegather() vs TaskGroup
gather() 与 TaskGroup 对比
| Feature | | |
|---|---|---|
| Python version | 3.4+ | 3.11+ |
| Error handling | first exception or return_exceptions | ExceptionGroup, cancels siblings |
| Structured concurrency | No | Yes |
| Dynamic task creation | No (fixed at call time) | Yes (create_task inside block) |
Prefer in new code targeting Python 3.11+. Use when supporting older runtimes or when behavior is needed.
TaskGroupgather()return_exceptions=True| 特性 | | |
|---|---|---|
| Python版本 | 3.4+ | 3.11+ |
| 错误处理 | 抛出首个异常或返回异常 | 抛出ExceptionGroup,取消同级任务 |
| 结构化并发 | 不支持 | 支持 |
| 动态任务创建 | 不支持(调用时固定任务列表) | 支持(可在代码块内创建任务) |
在面向Python 3.11+的新代码中优先使用。当需要支持旧版本运行时或需要的行为时,使用。
TaskGroupreturn_exceptions=Truegather()Running Blocking Code
运行阻塞代码
Never call blocking functions directly inside a coroutine -- this stalls the entire event loop.
绝不要在协程中直接调用阻塞函数 -- 这会导致整个事件循环停滞。
asyncio.to_thread()
asyncio.to_thread()
Offload a synchronous function to a separate thread (Python 3.9+):
python
import time
def cpu_light_blocking() -> str:
time.sleep(2) # blocking I/O
return "done"
async def main() -> None:
result = await asyncio.to_thread(cpu_light_blocking)将同步函数卸载到单独线程中执行(Python 3.9+):
python
import time
def cpu_light_blocking() -> str:
time.sleep(2) # 阻塞I/O操作
return "done"
async def main() -> None:
result = await asyncio.to_thread(cpu_light_blocking)loop.run_in_executor()
loop.run_in_executor()
For finer control or process-pool execution:
python
from concurrent.futures import ProcessPoolExecutor
async def main() -> None:
loop = asyncio.get_running_loop()
# Thread pool (default)
result = await loop.run_in_executor(None, blocking_io_func)
# Process pool for CPU-bound work
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy_func, arg1)用于更精细的控制或进程池执行:
python
from concurrent.futures import ProcessPoolExecutor
async def main() -> None:
loop = asyncio.get_running_loop()
# 线程池(默认)
result = await loop.run_in_executor(None, blocking_io_func)
# 用于CPU密集型工作的进程池
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy_func, arg1)Async Context Managers
异步上下文管理器
Class-based
基于类的实现
python
class AsyncDBConnection:
async def __aenter__(self):
self.conn = await connect_to_db()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.conn.close()
return False # do not suppress exceptionspython
class AsyncDBConnection:
async def __aenter__(self):
self.conn = await connect_to_db()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.conn.close()
return False # 不抑制异常Decorator-based
基于装饰器的实现
python
from contextlib import asynccontextmanager
@asynccontextmanager
async def db_connection(url: str):
conn = await connect_to_db(url)
try:
yield conn
finally:
await conn.close()
async def main() -> None:
async with db_connection("postgres://...") as conn:
await conn.execute("SELECT 1")python
from contextlib import asynccontextmanager
@asynccontextmanager
async def db_connection(url: str):
conn = await connect_to_db(url)
try:
yield conn
finally:
await conn.close()
async def main() -> None:
async with db_connection("postgres://...") as conn:
await conn.execute("SELECT 1")Async Iterators and Generators
异步迭代器与生成器
Async generators
异步生成器
python
async def stream_rows(query: str):
async with db_connection() as conn:
cursor = await conn.execute(query)
async for row in cursor:
yield row
async def main() -> None:
async for row in stream_rows("SELECT * FROM users"):
process(row)python
async def stream_rows(query: str):
async with db_connection() as conn:
cursor = await conn.execute(query)
async for row in cursor:
yield row
async def main() -> None:
async for row in stream_rows("SELECT * FROM users"):
process(row)Class-based async iterator
基于类的异步迭代器
python
class Countdown:
def __init__(self, start: int):
self.current = start
def __aiter__(self):
return self
async def __anext__(self) -> int:
if self.current <= 0:
raise StopAsyncIteration
self.current -= 1
await asyncio.sleep(0.1)
return self.current + 1python
class Countdown:
def __init__(self, start: int):
self.current = start
def __aiter__(self):
return self
async def __anext__(self) -> int:
if self.current <= 0:
raise StopAsyncIteration
self.current -= 1
await asyncio.sleep(0.1)
return self.current + 1Concurrency Primitives
并发原语
Semaphore -- limit concurrent operations
信号量(Semaphore)-- 限制并发操作数
python
sem = asyncio.Semaphore(10)
async def rate_limited_fetch(url: str) -> bytes:
async with sem:
return await http_get(url)python
sem = asyncio.Semaphore(10)
async def rate_limited_fetch(url: str) -> bytes:
async with sem:
return await http_get(url)Lock -- mutual exclusion
锁(Lock)-- 互斥访问
python
lock = asyncio.Lock()
async def update_shared_state():
async with lock:
# only one coroutine at a time
state["counter"] += 1python
lock = asyncio.Lock()
async def update_shared_state():
async with lock:
# 同一时间仅一个协程可执行此段代码
state["counter"] += 1Event -- notify waiting coroutines
事件(Event)-- 通知等待的协程
python
event = asyncio.Event()
async def waiter():
await event.wait()
print("event fired")
async def trigger():
event.set()python
event = asyncio.Event()
async def waiter():
await event.wait()
print("事件已触发")
async def trigger():
event.set()Condition -- wait for a predicate
条件变量(Condition)-- 等待特定条件
python
condition = asyncio.Condition()
async def consumer():
async with condition:
await condition.wait_for(lambda: len(queue) > 0)
item = queue.pop()python
condition = asyncio.Condition()
async def consumer():
async with condition:
await condition.wait_for(lambda: len(queue) > 0)
item = queue.pop()Timeouts
超时处理
asyncio.timeout() (Python 3.11+)
asyncio.timeout()(Python 3.11+)
python
async def main() -> None:
async with asyncio.timeout(5.0):
data = await slow_operation()python
async def main() -> None:
async with asyncio.timeout(5.0):
data = await slow_operation()asyncio.wait_for()
asyncio.wait_for()
python
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
print("operation timed out")asyncio.timeout()python
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
print("操作超时")在Python 3.11+中优先使用,因为它与结构化并发集成,并且是一个可以包装多个操作的异步上下文管理器。
asyncio.timeout()awaitError Handling
错误处理
ExceptionGroup (Python 3.11+)
ExceptionGroup(Python 3.11+)
TaskGroupExceptionGrouppython
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(may_fail_a())
tg.create_task(may_fail_b())
except* ValueError as eg:
for exc in eg.exceptions:
log_error(exc)
except* TypeError as eg:
for exc in eg.exceptions:
log_error(exc)Use to handle subgroups of exceptions selectively.
except*当子任务失败时,会抛出:
TaskGroupExceptionGrouppython
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(may_fail_a())
tg.create_task(may_fail_b())
except* ValueError as eg:
for exc in eg.exceptions:
log_error(exc)
except* TypeError as eg:
for exc in eg.exceptions:
log_error(exc)使用选择性处理异常子组。
except*Task cancellation
任务取消
python
task = asyncio.create_task(long_running())
task.cancel()
try:
await task
except asyncio.CancelledError:
print("task was cancelled")Inside a coroutine, catch only to perform cleanup, then re-raise or let it propagate. Swallowing silently breaks cancellation semantics.
CancelledErrorCancelledErrorpython
task = asyncio.create_task(long_running())
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务已取消")在协程内部,仅应在执行清理操作时捕获,然后重新抛出或让其传播。静默吞掉会破坏取消语义。
CancelledErrorCancelledErrorCommon Pitfalls
常见陷阱
-
Blocking the event loop -- calling, synchronous HTTP libraries, or CPU-heavy code directly in a coroutine. Use
time.sleep()orasyncio.to_thread().run_in_executor() -
Forgotten awaits -- calling anfunction without
async defproduces a coroutine object that never executes. Enable Python'sawaitor-W defaultdebug mode to catch these.asyncio -
Fire-and-forget tasks without references --without storing the return value risks silent garbage collection. Always assign to a variable or add to a set:
asyncio.create_task(coro())pythonbackground_tasks: set[asyncio.Task] = set() def schedule(coro): task = asyncio.create_task(coro) background_tasks.add(task) task.add_done_callback(background_tasks.discard) -
Mixing sync and async incorrectly -- callingfrom within a running loop raises
asyncio.run(). UseRuntimeErrororawaitfrom async code; usecreate_task()only at the top level.asyncio.run() -
Assuming thread safety -- asyncio coroutines run on a single thread, so shared state between coroutines does not need locks unless usingor
to_thread(), which introduce true threads.run_in_executor()
-
阻塞事件循环 -- 在协程中直接调用、同步HTTP库或CPU密集型代码。应使用
time.sleep()或asyncio.to_thread()。run_in_executor() -
遗漏await关键字 -- 调用函数时未加
async def会生成一个永远不会执行的协程对象。启用Python的await选项或-W default调试模式来捕获此类问题。asyncio -
无引用的“即发即弃”任务 --如果不保存返回值,可能会被静默垃圾回收。始终将其赋值给变量或添加到集合中:
asyncio.create_task(coro())pythonbackground_tasks: set[asyncio.Task] = set() def schedule(coro): task = asyncio.create_task(coro) background_tasks.add(task) task.add_done_callback(background_tasks.discard) -
错误混合同步与异步代码 -- 在运行中的事件循环内调用会抛出
asyncio.run()。在异步代码中使用RuntimeError或await;仅在顶层使用create_task()。asyncio.run() -
假设线程安全 -- asyncio协程运行在单个线程中,因此协程之间的共享状态不需要锁除非使用或
to_thread(),这会引入真正的线程。run_in_executor()
Testing Async Code
异步代码测试
Use to test coroutines with pytest:
pytest-asynciopython
import pytest
@pytest.mark.asyncio
async def test_fetch_data():
result = await fetch_data()
assert result == "expected"使用配合pytest测试协程:
pytest-asynciopython
import pytest
@pytest.mark.asyncio
async def test_fetch_data():
result = await fetch_data()
assert result == "expected"Async fixtures
异步夹具(Fixture)
python
@pytest.fixture
async def db_session():
session = await create_session()
yield session
await session.close()
@pytest.mark.asyncio
async def test_query(db_session):
rows = await db_session.execute("SELECT 1")
assert len(rows) == 1Configure mode in :
pytest-asynciopyproject.tomltoml
[tool.pytest.ini_options]
asyncio_mode = "auto" # all async tests detected automaticallyWith , the decorator is optional -- any function is treated as an async test.
asyncio_mode = "auto"@pytest.mark.asyncioasync def test_*python
@pytest.fixture
async def db_session():
session = await create_session()
yield session
await session.close()
@pytest.mark.asyncio
async def test_query(db_session):
rows = await db_session.execute("SELECT 1")
assert len(rows) == 1在中配置模式:
pyproject.tomlpytest-asynciotoml
[tool.pytest.ini_options]
asyncio_mode = "auto" # 自动检测所有异步测试当时,装饰器是可选的 -- 所有函数都会被视为异步测试。
asyncio_mode = "auto"@pytest.mark.asyncioasync def test_*Cross-References
交叉引用
- For FastAPI async endpoints, consult the skill.
fastapi - For advanced concurrency patterns including producer-consumer queues, fan-out/fan-in, structured concurrency, and async HTTP client patterns, see references/concurrency.md.
- 关于FastAPI异步端点,请参考技能文档。
fastapi - 关于高级并发模式(包括生产者-消费者队列、扇出/扇入、结构化并发和异步HTTP客户端模式),请查看references/concurrency.md。