asyncio-concurrency-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Asyncio Concurrency Patterns

Asyncio并发模式

A comprehensive skill for mastering Python's asyncio library and concurrent programming patterns. This skill covers event loops, coroutines, tasks, futures, synchronization primitives, async context managers, and production-ready patterns for building high-performance asynchronous applications.
这是一份帮助你掌握Python asyncio库和并发编程模式的全面指南。本指南涵盖Event Loop、协程(Coroutines)、任务(Tasks)、Futures、同步原语、异步上下文管理器,以及用于构建高性能异步应用的生产级模式。

When to Use This Skill

何时使用本指南

Use this skill when:
  • Building I/O-bound applications that need to handle many concurrent operations
  • Creating web servers, API clients, or websocket applications
  • Implementing real-time systems with event-driven architecture
  • Optimizing application performance with concurrent request handling
  • Managing multiple async operations with proper coordination and error handling
  • Building background task processors or job queues
  • Implementing async database operations and connection pooling
  • Creating chat applications, real-time dashboards, or notification systems
  • Handling parallel HTTP requests efficiently
  • Managing websocket connections with multiple event sources
  • Building microservices with async communication patterns
  • Optimizing resource utilization in network applications
在以下场景使用本指南:
  • 构建需要处理大量并发操作的I/O密集型应用
  • 创建Web服务器、API客户端或WebSocket应用
  • 实现基于事件驱动架构的实时系统
  • 通过并发请求处理优化应用性能
  • 通过适当的协调和错误处理管理多个异步操作
  • 构建后台任务处理器或作业队列
  • 实现异步数据库操作和连接池
  • 创建聊天应用、实时仪表盘或通知系统
  • 高效处理并行HTTP请求
  • 管理带有多个事件源的WebSocket连接
  • 构建采用异步通信模式的微服务
  • 优化网络应用中的资源利用率

Core Concepts

核心概念

What is Asyncio?

什么是Asyncio?

Asyncio is Python's built-in library for writing concurrent code using the async/await syntax. It provides:
  • Event Loop: The core of asyncio that schedules and runs asynchronous tasks
  • Coroutines: Functions defined with
    async def
    that can be paused and resumed
  • Tasks: Scheduled coroutines that run concurrently
  • Futures: Low-level objects representing results of async operations
  • Synchronization Primitives: Locks, semaphores, events for coordination
Asyncio是Python的内置库,用于使用async/await语法编写并发代码。它提供:
  • Event Loop:Asyncio的核心,用于调度和运行异步任务
  • Coroutines:使用
    async def
    定义的可暂停和恢复的函数
  • Tasks:已调度的、可并发运行的协程
  • Futures:表示异步操作结果的底层对象
  • 同步原语:用于协调的锁、信号量、事件

Event Loop Fundamentals

Event Loop基础

The event loop is the central execution mechanism in asyncio:
python
import asyncio
Event Loop是Asyncio中的核心执行机制:
python
import asyncio

Get or create an event loop

获取或创建事件循环

loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop()

Run a coroutine until complete

运行协程直到完成

loop.run_until_complete(my_coroutine())
loop.run_until_complete(my_coroutine())

Modern approach (Python 3.7+)

现代方式(Python 3.7+)

asyncio.run(my_coroutine())

**Key Event Loop Concepts:**

1. **Single-threaded concurrency**: One thread, many tasks
2. **Cooperative multitasking**: Tasks yield control voluntarily
3. **I/O multiplexing**: Efficient handling of many I/O operations
4. **Non-blocking operations**: Don't wait for I/O, do other work
asyncio.run(my_coroutine())

**Event Loop核心概念:**

1. **单线程并发**:一个线程,多个任务
2. **协作式多任务**:任务主动让出控制权
3. **I/O多路复用**:高效处理大量I/O操作
4. **非阻塞操作**:不等待I/O,转而处理其他工作

Coroutines vs Functions

协程 vs 普通函数

Regular Function:
python
def fetch_data():
    # Blocks until complete
    return requests.get('http://api.example.com')
Coroutine:
python
async def fetch_data():
    # Yields control while waiting
    async with aiohttp.ClientSession() as session:
        async with session.get('http://api.example.com') as resp:
            return await resp.text()
普通函数:
python
def fetch_data():
    # 阻塞直到完成
    return requests.get('http://api.example.com')
协程:
python
async def fetch_data():
    # 等待时让出控制权
    async with aiohttp.ClientSession() as session:
        async with session.get('http://api.example.com') as resp:
            return await resp.text()

Tasks and Futures

任务与Futures

Tasks wrap coroutines and schedule them on the event loop:
python
undefined
Tasks用于包装协程并在Event Loop上调度:
python
undefined

Create a task

创建任务

task = asyncio.create_task(my_coroutine())
task = asyncio.create_task(my_coroutine())

Task runs in background

任务在后台运行

... do other work ...

...执行其他工作...

Wait for result

等待结果

result = await task

**Futures** represent eventual results:

```python
result = await task

**Futures**表示异步操作的最终结果:

```python

Low-level future (rarely used directly)

底层Future(很少直接使用)

future = asyncio.Future()
future = asyncio.Future()

Set result

设置结果

future.set_result(42)
future.set_result(42)

Get result

获取结果

result = await future
undefined
result = await future
undefined

Async Context Managers

异步上下文管理器

Manage resources with async setup/teardown:
python
class AsyncResource:
    async def __aenter__(self):
        # Async setup
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # Async cleanup
        await self.disconnect()
通过异步的初始化/清理流程管理资源:
python
class AsyncResource:
    async def __aenter__(self):
        # 异步初始化
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 异步清理
        await self.disconnect()

Usage

使用方式

async with AsyncResource() as resource: await resource.do_work()
undefined
async with AsyncResource() as resource: await resource.do_work()
undefined

Concurrency Patterns

并发模式

Pattern 1: Gather - Concurrent Execution

模式1:Gather - 并发执行

Run multiple coroutines concurrently and wait for all to complete:
python
import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        # Run all fetches concurrently
        results = await asyncio.gather(
            fetch(session, 'http://python.org'),
            fetch(session, 'http://docs.python.org'),
            fetch(session, 'http://pypi.org')
        )
        return results
同时运行多个协程并等待所有完成:
python
import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        # 并发运行所有fetch操作
        results = await asyncio.gather(
            fetch(session, 'http://python.org'),
            fetch(session, 'http://docs.python.org'),
            fetch(session, 'http://pypi.org')
        )
        return results

Results is a list in the same order as inputs

结果列表与输入顺序一致

results = asyncio.run(main())

**When to use:**
- Need all results
- Order matters
- Want to fail fast on first exception (default)
- Can handle partial results with `return_exceptions=True`
results = asyncio.run(main())

**适用场景:**
- 需要获取所有结果
- 结果顺序重要
- 希望第一个异常出现时立即终止(默认行为)
- 通过`return_exceptions=True`处理部分结果

Pattern 2: Wait - Flexible Waiting

模式2:Wait - 灵活等待

More control over how to wait for multiple tasks:
python
import asyncio

async def task_a():
    await asyncio.sleep(2)
    return 'A'

async def task_b():
    await asyncio.sleep(1)
    return 'B'

async def main():
    tasks = [
        asyncio.create_task(task_a()),
        asyncio.create_task(task_b())
    ]

    # Wait for first to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    # Get first result
    first_result = done.pop().result()

    # Cancel remaining
    for task in pending:
        task.cancel()

    return first_result

result = asyncio.run(main())  # Returns 'B' after 1 second
Wait strategies:
  • FIRST_COMPLETED
    : Return when first task finishes
  • FIRST_EXCEPTION
    : Return when first task raises exception
  • ALL_COMPLETED
    : Wait for all tasks (default)
对多任务等待方式提供更多控制:
python
import asyncio

async def task_a():
    await asyncio.sleep(2)
    return 'A'

async def task_b():
    await asyncio.sleep(1)
    return 'B'

async def main():
    tasks = [
        asyncio.create_task(task_a()),
        asyncio.create_task(task_b())
    ]

    # 等待第一个完成的任务
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    # 获取第一个结果
    first_result = done.pop().result()

    # 取消剩余任务
    for task in pending:
        task.cancel()

    return first_result

result = asyncio.run(main())  # 1秒后返回'B'
等待策略:
  • FIRST_COMPLETED
    :第一个任务完成时返回
  • FIRST_EXCEPTION
    :第一个任务抛出异常时返回
  • ALL_COMPLETED
    :等待所有任务完成(默认)

Pattern 3: Semaphore - Limit Concurrency

模式3:Semaphore - 限制并发数

Control maximum number of concurrent operations:
python
import asyncio
import aiohttp

async def fetch_with_limit(session, url, semaphore):
    async with semaphore:
        # Only N requests run concurrently
        async with session.get(url) as resp:
            return await resp.text()

async def main():
    # Limit to 5 concurrent requests
    semaphore = asyncio.Semaphore(5)

    urls = [f'http://api.example.com/item/{i}' for i in range(100)]

    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_with_limit(session, url, semaphore)
            for url in urls
        ]
        results = await asyncio.gather(*tasks)

    return results

asyncio.run(main())
When to use:
  • Rate limiting API requests
  • Controlling database connection usage
  • Preventing resource exhaustion
  • Respecting external service limits
控制最大并发操作数:
python
import asyncio
import aiohttp

async def fetch_with_limit(session, url, semaphore):
    async with semaphore:
        # 最多N个请求同时运行
        async with session.get(url) as resp:
            return await resp.text()

async def main():
    # 限制为5个并发请求
    semaphore = asyncio.Semaphore(5)

    urls = [f'http://api.example.com/item/{i}' for i in range(100)]

    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_with_limit(session, url, semaphore)
            for url in urls
        ]
        results = await asyncio.gather(*tasks)

    return results

asyncio.run(main())
适用场景:
  • API请求限流
  • 控制数据库连接使用
  • 防止资源耗尽
  • 遵守外部服务的限制

Pattern 4: Lock - Mutual Exclusion

模式4:Lock - 互斥访问

Ensure only one coroutine accesses a resource at a time:
python
import asyncio

class SharedCounter:
    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()

    async def increment(self):
        async with self.lock:
            # Critical section - only one coroutine at a time
            current = self.value
            await asyncio.sleep(0)  # Simulate async work
            self.value = current + 1

async def worker(counter):
    for _ in range(100):
        await counter.increment()

async def main():
    counter = SharedCounter()

    # Run 10 workers concurrently
    await asyncio.gather(*[worker(counter) for _ in range(10)])

    print(f"Final count: {counter.value}")  # Always 1000

asyncio.run(main())
确保同一时间只有一个协程访问资源:
python
import asyncio

class SharedCounter:
    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()

    async def increment(self):
        async with self.lock:
            # 临界区 - 同一时间仅一个协程可进入
            current = self.value
            await asyncio.sleep(0)  # 模拟异步工作
            self.value = current + 1

async def worker(counter):
    for _ in range(100):
        await counter.increment()

async def main():
    counter = SharedCounter()

    # 并发运行10个工作协程
    await asyncio.gather(*[worker(counter) for _ in range(10)])

    print(f"最终计数: {counter.value}")  # 始终为1000

asyncio.run(main())

Pattern 5: Event - Signaling

模式5:Event - 信号通知

Coordinate multiple coroutines with events:
python
import asyncio

async def waiter(event, name):
    print(f'{name} waiting for event')
    await event.wait()
    print(f'{name} received event')

async def setter(event):
    await asyncio.sleep(2)
    print('Setting event')
    event.set()

async def main():
    event = asyncio.Event()

    # Multiple waiters
    await asyncio.gather(
        waiter(event, 'Waiter 1'),
        waiter(event, 'Waiter 2'),
        waiter(event, 'Waiter 3'),
        setter(event)
    )

asyncio.run(main())
通过事件协调多个协程:
python
import asyncio

async def waiter(event, name):
    print(f'{name} 等待事件')
    await event.wait()
    print(f'{name} 收到事件')

async def setter(event):
    await asyncio.sleep(2)
    print('触发事件')
    event.set()

async def main():
    event = asyncio.Event()

    # 多个等待协程
    await asyncio.gather(
        waiter(event, 'Waiter 1'),
        waiter(event, 'Waiter 2'),
        waiter(event, 'Waiter 3'),
        setter(event)
    )

asyncio.run(main())

Pattern 6: Queue - Producer/Consumer

模式6:Queue - 生产者/消费者

Coordinate work between producers and consumers:
python
import asyncio

async def producer(queue, n):
    for i in range(n):
        await asyncio.sleep(0.1)
        await queue.put(f'item-{i}')
        print(f'Produced item-{i}')

    # Signal completion
    await queue.put(None)

async def consumer(queue, name):
    while True:
        item = await queue.get()

        if item is None:
            # Propagate sentinel to other consumers
            await queue.put(None)
            break

        print(f'{name} processing {item}')
        await asyncio.sleep(0.2)
        queue.task_done()

async def main():
    queue = asyncio.Queue()

    # Start producer and consumers
    await asyncio.gather(
        producer(queue, 10),
        consumer(queue, 'Consumer-1'),
        consumer(queue, 'Consumer-2'),
        consumer(queue, 'Consumer-3')
    )

asyncio.run(main())
协调生产者与消费者之间的工作:
python
import asyncio

async def producer(queue, n):
    for i in range(n):
        await asyncio.sleep(0.1)
        await queue.put(f'item-{i}')
        print(f'生产 item-{i}')

    # 发送完成信号
    await queue.put(None)

async def consumer(queue, name):
    while True:
        item = await queue.get()

        if item is None:
            # 将结束信号传递给其他消费者
            await queue.put(None)
            break

        print(f'{name} 处理 {item}')
        await asyncio.sleep(0.2)
        queue.task_done()

async def main():
    queue = asyncio.Queue()

    # 启动生产者和消费者
    await asyncio.gather(
        producer(queue, 10),
        consumer(queue, 'Consumer-1'),
        consumer(queue, 'Consumer-2'),
        consumer(queue, 'Consumer-3')
    )

asyncio.run(main())

Task Management

任务管理

Creating Tasks

创建任务

Basic Task Creation:
python
import asyncio

async def background_task():
    await asyncio.sleep(10)
    return 'Done'

async def main():
    # Create task - starts running immediately
    task = asyncio.create_task(background_task())

    # Do other work while task runs
    await asyncio.sleep(1)

    # Wait for result
    result = await task
    return result

asyncio.run(main())
Named Tasks (Python 3.8+):
python
task = asyncio.create_task(
    background_task(),
    name='my-background-task'
)

print(task.get_name())  # 'my-background-task'
基础任务创建:
python
import asyncio

async def background_task():
    await asyncio.sleep(10)
    return 'Done'

async def main():
    # 创建任务 - 立即开始运行
    task = asyncio.create_task(background_task())

    # 任务运行时执行其他工作
    await asyncio.sleep(1)

    # 等待结果
    result = await task
    return result

asyncio.run(main())
命名任务(Python 3.8+):
python
task = asyncio.create_task(
    background_task(),
    name='my-background-task'
)

print(task.get_name())  # 'my-background-task'

Task Cancellation

任务取消

Graceful Cancellation:
python
import asyncio

async def long_running_task():
    try:
        while True:
            await asyncio.sleep(1)
            print('Working...')
    except asyncio.CancelledError:
        print('Task cancelled, cleaning up...')
        # Cleanup logic
        raise  # Re-raise to mark as cancelled

async def main():
    task = asyncio.create_task(long_running_task())

    # Let it run for 3 seconds
    await asyncio.sleep(3)

    # Request cancellation
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print('Task was cancelled')

asyncio.run(main())
Cancellation with Context Manager:
python
import asyncio
from contextlib import suppress

async def run_with_timeout():
    task = asyncio.create_task(long_running_task())

    try:
        # Wait with timeout
        await asyncio.wait_for(task, timeout=5.0)
    except asyncio.TimeoutError:
        task.cancel()
        with suppress(asyncio.CancelledError):
            await task
优雅取消:
python
import asyncio

async def long_running_task():
    try:
        while True:
            await asyncio.sleep(1)
            print('工作中...')
    except asyncio.CancelledError:
        print('任务已取消,正在清理...')
        # 清理逻辑
        raise  # 重新抛出以标记任务为已取消

async def main():
    task = asyncio.create_task(long_running_task())

    # 让任务运行3秒
    await asyncio.sleep(3)

    # 请求取消任务
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print('任务已被取消')

asyncio.run(main())
使用上下文管理器取消:
python
import asyncio
from contextlib import suppress

async def run_with_timeout():
    task = asyncio.create_task(long_running_task())

    try:
        # 带超时等待
        await asyncio.wait_for(task, timeout=5.0)
    except asyncio.TimeoutError:
        task.cancel()
        with suppress(asyncio.CancelledError):
            await task

Exception Handling in Tasks

任务中的异常处理

Gather with Exception Handling:
python
import asyncio

async def failing_task(n):
    await asyncio.sleep(n)
    raise ValueError(f'Task {n} failed')

async def successful_task(n):
    await asyncio.sleep(n)
    return f'Task {n} succeeded'

async def main():
    # return_exceptions=True: Returns exceptions instead of raising
    results = await asyncio.gather(
        successful_task(1),
        failing_task(2),
        successful_task(3),
        return_exceptions=True
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f'Task {i} failed: {result}')
        else:
            print(f'Task {i} result: {result}')

asyncio.run(main())
Task Exception Retrieval:
python
import asyncio

async def main():
    task = asyncio.create_task(failing_task(1))

    # Wait for task
    await asyncio.sleep(2)

    # Check if task failed
    if task.done() and task.exception():
        print(f'Task failed with: {task.exception()}')

asyncio.run(main())
Gather的异常处理:
python
import asyncio

async def failing_task(n):
    await asyncio.sleep(n)
    raise ValueError(f'Task {n} 失败')

async def successful_task(n):
    await asyncio.sleep(n)
    return f'Task {n} 成功'

async def main():
    # return_exceptions=True:返回异常而非抛出
    results = await asyncio.gather(
        successful_task(1),
        failing_task(2),
        successful_task(3),
        return_exceptions=True
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f'Task {i} 失败: {result}')
        else:
            print(f'Task {i} 结果: {result}')

asyncio.run(main())
任务异常获取:
python
import asyncio

async def main():
    task = asyncio.create_task(failing_task(1))

    # 等待任务执行
    await asyncio.sleep(2)

    # 检查任务是否失败
    if task.done() and task.exception():
        print(f'Task 失败原因: {task.exception()}')

asyncio.run(main())

Event Loop Management

Event Loop管理

Event Loop Policies

Event Loop策略

Default Event Loop:
python
import asyncio

async def main():
    # Get running loop
    loop = asyncio.get_running_loop()
    print(f'Loop: {loop}')

asyncio.run(main())
Custom Event Loop:
python
import asyncio

async def main():
    pass
默认Event Loop:
python
import asyncio

async def main():
    # 获取当前运行的Event Loop
    loop = asyncio.get_running_loop()
    print(f'Loop: {loop}')

asyncio.run(main())
自定义Event Loop:
python
import asyncio

async def main():
    pass

Create new event loop

创建新的Event Loop

loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)
try: loop.run_until_complete(main()) finally: loop.close()

**Event Loop Best Practices:**

1. **Use `asyncio.run()`** for simple programs (Python 3.7+)
2. **Avoid creating ClientSession outside event loop**
3. **Always close loops when done**
4. **Don't call blocking functions in event loop**
loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)
try: loop.run_until_complete(main()) finally: loop.close()

**Event Loop最佳实践:**

1. **使用`asyncio.run()`** 编写简单程序(Python 3.7+)
2. **避免在Event Loop外创建ClientSession**
3. **使用完毕后始终关闭Loop**
4. **不要在Event Loop中调用阻塞函数**

Running Blocking Code

运行阻塞代码

Using ThreadPoolExecutor:
python
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    # Blocking operation
    time.sleep(2)
    return 'Done'

async def main():
    loop = asyncio.get_running_loop()

    # Run blocking code in thread pool
    result = await loop.run_in_executor(
        None,  # Use default executor
        blocking_io
    )

    return result

asyncio.run(main())
Custom Executor:
python
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def main():
    loop = asyncio.get_running_loop()

    # Custom executor with 4 threads
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = await asyncio.gather(*[
            loop.run_in_executor(executor, blocking_io)
            for _ in range(10)
        ])

    return results

asyncio.run(main())
使用ThreadPoolExecutor:
python
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

def blocking_io():
    # 阻塞操作
    time.sleep(2)
    return 'Done'

async def main():
    loop = asyncio.get_running_loop()

    # 在线程池中运行阻塞代码
    result = await loop.run_in_executor(
        None,  # 使用默认执行器
        blocking_io
    )

    return result

asyncio.run(main())
自定义执行器:
python
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def main():
    loop = asyncio.get_running_loop()

    # 自定义4线程执行器
    with ThreadPoolExecutor(max_workers=4) as executor:
        results = await asyncio.gather(*[
            loop.run_in_executor(executor, blocking_io)
            for _ in range(10)
        ])

    return results

asyncio.run(main())

Loop Callbacks

Loop回调

Schedule Callback:
python
import asyncio

def callback(arg):
    print(f'Callback called with {arg}')

async def main():
    loop = asyncio.get_running_loop()

    # Schedule callback
    loop.call_soon(callback, 'immediate')

    # Schedule with delay
    loop.call_later(2, callback, 'delayed')

    # Schedule at specific time
    loop.call_at(loop.time() + 3, callback, 'scheduled')

    await asyncio.sleep(4)

asyncio.run(main())
调度回调:
python
import asyncio

def callback(arg):
    print(f'回调函数被调用,参数: {arg}')

async def main():
    loop = asyncio.get_running_loop()

    # 立即调度回调
    loop.call_soon(callback, 'immediate')

    # 延迟2秒调度
    loop.call_later(2, callback, 'delayed')

    # 在指定时间调度
    loop.call_at(loop.time() + 3, callback, 'scheduled')

    await asyncio.sleep(4)

asyncio.run(main())

Async Context Managers

异步上下文管理器

Creating Async Context Managers

创建异步上下文管理器

Class-Based:
python
import asyncio

class AsyncDatabaseConnection:
    def __init__(self, host):
        self.host = host
        self.connection = None

    async def __aenter__(self):
        print(f'Connecting to {self.host}')
        await asyncio.sleep(0.1)  # Simulate connection
        self.connection = f'Connection to {self.host}'
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f'Closing connection to {self.host}')
        await asyncio.sleep(0.1)  # Simulate cleanup
        self.connection = None

    async def query(self, sql):
        if not self.connection:
            raise RuntimeError('Not connected')
        await asyncio.sleep(0.05)
        return f'Results for: {sql}'

async def main():
    async with AsyncDatabaseConnection('localhost') as db:
        result = await db.query('SELECT * FROM users')
        print(result)

asyncio.run(main())
Decorator-Based:
python
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_resource(name):
    # Setup
    print(f'Acquiring {name}')
    await asyncio.sleep(0.1)

    try:
        yield name
    finally:
        # Cleanup
        print(f'Releasing {name}')
        await asyncio.sleep(0.1)

async def main():
    async with async_resource('database') as db:
        print(f'Using {db}')

asyncio.run(main())
基于类的实现:
python
import asyncio

class AsyncDatabaseConnection:
    def __init__(self, host):
        self.host = host
        self.connection = None

    async def __aenter__(self):
        print(f'连接到 {self.host}')
        await asyncio.sleep(0.1)  # 模拟连接过程
        self.connection = f'Connection to {self.host}'
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f'关闭到 {self.host} 的连接')
        await asyncio.sleep(0.1)  # 模拟清理过程
        self.connection = None

    async def query(self, sql):
        if not self.connection:
            raise RuntimeError('未连接')
        await asyncio.sleep(0.05)
        return f'查询结果: {sql}'

async def main():
    async with AsyncDatabaseConnection('localhost') as db:
        result = await db.query('SELECT * FROM users')
        print(result)

asyncio.run(main())
基于装饰器的实现:
python
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_resource(name):
    # 初始化
    print(f'获取 {name}')
    await asyncio.sleep(0.1)

    try:
        yield name
    finally:
        # 清理
        print(f'释放 {name}')
        await asyncio.sleep(0.1)

async def main():
    async with async_resource('database') as db:
        print(f'使用 {db}')

asyncio.run(main())

Real-World Example: aiohttp ClientSession

实战示例:aiohttp ClientSession

python
import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    # ClientSession as async context manager
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        print(f'Body: {html[:100]}...')

asyncio.run(main())
Why use async context manager for ClientSession?
  1. Ensures proper cleanup of connections
  2. Prevents resource leaks
  3. Manages SSL connections correctly
  4. Handles graceful shutdown
python
import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    # 将ClientSession作为异步上下文管理器
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        print(f'响应内容: {html[:100]}...')

asyncio.run(main())
为何使用异步上下文管理器管理ClientSession?
  1. 确保连接被正确清理
  2. 防止资源泄漏
  3. 正确管理SSL连接
  4. 处理优雅关闭

Performance Optimization

性能优化

Profiling Async Code

异步代码分析

Basic Timing:
python
import asyncio
import time

async def slow_operation():
    await asyncio.sleep(1)

async def main():
    start = time.perf_counter()

    await slow_operation()

    elapsed = time.perf_counter() - start
    print(f'Took {elapsed:.2f} seconds')

asyncio.run(main())
Profiling Multiple Operations:
python
import asyncio
import time

async def timed_task(name, duration):
    start = time.perf_counter()
    await asyncio.sleep(duration)
    elapsed = time.perf_counter() - start
    print(f'{name} took {elapsed:.2f}s')
    return name

async def main():
    await asyncio.gather(
        timed_task('Task 1', 1),
        timed_task('Task 2', 2),
        timed_task('Task 3', 0.5)
    )

asyncio.run(main())
基础计时:
python
import asyncio
import time

async def slow_operation():
    await asyncio.sleep(1)

async def main():
    start = time.perf_counter()

    await slow_operation()

    elapsed = time.perf_counter() - start
    print(f'耗时 {elapsed:.2f} 秒')

asyncio.run(main())
多操作分析:
python
import asyncio
import time

async def timed_task(name, duration):
    start = time.perf_counter()
    await asyncio.sleep(duration)
    elapsed = time.perf_counter() - start
    print(f'{name} 耗时 {elapsed:.2f}s')
    return name

async def main():
    await asyncio.gather(
        timed_task('Task 1', 1),
        timed_task('Task 2', 2),
        timed_task('Task 3', 0.5)
    )

asyncio.run(main())

Optimizing Concurrency

并发优化

Bad - Sequential Execution:
python
async def slow_approach():
    results = []
    for i in range(10):
        result = await fetch_data(i)
        results.append(result)
    return results
糟糕的实现 - 顺序执行:
python
async def slow_approach():
    results = []
    for i in range(10):
        result = await fetch_data(i)
        results.append(result)
    return results

Takes 10 * fetch_time

耗时为10 * 单次fetch时间


**Good - Concurrent Execution:**

```python
async def fast_approach():
    tasks = [fetch_data(i) for i in range(10)]
    results = await asyncio.gather(*tasks)
    return results

**良好的实现 - 并发执行:**

```python
async def fast_approach():
    tasks = [fetch_data(i) for i in range(10)]
    results = await asyncio.gather(*tasks)
    return results

Takes ~fetch_time

耗时约等于单次fetch时间


**Better - Controlled Concurrency:**

```python
async def controlled_approach():
    semaphore = asyncio.Semaphore(5)  # Max 5 concurrent

    async def fetch_with_limit(i):
        async with semaphore:
            return await fetch_data(i)

    tasks = [fetch_with_limit(i) for i in range(10)]
    results = await asyncio.gather(*tasks)
    return results

**更优的实现 - 受控并发:**

```python
async def controlled_approach():
    semaphore = asyncio.Semaphore(5)  # 最大5个并发

    async def fetch_with_limit(i):
        async with semaphore:
            return await fetch_data(i)

    tasks = [fetch_with_limit(i) for i in range(10)]
    results = await asyncio.gather(*tasks)
    return results

Takes ~2 * fetch_time, but respects limits

耗时约为2 * 单次fetch时间,但符合并发限制

undefined
undefined

Avoiding Common Performance Pitfalls

避免常见性能陷阱

1. Don't create sessions per request:
python
undefined
1. 不要为每个请求创建新Session:
python
undefined

BAD - Creates new session each time

糟糕的实现 - 每次请求创建新Session

async def bad_fetch(url): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.text()
async def bad_fetch(url): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: return await resp.text()

GOOD - Reuse session

良好的实现 - 复用Session

async def good_fetch(): async with aiohttp.ClientSession() as session: results = await asyncio.gather( session.get('http://example.com/1'), session.get('http://example.com/2'), session.get('http://example.com/3') ) return results

**2. Don't use blocking operations:**

```python
import asyncio
import requests  # Blocking library
async def good_fetch(): async with aiohttp.ClientSession() as session: results = await asyncio.gather( session.get('http://example.com/1'), session.get('http://example.com/2'), session.get('http://example.com/3') ) return results

**2. 不要使用阻塞操作:**

```python
import asyncio
import requests  # 阻塞式库

BAD - Blocks event loop

糟糕的实现 - 阻塞Event Loop

async def bad_request(): response = requests.get('http://example.com') # BLOCKS! return response.text
async def bad_request(): response = requests.get('http://example.com') # 阻塞! return response.text

GOOD - Use async library

良好的实现 - 使用异步库

async def good_request(): async with aiohttp.ClientSession() as session: async with session.get('http://example.com') as resp: return await resp.text()
async def good_request(): async with aiohttp.ClientSession() as session: async with session.get('http://example.com') as resp: return await resp.text()

ACCEPTABLE - If must use blocking, use executor

可接受的实现 - 必须使用阻塞库时,使用执行器

async def acceptable_request(): loop = asyncio.get_running_loop() result = await loop.run_in_executor( None, lambda: requests.get('http://example.com').text ) return result

**3. Proper cleanup with zero-sleep:**

```python
async def proper_cleanup():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://example.org/') as resp:
            await resp.read()

    # Zero-sleep to allow underlying connections to close
    await asyncio.sleep(0)
async def acceptable_request(): loop = asyncio.get_running_loop() result = await loop.run_in_executor( None, lambda: requests.get('http://example.com').text ) return result

**3. 使用零延迟sleep确保正确清理:**

```python
async def proper_cleanup():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://example.org/') as resp:
            await resp.read()

    # 零延迟sleep以允许底层连接关闭
    await asyncio.sleep(0)

Common Pitfalls

常见陷阱

Pitfall 1: Creating ClientSession Outside Event Loop

陷阱1:在Event Loop外创建ClientSession

Problem:
python
import aiohttp
问题:
python
import aiohttp

BAD - Session created outside event loop

糟糕的实现 - Session在Event Loop外创建

session = aiohttp.ClientSession()
async def fetch(url): async with session.get(url) as resp: return await resp.text()

**Why it's bad:**
- Session binds to event loop at creation time
- If loop changes (e.g., uvloop), session becomes invalid
- Can cause program to hang

**Solution:**

```python
import aiohttp
import asyncio

async def main():
    # Create session inside async function
    async with aiohttp.ClientSession() as session:
        async with session.get('http://python.org') as resp:
            print(await resp.text())

asyncio.run(main())
session = aiohttp.ClientSession()
async def fetch(url): async with session.get(url) as resp: return await resp.text()

**为何糟糕:**
- Session在创建时绑定到当前Event Loop
- 如果Loop发生变化(如使用uvloop),Session会失效
- 可能导致程序挂起

**解决方案:**

```python
import aiohttp
import asyncio

async def main():
    # 在异步函数内创建Session
    async with aiohttp.ClientSession() as session:
        async with session.get('http://python.org') as resp:
            print(await resp.text())

asyncio.run(main())

Pitfall 2: Session as Class Variable

陷阱2:将Session作为类变量

Problem:
python
class API:
    session = aiohttp.ClientSession()  # BAD - global instance

    async def fetch(self, url):
        async with self.session.get(url) as resp:
            return await resp.text()
Solution:
python
class API:
    def __init__(self):
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, *args):
        await self.session.close()

    async def fetch(self, url):
        async with self.session.get(url) as resp:
            return await resp.text()
问题:
python
class API:
    session = aiohttp.ClientSession()  # 糟糕的实现 - 全局实例

    async def fetch(self, url):
        async with self.session.get(url) as resp:
            return await resp.text()
解决方案:
python
class API:
    def __init__(self):
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, *args):
        await self.session.close()

    async def fetch(self, url):
        async with self.session.get(url) as resp:
            return await resp.text()

Usage

使用方式

async def main(): async with API() as api: result = await api.fetch('http://example.com')
undefined
async def main(): async with API() as api: result = await api.fetch('http://example.com')
undefined

Pitfall 3: Forgetting await

陷阱3:忘记使用await

Problem:
python
async def process_data():
    # Forgot await - returns coroutine, doesn't execute!
    result = fetch_data()  # Missing await
    return result
Solution:
python
async def process_data():
    result = await fetch_data()  # Proper await
    return result
问题:
python
async def process_data():
    # 忘记await - 返回协程对象,不会执行!
    result = fetch_data()  # 缺少await
    return result
解决方案:
python
async def process_data():
    result = await fetch_data()  # 正确使用await
    return result

Pitfall 4: Blocking the Event Loop

陷阱4:阻塞Event Loop

Problem:
python
import asyncio
import time

async def bad_sleep():
    time.sleep(5)  # BAD - Blocks entire event loop!

async def main():
    await asyncio.gather(
        bad_sleep(),
        another_task()  # Blocked for 5 seconds
    )
Solution:
python
import asyncio

async def good_sleep():
    await asyncio.sleep(5)  # GOOD - Yields control

async def main():
    await asyncio.gather(
        good_sleep(),
        another_task()  # Runs concurrently
    )
问题:
python
import asyncio
import time

async def bad_sleep():
    time.sleep(5)  # 糟糕的实现 - 阻塞整个Event Loop!

async def main():
    await asyncio.gather(
        bad_sleep(),
        another_task()  # 被阻塞5秒
    )
解决方案:
python
import asyncio

async def good_sleep():
    await asyncio.sleep(5)  # 良好的实现 - 让出控制权

async def main():
    await asyncio.gather(
        good_sleep(),
        another_task()  # 并发执行
    )

Pitfall 5: Not Handling Task Cancellation

陷阱5:未处理任务取消

Problem:
python
async def bad_task():
    while True:
        await asyncio.sleep(1)
        process_data()
        # No cleanup on cancellation!
Solution:
python
async def good_task():
    try:
        while True:
            await asyncio.sleep(1)
            process_data()
    except asyncio.CancelledError:
        # Cleanup resources
        cleanup()
        raise  # Re-raise to mark as cancelled
问题:
python
async def bad_task():
    while True:
        await asyncio.sleep(1)
        process_data()
        # 取消时无清理逻辑!
解决方案:
python
async def good_task():
    try:
        while True:
            await asyncio.sleep(1)
            process_data()
    except asyncio.CancelledError:
        # 清理资源
        cleanup()
        raise  # 重新抛出以标记任务为已取消

Pitfall 6: Deadlocks with Locks

陷阱6:锁导致死锁

Problem:
python
import asyncio

lock1 = asyncio.Lock()
lock2 = asyncio.Lock()

async def task_a():
    async with lock1:
        await asyncio.sleep(0.1)
        async with lock2:  # Deadlock potential
            pass

async def task_b():
    async with lock2:
        await asyncio.sleep(0.1)
        async with lock1:  # Deadlock potential
            pass
Solution:
python
undefined
问题:
python
import asyncio

lock1 = asyncio.Lock()
lock2 = asyncio.Lock()

async def task_a():
    async with lock1:
        await asyncio.sleep(0.1)
        async with lock2:  # 可能导致死锁
            pass

async def task_b():
    async with lock2:
        await asyncio.sleep(0.1)
        async with lock1:  # 可能导致死锁
            pass
解决方案:
python
undefined

Always acquire locks in same order

始终按相同顺序获取锁

async def safe_task_a(): async with lock1: async with lock2: pass
async def safe_task_b(): async with lock1: # Same order async with lock2: pass
undefined
async def safe_task_a(): async with lock1: async with lock2: pass
async def safe_task_b(): async with lock1: # 相同顺序 async with lock2: pass
undefined

Production Patterns

生产级模式

Pattern 1: Graceful Shutdown

模式1:优雅关闭

Complete Shutdown Example:
python
import asyncio
import signal
from contextlib import suppress

class Application:
    def __init__(self):
        self.should_exit = False
        self.tasks = []

    async def worker(self, name):
        try:
            while not self.should_exit:
                print(f'{name} working...')
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            print(f'{name} cancelled, cleaning up...')
            raise

    def handle_signal(self, sig):
        print(f'Received signal {sig}, shutting down...')
        self.should_exit = True

    async def run(self):
        # Setup signal handlers
        loop = asyncio.get_running_loop()
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(
                sig,
                lambda s=sig: self.handle_signal(s)
            )

        # Start workers
        self.tasks = [
            asyncio.create_task(self.worker(f'Worker-{i}'))
            for i in range(3)
        ]

        # Wait for shutdown signal
        while not self.should_exit:
            await asyncio.sleep(0.1)

        # Cancel all tasks
        for task in self.tasks:
            task.cancel()

        # Wait for cancellation to complete
        await asyncio.gather(*self.tasks, return_exceptions=True)

        print('Shutdown complete')
完整关闭示例:
python
import asyncio
import signal
from contextlib import suppress

class Application:
    def __init__(self):
        self.should_exit = False
        self.tasks = []

    async def worker(self, name):
        try:
            while not self.should_exit:
                print(f'{name} 工作中...')
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            print(f'{name} 已取消,正在清理...')
            raise

    def handle_signal(self, sig):
        print(f'收到信号 {sig},正在关闭...')
        self.should_exit = True

    async def run(self):
        # 设置信号处理器
        loop = asyncio.get_running_loop()
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(
                sig,
                lambda s=sig: self.handle_signal(s)
            )

        # 启动工作协程
        self.tasks = [
            asyncio.create_task(self.worker(f'Worker-{i}'))
            for i in range(3)
        ]

        # 等待关闭信号
        while not self.should_exit:
            await asyncio.sleep(0.1)

        # 取消所有任务
        for task in self.tasks:
            task.cancel()

        # 等待取消完成
        await asyncio.gather(*self.tasks, return_exceptions=True)

        print('关闭完成')

Run application

运行应用

app = Application() asyncio.run(app.run())
undefined
app = Application() asyncio.run(app.run())
undefined

Pattern 2: Background Tasks with Application Lifecycle

模式2:与应用生命周期绑定的后台任务

aiohttp Application with Background Tasks:
python
import asyncio
from contextlib import suppress
from aiohttp import web

async def listen_to_redis(app):
    """Background task that listens to Redis"""
    # Simulated Redis listening
    try:
        while True:
            # Process messages
            await asyncio.sleep(1)
            print('Processing Redis message...')
    except asyncio.CancelledError:
        print('Redis listener stopped')
        raise

async def background_tasks(app):
    """Cleanup context for managing background tasks"""
    # Startup: Create background task
    app['redis_listener'] = asyncio.create_task(listen_to_redis(app))

    yield  # App is running

    # Cleanup: Cancel background task
    app['redis_listener'].cancel()
    with suppress(asyncio.CancelledError):
        await app['redis_listener']
带后台任务的aiohttp应用:
python
import asyncio
from contextlib import suppress
from aiohttp import web

async def listen_to_redis(app):
    """监听Redis的后台任务"""
    # 模拟Redis监听
    try:
        while True:
            # 处理消息
            await asyncio.sleep(1)
            print('处理Redis消息...')
    except asyncio.CancelledError:
        print('Redis监听器已停止')
        raise

async def background_tasks(app):
    """管理后台任务的清理上下文"""
    # 启动:创建后台任务
    app['redis_listener'] = asyncio.create_task(listen_to_redis(app))

    yield  # 应用运行中

    # 清理:取消后台任务
    app['redis_listener'].cancel()
    with suppress(asyncio.CancelledError):
        await app['redis_listener']

Setup application

配置应用

app = web.Application() app.cleanup_ctx.append(background_tasks)
undefined
app = web.Application() app.cleanup_ctx.append(background_tasks)
undefined

Pattern 3: Retry Logic with Exponential Backoff

模式3:带指数退避的重试逻辑

python
import asyncio
import aiohttp
from typing import Any, Callable

async def retry_with_backoff(
    coro_func: Callable,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    *args,
    **kwargs
) -> Any:
    """
    Retry async function with exponential backoff

    Args:
        coro_func: Async function to retry
        max_retries: Maximum number of retries
        base_delay: Initial delay between retries
        max_delay: Maximum delay between retries
    """
    for attempt in range(max_retries):
        try:
            return await coro_func(*args, **kwargs)
        except Exception as e:
            if attempt == max_retries - 1:
                # Last attempt failed
                raise

            # Calculate delay with exponential backoff
            delay = min(base_delay * (2 ** attempt), max_delay)

            print(f'Attempt {attempt + 1} failed: {e}')
            print(f'Retrying in {delay:.1f} seconds...')

            await asyncio.sleep(delay)
python
import asyncio
import aiohttp
from typing import Any, Callable

async def retry_with_backoff(
    coro_func: Callable,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    *args,
    **kwargs
) -> Any:
    """
    带指数退避的异步函数重试机制

    参数:
        coro_func: 要重试的异步函数
        max_retries: 最大重试次数
        base_delay: 初始重试延迟
        max_delay: 最大重试延迟
    """
    for attempt in range(max_retries):
        try:
            return await coro_func(*args, **kwargs)
        except Exception as e:
            if attempt == max_retries - 1:
                # 最后一次尝试失败
                raise

            # 计算指数退避延迟
            delay = min(base_delay * (2 ** attempt), max_delay)

            print(f'第 {attempt + 1} 次尝试失败: {e}')
            print(f'将在 {delay:.1f} 秒后重试...')

            await asyncio.sleep(delay)

Usage

使用方式

async def unstable_api_call(): async with aiohttp.ClientSession() as session: async with session.get('http://unstable-api.com') as resp: return await resp.json()
async def main(): result = await retry_with_backoff( unstable_api_call, max_retries=5, base_delay=1.0 ) return result
undefined
async def unstable_api_call(): async with aiohttp.ClientSession() as session: async with session.get('http://unstable-api.com') as resp: return await resp.json()
async def main(): result = await retry_with_backoff( unstable_api_call, max_retries=5, base_delay=1.0 ) return result
undefined

Pattern 4: Circuit Breaker

模式4:熔断器

python
import asyncio
from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold

        self.failure_count = 0
        self.success_count = 0
        self.state = CircuitState.CLOSED
        self.opened_at = None

    async def call(self, coro_func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            # Check if should try recovery
            if datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout):
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
            else:
                raise Exception('Circuit breaker is OPEN')

        try:
            result = await coro_func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failure_count = 0

        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.success_count = 0

    def _on_failure(self):
        self.failure_count += 1

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()
python
import asyncio
from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # 正常运行
    OPEN = "open"          # 故障中,拒绝请求
    HALF_OPEN = "half_open"  # 测试服务是否恢复

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold

        self.failure_count = 0
        self.success_count = 0
        self.state = CircuitState.CLOSED
        self.opened_at = None

    async def call(self, coro_func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            # 检查是否应尝试恢复
            if datetime.now() - self.opened_at > timedelta(seconds=self.recovery_timeout):
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
            else:
                raise Exception('熔断器已打开')

        try:
            result = await coro_func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failure_count = 0

        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.success_count = 0

    def _on_failure(self):
        self.failure_count += 1

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            self.opened_at = datetime.now()

Usage

使用方式

async def flaky_service(): # Simulated flaky service import random await asyncio.sleep(0.1) if random.random() < 0.5: raise Exception('Service error') return 'Success'
async def main(): breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=5.0)
for i in range(20):
    try:
        result = await breaker.call(flaky_service)
        print(f'Request {i}: {result} - State: {breaker.state.value}')
    except Exception as e:
        print(f'Request {i}: Failed - State: {breaker.state.value}')

    await asyncio.sleep(0.5)
undefined
async def flaky_service(): # 模拟不稳定服务 import random await asyncio.sleep(0.1) if random.random() < 0.5: raise Exception('服务错误') return 'Success'
async def main(): breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=5.0)
for i in range(20):
    try:
        result = await breaker.call(flaky_service)
        print(f'请求 {i}: {result} - 状态: {breaker.state.value}')
    except Exception as e:
        print(f'请求 {i}: 失败 - 状态: {breaker.state.value}')

    await asyncio.sleep(0.5)
undefined

Pattern 5: WebSocket with Multiple Event Sources

模式5:多事件源WebSocket

Handling Parallel WebSocket and Background Events:
python
import asyncio
from aiohttp import web

async def read_subscription(ws, redis):
    """Background task reading from Redis and sending to WebSocket"""
    # Simulated Redis subscription
    channel = await redis.subscribe('channel:1')

    try:
        # Simulate receiving messages
        for i in range(10):
            await asyncio.sleep(1)
            message = f'Redis message {i}'
            await ws.send_str(message)
    finally:
        await redis.unsubscribe('channel:1')

async def websocket_handler(request):
    """WebSocket handler with parallel event sources"""
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    # Create background task for Redis subscription
    redis = request.app['redis']
    task = asyncio.create_task(read_subscription(ws, redis))

    try:
        # Handle incoming WebSocket messages
        async for msg in ws:
            if msg.type == web.WSMsgType.TEXT:
                # Process incoming message
                await ws.send_str(f'Echo: {msg.data}')
            elif msg.type == web.WSMsgType.ERROR:
                print(f'WebSocket error: {ws.exception()}')
    finally:
        # Cleanup: Cancel background task
        task.cancel()

    return ws
处理并行WebSocket与后台事件:
python
import asyncio
from aiohttp import web

async def read_subscription(ws, redis):
    """从Redis读取并发送到WebSocket的后台任务"""
    # 模拟Redis订阅
    channel = await redis.subscribe('channel:1')

    try:
        # 模拟接收消息
        for i in range(10):
            await asyncio.sleep(1)
            message = f'Redis消息 {i}'
            await ws.send_str(message)
    finally:
        await redis.unsubscribe('channel:1')

async def websocket_handler(request):
    """带并行事件源的WebSocket处理器"""
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    # 创建Redis订阅后台任务
    redis = request.app['redis']
    task = asyncio.create_task(read_subscription(ws, redis))

    try:
        # 处理WebSocket传入消息
        async for msg in ws:
            if msg.type == web.WSMsgType.TEXT:
                # 处理传入消息
                await ws.send_str(f'回声: {msg.data}')
            elif msg.type == web.WSMsgType.ERROR:
                print(f'WebSocket错误: {ws.exception()}')
    finally:
        # 清理:取消后台任务
        task.cancel()

    return ws

Best Practices

最佳实践

Testing Async Code

测试异步代码

Using pytest-asyncio:
python
import pytest
import asyncio

@pytest.mark.asyncio
async def test_async_function():
    result = await async_operation()
    assert result == 'expected'

@pytest.mark.asyncio
async def test_with_fixture(aiohttp_client):
    client = await aiohttp_client(create_app())
    resp = await client.get('/')
    assert resp.status == 200
Manual Event Loop Setup:
python
import asyncio
import unittest

class TestAsyncCode(unittest.TestCase):
    def setUp(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)

    def tearDown(self):
        self.loop.close()

    def test_coroutine(self):
        async def test_impl():
            result = await async_function()
            self.assertEqual(result, 'expected')

        self.loop.run_until_complete(test_impl())
使用pytest-asyncio:
python
import pytest
import asyncio

@pytest.mark.asyncio
async def test_async_function():
    result = await async_operation()
    assert result == 'expected'

@pytest.mark.asyncio
async def test_with_fixture(aiohttp_client):
    client = await aiohttp_client(create_app())
    resp = await client.get('/')
    assert resp.status == 200
手动Event Loop配置:
python
import asyncio
import unittest

class TestAsyncCode(unittest.TestCase):
    def setUp(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)

    def tearDown(self):
        self.loop.close()

    def test_coroutine(self):
        async def test_impl():
            result = await async_function()
            self.assertEqual(result, 'expected')

        self.loop.run_until_complete(test_impl())

Debugging Async Code

调试异步代码

Enable Debug Mode:
python
import asyncio
import warnings
启用调试模式:
python
import asyncio
import warnings

Enable asyncio debug mode

启用asyncio调试模式

asyncio.run(main(), debug=True)
asyncio.run(main(), debug=True)

Or manually

或手动配置

loop = asyncio.get_event_loop() loop.set_debug(True) loop.run_until_complete(main())

**What debug mode detects:**
- Coroutines that were never awaited
- Callbacks taking too long
- Tasks destroyed while pending

**Logging Slow Callbacks:**

```python
import asyncio
import logging

logging.basicConfig(level=logging.DEBUG)

loop = asyncio.get_event_loop()
loop.slow_callback_duration = 0.1  # 100ms threshold
loop.set_debug(True)
loop = asyncio.get_event_loop() loop.set_debug(True) loop.run_until_complete(main())

**调试模式检测内容:**
- 从未被await的协程
- 执行时间过长的回调
- 被销毁时仍处于挂起状态的任务

**记录慢回调:**

```python
import asyncio
import logging

logging.basicConfig(level=logging.DEBUG)

loop = asyncio.get_event_loop()
loop.slow_callback_duration = 0.1  # 100ms阈值
loop.set_debug(True)

Documentation

文档编写

Documenting Async Functions:
python
async def fetch_user_data(user_id: int) -> dict:
    """
    Fetch user data from the database.

    Args:
        user_id: The unique identifier of the user

    Returns:
        Dictionary containing user data

    Raises:
        UserNotFoundError: If user doesn't exist
        DatabaseError: If database connection fails

    Example:
        >>> async def main():
        ...     user = await fetch_user_data(123)
        ...     print(user['name'])

    Note:
        This function must be called within an async context.
        Connection pooling is handled automatically.
    """
    async with get_db_connection() as conn:
        return await conn.fetch_one(
            'SELECT * FROM users WHERE id = $1',
            user_id
        )
异步函数文档:
python
async def fetch_user_data(user_id: int) -> dict:
    """
    从数据库获取用户数据。

    参数:
        user_id: 用户的唯一标识符

    返回:
        包含用户数据的字典

    抛出:
        UserNotFoundError: 用户不存在时抛出
        DatabaseError: 数据库连接失败时抛出

    示例:
        >>> async def main():
        ...     user = await fetch_user_data(123)
        ...     print(user['name'])

    注意:
        此函数必须在异步上下文中调用。
        连接池已自动处理。
    """
    async with get_db_connection() as conn:
        return await conn.fetch_one(
            'SELECT * FROM users WHERE id = $1',
            user_id
        )

Complete Examples

完整示例

Example 1: Parallel HTTP Requests

示例1:并行HTTP请求

python
import asyncio
import aiohttp
import time

async def fetch(session, url):
    """Fetch a single URL"""
    async with session.get(url) as response:
        return {
            'url': url,
            'status': response.status,
            'length': len(await response.text())
        }

async def fetch_all(urls):
    """Fetch multiple URLs concurrently"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def main():
    urls = [
        'http://python.org',
        'http://docs.python.org',
        'http://pypi.org',
        'http://github.com/python',
        'http://www.python.org/dev/peps/'
    ]

    start = time.perf_counter()
    results = await fetch_all(urls)
    elapsed = time.perf_counter() - start

    for result in results:
        print(f"{result['url']}: {result['status']} ({result['length']} bytes)")

    print(f"\nFetched {len(urls)} URLs in {elapsed:.2f} seconds")

asyncio.run(main())
python
import asyncio
import aiohttp
import time

async def fetch(session, url):
    """获取单个URL"""
    async with session.get(url) as response:
        return {
            'url': url,
            'status': response.status,
            'length': len(await response.text())
        }

async def fetch_all(urls):
    """并发获取多个URL"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def main():
    urls = [
        'http://python.org',
        'http://docs.python.org',
        'http://pypi.org',
        'http://github.com/python',
        'http://www.python.org/dev/peps/'
    ]

    start = time.perf_counter()
    results = await fetch_all(urls)
    elapsed = time.perf_counter() - start

    for result in results:
        print(f"{result['url']}: {result['status']} ({result['length']} 字节)")

    print(f"\n在 {elapsed:.2f} 秒内获取了 {len(urls)} 个URL")

asyncio.run(main())

Example 2: Rate-Limited API Client

示例2:限流API客户端

python
import asyncio
import aiohttp
from typing import List, Dict, Any

class RateLimitedClient:
    def __init__(self, rate_limit: int = 10):
        """
        Args:
            rate_limit: Maximum concurrent requests
        """
        self.semaphore = asyncio.Semaphore(rate_limit)
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, *args):
        await self.session.close()
        # Allow connections to close
        await asyncio.sleep(0)

    async def fetch(self, url: str) -> Dict[str, Any]:
        """Fetch URL with rate limiting"""
        async with self.semaphore:
            print(f'Fetching {url}')
            async with self.session.get(url) as resp:
                return {
                    'url': url,
                    'status': resp.status,
                    'data': await resp.json()
                }

    async def fetch_all(self, urls: List[str]) -> List[Dict[str, Any]]:
        """Fetch all URLs with rate limiting"""
        tasks = [self.fetch(url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

async def main():
    urls = [f'https://api.github.com/users/{user}'
            for user in ['python', 'django', 'flask', 'requests', 'aiohttp']]

    async with RateLimitedClient(rate_limit=2) as client:
        results = await client.fetch_all(urls)

        for result in results:
            if isinstance(result, Exception):
                print(f'Error: {result}')
            else:
                print(f"User: {result['data'].get('login', 'unknown')}")

asyncio.run(main())
python
import asyncio
import aiohttp
from typing import List, Dict, Any

class RateLimitedClient:
    def __init__(self, rate_limit: int = 10):
        """
        参数:
            rate_limit: 最大并发请求数
        """
        self.semaphore = asyncio.Semaphore(rate_limit)
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, *args):
        await self.session.close()
        # 等待连接关闭
        await asyncio.sleep(0)

    async def fetch(self, url: str) -> Dict[str, Any]:
        """带限流的URL获取"""
        async with self.semaphore:
            print(f'正在获取 {url}')
            async with self.session.get(url) as resp:
                return {
                    'url': url,
                    'status': resp.status,
                    'data': await resp.json()
                }

    async def fetch_all(self, urls: List[str]) -> List[Dict[str, Any]]:
        """带限流的批量URL获取"""
        tasks = [self.fetch(url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

async def main():
    urls = [f'https://api.github.com/users/{user}'
            for user in ['python', 'django', 'flask', 'requests', 'aiohttp']]

    async with RateLimitedClient(rate_limit=2) as client:
        results = await client.fetch_all(urls)

        for result in results:
            if isinstance(result, Exception):
                print(f'错误: {result}')
            else:
                print(f"用户: {result['data'].get('login', 'unknown')}")

asyncio.run(main())

Example 3: Database Connection Pool

示例3:数据库连接池

python
import asyncio
from typing import List, Any

class AsyncConnectionPool:
    def __init__(self, size: int = 10):
        self.pool = asyncio.Queue(maxsize=size)
        self.size = size

    async def init(self):
        """Initialize connection pool"""
        for i in range(self.size):
            conn = await self._create_connection(i)
            await self.pool.put(conn)

    async def _create_connection(self, conn_id: int):
        """Create a database connection (simulated)"""
        await asyncio.sleep(0.1)  # Simulate connection time
        return {'id': conn_id, 'connected': True}

    async def acquire(self):
        """Acquire connection from pool"""
        return await self.pool.get()

    async def release(self, conn):
        """Release connection back to pool"""
        await self.pool.put(conn)

    async def execute(self, query: str) -> Any:
        """Execute query using pooled connection"""
        conn = await self.acquire()
        try:
            # Simulate query execution
            await asyncio.sleep(0.05)
            return f"Query '{query}' executed on connection {conn['id']}"
        finally:
            await self.release(conn)

    async def close(self):
        """Close all connections"""
        while not self.pool.empty():
            conn = await self.pool.get()
            # Close connection (simulated)
            conn['connected'] = False

async def worker(pool: AsyncConnectionPool, worker_id: int):
    """Worker that executes queries"""
    for i in range(5):
        result = await pool.execute(f'SELECT * FROM table WHERE id={i}')
        print(f'Worker {worker_id}: {result}')

async def main():
    # Create and initialize pool
    pool = AsyncConnectionPool(size=5)
    await pool.init()

    # Run multiple workers concurrently
    await asyncio.gather(*[
        worker(pool, i) for i in range(10)
    ])

    # Cleanup
    await pool.close()

asyncio.run(main())
python
import asyncio
from typing import List, Any

class AsyncConnectionPool:
    def __init__(self, size: int = 10):
        self.pool = asyncio.Queue(maxsize=size)
        self.size = size

    async def init(self):
        """初始化连接池"""
        for i in range(self.size):
            conn = await self._create_connection(i)
            await self.pool.put(conn)

    async def _create_connection(self, conn_id: int):
        """创建数据库连接(模拟)"""
        await asyncio.sleep(0.1)  # 模拟连接耗时
        return {'id': conn_id, 'connected': True}

    async def acquire(self):
        """从池获取连接"""
        return await self.pool.get()

    async def release(self, conn):
        """将连接放回池"""
        await self.pool.put(conn)

    async def execute(self, query: str) -> Any:
        """使用池连接执行查询"""
        conn = await self.acquire()
        try:
            # 模拟查询执行
            await asyncio.sleep(0.05)
            return f"查询 '{query}' 在连接 {conn['id']} 上执行"
        finally:
            await self.release(conn)

    async def close(self):
        """关闭所有连接"""
        while not self.pool.empty():
            conn = await self.pool.get()
            # 关闭连接(模拟)
            conn['connected'] = False

async def worker(pool: AsyncConnectionPool, worker_id: int):
    """执行查询的工作协程"""
    for i in range(5):
        result = await pool.execute(f'SELECT * FROM table WHERE id={i}')
        print(f'工作协程 {worker_id}: {result}')

async def main():
    # 创建并初始化连接池
    pool = AsyncConnectionPool(size=5)
    await pool.init()

    # 并发运行多个工作协程
    await asyncio.gather(*[
        worker(pool, i) for i in range(10)
    ])

    # 清理
    await pool.close()

asyncio.run(main())

Example 4: Real-Time Data Processor

示例4:实时数据处理器

python
import asyncio
import random
from datetime import datetime

class DataProcessor:
    def __init__(self):
        self.queue = asyncio.Queue()
        self.processed = 0
        self.errors = 0

    async def producer(self, producer_id: int):
        """Produce data items"""
        for i in range(10):
            await asyncio.sleep(random.uniform(0.1, 0.5))
            item = {
                'producer_id': producer_id,
                'item_id': i,
                'timestamp': datetime.now(),
                'data': random.randint(1, 100)
            }
            await self.queue.put(item)
            print(f'Producer {producer_id} generated item {i}')

        # Signal completion
        await self.queue.put(None)

    async def consumer(self, consumer_id: int):
        """Consume and process data items"""
        while True:
            item = await self.queue.get()

            if item is None:
                # Propagate sentinel
                await self.queue.put(None)
                break

            try:
                # Simulate processing
                await asyncio.sleep(random.uniform(0.05, 0.2))

                # Process item
                result = item['data'] * 2
                print(f"Consumer {consumer_id} processed: {item['item_id']} -> {result}")

                self.processed += 1
            except Exception as e:
                print(f'Consumer {consumer_id} error: {e}')
                self.errors += 1
            finally:
                self.queue.task_done()

    async def monitor(self):
        """Monitor processing statistics"""
        while True:
            await asyncio.sleep(2)
            print(f'\n=== Stats: Processed={self.processed}, Errors={self.errors}, Queue={self.queue.qsize()} ===\n')

    async def run(self, num_producers: int = 3, num_consumers: int = 5):
        """Run the data processor"""
        # Start monitor
        monitor_task = asyncio.create_task(self.monitor())

        # Start producers and consumers
        await asyncio.gather(
            *[self.producer(i) for i in range(num_producers)],
            *[self.consumer(i) for i in range(num_consumers)]
        )

        # Cancel monitor
        monitor_task.cancel()

        print(f'\nFinal Stats: Processed={self.processed}, Errors={self.errors}')

async def main():
    processor = DataProcessor()
    await processor.run(num_producers=3, num_consumers=5)

asyncio.run(main())
python
import asyncio
import random
from datetime import datetime

class DataProcessor:
    def __init__(self):
        self.queue = asyncio.Queue()
        self.processed = 0
        self.errors = 0

    async def producer(self, producer_id: int):
        """生产数据项"""
        for i in range(10):
            await asyncio.sleep(random.uniform(0.1, 0.5))
            item = {
                'producer_id': producer_id,
                'item_id': i,
                'timestamp': datetime.now(),
                'data': random.randint(1, 100)
            }
            await self.queue.put(item)
            print(f'生产者 {producer_id} 生成项 {i}')

        # 发送完成信号
        await self.queue.put(None)

    async def consumer(self, consumer_id: int):
        """消费并处理数据项"""
        while True:
            item = await self.queue.get()

            if item is None:
                # 传递结束信号
                await self.queue.put(None)
                break

            try:
                # 模拟处理
                await asyncio.sleep(random.uniform(0.05, 0.2))

                # 处理数据项
                result = item['data'] * 2
                print(f"消费者 {consumer_id} 已处理: {item['item_id']} -> {result}")

                self.processed += 1
            except Exception as e:
                print(f'消费者 {consumer_id} 错误: {e}')
                self.errors += 1
            finally:
                self.queue.task_done()

    async def monitor(self):
        """监控处理统计数据"""
        while True:
            await asyncio.sleep(2)
            print(f'\n=== 统计: 已处理={self.processed}, 错误={self.errors}, 队列长度={self.queue.qsize()} ===\n')

    async def run(self, num_producers: int = 3, num_consumers: int = 5):
        """运行数据处理器"""
        # 启动监控
        monitor_task = asyncio.create_task(self.monitor())

        # 启动生产者和消费者
        await asyncio.gather(
            *[self.producer(i) for i in range(num_producers)],
            *[self.consumer(i) for i in range(num_consumers)]
        )

        # 取消监控
        monitor_task.cancel()

        print(f'\n最终统计: 已处理={self.processed}, 错误={self.errors}')

async def main():
    processor = DataProcessor()
    await processor.run(num_producers=3, num_consumers=5)

asyncio.run(main())

Example 5: Async File I/O with aiofiles

示例5:使用aiofiles的异步文件I/O

python
import asyncio
import aiofiles
from pathlib import Path

async def write_file(path: str, content: str):
    """Write content to file asynchronously"""
    async with aiofiles.open(path, 'w') as f:
        await f.write(content)

async def read_file(path: str) -> str:
    """Read file content asynchronously"""
    async with aiofiles.open(path, 'r') as f:
        return await f.read()

async def process_files(file_paths: list):
    """Process multiple files concurrently"""
    tasks = [read_file(path) for path in file_paths]
    contents = await asyncio.gather(*tasks)

    # Process contents
    results = []
    for path, content in zip(file_paths, contents):
        result = {
            'path': path,
            'lines': len(content.split('\n')),
            'words': len(content.split()),
            'chars': len(content)
        }
        results.append(result)

    return results

async def main():
    # Create test files
    test_files = ['test1.txt', 'test2.txt', 'test3.txt']

    # Write files concurrently
    await asyncio.gather(*[
        write_file(f, f'Content of file {f}\n' * 10)
        for f in test_files
    ])

    # Process files
    results = await process_files(test_files)

    for result in results:
        print(f"{result['path']}: {result['lines']} lines, "
              f"{result['words']} words, {result['chars']} chars")

    # Cleanup
    for f in test_files:
        Path(f).unlink(missing_ok=True)
python
import asyncio
import aiofiles
from pathlib import Path

async def write_file(path: str, content: str):
    """异步写入文件"""
    async with aiofiles.open(path, 'w') as f:
        await f.write(content)

async def read_file(path: str) -> str:
    """异步读取文件"""
    async with aiofiles.open(path, 'r') as f:
        return await f.read()

async def process_files(file_paths: list):
    """并发处理多个文件"""
    tasks = [read_file(path) for path in file_paths]
    contents = await asyncio.gather(*tasks)

    # 处理内容
    results = []
    for path, content in zip(file_paths, contents):
        result = {
            'path': path,
            'lines': len(content.split('\n')),
            'words': len(content.split()),
            'chars': len(content)
        }
        results.append(result)

    return results

async def main():
    # 创建测试文件
    test_files = ['test1.txt', 'test2.txt', 'test3.txt']

    # 并发写入文件
    await asyncio.gather(*[
        write_file(f, f'文件 {f} 的内容\n' * 10)
        for f in test_files
    ])

    # 处理文件
    results = await process_files(test_files)

    for result in results:
        print(f"{result['path']}: {result['lines']} 行, "
              f"{result['words']} 词, {result['chars']} 字符")

    # 清理
    for f in test_files:
        Path(f).unlink(missing_ok=True)

asyncio.run(main()) # Uncomment to run (requires aiofiles)

asyncio.run(main()) # 取消注释以运行(需要安装aiofiles)

undefined
undefined

Resources

资源


Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: Concurrency, Performance, Async Programming Compatible With: Python 3.7+, aiohttp, asyncio, uvloop

指南版本: 1.0.0 最后更新: 2025年10月 指南分类: 并发、性能、异步编程 兼容版本: Python 3.7+, aiohttp, asyncio, uvloop