async-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Python Async/Await Patterns

Python Async/Await 模式

Overview

概述

Use asynchronous programming for I/O-bound work: network requests, database queries, file operations, and inter-process communication. These operations spend most of their time waiting, and
asyncio
allows other work to proceed during that wait.
Use synchronous (or multiprocessing-based) code for CPU-bound work: image processing, cryptographic hashing, data compression, and heavy computation. The Python GIL prevents true parallel execution of Python bytecode in threads, so CPU-bound work does not benefit from
asyncio
.
A useful heuristic: if the bottleneck is waiting, go async. If the bottleneck is computing, use
multiprocessing
or a task queue.
使用异步编程处理I/O密集型工作:网络请求、数据库查询、文件操作和进程间通信。这些操作的大部分时间都在等待,而
asyncio
允许在等待期间处理其他工作。
使用同步(或基于多进程的)代码处理CPU密集型工作:图像处理、加密哈希、数据压缩和大量计算。Python全局解释器锁(GIL)阻止了Python字节码在线程中的真正并行执行,因此CPU密集型工作无法从
asyncio
中获益。
一个实用的判断准则:如果瓶颈在于等待,就使用异步;如果瓶颈在于计算,则使用
multiprocessing
或任务队列。

Core Concepts

核心概念

Three building blocks underpin all async Python code:
  • Coroutine -- a function declared with
    async def
    . Calling it returns a coroutine object that must be awaited or scheduled.
  • Event loop -- the scheduler that runs coroutines, handles I/O callbacks, and manages timers.
    asyncio.run()
    creates and manages the loop.
  • Awaitable -- anything that can appear after
    await
    : coroutines,
    Task
    objects, and
    Future
    objects.
python
import asyncio

async def fetch_data() -> str:
    await asyncio.sleep(1)  # simulate I/O
    return "result"

async def main() -> None:
    data = await fetch_data()
    print(data)

asyncio.run(main())
asyncio.run()
is the standard entry point. Avoid calling it more than once in a program; instead, structure the application so a single
asyncio.run(main())
drives everything.
所有Python异步代码都基于三个核心构建块:
  • 协程(Coroutine) -- 使用
    async def
    声明的函数。调用它会返回一个协程对象,必须通过
    await
    或调度执行。
  • 事件循环(Event loop) -- 用于运行协程、处理I/O回调和管理计时器的调度器。
    asyncio.run()
    会创建并管理事件循环。
  • 可等待对象(Awaitable) -- 可以跟在
    await
    后的对象:协程、
    Task
    对象和
    Future
    对象。
python
import asyncio

async def fetch_data() -> str:
    await asyncio.sleep(1)  # 模拟I/O操作
    return "result"

async def main() -> None:
    data = await fetch_data()
    print(data)

asyncio.run(main())
asyncio.run()
是标准的入口点。避免在程序中多次调用它;应将应用程序结构设计为通过单个
asyncio.run(main())
驱动所有逻辑。

Task Creation

任务创建

asyncio.create_task()

asyncio.create_task()

Wrap a coroutine in a
Task
to schedule it concurrently on the running event loop:
python
async def main() -> None:
    task_a = asyncio.create_task(fetch_data("a"))
    task_b = asyncio.create_task(fetch_data("b"))
    result_a = await task_a
    result_b = await task_b
Always keep a reference to created tasks. A task without a live reference can be garbage-collected before completion.
将协程包装为
Task
对象,以便在运行中的事件循环上并发调度:
python
async def main() -> None:
    task_a = asyncio.create_task(fetch_data("a"))
    task_b = asyncio.create_task(fetch_data("b"))
    result_a = await task_a
    result_b = await task_b
务必保留创建的任务引用。没有活跃引用的任务可能在完成前被垃圾回收。

TaskGroup (Python 3.11+)

TaskGroup(Python 3.11+)

TaskGroup
provides structured concurrency -- all tasks are guaranteed to finish (or be cancelled) before the
async with
block exits:
python
async def main() -> None:
    async with asyncio.TaskGroup() as tg:
        task_a = tg.create_task(fetch_data("a"))
        task_b = tg.create_task(fetch_data("b"))
    # Both tasks are done here
    print(task_a.result(), task_b.result())
If any task raises, the remaining tasks are cancelled and the exception propagates as an
ExceptionGroup
.
TaskGroup
提供结构化并发机制 -- 所有任务都保证在
async with
块退出前完成(或被取消):
python
async def main() -> None:
    async with asyncio.TaskGroup() as tg:
        task_a = tg.create_task(fetch_data("a"))
        task_b = tg.create_task(fetch_data("b"))
    # 此处两个任务均已完成
    print(task_a.result(), task_b.result())
如果任何任务抛出异常,其余任务会被取消,异常会以
ExceptionGroup
的形式传播。

Gathering Results

结果收集

asyncio.gather()

asyncio.gather()

Run multiple awaitables concurrently and collect results in order:
python
results = await asyncio.gather(
    fetch_data("a"),
    fetch_data("b"),
    fetch_data("c"),
    return_exceptions=True,  # exceptions returned as values instead of raised
)
With
return_exceptions=False
(the default), the first exception cancels the gather. With
return_exceptions=True
, exceptions appear in the results list alongside successful values.
并发运行多个可等待对象,并按顺序收集结果:
python
results = await asyncio.gather(
    fetch_data("a"),
    fetch_data("b"),
    fetch_data("c"),
    return_exceptions=True,  # 异常作为值返回而非抛出
)
return_exceptions=False
(默认值)时,第一个异常会取消整个gather操作。当
return_exceptions=True
时,异常会与成功结果一同出现在结果列表中。

gather() vs TaskGroup

gather() 与 TaskGroup 对比

Feature
asyncio.gather()
asyncio.TaskGroup
Python version3.4+3.11+
Error handlingfirst exception or return_exceptionsExceptionGroup, cancels siblings
Structured concurrencyNoYes
Dynamic task creationNo (fixed at call time)Yes (create_task inside block)
Prefer
TaskGroup
in new code targeting Python 3.11+. Use
gather()
when supporting older runtimes or when
return_exceptions=True
behavior is needed.
特性
asyncio.gather()
asyncio.TaskGroup
Python版本3.4+3.11+
错误处理抛出首个异常或返回异常抛出ExceptionGroup,取消同级任务
结构化并发不支持支持
动态任务创建不支持(调用时固定任务列表)支持(可在代码块内创建任务)
在面向Python 3.11+的新代码中优先使用
TaskGroup
。当需要支持旧版本运行时或需要
return_exceptions=True
的行为时,使用
gather()

Running Blocking Code

运行阻塞代码

Never call blocking functions directly inside a coroutine -- this stalls the entire event loop.
绝不要在协程中直接调用阻塞函数 -- 这会导致整个事件循环停滞。

asyncio.to_thread()

asyncio.to_thread()

Offload a synchronous function to a separate thread (Python 3.9+):
python
import time

def cpu_light_blocking() -> str:
    time.sleep(2)  # blocking I/O
    return "done"

async def main() -> None:
    result = await asyncio.to_thread(cpu_light_blocking)
将同步函数卸载到单独线程中执行(Python 3.9+):
python
import time

def cpu_light_blocking() -> str:
    time.sleep(2)  # 阻塞I/O操作
    return "done"

async def main() -> None:
    result = await asyncio.to_thread(cpu_light_blocking)

loop.run_in_executor()

loop.run_in_executor()

For finer control or process-pool execution:
python
from concurrent.futures import ProcessPoolExecutor

async def main() -> None:
    loop = asyncio.get_running_loop()

    # Thread pool (default)
    result = await loop.run_in_executor(None, blocking_io_func)

    # Process pool for CPU-bound work
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_heavy_func, arg1)
用于更精细的控制或进程池执行:
python
from concurrent.futures import ProcessPoolExecutor

async def main() -> None:
    loop = asyncio.get_running_loop()

    # 线程池(默认)
    result = await loop.run_in_executor(None, blocking_io_func)

    # 用于CPU密集型工作的进程池
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_heavy_func, arg1)

Async Context Managers

异步上下文管理器

Class-based

基于类的实现

python
class AsyncDBConnection:
    async def __aenter__(self):
        self.conn = await connect_to_db()
        return self.conn

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.conn.close()
        return False  # do not suppress exceptions
python
class AsyncDBConnection:
    async def __aenter__(self):
        self.conn = await connect_to_db()
        return self.conn

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.conn.close()
        return False  # 不抑制异常

Decorator-based

基于装饰器的实现

python
from contextlib import asynccontextmanager

@asynccontextmanager
async def db_connection(url: str):
    conn = await connect_to_db(url)
    try:
        yield conn
    finally:
        await conn.close()

async def main() -> None:
    async with db_connection("postgres://...") as conn:
        await conn.execute("SELECT 1")
python
from contextlib import asynccontextmanager

@asynccontextmanager
async def db_connection(url: str):
    conn = await connect_to_db(url)
    try:
        yield conn
    finally:
        await conn.close()

async def main() -> None:
    async with db_connection("postgres://...") as conn:
        await conn.execute("SELECT 1")

Async Iterators and Generators

异步迭代器与生成器

Async generators

异步生成器

python
async def stream_rows(query: str):
    async with db_connection() as conn:
        cursor = await conn.execute(query)
        async for row in cursor:
            yield row

async def main() -> None:
    async for row in stream_rows("SELECT * FROM users"):
        process(row)
python
async def stream_rows(query: str):
    async with db_connection() as conn:
        cursor = await conn.execute(query)
        async for row in cursor:
            yield row

async def main() -> None:
    async for row in stream_rows("SELECT * FROM users"):
        process(row)

Class-based async iterator

基于类的异步迭代器

python
class Countdown:
    def __init__(self, start: int):
        self.current = start

    def __aiter__(self):
        return self

    async def __anext__(self) -> int:
        if self.current <= 0:
            raise StopAsyncIteration
        self.current -= 1
        await asyncio.sleep(0.1)
        return self.current + 1
python
class Countdown:
    def __init__(self, start: int):
        self.current = start

    def __aiter__(self):
        return self

    async def __anext__(self) -> int:
        if self.current <= 0:
            raise StopAsyncIteration
        self.current -= 1
        await asyncio.sleep(0.1)
        return self.current + 1

Concurrency Primitives

并发原语

Semaphore -- limit concurrent operations

信号量(Semaphore)-- 限制并发操作数

python
sem = asyncio.Semaphore(10)

async def rate_limited_fetch(url: str) -> bytes:
    async with sem:
        return await http_get(url)
python
sem = asyncio.Semaphore(10)

async def rate_limited_fetch(url: str) -> bytes:
    async with sem:
        return await http_get(url)

Lock -- mutual exclusion

锁(Lock)-- 互斥访问

python
lock = asyncio.Lock()

async def update_shared_state():
    async with lock:
        # only one coroutine at a time
        state["counter"] += 1
python
lock = asyncio.Lock()

async def update_shared_state():
    async with lock:
        # 同一时间仅一个协程可执行此段代码
        state["counter"] += 1

Event -- notify waiting coroutines

事件(Event)-- 通知等待的协程

python
event = asyncio.Event()

async def waiter():
    await event.wait()
    print("event fired")

async def trigger():
    event.set()
python
event = asyncio.Event()

async def waiter():
    await event.wait()
    print("事件已触发")

async def trigger():
    event.set()

Condition -- wait for a predicate

条件变量(Condition)-- 等待特定条件

python
condition = asyncio.Condition()

async def consumer():
    async with condition:
        await condition.wait_for(lambda: len(queue) > 0)
        item = queue.pop()
python
condition = asyncio.Condition()

async def consumer():
    async with condition:
        await condition.wait_for(lambda: len(queue) > 0)
        item = queue.pop()

Timeouts

超时处理

asyncio.timeout() (Python 3.11+)

asyncio.timeout()(Python 3.11+)

python
async def main() -> None:
    async with asyncio.timeout(5.0):
        data = await slow_operation()
python
async def main() -> None:
    async with asyncio.timeout(5.0):
        data = await slow_operation()

asyncio.wait_for()

asyncio.wait_for()

python
try:
    result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
    print("operation timed out")
asyncio.timeout()
is preferred in Python 3.11+ because it integrates with structured concurrency and is an async context manager that can wrap multiple awaits.
python
try:
    result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
    print("操作超时")
在Python 3.11+中优先使用
asyncio.timeout()
,因为它与结构化并发集成,并且是一个可以包装多个
await
操作的异步上下文管理器。

Error Handling

错误处理

ExceptionGroup (Python 3.11+)

ExceptionGroup(Python 3.11+)

TaskGroup
raises
ExceptionGroup
when child tasks fail:
python
try:
    async with asyncio.TaskGroup() as tg:
        tg.create_task(may_fail_a())
        tg.create_task(may_fail_b())
except* ValueError as eg:
    for exc in eg.exceptions:
        log_error(exc)
except* TypeError as eg:
    for exc in eg.exceptions:
        log_error(exc)
Use
except*
to handle subgroups of exceptions selectively.
当子任务失败时,
TaskGroup
会抛出
ExceptionGroup
python
try:
    async with asyncio.TaskGroup() as tg:
        tg.create_task(may_fail_a())
        tg.create_task(may_fail_b())
except* ValueError as eg:
    for exc in eg.exceptions:
        log_error(exc)
except* TypeError as eg:
    for exc in eg.exceptions:
        log_error(exc)
使用
except*
选择性处理异常子组。

Task cancellation

任务取消

python
task = asyncio.create_task(long_running())
task.cancel()

try:
    await task
except asyncio.CancelledError:
    print("task was cancelled")
Inside a coroutine, catch
CancelledError
only to perform cleanup, then re-raise or let it propagate. Swallowing
CancelledError
silently breaks cancellation semantics.
python
task = asyncio.create_task(long_running())
task.cancel()

try:
    await task
except asyncio.CancelledError:
    print("任务已取消")
在协程内部,仅应在执行清理操作时捕获
CancelledError
,然后重新抛出或让其传播。静默吞掉
CancelledError
会破坏取消语义。

Common Pitfalls

常见陷阱

  1. Blocking the event loop -- calling
    time.sleep()
    , synchronous HTTP libraries, or CPU-heavy code directly in a coroutine. Use
    asyncio.to_thread()
    or
    run_in_executor()
    .
  2. Forgotten awaits -- calling an
    async def
    function without
    await
    produces a coroutine object that never executes. Enable Python's
    -W default
    or
    asyncio
    debug mode to catch these.
  3. Fire-and-forget tasks without references --
    asyncio.create_task(coro())
    without storing the return value risks silent garbage collection. Always assign to a variable or add to a set:
    python
    background_tasks: set[asyncio.Task] = set()
    
    def schedule(coro):
        task = asyncio.create_task(coro)
        background_tasks.add(task)
        task.add_done_callback(background_tasks.discard)
  4. Mixing sync and async incorrectly -- calling
    asyncio.run()
    from within a running loop raises
    RuntimeError
    . Use
    await
    or
    create_task()
    from async code; use
    asyncio.run()
    only at the top level.
  5. Assuming thread safety -- asyncio coroutines run on a single thread, so shared state between coroutines does not need locks unless using
    to_thread()
    or
    run_in_executor()
    , which introduce true threads.
  1. 阻塞事件循环 -- 在协程中直接调用
    time.sleep()
    、同步HTTP库或CPU密集型代码。应使用
    asyncio.to_thread()
    run_in_executor()
  2. 遗漏await关键字 -- 调用
    async def
    函数时未加
    await
    会生成一个永远不会执行的协程对象。启用Python的
    -W default
    选项或
    asyncio
    调试模式来捕获此类问题。
  3. 无引用的“即发即弃”任务 --
    asyncio.create_task(coro())
    如果不保存返回值,可能会被静默垃圾回收。始终将其赋值给变量或添加到集合中:
    python
    background_tasks: set[asyncio.Task] = set()
    
    def schedule(coro):
        task = asyncio.create_task(coro)
        background_tasks.add(task)
        task.add_done_callback(background_tasks.discard)
  4. 错误混合同步与异步代码 -- 在运行中的事件循环内调用
    asyncio.run()
    会抛出
    RuntimeError
    。在异步代码中使用
    await
    create_task()
    ;仅在顶层使用
    asyncio.run()
  5. 假设线程安全 -- asyncio协程运行在单个线程中,因此协程之间的共享状态不需要锁除非使用
    to_thread()
    run_in_executor()
    ,这会引入真正的线程。

Testing Async Code

异步代码测试

Use
pytest-asyncio
to test coroutines with pytest:
python
import pytest

@pytest.mark.asyncio
async def test_fetch_data():
    result = await fetch_data()
    assert result == "expected"
使用
pytest-asyncio
配合pytest测试协程:
python
import pytest

@pytest.mark.asyncio
async def test_fetch_data():
    result = await fetch_data()
    assert result == "expected"

Async fixtures

异步夹具(Fixture)

python
@pytest.fixture
async def db_session():
    session = await create_session()
    yield session
    await session.close()

@pytest.mark.asyncio
async def test_query(db_session):
    rows = await db_session.execute("SELECT 1")
    assert len(rows) == 1
Configure
pytest-asyncio
mode in
pyproject.toml
:
toml
[tool.pytest.ini_options]
asyncio_mode = "auto"  # all async tests detected automatically
With
asyncio_mode = "auto"
, the
@pytest.mark.asyncio
decorator is optional -- any
async def test_*
function is treated as an async test.
python
@pytest.fixture
async def db_session():
    session = await create_session()
    yield session
    await session.close()

@pytest.mark.asyncio
async def test_query(db_session):
    rows = await db_session.execute("SELECT 1")
    assert len(rows) == 1
pyproject.toml
中配置
pytest-asyncio
模式:
toml
[tool.pytest.ini_options]
asyncio_mode = "auto"  # 自动检测所有异步测试
asyncio_mode = "auto"
时,
@pytest.mark.asyncio
装饰器是可选的 -- 所有
async def test_*
函数都会被视为异步测试。

Cross-References

交叉引用

  • For FastAPI async endpoints, consult the
    fastapi
    skill.
  • For advanced concurrency patterns including producer-consumer queues, fan-out/fan-in, structured concurrency, and async HTTP client patterns, see references/concurrency.md.
  • 关于FastAPI异步端点,请参考
    fastapi
    技能文档。
  • 关于高级并发模式(包括生产者-消费者队列、扇出/扇入、结构化并发和异步HTTP客户端模式),请查看references/concurrency.md