async-python-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Async Python Patterns

Python异步编程模式

Comprehensive guidance for implementing asynchronous Python applications using asyncio, concurrent programming patterns, and async/await for building high-performance, non-blocking systems.
本文提供了使用asyncio、并发编程模式以及async/await实现Python异步应用的全面指南,用于构建高性能、非阻塞的系统。

When to Use This Skill

何时使用该技术

  • Building async web APIs (FastAPI, aiohttp, Sanic)
  • Implementing concurrent I/O operations (database, file, network)
  • Creating web scrapers with concurrent requests
  • Developing real-time applications (WebSocket servers, chat systems)
  • Processing multiple independent tasks simultaneously
  • Building microservices with async communication
  • Optimizing I/O-bound workloads
  • Implementing async background tasks and queues
  • 构建异步Web API(FastAPI、aiohttp、Sanic)
  • 实现并发I/O操作(数据库、文件、网络)
  • 创建支持并发请求的网络爬虫
  • 开发实时应用(WebSocket服务器、聊天系统)
  • 同时处理多个独立任务
  • 构建采用异步通信的微服务
  • 优化I/O密集型工作负载
  • 实现异步后台任务与队列

Sync vs Async Decision Guide

同步vs异步决策指南

Before adopting async, consider whether it's the right choice for your use case.
Use CaseRecommended Approach
Many concurrent network/DB calls
asyncio
CPU-bound computation
multiprocessing
or thread pool
Mixed I/O + CPUOffload CPU work with
asyncio.to_thread()
Simple scripts, few connectionsSync (simpler, easier to debug)
Web APIs with high concurrencyAsync frameworks (FastAPI, aiohttp)
Key Rule: Stay fully sync or fully async within a call path. Mixing creates hidden blocking and complexity.
在采用异步编程之前,请考虑它是否适合你的使用场景。
使用场景推荐方案
大量并发网络/数据库调用
asyncio
CPU密集型计算
multiprocessing
或线程池
I/O与CPU混合负载使用
asyncio.to_thread()
卸载CPU任务
简单脚本、少量连接同步编程(更简单、易于调试)
高并发Web API异步框架(FastAPI、aiohttp)
核心原则: 在调用路径中保持完全同步或完全异步。混合使用会导致隐藏的阻塞和复杂度。

Core Concepts

核心概念

1. Event Loop

1. 事件循环

The event loop is the heart of asyncio, managing and scheduling asynchronous tasks.
Key characteristics:
  • Single-threaded cooperative multitasking
  • Schedules coroutines for execution
  • Handles I/O operations without blocking
  • Manages callbacks and futures
事件循环是asyncio的核心,用于管理和调度异步任务。
关键特性:
  • 单线程协作式多任务处理
  • 调度协程执行
  • 无阻塞处理I/O操作
  • 管理回调和Future对象

2. Coroutines

2. 协程

Functions defined with
async def
that can be paused and resumed.
Syntax:
python
async def my_coroutine():
    result = await some_async_operation()
    return result
使用
async def
定义的可暂停和恢复的函数。
语法:
python
async def my_coroutine():
    result = await some_async_operation()
    return result

3. Tasks

3. 任务

Scheduled coroutines that run concurrently on the event loop.
在事件循环上并发运行的已调度协程。

4. Futures

4. Future对象

Low-level objects representing eventual results of async operations.
表示异步操作最终结果的底层对象。

5. Async Context Managers

5. 异步上下文管理器

Resources that support
async with
for proper cleanup.
支持
async with
语法以实现资源正确清理的资源对象。

6. Async Iterators

6. 异步迭代器

Objects that support
async for
for iterating over async data sources.
支持
async for
语法以遍历异步数据源的对象。

Quick Start

快速入门

python
import asyncio

async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")
python
import asyncio

async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

Python 3.7+

Python 3.7+

asyncio.run(main())
undefined
asyncio.run(main())
undefined

Fundamental Patterns

基础模式

Pattern 1: Basic Async/Await

模式1:基础Async/Await

python
import asyncio

async def fetch_data(url: str) -> dict:
    """Fetch data from URL asynchronously."""
    await asyncio.sleep(1)  # Simulate I/O
    return {"url": url, "data": "result"}

async def main():
    result = await fetch_data("https://api.example.com")
    print(result)

asyncio.run(main())
python
import asyncio

async def fetch_data(url: str) -> dict:
    """异步从URL获取数据。"""
    await asyncio.sleep(1)  # 模拟I/O操作
    return {"url": url, "data": "result"}

async def main():
    result = await fetch_data("https://api.example.com")
    print(result)

asyncio.run(main())

Pattern 2: Concurrent Execution with gather()

模式2:使用gather()实现并发执行

python
import asyncio
from typing import List

async def fetch_user(user_id: int) -> dict:
    """Fetch user data."""
    await asyncio.sleep(0.5)
    return {"id": user_id, "name": f"User {user_id}"}

async def fetch_all_users(user_ids: List[int]) -> List[dict]:
    """Fetch multiple users concurrently."""
    tasks = [fetch_user(uid) for uid in user_ids]
    results = await asyncio.gather(*tasks)
    return results

async def main():
    user_ids = [1, 2, 3, 4, 5]
    users = await fetch_all_users(user_ids)
    print(f"Fetched {len(users)} users")

asyncio.run(main())
python
import asyncio
from typing import List

async def fetch_user(user_id: int) -> dict:
    """获取用户数据。"""
    await asyncio.sleep(0.5)
    return {"id": user_id, "name": f"User {user_id}"}

async def fetch_all_users(user_ids: List[int]) -> List[dict]:
    """并发获取多个用户数据。"""
    tasks = [fetch_user(uid) for uid in user_ids]
    results = await asyncio.gather(*tasks)
    return results

async def main():
    user_ids = [1, 2, 3, 4, 5]
    users = await fetch_all_users(user_ids)
    print(f"Fetched {len(users)} users")

asyncio.run(main())

Pattern 3: Task Creation and Management

模式3:任务创建与管理

python
import asyncio

async def background_task(name: str, delay: int):
    """Long-running background task."""
    print(f"{name} started")
    await asyncio.sleep(delay)
    print(f"{name} completed")
    return f"Result from {name}"

async def main():
    # Create tasks
    task1 = asyncio.create_task(background_task("Task 1", 2))
    task2 = asyncio.create_task(background_task("Task 2", 1))

    # Do other work
    print("Main: doing other work")
    await asyncio.sleep(0.5)

    # Wait for tasks
    result1 = await task1
    result2 = await task2

    print(f"Results: {result1}, {result2}")

asyncio.run(main())
python
import asyncio

async def background_task(name: str, delay: int):
    """长时间运行的后台任务。"""
    print(f"{name} started")
    await asyncio.sleep(delay)
    print(f"{name} completed")
    return f"Result from {name}"

async def main():
    # 创建任务
    task1 = asyncio.create_task(background_task("Task 1", 2))
    task2 = asyncio.create_task(background_task("Task 2", 1))

    # 执行其他工作
    print("Main: doing other work")
    await asyncio.sleep(0.5)

    # 等待任务完成
    result1 = await task1
    result2 = await task2

    print(f"Results: {result1}, {result2}")

asyncio.run(main())

Pattern 4: Error Handling in Async Code

模式4:异步代码中的错误处理

python
import asyncio
from typing import List, Optional

async def risky_operation(item_id: int) -> dict:
    """Operation that might fail."""
    await asyncio.sleep(0.1)
    if item_id % 3 == 0:
        raise ValueError(f"Item {item_id} failed")
    return {"id": item_id, "status": "success"}

async def safe_operation(item_id: int) -> Optional[dict]:
    """Wrapper with error handling."""
    try:
        return await risky_operation(item_id)
    except ValueError as e:
        print(f"Error: {e}")
        return None

async def process_items(item_ids: List[int]):
    """Process multiple items with error handling."""
    tasks = [safe_operation(iid) for iid in item_ids]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Filter out failures
    successful = [r for r in results if r is not None and not isinstance(r, Exception)]
    failed = [r for r in results if isinstance(r, Exception)]

    print(f"Success: {len(successful)}, Failed: {len(failed)}")
    return successful

asyncio.run(process_items([1, 2, 3, 4, 5, 6]))
python
import asyncio
from typing import List, Optional

async def risky_operation(item_id: int) -> dict:
    """可能失败的操作。"""
    await asyncio.sleep(0.1)
    if item_id % 3 == 0:
        raise ValueError(f"Item {item_id} failed")
    return {"id": item_id, "status": "success"}

async def safe_operation(item_id: int) -> Optional[dict]:
    """带有错误处理的包装函数。"""
    try:
        return await risky_operation(item_id)
    except ValueError as e:
        print(f"Error: {e}")
        return None

async def process_items(item_ids: List[int]):
    """处理多个带有错误处理的任务。"""
    tasks = [safe_operation(iid) for iid in item_ids]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # 过滤失败的任务
    successful = [r for r in results if r is not None and not isinstance(r, Exception)]
    failed = [r for r in results if isinstance(r, Exception)]

    print(f"Success: {len(successful)}, Failed: {len(failed)}")
    return successful

asyncio.run(process_items([1, 2, 3, 4, 5, 6]))

Pattern 5: Timeout Handling

模式5:超时处理

python
import asyncio

async def slow_operation(delay: int) -> str:
    """Operation that takes time."""
    await asyncio.sleep(delay)
    return f"Completed after {delay}s"

async def with_timeout():
    """Execute operation with timeout."""
    try:
        result = await asyncio.wait_for(slow_operation(5), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(with_timeout())
python
import asyncio

async def slow_operation(delay: int) -> str:
    """耗时操作。"""
    await asyncio.sleep(delay)
    return f"Completed after {delay}s"

async def with_timeout():
    """带超时的操作执行。"""
    try:
        result = await asyncio.wait_for(slow_operation(5), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(with_timeout())

Advanced Patterns

高级模式

Pattern 6: Async Context Managers

模式6:异步上下文管理器

python
import asyncio
from typing import Optional

class AsyncDatabaseConnection:
    """Async database connection context manager."""

    def __init__(self, dsn: str):
        self.dsn = dsn
        self.connection: Optional[object] = None

    async def __aenter__(self):
        print("Opening connection")
        await asyncio.sleep(0.1)  # Simulate connection
        self.connection = {"dsn": self.dsn, "connected": True}
        return self.connection

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing connection")
        await asyncio.sleep(0.1)  # Simulate cleanup
        self.connection = None

async def query_database():
    """Use async context manager."""
    async with AsyncDatabaseConnection("postgresql://localhost") as conn:
        print(f"Using connection: {conn}")
        await asyncio.sleep(0.2)  # Simulate query
        return {"rows": 10}

asyncio.run(query_database())
python
import asyncio
from typing import Optional

class AsyncDatabaseConnection:
    """异步数据库连接上下文管理器。"""

    def __init__(self, dsn: str):
        self.dsn = dsn
        self.connection: Optional[object] = None

    async def __aenter__(self):
        print("Opening connection")
        await asyncio.sleep(0.1)  # 模拟连接过程
        self.connection = {"dsn": self.dsn, "connected": True}
        return self.connection

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing connection")
        await asyncio.sleep(0.1)  # 模拟清理过程
        self.connection = None

async def query_database():
    """使用异步上下文管理器。"""
    async with AsyncDatabaseConnection("postgresql://localhost") as conn:
        print(f"Using connection: {conn}")
        await asyncio.sleep(0.2)  # 模拟查询
        return {"rows": 10}

asyncio.run(query_database())

Pattern 7: Async Iterators and Generators

模式7:异步迭代器与生成器

python
import asyncio
from typing import AsyncIterator

async def async_range(start: int, end: int, delay: float = 0.1) -> AsyncIterator[int]:
    """Async generator that yields numbers with delay."""
    for i in range(start, end):
        await asyncio.sleep(delay)
        yield i

async def fetch_pages(url: str, max_pages: int) -> AsyncIterator[dict]:
    """Fetch paginated data asynchronously."""
    for page in range(1, max_pages + 1):
        await asyncio.sleep(0.2)  # Simulate API call
        yield {
            "page": page,
            "url": f"{url}?page={page}",
            "data": [f"item_{page}_{i}" for i in range(5)]
        }

async def consume_async_iterator():
    """Consume async iterator."""
    async for number in async_range(1, 5):
        print(f"Number: {number}")

    print("\nFetching pages:")
    async for page_data in fetch_pages("https://api.example.com/items", 3):
        print(f"Page {page_data['page']}: {len(page_data['data'])} items")

asyncio.run(consume_async_iterator())
python
import asyncio
from typing import AsyncIterator

async def async_range(start: int, end: int, delay: float = 0.1) -> AsyncIterator[int]:
    """带有延迟的异步生成器,生成数字序列。"""
    for i in range(start, end):
        await asyncio.sleep(delay)
        yield i

async def fetch_pages(url: str, max_pages: int) -> AsyncIterator[dict]:
    """异步获取分页数据。"""
    for page in range(1, max_pages + 1):
        await asyncio.sleep(0.2)  # 模拟API调用
        yield {
            "page": page,
            "url": f"{url}?page={page}",
            "data": [f"item_{page}_{i}" for i in range(5)]
        }

async def consume_async_iterator():
    """消费异步迭代器。"""
    async for number in async_range(1, 5):
        print(f"Number: {number}")

    print("\nFetching pages:")
    async for page_data in fetch_pages("https://api.example.com/items", 3):
        print(f"Page {page_data['page']}: {len(page_data['data'])} items")

asyncio.run(consume_async_iterator())

Pattern 8: Producer-Consumer Pattern

模式8:生产者-消费者模式

python
import asyncio
from asyncio import Queue
from typing import Optional

async def producer(queue: Queue, producer_id: int, num_items: int):
    """Produce items and put them in queue."""
    for i in range(num_items):
        item = f"Item-{producer_id}-{i}"
        await queue.put(item)
        print(f"Producer {producer_id} produced: {item}")
        await asyncio.sleep(0.1)
    await queue.put(None)  # Signal completion

async def consumer(queue: Queue, consumer_id: int):
    """Consume items from queue."""
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break

        print(f"Consumer {consumer_id} processing: {item}")
        await asyncio.sleep(0.2)  # Simulate work
        queue.task_done()

async def producer_consumer_example():
    """Run producer-consumer pattern."""
    queue = Queue(maxsize=10)

    # Create tasks
    producers = [
        asyncio.create_task(producer(queue, i, 5))
        for i in range(2)
    ]

    consumers = [
        asyncio.create_task(consumer(queue, i))
        for i in range(3)
    ]

    # Wait for producers
    await asyncio.gather(*producers)

    # Wait for queue to be empty
    await queue.join()

    # Cancel consumers
    for c in consumers:
        c.cancel()

asyncio.run(producer_consumer_example())
python
import asyncio
from asyncio import Queue
from typing import Optional

async def producer(queue: Queue, producer_id: int, num_items: int):
    """生产任务项并放入队列。"""
    for i in range(num_items):
        item = f"Item-{producer_id}-{i}"
        await queue.put(item)
        print(f"Producer {producer_id} produced: {item}")
        await asyncio.sleep(0.1)
    await queue.put(None)  # 发送完成信号

async def consumer(queue: Queue, consumer_id: int):
    """从队列中消费任务项。"""
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break

        print(f"Consumer {consumer_id} processing: {item}")
        await asyncio.sleep(0.2)  # 模拟处理过程
        queue.task_done()

async def producer_consumer_example():
    """运行生产者-消费者模式示例。"""
    queue = Queue(maxsize=10)

    # 创建任务
    producers = [
        asyncio.create_task(producer(queue, i, 5))
        for i in range(2)
    ]

    consumers = [
        asyncio.create_task(consumer(queue, i))
        for i in range(3)
    ]

    # 等待生产者完成
    await asyncio.gather(*producers)

    # 等待队列任务全部完成
    await queue.join()

    # 取消消费者任务
    for c in consumers:
        c.cancel()

asyncio.run(producer_consumer_example())

Pattern 9: Semaphore for Rate Limiting

模式9:使用信号量进行速率限制

python
import asyncio
from typing import List

async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
    """Make API call with rate limiting."""
    async with semaphore:
        print(f"Calling {url}")
        await asyncio.sleep(0.5)  # Simulate API call
        return {"url": url, "status": 200}

async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
    """Make multiple requests with rate limiting."""
    semaphore = asyncio.Semaphore(max_concurrent)
    tasks = [api_call(url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

async def main():
    urls = [f"https://api.example.com/item/{i}" for i in range(20)]
    results = await rate_limited_requests(urls, max_concurrent=3)
    print(f"Completed {len(results)} requests")

asyncio.run(main())
python
import asyncio
from typing import List

async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
    """使用信号量进行速率限制的API调用。"""
    async with semaphore:
        print(f"Calling {url}")
        await asyncio.sleep(0.5)  # 模拟API调用
        return {"url": url, "status": 200}

async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
    """使用速率限制发送多个请求。"""
    semaphore = asyncio.Semaphore(max_concurrent)
    tasks = [api_call(url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

async def main():
    urls = [f"https://api.example.com/item/{i}" for i in range(20)]
    results = await rate_limited_requests(urls, max_concurrent=3)
    print(f"Completed {len(results)} requests")

asyncio.run(main())

Pattern 10: Async Locks and Synchronization

模式10:异步锁与同步机制

python
import asyncio

class AsyncCounter:
    """Thread-safe async counter."""

    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()

    async def increment(self):
        """Safely increment counter."""
        async with self.lock:
            current = self.value
            await asyncio.sleep(0.01)  # Simulate work
            self.value = current + 1

    async def get_value(self) -> int:
        """Get current value."""
        async with self.lock:
            return self.value

async def worker(counter: AsyncCounter, worker_id: int):
    """Worker that increments counter."""
    for _ in range(10):
        await counter.increment()
        print(f"Worker {worker_id} incremented")

async def test_counter():
    """Test concurrent counter."""
    counter = AsyncCounter()

    workers = [asyncio.create_task(worker(counter, i)) for i in range(5)]
    await asyncio.gather(*workers)

    final_value = await counter.get_value()
    print(f"Final counter value: {final_value}")

asyncio.run(test_counter())
python
import asyncio

class AsyncCounter:
    """线程安全的异步计数器。"""

    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()

    async def increment(self):
        """安全地递增计数器。"""
        async with self.lock:
            current = self.value
            await asyncio.sleep(0.01)  # 模拟处理过程
            self.value = current + 1

    async def get_value(self) -> int:
        """获取当前计数器值。"""
        async with self.lock:
            return self.value

async def worker(counter: AsyncCounter, worker_id: int):
    """递增计数器的工作任务。"""
    for _ in range(10):
        await counter.increment()
        print(f"Worker {worker_id} incremented")

async def test_counter():
    """测试并发计数器。"""
    counter = AsyncCounter()

    workers = [asyncio.create_task(worker(counter, i)) for i in range(5)]
    await asyncio.gather(*workers)

    final_value = await counter.get_value()
    print(f"Final counter value: {final_value}")

asyncio.run(test_counter())

Real-World Applications

实际应用场景

Web Scraping with aiohttp

使用aiohttp进行网络爬虫

python
import asyncio
import aiohttp
from typing import List, Dict

async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
    """Fetch single URL."""
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
            text = await response.text()
            return {
                "url": url,
                "status": response.status,
                "length": len(text)
            }
    except Exception as e:
        return {"url": url, "error": str(e)}

async def scrape_urls(urls: List[str]) -> List[Dict]:
    """Scrape multiple URLs concurrently."""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/status/404",
    ]

    results = await scrape_urls(urls)
    for result in results:
        print(result)

asyncio.run(main())
python
import asyncio
import aiohttp
from typing import List, Dict

async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
    """获取单个URL的内容。"""
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
            text = await response.text()
            return {
                "url": url,
                "status": response.status,
                "length": len(text)
            }
    except Exception as e:
        return {"url": url, "error": str(e)}

async def scrape_urls(urls: List[str]) -> List[Dict]:
    """并发爬取多个URL。"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/status/404",
    ]

    results = await scrape_urls(urls)
    for result in results:
        print(result)

asyncio.run(main())

Async Database Operations

异步数据库操作

python
import asyncio
from typing import List, Optional
python
import asyncio
from typing import List, Optional

Simulated async database client

模拟异步数据库客户端

class AsyncDB: """Simulated async database."""
async def execute(self, query: str) -> List[dict]:
    """Execute query."""
    await asyncio.sleep(0.1)
    return [{"id": 1, "name": "Example"}]

async def fetch_one(self, query: str) -> Optional[dict]:
    """Fetch single row."""
    await asyncio.sleep(0.1)
    return {"id": 1, "name": "Example"}
async def get_user_data(db: AsyncDB, user_id: int) -> dict: """Fetch user and related data concurrently.""" user_task = db.fetch_one(f"SELECT * FROM users WHERE id = {user_id}") orders_task = db.execute(f"SELECT * FROM orders WHERE user_id = {user_id}") profile_task = db.fetch_one(f"SELECT * FROM profiles WHERE user_id = {user_id}")
user, orders, profile = await asyncio.gather(user_task, orders_task, profile_task)

return {
    "user": user,
    "orders": orders,
    "profile": profile
}
async def main(): db = AsyncDB() user_data = await get_user_data(db, 1) print(user_data)
asyncio.run(main())
undefined
class AsyncDB: """模拟的异步数据库。"""
async def execute(self, query: str) -> List[dict]:
    """执行查询语句。"""
    await asyncio.sleep(0.1)
    return [{"id": 1, "name": "Example"}]

async def fetch_one(self, query: str) -> Optional[dict]:
    """获取单行数据。"""
    await asyncio.sleep(0.1)
    return {"id": 1, "name": "Example"}
async def get_user_data(db: AsyncDB, user_id: int) -> dict: """并发获取用户及其关联数据。""" user_task = db.fetch_one(f"SELECT * FROM users WHERE id = {user_id}") orders_task = db.execute(f"SELECT * FROM orders WHERE user_id = {user_id}") profile_task = db.fetch_one(f"SELECT * FROM profiles WHERE user_id = {user_id}")
user, orders, profile = await asyncio.gather(user_task, orders_task, profile_task)

return {
    "user": user,
    "orders": orders,
    "profile": profile
}
async def main(): db = AsyncDB() user_data = await get_user_data(db, 1) print(user_data)
asyncio.run(main())
undefined

WebSocket Server

WebSocket服务器

python
import asyncio
from typing import Set
python
import asyncio
from typing import Set

Simulated WebSocket connection

模拟WebSocket连接

class WebSocket: """Simulated WebSocket."""
def __init__(self, client_id: str):
    self.client_id = client_id

async def send(self, message: str):
    """Send message."""
    print(f"Sending to {self.client_id}: {message}")
    await asyncio.sleep(0.01)

async def recv(self) -> str:
    """Receive message."""
    await asyncio.sleep(1)
    return f"Message from {self.client_id}"
class WebSocketServer: """Simple WebSocket server."""
def __init__(self):
    self.clients: Set[WebSocket] = set()

async def register(self, websocket: WebSocket):
    """Register new client."""
    self.clients.add(websocket)
    print(f"Client {websocket.client_id} connected")

async def unregister(self, websocket: WebSocket):
    """Unregister client."""
    self.clients.remove(websocket)
    print(f"Client {websocket.client_id} disconnected")

async def broadcast(self, message: str):
    """Broadcast message to all clients."""
    if self.clients:
        tasks = [client.send(message) for client in self.clients]
        await asyncio.gather(*tasks)

async def handle_client(self, websocket: WebSocket):
    """Handle individual client connection."""
    await self.register(websocket)
    try:
        async for message in self.message_iterator(websocket):
            await self.broadcast(f"{websocket.client_id}: {message}")
    finally:
        await self.unregister(websocket)

async def message_iterator(self, websocket: WebSocket):
    """Iterate over messages from client."""
    for _ in range(3):  # Simulate 3 messages
        yield await websocket.recv()
undefined
class WebSocket: """模拟的WebSocket连接。"""
def __init__(self, client_id: str):
    self.client_id = client_id

async def send(self, message: str):
    """发送消息。"""
    print(f"Sending to {self.client_id}: {message}")
    await asyncio.sleep(0.01)

async def recv(self) -> str:
    """接收消息。"""
    await asyncio.sleep(1)
    return f"Message from {self.client_id}"
class WebSocketServer: """简单的WebSocket服务器。"""
def __init__(self):
    self.clients: Set[WebSocket] = set()

async def register(self, websocket: WebSocket):
    """注册新客户端。"""
    self.clients.add(websocket)
    print(f"Client {websocket.client_id} connected")

async def unregister(self, websocket: WebSocket):
    """注销客户端。"""
    self.clients.remove(websocket)
    print(f"Client {websocket.client_id} disconnected")

async def broadcast(self, message: str):
    """向所有客户端广播消息。"""
    if self.clients:
        tasks = [client.send(message) for client in self.clients]
        await asyncio.gather(*tasks)

async def handle_client(self, websocket: WebSocket):
    """处理单个客户端连接。"""
    await self.register(websocket)
    try:
        async for message in self.message_iterator(websocket):
            await self.broadcast(f"{websocket.client_id}: {message}")
    finally:
        await self.unregister(websocket)

async def message_iterator(self, websocket: WebSocket):
    """遍历客户端发送的消息。"""
    for _ in range(3):  # 模拟3条消息
        yield await websocket.recv()
undefined

Performance Best Practices

性能最佳实践

1. Use Connection Pools

1. 使用连接池

python
import asyncio
import aiohttp

async def with_connection_pool():
    """Use connection pool for efficiency."""
    connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)

    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [session.get(f"https://api.example.com/item/{i}") for i in range(50)]
        responses = await asyncio.gather(*tasks)
        return responses
python
import asyncio
import aiohttp

async def with_connection_pool():
    """使用连接池提升效率。"""
    connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)

    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [session.get(f"https://api.example.com/item/{i}") for i in range(50)]
        responses = await asyncio.gather(*tasks)
        return responses

2. Batch Operations

2. 批量操作

python
async def batch_process(items: List[str], batch_size: int = 10):
    """Process items in batches."""
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        tasks = [process_item(item) for item in batch]
        await asyncio.gather(*tasks)
        print(f"Processed batch {i // batch_size + 1}")

async def process_item(item: str):
    """Process single item."""
    await asyncio.sleep(0.1)
    return f"Processed: {item}"
python
async def batch_process(items: List[str], batch_size: int = 10):
    """批量处理任务项。"""
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        tasks = [process_item(item) for item in batch]
        await asyncio.gather(*tasks)
        print(f"Processed batch {i // batch_size + 1}")

async def process_item(item: str):
    """处理单个任务项。"""
    await asyncio.sleep(0.1)
    return f"Processed: {item}"

3. Avoid Blocking Operations

3. 避免阻塞操作

Never block the event loop with synchronous operations. A single blocking call stalls all concurrent tasks.
python
undefined
切勿使用同步操作阻塞事件循环。单个阻塞调用会导致所有并发任务停滞。
python
undefined

BAD - blocks the entire event loop

错误示例 - 阻塞整个事件循环

async def fetch_data_bad(): import time import requests time.sleep(1) # Blocks! response = requests.get(url) # Also blocks!
async def fetch_data_bad(): import time import requests time.sleep(1) # 阻塞! response = requests.get(url) # 同样阻塞!

GOOD - use async-native libraries (e.g., httpx for async HTTP)

正确示例 - 使用异步原生库(例如,使用httpx进行异步HTTP请求)

import httpx
async def fetch_data_good(url: str): await asyncio.sleep(1) async with httpx.AsyncClient() as client: response = await client.get(url)

**Wrapping Blocking Code with `asyncio.to_thread()` (Python 3.9+):**

When you must use synchronous libraries, offload to a thread pool:

```python
import asyncio
from pathlib import Path

async def read_file_async(path: str) -> str:
    """Read file without blocking event loop."""
    # asyncio.to_thread() runs sync code in a thread pool
    return await asyncio.to_thread(Path(path).read_text)

async def call_sync_library(data: dict) -> dict:
    """Wrap a synchronous library call."""
    # Useful for sync database drivers, file I/O, CPU work
    return await asyncio.to_thread(sync_library.process, data)
Lower-level approach with
run_in_executor()
:
python
import asyncio
import concurrent.futures
from typing import Any

def blocking_operation(data: Any) -> Any:
    """CPU-intensive blocking operation."""
    import time
    time.sleep(1)
    return data * 2

async def run_in_executor(data: Any) -> Any:
    """Run blocking operation in thread pool."""
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_operation, data)
        return result

async def main():
    results = await asyncio.gather(*[run_in_executor(i) for i in range(5)])
    print(results)

asyncio.run(main())
import httpx
async def fetch_data_good(url: str): await asyncio.sleep(1) async with httpx.AsyncClient() as client: response = await client.get(url)

**使用`asyncio.to_thread()`包装阻塞代码(Python 3.9+):**

当必须使用同步库时,将其卸载到线程池:

```python
import asyncio
from pathlib import Path

async def read_file_async(path: str) -> str:
    """异步读取文件,不阻塞事件循环。"""
    # asyncio.to_thread() 在线程池中运行同步代码
    return await asyncio.to_thread(Path(path).read_text)

async def call_sync_library(data: dict) -> dict:
    """包装同步库调用。"""
    # 适用于同步数据库驱动、文件I/O、CPU密集型工作
    return await asyncio.to_thread(sync_library.process, data)
使用
run_in_executor()
的底层实现方式:
python
import asyncio
import concurrent.futures
from typing import Any

def blocking_operation(data: Any) -> Any:
    """CPU密集型阻塞操作。"""
    import time
    time.sleep(1)
    return data * 2

async def run_in_executor(data: Any) -> Any:
    """在线程池中运行阻塞操作。"""
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_operation, data)
        return result

async def main():
    results = await asyncio.gather(*[run_in_executor(i) for i in range(5)])
    print(results)

asyncio.run(main())

Common Pitfalls

常见陷阱

1. Forgetting await

1. 忘记使用await

python
undefined
python
undefined

Wrong - returns coroutine object, doesn't execute

错误 - 返回协程对象,未执行

result = async_function()
result = async_function()

Correct

正确

result = await async_function()
undefined
result = await async_function()
undefined

2. Blocking the Event Loop

2. 阻塞事件循环

python
undefined
python
undefined

Wrong - blocks event loop

错误 - 阻塞事件循环

import time async def bad(): time.sleep(1) # Blocks!
import time async def bad(): time.sleep(1) # 阻塞!

Correct

正确

async def good(): await asyncio.sleep(1) # Non-blocking
undefined
async def good(): await asyncio.sleep(1) # 非阻塞
undefined

3. Not Handling Cancellation

3. 未处理任务取消

python
async def cancelable_task():
    """Task that handles cancellation."""
    try:
        while True:
            await asyncio.sleep(1)
            print("Working...")
    except asyncio.CancelledError:
        print("Task cancelled, cleaning up...")
        # Perform cleanup
        raise  # Re-raise to propagate cancellation
python
async def cancelable_task():
    """可取消的任务,包含取消处理逻辑。"""
    try:
        while True:
            await asyncio.sleep(1)
            print("Working...")
    except asyncio.CancelledError:
        print("Task cancelled, cleaning up...")
        # 执行清理操作
        raise  # 重新抛出异常以传播取消信号

4. Mixing Sync and Async Code

4. 混合同步与异步代码

python
undefined
python
undefined

Wrong - can't call async from sync directly

错误 - 无法直接从同步函数调用异步函数

def sync_function(): result = await async_function() # SyntaxError!
def sync_function(): result = await async_function() # 语法错误!

Correct

正确

def sync_function(): result = asyncio.run(async_function())
undefined
def sync_function(): result = asyncio.run(async_function())
undefined

Testing Async Code

异步代码测试

python
import asyncio
import pytest
python
import asyncio
import pytest

Using pytest-asyncio

使用pytest-asyncio

@pytest.mark.asyncio async def test_async_function(): """Test async function.""" result = await fetch_data("https://api.example.com") assert result is not None
@pytest.mark.asyncio async def test_with_timeout(): """Test with timeout.""" with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(slow_operation(5), timeout=1.0)
undefined
@pytest.mark.asyncio async def test_async_function(): """测试异步函数。""" result = await fetch_data("https://api.example.com") assert result is not None
@pytest.mark.asyncio async def test_with_timeout(): """测试超时场景。""" with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(slow_operation(5), timeout=1.0)
undefined

Resources

参考资源

Best Practices Summary

最佳实践总结

  1. Use asyncio.run() for entry point (Python 3.7+)
  2. Always await coroutines to execute them
  3. Limit concurrency with semaphores - unbounded
    gather()
    can exhaust resources
  4. Implement proper error handling with try/except
  5. Use timeouts to prevent hanging operations
  6. Pool connections for better performance
  7. Never block the event loop - use
    asyncio.to_thread()
    for sync code
  8. Use semaphores for rate limiting external API calls
  9. Handle task cancellation properly - always re-raise
    CancelledError
  10. Test async code with pytest-asyncio
  11. Stay consistent - fully sync or fully async, avoid mixing
  1. 使用asyncio.run() 作为入口点(Python 3.7+)
  2. 始终使用await执行协程
  3. 使用信号量限制并发数 - 无限制的
    gather()
    会耗尽资源
  4. 实现完善的错误处理 - 使用try/except捕获异常
  5. 为操作设置超时 - 防止任务挂起
  6. 使用连接池 - 提升性能
  7. 切勿阻塞事件循环 - 对同步代码使用
    asyncio.to_thread()
  8. 使用信号量限制外部API调用速率
  9. 正确处理任务取消 - 始终重新抛出
    CancelledError
  10. 使用pytest-asyncio测试异步代码
  11. 保持代码一致性 - 完全同步或完全异步,避免混合使用