asyncio

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Python asyncio - Async/Await Concurrency

Python asyncio - Async/Await 并发编程

Overview

概述

Python's asyncio library enables writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like HTTP requests, database queries, file operations, and WebSocket connections. asyncio provides non-blocking execution without the complexity of threading or multiprocessing.
Key Features:
  • async/await syntax for readable concurrent code
  • Event loop for managing concurrent operations
  • Tasks for running multiple coroutines concurrently
  • Primitives: locks, semaphores, events, queues
  • HTTP client/server with aiohttp
  • Database async support (asyncpg, aiomysql, motor)
  • FastAPI async endpoints
  • WebSocket support
  • Background task management
Installation:
bash
undefined
Python的asyncio库支持使用async/await语法编写并发代码。它非常适合I/O密集型操作,如HTTP请求、数据库查询、文件操作和WebSocket连接。asyncio无需线程或多进程的复杂操作即可实现非阻塞执行。
核心特性:
  • 采用async/await语法编写易读的并发代码
  • 事件循环用于管理并发操作
  • 任务用于同时运行多个协程
  • 原语:锁、信号量、事件、队列
  • 基于aiohttp的HTTP客户端/服务器
  • 异步数据库支持(asyncpg、aiomysql、motor)
  • FastAPI异步端点
  • WebSocket支持
  • 后台任务管理
安装:
bash
undefined

asyncio is built-in (Python 3.7+)

asyncio是Python 3.7+的内置库

Async HTTP client

异步HTTP客户端

pip install aiohttp
pip install aiohttp

Async HTTP requests (alternative)

异步HTTP请求(替代方案)

pip install httpx
pip install httpx

Async database drivers

异步数据库驱动

pip install asyncpg aiomysql motor # PostgreSQL, MySQL, MongoDB
pip install asyncpg aiomysql motor # PostgreSQL、MySQL、MongoDB

FastAPI with async support

带异步支持的FastAPI

pip install fastapi uvicorn[standard]
pip install fastapi uvicorn[standard]

Async testing

异步测试

pip install pytest-asyncio
undefined
pip install pytest-asyncio
undefined

Basic Async/Await Patterns

基础Async/Await模式

1. Simple Async Function

1. 简单异步函数

python
import asyncio

async def hello():
    """Basic async function (coroutine)."""
    print("Hello")
    await asyncio.sleep(1)  # Async sleep (non-blocking)
    print("World")
    return "Done"
python
import asyncio

async def hello():
    """基础异步函数(协程)。"""
    print("Hello")
    await asyncio.sleep(1)  # 异步休眠(非阻塞)
    print("World")
    return "Done"

Run async function

运行异步函数

result = asyncio.run(hello()) print(result) # "Done"

**Key Points**:
- `async def` defines a coroutine function
- `await` suspends execution until awaitable completes
- `asyncio.run()` is the entry point for async programs
- Coroutines must be awaited or scheduled
result = asyncio.run(hello()) print(result) # "Done"

**关键点**:
- `async def` 定义协程函数
- `await` 暂停执行,直到等待对象完成
- `asyncio.run()` 是异步程序的入口点
- 协程必须被await或调度执行

2. Multiple Concurrent Tasks

2. 多任务并发执行

python
import asyncio
import time

async def task(name, duration):
    """Simulate async task."""
    print(f"{name}: Starting (duration: {duration}s)")
    await asyncio.sleep(duration)
    print(f"{name}: Complete")
    return f"{name} result"

async def run_concurrent():
    """Run multiple tasks concurrently."""
    start = time.time()

    # Sequential (slow) - 6 seconds total
    # result1 = await task("Task 1", 3)
    # result2 = await task("Task 2", 2)
    # result3 = await task("Task 3", 1)

    # Concurrent (fast) - 3 seconds total
    results = await asyncio.gather(
        task("Task 1", 3),
        task("Task 2", 2),
        task("Task 3", 1)
    )

    elapsed = time.time() - start
    print(f"Total time: {elapsed:.2f}s")
    print(f"Results: {results}")

asyncio.run(run_concurrent())
python
import asyncio
import time

async def task(name, duration):
    """模拟异步任务。"""
    print(f"{name}: 启动(时长: {duration}s)")
    await asyncio.sleep(duration)
    print(f"{name}: 完成")
    return f"{name} result"

async def run_concurrent():
    """同时运行多个任务。"""
    start = time.time()

    # 顺序执行(慢)- 总耗时6秒
    # result1 = await task("Task 1", 3)
    # result2 = await task("Task 2", 2)
    # result3 = await task("Task 3", 1)

    # 并发执行(快)- 总耗时3秒
    results = await asyncio.gather(
        task("Task 1", 3),
        task("Task 2", 2),
        task("Task 3", 1)
    )

    elapsed = time.time() - start
    print(f"总耗时: {elapsed:.2f}s")
    print(f"结果: {results}")

asyncio.run(run_concurrent())

Output: Total time: 3.00s (tasks ran concurrently)

输出: 总耗时: 3.00s(任务并发执行)

undefined
undefined

3. Task Creation and Management

3. 任务创建与管理

python
import asyncio

async def background_task(name):
    """Long-running background task."""
    for i in range(5):
        print(f"{name}: iteration {i}")
        await asyncio.sleep(1)
    return f"{name} complete"

async def main():
    # Create task (starts immediately)
    task1 = asyncio.create_task(background_task("Task-1"))
    task2 = asyncio.create_task(background_task("Task-2"))

    # Do other work while tasks run
    print("Main: doing other work")
    await asyncio.sleep(2)

    # Wait for tasks to complete
    result1 = await task1
    result2 = await task2

    print(f"Results: {result1}, {result2}")

asyncio.run(main())
python
import asyncio

async def background_task(name):
    """长期运行的后台任务。"""
    for i in range(5):
        print(f"{name}: 迭代 {i}")
        await asyncio.sleep(1)
    return f"{name} complete"

async def main():
    # 创建任务(立即启动)
    task1 = asyncio.create_task(background_task("Task-1"))
    task2 = asyncio.create_task(background_task("Task-2"))

    # 任务运行时处理其他工作
    print("Main: 处理其他工作")
    await asyncio.sleep(2)

    # 等待任务完成
    result1 = await task1
    result2 = await task2

    print(f"结果: {result1}, {result2}")

asyncio.run(main())

4. Error Handling in Async Code

4. 异步代码中的错误处理

python
import asyncio

async def risky_operation(fail=False):
    """Operation that might fail."""
    await asyncio.sleep(1)
    if fail:
        raise ValueError("Operation failed")
    return "Success"

async def handle_errors():
    # Individual try/except
    try:
        result = await risky_operation(fail=True)
    except ValueError as e:
        print(f"Caught error: {e}")
        result = "Fallback value"

    # Gather with error handling
    results = await asyncio.gather(
        risky_operation(fail=False),
        risky_operation(fail=True),
        risky_operation(fail=False),
        return_exceptions=True  # Return exceptions instead of raising
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

asyncio.run(handle_errors())
python
import asyncio

async def risky_operation(fail=False):
    """可能失败的操作。"""
    await asyncio.sleep(1)
    if fail:
        raise ValueError("操作失败")
    return "Success"

async def handle_errors():
    # 单独的try/except
    try:
        result = await risky_operation(fail=True)
    except ValueError as e:
        print(f"捕获错误: {e}")
        result = "回退值"

    # 带错误处理的gather
    results = await asyncio.gather(
        risky_operation(fail=False),
        risky_operation(fail=True),
        risky_operation(fail=False),
        return_exceptions=True  # 返回异常而非抛出
    )

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务 {i} 失败: {result}")
        else:
            print(f"任务 {i} 成功: {result}")

asyncio.run(handle_errors())

Event Loop Fundamentals

事件循环基础

1. Event Loop Lifecycle

1. 事件循环生命周期

python
import asyncio
python
import asyncio

Modern approach (Python 3.7+)

现代方式(Python 3.7+)

async def main(): print("Main coroutine") await asyncio.sleep(1)
asyncio.run(main()) # Creates loop, runs main, closes loop
async def main(): print("主协程") await asyncio.sleep(1)
asyncio.run(main()) # 创建循环、运行主协程、关闭循环

Manual loop management (advanced use cases)

手动管理循环(高级用例)

async def manual_example(): loop = asyncio.get_event_loop()
# Schedule coroutine
task = loop.create_task(some_coroutine())

# Schedule callback
loop.call_later(5, callback_function)

# Run until complete
result = await task

return result
async def manual_example(): loop = asyncio.get_event_loop()
# 调度协程
task = loop.create_task(some_coroutine())

# 调度回调
loop.call_later(5, callback_function)

# 运行直到完成
result = await task

return result

Get current event loop

获取当前事件循环

async def get_current_loop(): loop = asyncio.get_running_loop() print(f"Loop: {loop}")
# Schedule callback in event loop
loop.call_soon(lambda: print("Callback executed"))

await asyncio.sleep(0)  # Let callback execute
undefined
async def get_current_loop(): loop = asyncio.get_running_loop() print(f"循环: {loop}")
# 在事件循环中调度回调
loop.call_soon(lambda: print("回调执行"))

await asyncio.sleep(0)  # 让回调执行
undefined

2. Loop Scheduling and Callbacks

2. 循环调度与回调

python
import asyncio
from datetime import datetime

def callback(name, loop):
    """Callback function (not async)."""
    print(f"{datetime.now()}: {name} callback executed")

    # Stop loop after callback
    # loop.stop()

async def schedule_callbacks():
    loop = asyncio.get_running_loop()

    # Schedule immediate callback
    loop.call_soon(callback, "Immediate", loop)

    # Schedule callback after delay
    loop.call_later(2, callback, "Delayed 2s", loop)

    # Schedule callback at specific time
    loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)

    # Wait for callbacks to execute
    await asyncio.sleep(5)

asyncio.run(schedule_callbacks())
python
import asyncio
from datetime import datetime

def callback(name, loop):
    """回调函数(非异步)。"""
    print(f"{datetime.now()}: {name} 回调执行")

    # 回调后停止循环
    # loop.stop()

async def schedule_callbacks():
    loop = asyncio.get_running_loop()

    # 调度立即执行的回调
    loop.call_soon(callback, "Immediate", loop)

    # 调度延迟执行的回调
    loop.call_later(2, callback, "Delayed 2s", loop)

    # 调度在特定时间执行的回调
    loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)

    # 等待回调执行
    await asyncio.sleep(5)

asyncio.run(schedule_callbacks())

3. Running Blocking Code

3. 运行阻塞代码

python
import asyncio
import time

def blocking_io():
    """CPU-intensive or blocking I/O operation."""
    print("Blocking operation started")
    time.sleep(2)  # Blocks thread
    print("Blocking operation complete")
    return "Blocking result"

async def run_in_executor():
    """Run blocking code in thread pool."""
    loop = asyncio.get_running_loop()

    # Run in default executor (thread pool)
    result = await loop.run_in_executor(
        None,  # Use default executor
        blocking_io
    )

    print(f"Result: {result}")
python
import asyncio
import time

def blocking_io():
    """CPU密集型或阻塞I/O操作。"""
    print("阻塞操作启动")
    time.sleep(2)  # 阻塞线程
    print("阻塞操作完成")
    return "Blocking result"

async def run_in_executor():
    """在线程池中运行阻塞代码。"""
    loop = asyncio.get_running_loop()

    # 在默认执行器(线程池)中运行
    result = await loop.run_in_executor(
        None,  # 使用默认执行器
        blocking_io
    )

    print(f"结果: {result}")

Run blocking operations concurrently

并发运行阻塞操作

async def concurrent_blocking(): loop = asyncio.get_running_loop()
# These run in thread pool, don't block event loop
results = await asyncio.gather(
    loop.run_in_executor(None, blocking_io),
    loop.run_in_executor(None, blocking_io),
    loop.run_in_executor(None, blocking_io)
)

print(f"All results: {results}")
asyncio.run(concurrent_blocking())
undefined
async def concurrent_blocking(): loop = asyncio.get_running_loop()
# 这些操作在线程池中运行,不会阻塞事件循环
results = await asyncio.gather(
    loop.run_in_executor(None, blocking_io),
    loop.run_in_executor(None, blocking_io),
    loop.run_in_executor(None, blocking_io)
)

print(f"所有结果: {results}")
asyncio.run(concurrent_blocking())
undefined

Asyncio Primitives

Asyncio原语

1. Locks for Mutual Exclusion

1. 用于互斥的锁

python
import asyncio
python
import asyncio

Shared resource

共享资源

counter = 0 lock = asyncio.Lock()
async def increment_with_lock(name): """Increment counter with lock protection.""" global counter
async with lock:
    # Critical section - only one task at a time
    print(f"{name}: acquired lock")
    current = counter
    await asyncio.sleep(0.1)  # Simulate processing
    counter = current + 1
    print(f"{name}: released lock, counter={counter}")
async def increment_without_lock(name): """Increment without lock - race condition!""" global counter
current = counter
await asyncio.sleep(0.1)  # Race condition window
counter = current + 1
print(f"{name}: counter={counter}")
async def test_locks(): global counter
# Without lock (race condition)
counter = 0
await asyncio.gather(
    increment_without_lock("Task-1"),
    increment_without_lock("Task-2"),
    increment_without_lock("Task-3")
)
print(f"Without lock: {counter}")  # Often wrong (< 3)

# With lock (correct)
counter = 0
await asyncio.gather(
    increment_with_lock("Task-1"),
    increment_with_lock("Task-2"),
    increment_with_lock("Task-3")
)
print(f"With lock: {counter}")  # Always 3
asyncio.run(test_locks())
undefined
counter = 0 lock = asyncio.Lock()
async def increment_with_lock(name): """使用锁保护的计数器递增。""" global counter
async with lock:
    # 临界区 - 同一时间仅一个任务执行
    print(f"{name}: 获取锁")
    current = counter
    await asyncio.sleep(0.1)  # 模拟处理
    counter = current + 1
    print(f"{name}: 释放锁, counter={counter}")
async def increment_without_lock(name): """无锁递增 - 存在竞态条件!""" global counter
current = counter
await asyncio.sleep(0.1)  # 竞态条件窗口
counter = current + 1
print(f"{name}: counter={counter}")
async def test_locks(): global counter
# 无锁(竞态条件)
counter = 0
await asyncio.gather(
    increment_without_lock("Task-1"),
    increment_without_lock("Task-2"),
    increment_without_lock("Task-3")
)
print(f"无锁结果: {counter}")  # 通常不正确(< 3)

# 有锁(正确)
counter = 0
await asyncio.gather(
    increment_with_lock("Task-1"),
    increment_with_lock("Task-2"),
    increment_with_lock("Task-3")
)
print(f"有锁结果: {counter}")  # 始终为3
asyncio.run(test_locks())
undefined

2. Semaphores for Resource Limiting

2. 用于资源限制的信号量

python
import asyncio
python
import asyncio

Limit concurrent operations

限制并发操作数

semaphore = asyncio.Semaphore(2) # Max 2 concurrent
async def limited_operation(name): """Operation limited by semaphore.""" print(f"{name}: waiting for semaphore")
async with semaphore:
    print(f"{name}: acquired semaphore")
    await asyncio.sleep(2)  # Simulate work
    print(f"{name}: releasing semaphore")
async def test_semaphore(): # Create 5 tasks, but only 2 run concurrently await asyncio.gather( limited_operation("Task-1"), limited_operation("Task-2"), limited_operation("Task-3"), limited_operation("Task-4"), limited_operation("Task-5") )
asyncio.run(test_semaphore())
semaphore = asyncio.Semaphore(2) # 最大2个并发
async def limited_operation(name): """受信号量限制的操作。""" print(f"{name}: 等待信号量")
async with semaphore:
    print(f"{name}: 获取信号量")
    await asyncio.sleep(2)  # 模拟工作
    print(f"{name}: 释放信号量")
async def test_semaphore(): # 创建5个任务,但仅2个可并发运行 await asyncio.gather( limited_operation("Task-1"), limited_operation("Task-2"), limited_operation("Task-3"), limited_operation("Task-4"), limited_operation("Task-5") )
asyncio.run(test_semaphore())

Only 2 tasks hold semaphore at any time

同一时间仅2个任务持有信号量

undefined
undefined

3. Events for Signaling

3. 用于信号通知的事件

python
import asyncio

event = asyncio.Event()

async def waiter(name):
    """Wait for event to be set."""
    print(f"{name}: waiting for event")
    await event.wait()  # Block until event is set
    print(f"{name}: event received!")

async def setter():
    """Set event after delay."""
    await asyncio.sleep(2)
    print("Setter: setting event")
    event.set()  # Wake up all waiters

async def test_event():
    # Create waiters
    await asyncio.gather(
        waiter("Waiter-1"),
        waiter("Waiter-2"),
        waiter("Waiter-3"),
        setter()
    )

asyncio.run(test_event())
python
import asyncio

event = asyncio.Event()

async def waiter(name):
    """等待事件被设置。"""
    print(f"{name}: 等待事件")
    await event.wait()  # 阻塞直到事件被设置
    print(f"{name}: 收到事件!")

async def setter():
    """延迟后设置事件。"""
    await asyncio.sleep(2)
    print("Setter: 设置事件")
    event.set()  # 唤醒所有等待者

async def test_event():
    # 创建等待者
    await asyncio.gather(
        waiter("Waiter-1"),
        waiter("Waiter-2"),
        waiter("Waiter-3"),
        setter()
    )

asyncio.run(test_event())

4. Queues for Task Distribution

4. 用于任务分发的队列

python
import asyncio
import random

async def producer(queue, name):
    """Produce items and add to queue."""
    for i in range(5):
        item = f"{name}-item-{i}"
        await queue.put(item)
        print(f"{name}: produced {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

    # Signal completion
    await queue.put(None)

async def consumer(queue, name):
    """Consume items from queue."""
    while True:
        item = await queue.get()  # Block until item available

        if item is None:  # Shutdown signal
            queue.task_done()
            break

        print(f"{name}: consumed {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def test_queue():
    queue = asyncio.Queue(maxsize=10)

    # Create producers and consumers
    await asyncio.gather(
        producer(queue, "Producer-1"),
        producer(queue, "Producer-2"),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
        consumer(queue, "Consumer-3")
    )

    # Wait for all items to be processed
    await queue.join()
    print("All tasks complete")

asyncio.run(test_queue())
python
import asyncio
import random

async def producer(queue, name):
    """生成项目并添加到队列。"""
    for i in range(5):
        item = f"{name}-item-{i}"
        await queue.put(item)
        print(f"{name}: 生成 {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

    # 发送完成信号
    await queue.put(None)

async def consumer(queue, name):
    """从队列消费项目。"""
    while True:
        item = await queue.get()  # 阻塞直到项目可用

        if item is None:  # 关闭信号
            queue.task_done()
            break

        print(f"{name}: 消费 {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def test_queue():
    queue = asyncio.Queue(maxsize=10)

    # 创建生产者和消费者
    await asyncio.gather(
        producer(queue, "Producer-1"),
        producer(queue, "Producer-2"),
        consumer(queue, "Consumer-1"),
        consumer(queue, "Consumer-2"),
        consumer(queue, "Consumer-3")
    )

    # 等待所有项目处理完成
    await queue.join()
    print("所有任务完成")

asyncio.run(test_queue())

5. Condition Variables

5. 条件变量

python
import asyncio

condition = asyncio.Condition()
items = []

async def consumer(name):
    """Wait for items to be available."""
    async with condition:
        # Wait until items are available
        await condition.wait_for(lambda: len(items) > 0)

        item = items.pop(0)
        print(f"{name}: consumed {item}")

async def producer(name):
    """Add items and notify consumers."""
    async with condition:
        item = f"{name}-item"
        items.append(item)
        print(f"{name}: produced {item}")

        # Notify one waiting consumer
        condition.notify(n=1)
        # Or notify all: condition.notify_all()

async def test_condition():
    await asyncio.gather(
        consumer("Consumer-1"),
        consumer("Consumer-2"),
        producer("Producer-1"),
        producer("Producer-2")
    )

asyncio.run(test_condition())
python
import asyncio

condition = asyncio.Condition()
items = []

async def consumer(name):
    """等待项目可用。"""
    async with condition:
        # 等待直到项目可用
        await condition.wait_for(lambda: len(items) > 0)

        item = items.pop(0)
        print(f"{name}: 消费 {item}")

async def producer(name):
    """添加项目并通知消费者。"""
    async with condition:
        item = f"{name}-item"
        items.append(item)
        print(f"{name}: 生成 {item}")

        # 通知一个等待的消费者
        condition.notify(n=1)
        # 或通知所有: condition.notify_all()

async def test_condition():
    await asyncio.gather(
        consumer("Consumer-1"),
        consumer("Consumer-2"),
        producer("Producer-1"),
        producer("Producer-2")
    )

asyncio.run(test_condition())

Async HTTP with aiohttp

基于aiohttp的异步HTTP

1. Basic HTTP Client

1. 基础HTTP客户端

python
import asyncio
import aiohttp

async def fetch_url(session, url):
    """Fetch single URL."""
    async with session.get(url) as response:
        status = response.status
        text = await response.text()
        return {"url": url, "status": status, "length": len(text)}

async def fetch_multiple_urls():
    """Fetch multiple URLs concurrently."""
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/json",
    ]

    async with aiohttp.ClientSession() as session:
        # Concurrent requests
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for result in results:
            print(f"{result['url']}: {result['status']} ({result['length']} bytes)")

asyncio.run(fetch_multiple_urls())
python
import asyncio
import aiohttp

async def fetch_url(session, url):
    """获取单个URL。"""
    async with session.get(url) as response:
        status = response.status
        text = await response.text()
        return {"url": url, "status": status, "length": len(text)}

async def fetch_multiple_urls():
    """并发获取多个URL。"""
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/json",
    ]

    async with aiohttp.ClientSession() as session:
        # 并发请求
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for result in results:
            print(f"{result['url']}: {result['status']} ({result['length']} 字节)")

asyncio.run(fetch_multiple_urls())

2. HTTP Client with Error Handling

2. 带错误处理的HTTP客户端

python
import asyncio
import aiohttp
from typing import Dict, Any

async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3
) -> Dict[str, Any]:
    """Fetch URL with retry logic."""
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                response.raise_for_status()  # Raise for 4xx/5xx
                data = await response.json()
                return {"success": True, "data": data}

        except aiohttp.ClientError as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                return {"success": False, "error": str(e)}

            # Exponential backoff
            await asyncio.sleep(2 ** attempt)

        except asyncio.TimeoutError:
            print(f"Attempt {attempt + 1} timed out")
            if attempt == max_retries - 1:
                return {"success": False, "error": "Timeout"}

            await asyncio.sleep(2 ** attempt)

async def parallel_api_calls():
    """Make parallel API calls with error handling."""
    urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/status/500",  # Will fail
        "https://httpbin.org/delay/1",
    ]

    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *[fetch_with_retry(session, url) for url in urls],
            return_exceptions=True  # Don't stop on errors
        )

        for url, result in zip(urls, results):
            if isinstance(result, Exception):
                print(f"{url}: Exception - {result}")
            elif result["success"]:
                print(f"{url}: Success")
            else:
                print(f"{url}: Failed - {result['error']}")

asyncio.run(parallel_api_calls())
python
import asyncio
import aiohttp
from typing import Dict, Any

async def fetch_with_retry(
    session: aiohttp.ClientSession,
    url: str,
    max_retries: int = 3
) -> Dict[str, Any]:
    """带重试逻辑的URL获取。"""
    for attempt in range(max_retries):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                response.raise_for_status()  # 为4xx/5xx状态码抛出异常
                data = await response.json()
                return {"success": True, "data": data}

        except aiohttp.ClientError as e:
            print(f"第 {attempt + 1} 次尝试失败: {e}")
            if attempt == max_retries - 1:
                return {"success": False, "error": str(e)}

            # 指数退避
            await asyncio.sleep(2 ** attempt)

        except asyncio.TimeoutError:
            print(f"第 {attempt + 1} 次尝试超时")
            if attempt == max_retries - 1:
                return {"success": False, "error": "超时"}

            await asyncio.sleep(2 ** attempt)

async def parallel_api_calls():
    """并行调用API并处理错误。"""
    urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/status/500",  # 会失败
        "https://httpbin.org/delay/1",
    ]

    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            *[fetch_with_retry(session, url) for url in urls],
            return_exceptions=True  # 遇到错误不停止
        )

        for url, result in zip(urls, results):
            if isinstance(result, Exception):
                print(f"{url}: 异常 - {result}")
            elif result["success"]:
                print(f"{url}: 成功")
            else:
                print(f"{url}: 失败 - {result['error']}")

asyncio.run(parallel_api_calls())

3. HTTP Server with aiohttp

3. 基于aiohttp的HTTP服务器

python
from aiohttp import web
import asyncio

async def handle_hello(request):
    """Simple GET handler."""
    name = request.query.get("name", "World")
    return web.json_response({"message": f"Hello, {name}!"})

async def handle_post(request):
    """POST handler with JSON body."""
    data = await request.json()

    # Simulate async processing
    await asyncio.sleep(1)

    return web.json_response({
        "received": data,
        "status": "processed"
    })

async def handle_stream(request):
    """Streaming response."""
    response = web.StreamResponse()
    await response.prepare(request)

    for i in range(10):
        await response.write(f"Chunk {i}\n".encode())
        await asyncio.sleep(0.5)

    await response.write_eof()
    return response
python
from aiohttp import web
import asyncio

async def handle_hello(request):
    """简单GET处理器。"""
    name = request.query.get("name", "World")
    return web.json_response({"message": f"Hello, {name}!"})

async def handle_post(request):
    """带JSON体的POST处理器。"""
    data = await request.json()

    # 模拟异步处理
    await asyncio.sleep(1)

    return web.json_response({
        "received": data,
        "status": "processed"
    })

async def handle_stream(request):
    """流式响应。"""
    response = web.StreamResponse()
    await response.prepare(request)

    for i in range(10):
        await response.write(f"Chunk {i}\n".encode())
        await asyncio.sleep(0.5)

    await response.write_eof()
    return response

Create application

创建应用

app = web.Application() app.router.add_get("/hello", handle_hello) app.router.add_post("/process", handle_post) app.router.add_get("/stream", handle_stream)
app = web.Application() app.router.add_get("/hello", handle_hello) app.router.add_post("/process", handle_post) app.router.add_get("/stream", handle_stream)

Run server

运行服务器

if name == "main": web.run_app(app, host="0.0.0.0", port=8080)
undefined
if name == "main": web.run_app(app, host="0.0.0.0", port=8080)
undefined

4. WebSocket Client

4. WebSocket客户端

python
import asyncio
import aiohttp

async def websocket_client():
    """Connect to WebSocket server."""
    url = "wss://echo.websocket.org"

    async with aiohttp.ClientSession() as session:
        async with session.ws_connect(url) as ws:
            # Send messages
            await ws.send_str("Hello WebSocket")
            await ws.send_json({"type": "greeting", "data": "test"})

            # Receive messages
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    print(f"Received: {msg.data}")

                    if msg.data == "close":
                        await ws.close()
                        break

                elif msg.type == aiohttp.WSMsgType.ERROR:
                    print(f"Error: {ws.exception()}")
                    break

asyncio.run(websocket_client())
python
import asyncio
import aiohttp

async def websocket_client():
    """连接到WebSocket服务器。"""
    url = "wss://echo.websocket.org"

    async with aiohttp.ClientSession() as session:
        async with session.ws_connect(url) as ws:
            # 发送消息
            await ws.send_str("Hello WebSocket")
            await ws.send_json({"type": "greeting", "data": "test"})

            # 接收消息
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    print(f"收到: {msg.data}")

                    if msg.data == "close":
                        await ws.close()
                        break

                elif msg.type == aiohttp.WSMsgType.ERROR:
                    print(f"错误: {ws.exception()}")
                    break

asyncio.run(websocket_client())

Async Database Operations

异步数据库操作

1. PostgreSQL with asyncpg

1. 基于asyncpg的PostgreSQL

python
import asyncio
import asyncpg

async def database_operations():
    """Async PostgreSQL operations."""
    # Create connection pool
    pool = await asyncpg.create_pool(
        host="localhost",
        database="mydb",
        user="user",
        password="password",
        min_size=5,
        max_size=20
    )

    try:
        # Acquire connection from pool
        async with pool.acquire() as conn:
            # Execute query
            rows = await conn.fetch(
                "SELECT id, name, email FROM users WHERE active = $1",
                True
            )

            for row in rows:
                print(f"User: {row['name']} ({row['email']})")

            # Insert data
            await conn.execute(
                "INSERT INTO users (name, email) VALUES ($1, $2)",
                "Alice", "alice@example.com"
            )

            # Transaction
            async with conn.transaction():
                await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1)
                await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate")

    finally:
        await pool.close()

asyncio.run(database_operations())
python
import asyncio
import asyncpg

async def database_operations():
    """异步PostgreSQL操作。"""
    # 创建连接池
    pool = await asyncpg.create_pool(
        host="localhost",
        database="mydb",
        user="user",
        password="password",
        min_size=5,
        max_size=20
    )

    try:
        # 从连接池获取连接
        async with pool.acquire() as conn:
            # 执行查询
            rows = await conn.fetch(
                "SELECT id, name, email FROM users WHERE active = $1",
                True
            )

            for row in rows:
                print(f"用户: {row['name']} ({row['email']})")

            # 插入数据
            await conn.execute(
                "INSERT INTO users (name, email) VALUES ($1, $2)",
                "Alice", "alice@example.com"
            )

            # 事务
            async with conn.transaction():
                await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1)
                await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate")

    finally:
        await pool.close()

asyncio.run(database_operations())

2. MongoDB with motor

2. 基于motor的MongoDB

python
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def mongodb_operations():
    """Async MongoDB operations."""
    # Create client
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client.mydb
    collection = db.users

    try:
        # Insert document
        result = await collection.insert_one({
            "name": "Alice",
            "email": "alice@example.com",
            "age": 30
        })
        print(f"Inserted ID: {result.inserted_id}")

        # Find documents
        cursor = collection.find({"age": {"$gte": 25}})
        async for document in cursor:
            print(f"User: {document['name']}")

        # Update document
        await collection.update_one(
            {"name": "Alice"},
            {"$set": {"age": 31}}
        )

        # Aggregation pipeline
        pipeline = [
            {"$match": {"age": {"$gte": 25}}},
            {"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
        ]
        async for result in collection.aggregate(pipeline):
            print(f"Average age: {result['avg_age']}")

    finally:
        client.close()

asyncio.run(mongodb_operations())
python
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def mongodb_operations():
    """异步MongoDB操作。"""
    # 创建客户端
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client.mydb
    collection = db.users

    try:
        # 插入文档
        result = await collection.insert_one({
            "name": "Alice",
            "email": "alice@example.com",
            "age": 30
        })
        print(f"插入ID: {result.inserted_id}")

        # 查询文档
        cursor = collection.find({"age": {"$gte": 25}})
        async for document in cursor:
            print(f"用户: {document['name']}")

        # 更新文档
        await collection.update_one(
            {"name": "Alice"},
            {"$set": {"age": 31}}
        )

        # 聚合管道
        pipeline = [
            {"$match": {"age": {"$gte": 25}}},
            {"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
        ]
        async for result in collection.aggregate(pipeline):
            print(f"平均年龄: {result['avg_age']}")

    finally:
        client.close()

asyncio.run(mongodb_operations())

3. Connection Pool Pattern

3. 连接池模式

python
import asyncio
import asyncpg
from typing import Optional

class DatabasePool:
    """Async database connection pool manager."""

    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool: Optional[asyncpg.Pool] = None

    async def connect(self):
        """Create connection pool."""
        self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20)

    async def close(self):
        """Close connection pool."""
        if self.pool:
            await self.pool.close()

    async def execute(self, query: str, *args):
        """Execute query."""
        async with self.pool.acquire() as conn:
            return await conn.execute(query, *args)

    async def fetch(self, query: str, *args):
        """Fetch multiple rows."""
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)

    async def fetchrow(self, query: str, *args):
        """Fetch single row."""
        async with self.pool.acquire() as conn:
            return await conn.fetchrow(query, *args)
python
import asyncio
import asyncpg
from typing import Optional

class DatabasePool:
    """异步数据库连接池管理器。"""

    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool: Optional[asyncpg.Pool] = None

    async def connect(self):
        """创建连接池。"""
        self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20)

    async def close(self):
        """关闭连接池。"""
        if self.pool:
            await self.pool.close()

    async def execute(self, query: str, *args):
        """执行查询。"""
        async with self.pool.acquire() as conn:
            return await conn.execute(query, *args)

    async def fetch(self, query: str, *args):
        """获取多行结果。"""
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)

    async def fetchrow(self, query: str, *args):
        """获取单行结果。"""
        async with self.pool.acquire() as conn:
            return await conn.fetchrow(query, *args)

Usage

使用示例

async def use_pool(): db = DatabasePool("postgresql://user:pass@localhost/mydb") await db.connect()
try:
    # Execute operations
    rows = await db.fetch("SELECT * FROM users")
    for row in rows:
        print(row)
finally:
    await db.close()
asyncio.run(use_pool())
undefined
async def use_pool(): db = DatabasePool("postgresql://user:pass@localhost/mydb") await db.connect()
try:
    # 执行操作
    rows = await db.fetch("SELECT * FROM users")
    for row in rows:
        print(row)
finally:
    await db.close()
asyncio.run(use_pool())
undefined

FastAPI Async Patterns

FastAPI异步模式

1. Async Endpoints

1. 异步端点

python
from fastapi import FastAPI, HTTPException
import asyncio
import httpx

app = FastAPI()

@app.get("/")
async def root():
    """Simple async endpoint."""
    return {"message": "Hello World"}

@app.get("/delay/{seconds}")
async def delayed_response(seconds: int):
    """Endpoint with async delay."""
    await asyncio.sleep(seconds)
    return {"message": f"Waited {seconds} seconds"}

@app.get("/fetch")
async def fetch_external():
    """Fetch data from external API."""
    async with httpx.AsyncClient() as client:
        response = await client.get("https://httpbin.org/json")
        return response.json()

@app.get("/parallel")
async def parallel_requests():
    """Make parallel API calls."""
    async with httpx.AsyncClient() as client:
        responses = await asyncio.gather(
            client.get("https://httpbin.org/delay/1"),
            client.get("https://httpbin.org/delay/2"),
            client.get("https://httpbin.org/json")
        )

        return {
            "results": [r.json() for r in responses]
        }
python
from fastapi import FastAPI, HTTPException
import asyncio
import httpx

app = FastAPI()

@app.get("/")
async def root():
    """简单异步端点。"""
    return {"message": "Hello World"}

@app.get("/delay/{seconds}")
async def delayed_response(seconds: int):
    """带异步延迟的端点。"""
    await asyncio.sleep(seconds)
    return {"message": f"等待了 {seconds} 秒"}

@app.get("/fetch")
async def fetch_external():
    """从外部API获取数据。"""
    async with httpx.AsyncClient() as client:
        response = await client.get("https://httpbin.org/json")
        return response.json()

@app.get("/parallel")
async def parallel_requests():
    """并行调用API。"""
    async with httpx.AsyncClient() as client:
        responses = await asyncio.gather(
            client.get("https://httpbin.org/delay/1"),
            client.get("https://httpbin.org/delay/2"),
            client.get("https://httpbin.org/json")
        )

        return {
            "results": [r.json() for r in responses]
        }

2. Background Tasks

2. 后台任务

python
from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

async def send_email(email: str, message: str):
    """Simulate sending email."""
    print(f"Sending email to {email}")
    await asyncio.sleep(5)  # Simulate slow email service
    print(f"Email sent to {email}: {message}")

@app.post("/send-notification")
async def send_notification(
    email: str,
    message: str,
    background_tasks: BackgroundTasks
):
    """Send notification in background."""
    # Add task to background
    background_tasks.add_task(send_email, email, message)

    # Return immediately
    return {"status": "notification queued"}
python
from fastapi import FastAPI, BackgroundTasks
import asyncio

app = FastAPI()

async def send_email(email: str, message: str):
    """模拟发送邮件。"""
    print(f"正在向 {email} 发送邮件")
    await asyncio.sleep(5)  # 模拟慢速邮件服务
    print(f"已向 {email} 发送邮件: {message}")

@app.post("/send-notification")
async def send_notification(
    email: str,
    message: str,
    background_tasks: BackgroundTasks
):
    """在后台发送通知。"""
    # 将任务添加到后台
    background_tasks.add_task(send_email, email, message)

    # 立即返回
    return {"status": "通知已排队"}

Alternative: manual task creation

替代方案:手动创建任务

@app.post("/send-notification-manual") async def send_notification_manual(email: str, message: str): """Create background task manually.""" asyncio.create_task(send_email(email, message)) return {"status": "notification queued"}
undefined
@app.post("/send-notification-manual") async def send_notification_manual(email: str, message: str): """手动创建后台任务。""" asyncio.create_task(send_email(email, message)) return {"status": "通知已排队"}
undefined

3. Async Dependencies

3. 异步依赖

python
from fastapi import FastAPI, Depends
import asyncpg

app = FastAPI()
python
from fastapi import FastAPI, Depends
import asyncpg

app = FastAPI()

Database pool (global)

数据库池(全局)

db_pool = None
async def get_db(): """Dependency: database connection.""" async with db_pool.acquire() as conn: yield conn
@app.on_event("startup") async def startup(): """Initialize database pool on startup.""" global db_pool db_pool = await asyncpg.create_pool( "postgresql://user:pass@localhost/mydb" )
@app.on_event("shutdown") async def shutdown(): """Close database pool on shutdown.""" await db_pool.close()
@app.get("/users/{user_id}") async def get_user(user_id: int, conn=Depends(get_db)): """Get user with async database dependency.""" user = await conn.fetchrow( "SELECT * FROM users WHERE id = $1", user_id )
if not user:
    raise HTTPException(status_code=404, detail="User not found")

return dict(user)
undefined
db_pool = None
async def get_db(): """依赖项:数据库连接。""" async with db_pool.acquire() as conn: yield conn
@app.on_event("startup") async def startup(): """启动时初始化数据库池。""" global db_pool db_pool = await asyncpg.create_pool( "postgresql://user:pass@localhost/mydb" )
@app.on_event("shutdown") async def shutdown(): """关闭时关闭数据库池。""" await db_pool.close()
@app.get("/users/{user_id}") async def get_user(user_id: int, conn=Depends(get_db)): """使用异步数据库依赖获取用户。""" user = await conn.fetchrow( "SELECT * FROM users WHERE id = $1", user_id )
if not user:
    raise HTTPException(status_code=404, detail="用户未找到")

return dict(user)
undefined

4. WebSocket Endpoints

4. WebSocket端点

python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio

app = FastAPI()
python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio

app = FastAPI()

Active connections

活跃连接

active_connections: List[WebSocket] = []
@app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint.""" await websocket.accept() active_connections.append(websocket)
try:
    while True:
        # Receive message
        data = await websocket.receive_text()

        # Broadcast to all connections
        for connection in active_connections:
            await connection.send_text(f"Broadcast: {data}")

except WebSocketDisconnect:
    active_connections.remove(websocket)
    print("Client disconnected")
active_connections: List[WebSocket] = []
@app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket端点。""" await websocket.accept() active_connections.append(websocket)
try:
    while True:
        # 接收消息
        data = await websocket.receive_text()

        # 广播到所有连接
        for connection in active_connections:
            await connection.send_text(f"广播: {data}")

except WebSocketDisconnect:
    active_connections.remove(websocket)
    print("客户端已断开连接")

Background task to send periodic updates

用于发送定期更新的后台任务

async def broadcast_updates(): """Send periodic updates to all clients.""" while True: await asyncio.sleep(5)
    for connection in active_connections:
        try:
            await connection.send_text("Periodic update")
        except:
            active_connections.remove(connection)
@app.on_event("startup") async def startup(): """Start background update task.""" asyncio.create_task(broadcast_updates())
undefined
async def broadcast_updates(): """向所有客户端发送定期更新。""" while True: await asyncio.sleep(5)
    for connection in active_connections:
        try:
            await connection.send_text("定期更新")
        except:
            active_connections.remove(connection)
@app.on_event("startup") async def startup(): """启动后台更新任务。""" asyncio.create_task(broadcast_updates())
undefined

Common Patterns and Best Practices

常见模式与最佳实践

1. Timeout Handling

1. 超时处理

python
import asyncio

async def slow_operation():
    """Slow operation."""
    await asyncio.sleep(10)
    return "Result"

async def with_timeout():
    """Run operation with timeout."""
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=5.0)
        print(f"Result: {result}")

    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(with_timeout())
python
import asyncio

async def slow_operation():
    """慢速操作。"""
    await asyncio.sleep(10)
    return "Result"

async def with_timeout():
    """带超时的操作运行。"""
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=5.0)
        print(f"结果: {result}")

    except asyncio.TimeoutError:
        print("操作超时")

asyncio.run(with_timeout())

2. Cancellation Handling

2. 取消处理

python
import asyncio

async def cancellable_task():
    """Task that can be cancelled."""
    try:
        for i in range(10):
            print(f"Working: {i}")
            await asyncio.sleep(1)

        return "Complete"

    except asyncio.CancelledError:
        print("Task was cancelled")
        # Cleanup
        raise  # Re-raise to propagate cancellation

async def cancel_example():
    """Example of task cancellation."""
    task = asyncio.create_task(cancellable_task())

    # Let it run for a bit
    await asyncio.sleep(3)

    # Cancel the task
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Confirmed: task was cancelled")

asyncio.run(cancel_example())
python
import asyncio

async def cancellable_task():
    """可取消的任务。"""
    try:
        for i in range(10):
            print(f"工作中: {i}")
            await asyncio.sleep(1)

        return "Complete"

    except asyncio.CancelledError:
        print("任务已取消")
        # 清理资源
        raise  # 重新抛出以传播取消信号

async def cancel_example():
    """任务取消示例。"""
    task = asyncio.create_task(cancellable_task())

    # 让任务运行一段时间
    await asyncio.sleep(3)

    # 取消任务
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("已确认: 任务已取消")

asyncio.run(cancel_example())

3. Resource Cleanup with Context Managers

3. 使用上下文管理器进行资源清理

python
import asyncio

class AsyncResource:
    """Async context manager for resource management."""

    async def __aenter__(self):
        """Async setup."""
        print("Acquiring resource")
        await asyncio.sleep(1)  # Simulate async setup
        self.connection = "connected"
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async cleanup."""
        print("Releasing resource")
        await asyncio.sleep(1)  # Simulate async cleanup
        self.connection = None

async def use_resource():
    """Use async resource."""
    async with AsyncResource() as resource:
        print(f"Using resource: {resource.connection}")
        await asyncio.sleep(1)
    # Resource automatically cleaned up

asyncio.run(use_resource())
python
import asyncio

class AsyncResource:
    """用于资源管理的异步上下文管理器。"""

    async def __aenter__(self):
        """异步初始化。"""
        print("获取资源")
        await asyncio.sleep(1)  # 模拟异步初始化
        self.connection = "connected"
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步清理。"""
        print("释放资源")
        await asyncio.sleep(1)  # 模拟异步清理
        self.connection = None

async def use_resource():
    """使用异步资源。"""
    async with AsyncResource() as resource:
        print(f"使用资源: {resource.connection}")
        await asyncio.sleep(1)
    # 资源自动清理

asyncio.run(use_resource())

4. Debouncing and Throttling

4. 防抖与节流

python
import asyncio
from datetime import datetime

class Debouncer:
    """Debounce async function calls."""

    def __init__(self, delay: float):
        self.delay = delay
        self.task = None

    async def call(self, func, *args, **kwargs):
        """Debounced function call."""
        # Cancel previous task
        if self.task:
            self.task.cancel()

        # Create new task
        async def delayed_call():
            await asyncio.sleep(self.delay)
            await func(*args, **kwargs)

        self.task = asyncio.create_task(delayed_call())

async def api_call(query: str):
    """Simulated API call."""
    print(f"{datetime.now()}: API call with query: {query}")

async def debounce_example():
    """Example of debouncing."""
    debouncer = Debouncer(delay=1.0)

    # Rapid calls - only last one executes
    await debouncer.call(api_call, "query1")
    await asyncio.sleep(0.1)
    await debouncer.call(api_call, "query2")
    await asyncio.sleep(0.1)
    await debouncer.call(api_call, "query3")

    # Wait for debounced call
    await asyncio.sleep(2)

asyncio.run(debounce_example())
python
import asyncio
from datetime import datetime

class Debouncer:
    """异步函数调用防抖。"""

    def __init__(self, delay: float):
        self.delay = delay
        self.task = None

    async def call(self, func, *args, **kwargs):
        """防抖函数调用。"""
        # 取消之前的任务
        if self.task:
            self.task.cancel()

        # 创建新任务
        async def delayed_call():
            await asyncio.sleep(self.delay)
            await func(*args, **kwargs)

        self.task = asyncio.create_task(delayed_call())

async def api_call(query: str):
    """模拟API调用。"""
    print(f"{datetime.now()}: API调用,查询参数: {query}")

async def debounce_example():
    """防抖示例。"""
    debouncer = Debouncer(delay=1.0)

    # 快速连续调用 - 仅最后一个执行
    await debouncer.call(api_call, "query1")
    await asyncio.sleep(0.1)
    await debouncer.call(api_call, "query2")
    await asyncio.sleep(0.1)
    await debouncer.call(api_call, "query3")

    # 等待防抖调用执行
    await asyncio.sleep(2)

asyncio.run(debounce_example())

Output: Only "query3" API call executes

输出: 仅"query3"的API调用执行

undefined
undefined

5. Rate Limiting

5. 速率限制

python
import asyncio
from datetime import datetime

class RateLimiter:
    """Limit rate of async operations."""

    def __init__(self, max_calls: int, period: float):
        self.max_calls = max_calls
        self.period = period
        self.semaphore = asyncio.Semaphore(max_calls)
        self.calls = []

    async def __aenter__(self):
        """Acquire rate limit slot."""
        await self.semaphore.acquire()

        now = asyncio.get_event_loop().time()

        # Remove old calls outside period
        self.calls = [t for t in self.calls if now - t < self.period]

        if len(self.calls) >= self.max_calls:
            # Wait until oldest call expires
            sleep_time = self.period - (now - self.calls[0])
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)

        self.calls.append(now)
        return self

    async def __aexit__(self, *args):
        """Release semaphore."""
        self.semaphore.release()

async def rate_limited_operation(limiter, name):
    """Operation with rate limiting."""
    async with limiter:
        print(f"{datetime.now()}: {name}")
        await asyncio.sleep(0.1)

async def rate_limit_example():
    """Example of rate limiting."""
    # Max 3 calls per 2 seconds
    limiter = RateLimiter(max_calls=3, period=2.0)

    # Try to make 6 calls
    await asyncio.gather(*[
        rate_limited_operation(limiter, f"Call-{i}")
        for i in range(6)
    ])

asyncio.run(rate_limit_example())
python
import asyncio
from datetime import datetime

class RateLimiter:
    """限制异步操作速率。"""

    def __init__(self, max_calls: int, period: float):
        self.max_calls = max_calls
        self.period = period
        self.semaphore = asyncio.Semaphore(max_calls)
        self.calls = []

    async def __aenter__(self):
        """获取速率限制槽位。"""
        await self.semaphore.acquire()

        now = asyncio.get_event_loop().time()

        # 移除周期外的旧调用记录
        self.calls = [t for t in self.calls if now - t < self.period]

        if len(self.calls) >= self.max_calls:
            # 等待最早的调用记录过期
            sleep_time = self.period - (now - self.calls[0])
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)

        self.calls.append(now)
        return self

    async def __aexit__(self, *args):
        """释放信号量。"""
        self.semaphore.release()

async def rate_limited_operation(limiter, name):
    """带速率限制的操作。"""
    async with limiter:
        print(f"{datetime.now()}: {name}")
        await asyncio.sleep(0.1)

async def rate_limit_example():
    """速率限制示例。"""
    # 每2秒最多3次调用
    limiter = RateLimiter(max_calls=3, period=2.0)

    # 尝试进行6次调用
    await asyncio.gather(*[
        rate_limited_operation(limiter, f"Call-{i}")
        for i in range(6)
    ])

asyncio.run(rate_limit_example())

Debugging Async Code

调试异步代码

1. Enable Debug Mode

1. 启用调试模式

python
import asyncio
import logging
python
import asyncio
import logging

Enable asyncio debug mode

启用asyncio调试模式

asyncio.run(main(), debug=True)
asyncio.run(main(), debug=True)

Or set environment variable:

或设置环境变量:

PYTHONASYNCIODEBUG=1 python script.py

PYTHONASYNCIODEBUG=1 python script.py

Configure logging

配置日志

logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(name)
async def debug_example(): logger.debug("Starting operation") await asyncio.sleep(1) logger.debug("Operation complete")
undefined
logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(name)
async def debug_example(): logger.debug("启动操作") await asyncio.sleep(1) logger.debug("操作完成")
undefined

2. Detect Blocking Code

2. 检测阻塞代码

python
import asyncio
import time

async def problematic_code():
    """Code with blocking operation."""
    print("Starting")

    # BAD: Blocking sleep
    time.sleep(2)  # This blocks the event loop!

    print("Complete")
python
import asyncio
import time

async def problematic_code():
    """包含阻塞操作的代码。"""
    print("启动")

    # ❌ 错误:异步函数中的阻塞调用
    time.sleep(2)  # 这会阻塞事件循环!

    print("完成")

Run with debug mode to detect blocking

启用调试模式运行以检测阻塞

asyncio.run(problematic_code(), debug=True)
asyncio.run(problematic_code(), debug=True)

Warning: Executing <Task> took 2.001 seconds

警告: Executing <Task> took 2.001 seconds

undefined
undefined

3. Track Pending Tasks

3. 跟踪待处理任务

python
import asyncio

async def track_tasks():
    """Track all pending tasks."""
    # Get all tasks
    tasks = asyncio.all_tasks()

    print(f"Total tasks: {len(tasks)}")
    for task in tasks:
        print(f"  - {task.get_name()}: {task}")

        # Check if task is done
        if task.done():
            try:
                result = task.result()
                print(f"    Result: {result}")
            except Exception as e:
                print(f"    Exception: {e}")
python
import asyncio

async def track_tasks():
    """跟踪所有待处理任务。"""
    # 获取所有任务
    tasks = asyncio.all_tasks()

    print(f"总任务数: {len(tasks)}")
    for task in tasks:
        print(f"  - {task.get_name()}: {task}")

        # 检查任务是否完成
        if task.done():
            try:
                result = task.result()
                print(f"    结果: {result}")
            except Exception as e:
                print(f"    异常: {e}")

Create some tasks

创建一些任务

async def main(): task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task") task2 = asyncio.create_task(track_tasks(), name="tracking")
await task2
task1.cancel()
asyncio.run(main())
undefined
async def main(): task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task") task2 = asyncio.create_task(track_tasks(), name="tracking")
await task2
task1.cancel()
asyncio.run(main())
undefined

Testing Async Code

测试异步代码

1. pytest-asyncio Setup

1. pytest-asyncio设置

python
undefined
python
undefined

test_async.py

test_async.py

import pytest import asyncio
import pytest import asyncio

Mark test as async

将测试标记为异步

@pytest.mark.asyncio async def test_async_function(): """Test async function.""" result = await some_async_function() assert result == "expected"
@pytest.mark.asyncio async def test_async_http(): """Test async HTTP client.""" async with aiohttp.ClientSession() as session: async with session.get("https://httpbin.org/json") as response: assert response.status == 200 data = await response.json() assert "slideshow" in data
@pytest.mark.asyncio async def test_async_function(): """测试异步函数。""" result = await some_async_function() assert result == "expected"
@pytest.mark.asyncio async def test_async_http(): """测试异步HTTP客户端。""" async with aiohttp.ClientSession() as session: async with session.get("https://httpbin.org/json") as response: assert response.status == 200 data = await response.json() assert "slideshow" in data

Async fixture

异步fixture

@pytest.fixture async def async_client(): """Async test fixture.""" client = await create_async_client() yield client await client.close()
@pytest.mark.asyncio async def test_with_fixture(async_client): """Test using async fixture.""" result = await async_client.fetch_data() assert result is not None
undefined
@pytest.fixture async def async_client(): """异步测试fixture。""" client = await create_async_client() yield client await client.close()
@pytest.mark.asyncio async def test_with_fixture(async_client): """使用异步fixture测试。""" result = await async_client.fetch_data() assert result is not None
undefined

2. Mocking Async Functions

2. 模拟异步函数

python
import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_with_mock():
    """Test with async mock."""
    # Create async mock
    mock_func = AsyncMock(return_value="mocked result")

    result = await mock_func()
    assert result == "mocked result"
    mock_func.assert_called_once()

@pytest.mark.asyncio
@patch("module.async_function", new_callable=AsyncMock)
async def test_with_patch(mock_async):
    """Test with patched async function."""
    mock_async.return_value = {"status": "success"}

    result = await some_function_that_calls_async()

    assert result["status"] == "success"
    mock_async.assert_called_once()
python
import pytest
from unittest.mock import AsyncMock, patch

@pytest.mark.asyncio
async def test_with_mock():
    """使用异步模拟测试。"""
    # 创建异步模拟
    mock_func = AsyncMock(return_value="mocked result")

    result = await mock_func()
    assert result == "mocked result"
    mock_func.assert_called_once()

@pytest.mark.asyncio
@patch("module.async_function", new_callable=AsyncMock)
async def test_with_patch(mock_async):
    """使用补丁异步函数测试。"""
    mock_async.return_value = {"status": "success"}

    result = await some_function_that_calls_async()

    assert result["status"] == "success"
    mock_async.assert_called_once()

Performance Optimization

性能优化

1. Use asyncio.gather() for Parallelism

1. 使用asyncio.gather()实现并行

python
import asyncio
import time

async def slow_task(n):
    await asyncio.sleep(1)
    return n * 2

async def optimized():
    """Parallel execution."""
    start = time.time()

    # Sequential (slow) - 5 seconds
    # results = []
    # for i in range(5):
    #     result = await slow_task(i)
    #     results.append(result)

    # Parallel (fast) - 1 second
    results = await asyncio.gather(*[slow_task(i) for i in range(5)])

    elapsed = time.time() - start
    print(f"Time: {elapsed:.2f}s, Results: {results}")

asyncio.run(optimized())
python
import asyncio
import time

async def slow_task(n):
    await asyncio.sleep(1)
    return n * 2

async def optimized():
    """并行执行。"""
    start = time.time()

    # 顺序执行(慢)- 5秒
    # results = []
    # for i in range(5):
    #     result = await slow_task(i)
    #     results.append(result)

    # 并行执行(快)- 1秒
    results = await asyncio.gather(*[slow_task(i) for i in range(5)])

    elapsed = time.time() - start
    print(f"耗时: {elapsed:.2f}s, 结果: {results}")

asyncio.run(optimized())

2. Connection Pooling

2. 连接池

python
import asyncio
import aiohttp
python
import asyncio
import aiohttp

BAD: Create new session for each request

❌ 错误模式:每个请求创建新会话

async def bad_pattern(): for i in range(10): async with aiohttp.ClientSession() as session: async with session.get("https://httpbin.org/json") as response: await response.json()
async def bad_pattern(): for i in range(10): async with aiohttp.ClientSession() as session: async with session.get("https://httpbin.org/json") as response: await response.json()

GOOD: Reuse session with connection pool

✅ 正确模式:复用带连接池的会话

async def good_pattern(): async with aiohttp.ClientSession() as session: tasks = [ session.get("https://httpbin.org/json") for i in range(10) ] responses = await asyncio.gather(*tasks) for response in responses: await response.json()
undefined
async def good_pattern(): async with aiohttp.ClientSession() as session: tasks = [ session.get("https://httpbin.org/json") for i in range(10) ] responses = await asyncio.gather(*tasks) for response in responses: await response.json()
undefined

3. Avoid Blocking Operations

3. 避免阻塞操作

python
import asyncio
python
import asyncio

BAD: Blocking I/O in async function

❌ 错误:异步函数中的阻塞I/O

async def bad_file_read(): with open("large_file.txt") as f: # Blocks event loop! data = f.read() return data
async def bad_file_read(): with open("large_file.txt") as f: # 阻塞事件循环! data = f.read() return data

GOOD: Use async file I/O or run in executor

✅ 正确:使用异步文件I/O或在执行器中运行

async def good_file_read(): loop = asyncio.get_running_loop()
# Run blocking operation in thread pool
data = await loop.run_in_executor(
    None,
    lambda: open("large_file.txt").read()
)
return data
async def good_file_read(): loop = asyncio.get_running_loop()
# 在线程池中运行阻塞操作
data = await loop.run_in_executor(
    None,
    lambda: open("large_file.txt").read()
)
return data

BETTER: Use aiofiles for async file I/O

✅ 更好:使用aiofiles进行异步文件I/O

import aiofiles
async def better_file_read(): async with aiofiles.open("large_file.txt") as f: data = await f.read() return data
undefined
import aiofiles
async def better_file_read(): async with aiofiles.open("large_file.txt") as f: data = await f.read() return data
undefined

Common Pitfalls

常见陷阱

❌ Anti-Pattern 1: Not Awaiting Coroutines

❌ 反模式1:未等待协程

python
undefined
python
undefined

WRONG

错误

async def bad(): result = async_function() # Returns coroutine, doesn't execute! print(result) # Prints: <coroutine object>
async def bad(): result = async_function() # 返回协程,未执行! print(result) # 输出: <coroutine object>

CORRECT

正确

async def good(): result = await async_function() # Actually executes print(result)
undefined
async def good(): result = await async_function() # 实际执行 print(result)
undefined

❌ Anti-Pattern 2: Blocking the Event Loop

❌ 反模式2:阻塞事件循环

python
undefined
python
undefined

WRONG

错误

import time
async def bad(): time.sleep(5) # Blocks entire event loop!
import time
async def bad(): time.sleep(5) # 阻塞整个事件循环!

CORRECT

正确

async def good(): await asyncio.sleep(5) # Non-blocking
undefined
async def good(): await asyncio.sleep(5) # 非阻塞
undefined

❌ Anti-Pattern 3: Not Handling Cancellation

❌ 反模式3:未处理取消

python
undefined
python
undefined

WRONG

错误

async def bad(): await asyncio.sleep(10) # No cleanup if cancelled
async def bad(): await asyncio.sleep(10) # 取消时无清理

CORRECT

正确

async def good(): try: await asyncio.sleep(10) except asyncio.CancelledError: # Cleanup resources await cleanup() raise # Re-raise to propagate
undefined
async def good(): try: await asyncio.sleep(10) except asyncio.CancelledError: # 清理资源 await cleanup() raise # 重新抛出以传播取消
undefined

❌ Anti-Pattern 4: Creating Event Loop Incorrectly

❌ 反模式4:错误创建事件循环

python
undefined
python
undefined

WRONG (Python 3.7+)

错误(Python 3.7+)

loop = asyncio.get_event_loop() loop.run_until_complete(main())
loop = asyncio.get_event_loop() loop.run_until_complete(main())

CORRECT (Python 3.7+)

正确(Python 3.7+)

asyncio.run(main())
undefined
asyncio.run(main())
undefined

❌ Anti-Pattern 5: Not Closing Resources

❌ 反模式5:未关闭资源

python
undefined
python
undefined

WRONG

错误

async def bad(): session = aiohttp.ClientSession() response = await session.get(url) # Session never closed - resource leak!
async def bad(): session = aiohttp.ClientSession() response = await session.get(url) # 会话从未关闭 - 资源泄漏!

CORRECT

正确

async def good(): async with aiohttp.ClientSession() as session: response = await session.get(url) # Session automatically closed
undefined
async def good(): async with aiohttp.ClientSession() as session: response = await session.get(url) # 会话自动关闭
undefined

Best Practices

最佳实践

  1. Use asyncio.run() for entry point (Python 3.7+)
  2. Always await coroutines - don't forget await
  3. Use async context managers for resource cleanup
  4. Connection pooling for HTTP and database clients
  5. Handle CancelledError for graceful shutdown
  6. Use asyncio.gather() for parallel operations
  7. Avoid blocking operations in async functions
  8. Use timeouts to prevent hanging operations
  9. Debug mode during development to catch issues
  10. Test async code with pytest-asyncio
  1. 使用asyncio.run() 作为入口点(Python 3.7+)
  2. 始终等待协程 - 不要忘记await关键字
  3. 使用异步上下文管理器 进行资源清理
  4. 连接池 用于HTTP和数据库客户端
  5. 处理CancelledError 以实现优雅关闭
  6. 使用asyncio.gather() 进行并行操作
  7. 异步函数中避免阻塞操作
  8. 使用超时 防止操作挂起
  9. 开发期间启用调试模式 以发现问题
  10. 使用pytest-asyncio测试异步代码

Quick Reference

快速参考

Common Commands

常用命令

bash
undefined
bash
undefined

Run async script

运行异步脚本

python script.py
python script.py

Run with debug mode

启用调试模式运行

PYTHONASYNCIODEBUG=1 python script.py
PYTHONASYNCIODEBUG=1 python script.py

Run tests

运行测试

pytest -v --asyncio-mode=auto
pytest -v --asyncio-mode=auto

Install async dependencies

安装异步依赖

pip install aiohttp asyncpg motor pytest-asyncio
undefined
pip install aiohttp asyncpg motor pytest-asyncio
undefined

Essential Imports

必要导入

python
import asyncio
import aiohttp
import asyncpg
from typing import List, Dict, Any
python
import asyncio
import aiohttp
import asyncpg
from typing import List, Dict, Any

Resources

资源

Related Skills

相关技能

When using asyncio, consider these complementary skills:
  • fastapi-local-dev: FastAPI async server patterns and production deployment
  • pytest: Testing async code with pytest-asyncio and fixtures
  • systematic-debugging: Debugging async race conditions and deadlocks
使用asyncio时,可结合以下互补技能:
  • fastapi-local-dev: FastAPI异步服务端模式与生产部署
  • pytest: 使用pytest-asyncio和fixture测试异步代码
  • systematic-debugging: 调试异步竞态条件与死锁

Quick FastAPI Async Patterns (Inlined for Standalone Use)

独立使用的FastAPI异步模式速览

python
undefined
python
undefined

FastAPI async endpoint pattern

FastAPI异步端点模式

from fastapi import FastAPI, Depends from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker
app = FastAPI()
from fastapi import FastAPI, Depends from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker
app = FastAPI()

Async database setup

异步数据库设置

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db") AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db(): async with AsyncSessionLocal() as session: yield session
@app.get("/users/{user_id}") async def get_user(user_id: int, db: AsyncSession = Depends(get_db)): # Async database query result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") return user
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db") AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db(): async with AsyncSessionLocal() as session: yield session
@app.get("/users/{user_id}") async def get_user(user_id: int, db: AsyncSession = Depends(get_db)): # 异步数据库查询 result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="用户未找到") return user

Background tasks with asyncio

基于asyncio的后台任务

@app.post("/send-email") async def send_email_endpoint(email: EmailSchema): # Non-blocking background task asyncio.create_task(send_email_async(email)) return {"status": "email queued"}
undefined
@app.post("/send-email") async def send_email_endpoint(email: EmailSchema): # 非阻塞后台任务 asyncio.create_task(send_email_async(email)) return {"status": "邮件已排队"}
undefined

Quick pytest-asyncio Patterns (Inlined for Standalone Use)

独立使用的pytest-asyncio模式速览

python
undefined
python
undefined

Testing async functions with pytest

使用pytest测试异步函数

import pytest import pytest_asyncio from httpx import AsyncClient
import pytest import pytest_asyncio from httpx import AsyncClient

Async test fixture

异步测试fixture

@pytest_asyncio.fixture async def async_client(): async with AsyncClient(app=app, base_url="http://test") as client: yield client
@pytest_asyncio.fixture async def async_client(): async with AsyncClient(app=app, base_url="http://test") as client: yield client

Async test function

异步测试函数

@pytest.mark.asyncio async def test_get_user(async_client): response = await async_client.get("/users/1") assert response.status_code == 200 assert response.json()["id"] == 1
@pytest.mark.asyncio async def test_get_user(async_client): response = await async_client.get("/users/1") assert response.status_code == 200 assert response.json()["id"] == 1

Testing concurrent operations

测试并发操作

@pytest.mark.asyncio async def test_concurrent_requests(): async with AsyncClient(app=app, base_url="http://test") as client: # Run 10 requests concurrently responses = await asyncio.gather( *[client.get(f"/users/{i}") for i in range(1, 11)] ) assert all(r.status_code == 200 for r in responses)
@pytest.mark.asyncio async def test_concurrent_requests(): async with AsyncClient(app=app, base_url="http://test") as client: # 并发运行10个请求 responses = await asyncio.gather( *[client.get(f"/users/{i}") for i in range(1, 11)] ) assert all(r.status_code == 200 for r in responses)

Mock async dependencies

模拟异步依赖

@pytest_asyncio.fixture async def mock_db(): # Setup mock database db = AsyncMock() yield db # Cleanup
undefined
@pytest_asyncio.fixture async def mock_db(): # 设置模拟数据库 db = AsyncMock() yield db # 清理
undefined

Quick Async Debugging Reference (Inlined for Standalone Use)

独立使用的异步调试参考速览

Common Async Pitfalls:
  1. Blocking the Event Loop
    python
    # ❌ WRONG: Blocking call in async function
    async def bad_function():
        time.sleep(5)  # Blocks entire event loop!
        return "done"
    
    # ✅ CORRECT: Use asyncio.sleep
    async def good_function():
        await asyncio.sleep(5)  # Releases event loop
        return "done"
  2. Debugging Race Conditions
    python
    # Add logging to track execution order
    import logging
    logging.basicConfig(level=logging.DEBUG)
    
    async def debug_task(name):
        logging.debug(f"{name}: Starting")
        await asyncio.sleep(1)
        logging.debug(f"{name}: Finished")
        return name
    
    # Run with detailed tracing
    asyncio.run(asyncio.gather(debug_task("A"), debug_task("B")), debug=True)
  3. Deadlock Detection
    python
    # Use timeout to detect deadlocks
    try:
        result = await asyncio.wait_for(some_async_function(), timeout=5.0)
    except asyncio.TimeoutError:
        logging.error("Deadlock detected: operation timed out")
        # Investigate what's blocking
  4. Inspecting Running Tasks
    python
    # Check all pending tasks
    tasks = asyncio.all_tasks()
    for task in tasks:
        print(f"Task: {task.get_name()}, Done: {task.done()}")
        if not task.done():
            print(f"  Current coro: {task.get_coro()}")
[Full FastAPI, pytest, and debugging patterns available in respective skills if deployed together]

Python Version Compatibility: This skill covers asyncio in Python 3.7+ and reflects current best practices for async programming in 2025.
常见异步陷阱:
  1. 阻塞事件循环
    python
    # ❌ 错误:异步函数中的阻塞调用
    async def bad_function():
        time.sleep(5)  # 阻塞整个事件循环!
        return "done"
    
    # ✅ 正确:使用asyncio.sleep
    async def good_function():
        await asyncio.sleep(5)  # 释放事件循环
        return "done"
  2. 调试竞态条件
    python
    # 添加日志跟踪执行顺序
    import logging
    logging.basicConfig(level=logging.DEBUG)
    
    async def debug_task(name):
        logging.debug(f"{name}: 启动")
        await asyncio.sleep(1)
        logging.debug(f"{name}: 完成")
        return name
    
    # 启用详细跟踪运行
    asyncio.run(asyncio.gather(debug_task("A"), debug_task("B")), debug=True)
  3. 死锁检测
    python
    # 使用超时检测死锁
    try:
        result = await asyncio.wait_for(some_async_function(), timeout=5.0)
    except asyncio.TimeoutError:
        logging.error("检测到死锁:操作超时")
        # 调查阻塞原因
  4. 检查运行中任务
    python
    # 检查所有待处理任务
    tasks = asyncio.all_tasks()
    for task in tasks:
        print(f"任务: {task.get_name()}, 完成: {task.done()}")
        if not task.done():
            print(f"  当前协程: {task.get_coro()}")
[若同时部署,可查看对应技能中的完整FastAPI、pytest及调试模式]

Python版本兼容性: 本技能覆盖Python 3.7+中的asyncio,反映了2025年异步编程的当前最佳实践。