python-async-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Python Async Patterns

Python 异步编程模式

Master asynchronous programming in Python using asyncio, async/await syntax, and concurrent execution patterns for I/O-bound and CPU-bound tasks.
使用asyncio、async/await语法以及并发执行模式,掌握Python中的异步编程,处理I/O密集型和CPU密集型任务。

Basic Async/Await

基础Async/Await

Core async syntax:
python
import asyncio
核心异步语法:
python
import asyncio

Define async function with async def

Define async function with async def

async def fetch_data(url: str) -> str: print(f"Fetching {url}...") await asyncio.sleep(1) # Simulate I/O operation return f"Data from {url}"
async def fetch_data(url: str) -> str: print(f"Fetching {url}...") await asyncio.sleep(1) # Simulate I/O operation return f"Data from {url}"

Call async function

Call async function

async def main() -> None: result = await fetch_data("https://api.example.com") print(result)
async def main() -> None: result = await fetch_data("https://api.example.com") print(result)

Run async function

Run async function

asyncio.run(main())

**Multiple concurrent operations:**

```python
import asyncio

async def fetch_url(url: str) -> str:
    await asyncio.sleep(1)
    return f"Content from {url}"

async def main() -> None:
    # Run concurrently with gather
    results = await asyncio.gather(
        fetch_url("https://example.com/1"),
        fetch_url("https://example.com/2"),
        fetch_url("https://example.com/3")
    )
    for result in results:
        print(result)

asyncio.run(main())
asyncio.run(main())

**多任务并发操作:**

```python
import asyncio

async def fetch_url(url: str) -> str:
    await asyncio.sleep(1)
    return f"Content from {url}"

async def main() -> None:
    # Run concurrently with gather
    results = await asyncio.gather(
        fetch_url("https://example.com/1"),
        fetch_url("https://example.com/2"),
        fetch_url("https://example.com/3")
    )
    for result in results:
        print(result)

asyncio.run(main())

asyncio.create_task

asyncio.create_task

Creating and managing tasks:
python
import asyncio

async def process_item(item: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"Processed {item}"

async def main() -> None:
    # Create tasks for concurrent execution
    task1 = asyncio.create_task(process_item("A", 2.0))
    task2 = asyncio.create_task(process_item("B", 1.0))
    task3 = asyncio.create_task(process_item("C", 1.5))

    # Do other work while tasks run
    print("Tasks started")

    # Wait for tasks to complete
    result1 = await task1
    result2 = await task2
    result3 = await task3

    print(result1, result2, result3)

asyncio.run(main())
Task with name and context:
python
import asyncio

async def background_task(name: str) -> None:
    print(f"Task {name} starting")
    await asyncio.sleep(2)
    print(f"Task {name} completed")

async def main() -> None:
    # Create named task
    task = asyncio.create_task(
        background_task("worker"),
        name="background-worker"
    )

    # Check task status
    print(f"Task name: {task.get_name()}")
    print(f"Task done: {task.done()}")

    await task

asyncio.run(main())
任务创建与管理:
python
import asyncio

async def process_item(item: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"Processed {item}"

async def main() -> None:
    # Create tasks for concurrent execution
    task1 = asyncio.create_task(process_item("A", 2.0))
    task2 = asyncio.create_task(process_item("B", 1.0))
    task3 = asyncio.create_task(process_item("C", 1.5))

    # Do other work while tasks run
    print("Tasks started")

    # Wait for tasks to complete
    result1 = await task1
    result2 = await task2
    result3 = await task3

    print(result1, result2, result3)

asyncio.run(main())
带名称与上下文的任务:
python
import asyncio

async def background_task(name: str) -> None:
    print(f"Task {name} starting")
    await asyncio.sleep(2)
    print(f"Task {name} completed")

async def main() -> None:
    # Create named task
    task = asyncio.create_task(
        background_task("worker"),
        name="background-worker"
    )

    # Check task status
    print(f"Task name: {task.get_name()}")
    print(f"Task done: {task.done()}")

    await task

asyncio.run(main())

asyncio.gather vs asyncio.wait

asyncio.gather vs asyncio.wait

Using gather for results:
python
import asyncio

async def fetch(n: int) -> int:
    await asyncio.sleep(1)
    return n * 2

async def main() -> None:
    # gather returns results in order
    results = await asyncio.gather(
        fetch(1),
        fetch(2),
        fetch(3)
    )
    print(results)  # [2, 4, 6]

    # Return exceptions instead of raising
    results = await asyncio.gather(
        fetch(1),
        fetch(2),
        return_exceptions=True
    )

asyncio.run(main())
Using wait for more control:
python
import asyncio

async def worker(n: int) -> int:
    await asyncio.sleep(n)
    return n

async def main() -> None:
    tasks = [
        asyncio.create_task(worker(1)),
        asyncio.create_task(worker(2)),
        asyncio.create_task(worker(3))
    ]

    # Wait for first task to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    print(f"Done: {len(done)}, Pending: {len(pending)}")

    # Cancel pending tasks
    for task in pending:
        task.cancel()

    # Wait for all with timeout
    done, pending = await asyncio.wait(
        tasks,
        timeout=2.0
    )

asyncio.run(main())
使用gather获取结果:
python
import asyncio

async def fetch(n: int) -> int:
    await asyncio.sleep(1)
    return n * 2

async def main() -> None:
    # gather returns results in order
    results = await asyncio.gather(
        fetch(1),
        fetch(2),
        fetch(3)
    )
    print(results)  # [2, 4, 6]

    # Return exceptions instead of raising
    results = await asyncio.gather(
        fetch(1),
        fetch(2),
        return_exceptions=True
    )

asyncio.run(main())
使用wait实现更精细控制:
python
import asyncio

async def worker(n: int) -> int:
    await asyncio.sleep(n)
    return n

async def main() -> None:
    tasks = [
        asyncio.create_task(worker(1)),
        asyncio.create_task(worker(2)),
        asyncio.create_task(worker(3))
    ]

    # Wait for first task to complete
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    print(f"Done: {len(done)}, Pending: {len(pending)}")

    # Cancel pending tasks
    for task in pending:
        task.cancel()

    # Wait for all with timeout
    done, pending = await asyncio.wait(
        tasks,
        timeout=2.0
    )

asyncio.run(main())

Async Context Managers

异步上下文管理器

Creating async context managers:
python
import asyncio
from typing import AsyncIterator

class AsyncResource:
    async def __aenter__(self) -> "AsyncResource":
        print("Acquiring resource")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        print("Releasing resource")
        await asyncio.sleep(0.1)

    async def query(self) -> str:
        return "data"

async def main() -> None:
    async with AsyncResource() as resource:
        result = await resource.query()
        print(result)

asyncio.run(main())
Using asynccontextmanager decorator:
python
from contextlib import asynccontextmanager
import asyncio

@asynccontextmanager
async def get_connection(url: str) -> AsyncIterator[str]:
    # Setup
    print(f"Connecting to {url}")
    await asyncio.sleep(0.1)
    conn = f"connection-{url}"

    try:
        yield conn
    finally:
        # Teardown
        print(f"Closing connection to {url}")
        await asyncio.sleep(0.1)

async def main() -> None:
    async with get_connection("localhost") as conn:
        print(f"Using {conn}")

asyncio.run(main())
创建异步上下文管理器:
python
import asyncio
from typing import AsyncIterator

class AsyncResource:
    async def __aenter__(self) -> "AsyncResource":
        print("Acquiring resource")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        print("Releasing resource")
        await asyncio.sleep(0.1)

    async def query(self) -> str:
        return "data"

async def main() -> None:
    async with AsyncResource() as resource:
        result = await resource.query()
        print(result)

asyncio.run(main())
使用asynccontextmanager装饰器:
python
from contextlib import asynccontextmanager
import asyncio

@asynccontextmanager
async def get_connection(url: str) -> AsyncIterator[str]:
    # Setup
    print(f"Connecting to {url}")
    await asyncio.sleep(0.1)
    conn = f"connection-{url}"

    try:
        yield conn
    finally:
        # Teardown
        print(f"Closing connection to {url}")
        await asyncio.sleep(0.1)

async def main() -> None:
    async with get_connection("localhost") as conn:
        print(f"Using {conn}")

asyncio.run(main())

Async Iterators

异步迭代器

Creating async iterators:
python
import asyncio
from typing import AsyncIterator

class AsyncRange:
    def __init__(self, count: int) -> None:
        self.count = count

    def __aiter__(self) -> AsyncIterator[int]:
        return self

    async def __anext__(self) -> int:
        if self.count <= 0:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.count -= 1
        return self.count

async def main() -> None:
    async for i in AsyncRange(5):
        print(i)

asyncio.run(main())
Async generator functions:
python
import asyncio
from typing import AsyncIterator

async def async_range(count: int) -> AsyncIterator[int]:
    for i in range(count):
        await asyncio.sleep(0.1)
        yield i

async def fetch_pages(urls: list[str]) -> AsyncIterator[str]:
    for url in urls:
        await asyncio.sleep(0.5)
        yield f"Page content from {url}"

async def main() -> None:
    async for num in async_range(5):
        print(num)

    urls = ["url1", "url2", "url3"]
    async for page in fetch_pages(urls):
        print(page)

asyncio.run(main())
创建异步迭代器:
python
import asyncio
from typing import AsyncIterator

class AsyncRange:
    def __init__(self, count: int) -> None:
        self.count = count

    def __aiter__(self) -> AsyncIterator[int]:
        return self

    async def __anext__(self) -> int:
        if self.count <= 0:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.count -= 1
        return self.count

async def main() -> None:
    async for i in AsyncRange(5):
        print(i)

asyncio.run(main())
异步生成器函数:
python
import asyncio
from typing import AsyncIterator

async def async_range(count: int) -> AsyncIterator[int]:
    for i in range(count):
        await asyncio.sleep(0.1)
        yield i

async def fetch_pages(urls: list[str]) -> AsyncIterator[str]:
    for url in urls:
        await asyncio.sleep(0.5)
        yield f"Page content from {url}"

async def main() -> None:
    async for num in async_range(5):
        print(num)

    urls = ["url1", "url2", "url3"]
    async for page in fetch_pages(urls):
        print(page)

asyncio.run(main())

Async Queues

异步队列

Producer-consumer pattern with Queue:
python
import asyncio
from asyncio import Queue

async def producer(queue: Queue[int], n: int) -> None:
    for i in range(n):
        await asyncio.sleep(0.1)
        await queue.put(i)
        print(f"Produced {i}")
    await queue.put(None)  # Sentinel value

async def consumer(queue: Queue[int], name: str) -> None:
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        await asyncio.sleep(0.2)
        print(f"Consumer {name} processed {item}")
        queue.task_done()

async def main() -> None:
    queue: Queue[int] = Queue(maxsize=5)

    # Start producer and consumers
    prod = asyncio.create_task(producer(queue, 10))
    cons1 = asyncio.create_task(consumer(queue, "A"))
    cons2 = asyncio.create_task(consumer(queue, "B"))

    await prod
    await queue.join()  # Wait for all tasks to be processed

    # Signal consumers to exit
    await queue.put(None)
    await queue.put(None)

    await cons1
    await cons2

asyncio.run(main())
基于Queue的生产者-消费者模式:
python
import asyncio
from asyncio import Queue

async def producer(queue: Queue[int], n: int) -> None:
    for i in range(n):
        await asyncio.sleep(0.1)
        await queue.put(i)
        print(f"Produced {i}")
    await queue.put(None)  # Sentinel value

async def consumer(queue: Queue[int], name: str) -> None:
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        await asyncio.sleep(0.2)
        print(f"Consumer {name} processed {item}")
        queue.task_done()

async def main() -> None:
    queue: Queue[int] = Queue(maxsize=5)

    # Start producer and consumers
    prod = asyncio.create_task(producer(queue, 10))
    cons1 = asyncio.create_task(consumer(queue, "A"))
    cons2 = asyncio.create_task(consumer(queue, "B"))

    await prod
    await queue.join()  # Wait for all tasks to be processed

    # Signal consumers to exit
    await queue.put(None)
    await queue.put(None)

    await cons1
    await cons2

asyncio.run(main())

Semaphore and Lock

信号量与锁

Limiting concurrent operations:
python
import asyncio

async def fetch_with_limit(
    url: str,
    semaphore: asyncio.Semaphore
) -> str:
    async with semaphore:
        print(f"Fetching {url}")
        await asyncio.sleep(1)
        return f"Data from {url}"

async def main() -> None:
    # Limit to 3 concurrent operations
    semaphore = asyncio.Semaphore(3)

    urls = [f"https://example.com/{i}" for i in range(10)]

    tasks = [
        fetch_with_limit(url, semaphore)
        for url in urls
    ]

    results = await asyncio.gather(*tasks)
    print(f"Fetched {len(results)} URLs")

asyncio.run(main())
Using Lock for mutual exclusion:
python
import asyncio

class Counter:
    def __init__(self) -> None:
        self.value = 0
        self.lock = asyncio.Lock()

    async def increment(self) -> None:
        async with self.lock:
            # Critical section
            current = self.value
            await asyncio.sleep(0.01)
            self.value = current + 1

async def main() -> None:
    counter = Counter()

    # Run increments concurrently
    await asyncio.gather(*[
        counter.increment()
        for _ in range(100)
    ])

    print(f"Final value: {counter.value}")  # Should be 100

asyncio.run(main())
限制并发操作数:
python
import asyncio

async def fetch_with_limit(
    url: str,
    semaphore: asyncio.Semaphore
) -> str:
    async with semaphore:
        print(f"Fetching {url}")
        await asyncio.sleep(1)
        return f"Data from {url}"

async def main() -> None:
    # Limit to 3 concurrent operations
    semaphore = asyncio.Semaphore(3)

    urls = [f"https://example.com/{i}" for i in range(10)]

    tasks = [
        fetch_with_limit(url, semaphore)
        for url in urls
    ]

    results = await asyncio.gather(*tasks)
    print(f"Fetched {len(results)} URLs")

asyncio.run(main())
使用Lock实现互斥:
python
import asyncio

class Counter:
    def __init__(self) -> None:
        self.value = 0
        self.lock = asyncio.Lock()

    async def increment(self) -> None:
        async with self.lock:
            # Critical section
            current = self.value
            await asyncio.sleep(0.01)
            self.value = current + 1

async def main() -> None:
    counter = Counter()

    # Run increments concurrently
    await asyncio.gather(*[
        counter.increment()
        for _ in range(100)
    ])

    print(f"Final value: {counter.value}")  # Should be 100

asyncio.run(main())

Timeouts and Cancellation

超时与取消

Using timeout:
python
import asyncio

async def slow_operation() -> str:
    await asyncio.sleep(5)
    return "completed"

async def main() -> None:
    # Timeout after 2 seconds
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=2.0
        )
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(main())
Handling cancellation:
python
import asyncio

async def cancellable_task() -> None:
    try:
        while True:
            print("Working...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task was cancelled")
        # Cleanup
        raise

async def main() -> None:
    task = asyncio.create_task(cancellable_task())

    await asyncio.sleep(3)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task cancellation confirmed")

asyncio.run(main())
使用超时:
python
import asyncio

async def slow_operation() -> str:
    await asyncio.sleep(5)
    return "completed"

async def main() -> None:
    # Timeout after 2 seconds
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=2.0
        )
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(main())
处理取消操作:
python
import asyncio

async def cancellable_task() -> None:
    try:
        while True:
            print("Working...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print("Task was cancelled")
        # Cleanup
        raise

async def main() -> None:
    task = asyncio.create_task(cancellable_task())

    await asyncio.sleep(3)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task cancellation confirmed")

asyncio.run(main())

Event Loop Management

事件循环管理

Direct event loop control:
python
import asyncio

async def task1() -> None:
    print("Task 1")

async def task2() -> None:
    print("Task 2")
直接控制事件循环:
python
import asyncio

async def task1() -> None:
    print("Task 1")

async def task2() -> None:
    print("Task 2")

Create new event loop

Create new event loop

loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)
try: # Schedule callbacks loop.call_soon(lambda: print("Callback"))
# Schedule delayed callback
loop.call_later(1.0, lambda: print("Delayed"))

# Run coroutine
loop.run_until_complete(task1())

# Run multiple tasks
loop.run_until_complete(
    asyncio.gather(task1(), task2())
)
finally: loop.close()
undefined
loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)
try: # Schedule callbacks loop.call_soon(lambda: print("Callback"))
# Schedule delayed callback
loop.call_later(1.0, lambda: print("Delayed"))

# Run coroutine
loop.run_until_complete(task1())

# Run multiple tasks
loop.run_until_complete(
    asyncio.gather(task1(), task2())
)
finally: loop.close()
undefined

concurrent.futures

concurrent.futures

ThreadPoolExecutor for I/O-bound tasks:
python
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

def blocking_io_task(n: int) -> int:
    print(f"Task {n} starting")
    time.sleep(2)  # Blocking I/O
    return n * 2

async def main() -> None:
    loop = asyncio.get_event_loop()

    with ThreadPoolExecutor(max_workers=3) as executor:
        # Run blocking function in thread pool
        tasks = [
            loop.run_in_executor(executor, blocking_io_task, i)
            for i in range(5)
        ]

        results = await asyncio.gather(*tasks)
        print(results)

asyncio.run(main())
ProcessPoolExecutor for CPU-bound tasks:
python
import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_intensive_task(n: int) -> int:
    # CPU-intensive computation
    result = sum(i * i for i in range(n))
    return result

async def main() -> None:
    loop = asyncio.get_event_loop()

    with ProcessPoolExecutor(max_workers=4) as executor:
        # Run CPU-bound function in process pool
        tasks = [
            loop.run_in_executor(
                executor,
                cpu_intensive_task,
                10_000_000
            )
            for _ in range(4)
        ]

        results = await asyncio.gather(*tasks)
        print(f"Completed {len(results)} tasks")

asyncio.run(main())
针对I/O密集型任务的ThreadPoolExecutor:
python
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time

def blocking_io_task(n: int) -> int:
    print(f"Task {n} starting")
    time.sleep(2)  # Blocking I/O
    return n * 2

async def main() -> None:
    loop = asyncio.get_event_loop()

    with ThreadPoolExecutor(max_workers=3) as executor:
        # Run blocking function in thread pool
        tasks = [
            loop.run_in_executor(executor, blocking_io_task, i)
            for i in range(5)
        ]

        results = await asyncio.gather(*tasks)
        print(results)

asyncio.run(main())
针对CPU密集型任务的ProcessPoolExecutor:
python
import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_intensive_task(n: int) -> int:
    # CPU-intensive computation
    result = sum(i * i for i in range(n))
    return result

async def main() -> None:
    loop = asyncio.get_event_loop()

    with ProcessPoolExecutor(max_workers=4) as executor:
        # Run CPU-bound function in process pool
        tasks = [
            loop.run_in_executor(
                executor,
                cpu_intensive_task,
                10_000_000
            )
            for _ in range(4)
        ]

        results = await asyncio.gather(*tasks)
        print(f"Completed {len(results)} tasks")

asyncio.run(main())

Async HTTP with aiohttp

基于aiohttp的异步HTTP请求

Making async HTTP requests:
python
import asyncio
import aiohttp

async def fetch_url(
    session: aiohttp.ClientSession,
    url: str
) -> str:
    async with session.get(url) as response:
        return await response.text()

async def main() -> None:
    async with aiohttp.ClientSession() as session:
        urls = [
            "https://example.com/1",
            "https://example.com/2",
            "https://example.com/3"
        ]

        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        print(f"Fetched {len(results)} pages")

asyncio.run(main())
发起异步HTTP请求:
python
import asyncio
import aiohttp

async def fetch_url(
    session: aiohttp.ClientSession,
    url: str
) -> str:
    async with session.get(url) as response:
        return await response.text()

async def main() -> None:
    async with aiohttp.ClientSession() as session:
        urls = [
            "https://example.com/1",
            "https://example.com/2",
            "https://example.com/3"
        ]

        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        print(f"Fetched {len(results)} pages")

asyncio.run(main())

Error Handling

错误处理

Handling exceptions in async code:
python
import asyncio

async def failing_task() -> None:
    await asyncio.sleep(1)
    raise ValueError("Task failed")

async def main() -> None:
    # Handle exception in single task
    try:
        await failing_task()
    except ValueError as e:
        print(f"Caught: {e}")

    # Handle exceptions with gather
    results = await asyncio.gather(
        failing_task(),
        failing_task(),
        return_exceptions=True
    )

    for result in results:
        if isinstance(result, Exception):
            print(f"Task failed: {result}")

asyncio.run(main())
异步代码中的异常处理:
python
import asyncio

async def failing_task() -> None:
    await asyncio.sleep(1)
    raise ValueError("Task failed")

async def main() -> None:
    # Handle exception in single task
    try:
        await failing_task()
    except ValueError as e:
        print(f"Caught: {e}")

    # Handle exceptions with gather
    results = await asyncio.gather(
        failing_task(),
        failing_task(),
        return_exceptions=True
    )

    for result in results:
        if isinstance(result, Exception):
            print(f"Task failed: {result}")

asyncio.run(main())

When to Use This Skill

何时使用该技术

Use python-async-patterns when you need to:
  • Handle multiple I/O operations concurrently (API calls, database queries)
  • Build async web servers or clients
  • Process data streams asynchronously
  • Implement producer-consumer patterns with async queues
  • Run blocking I/O operations without blocking the event loop
  • Create async context managers for resource management
  • Implement async iterators for streaming data
  • Control concurrency with semaphores and locks
  • Handle timeouts and cancellation in async operations
  • Mix CPU-bound and I/O-bound operations efficiently
当你需要以下场景时,使用Python异步编程模式:
  • 并发处理多个I/O操作(API调用、数据库查询)
  • 构建异步Web服务器或客户端
  • 异步处理数据流
  • 基于异步队列实现生产者-消费者模式
  • 在不阻塞事件循环的情况下运行阻塞I/O操作
  • 创建用于资源管理的异步上下文管理器
  • 实现用于流数据的异步迭代器
  • 使用信号量与锁控制并发数
  • 处理异步操作中的超时与取消
  • 高效混合CPU密集型与I/O密集型操作

Best Practices

最佳实践

  • Use asyncio.run() for the main entry point
  • Create tasks with asyncio.create_task() for concurrent execution
  • Use gather() when you need all results
  • Use wait() when you need fine-grained control
  • Always handle CancelledError in long-running tasks
  • Use semaphores to limit concurrent operations
  • Prefer async context managers for resource management
  • Use asyncio.Queue for producer-consumer patterns
  • Run blocking I/O in thread pool with run_in_executor()
  • Run CPU-bound tasks in process pool
  • Set appropriate timeouts for network operations
  • Use structured concurrency patterns (nurseries)
  • 使用asyncio.run()作为主入口
  • 使用asyncio.create_task()创建任务以实现并发执行
  • 当需要所有结果时使用gather()
  • 当需要精细控制时使用wait()
  • 在长时间运行的任务中始终处理CancelledError
  • 使用信号量限制并发操作数
  • 优先使用异步上下文管理器进行资源管理
  • 使用asyncio.Queue实现生产者-消费者模式
  • 使用run_in_executor()在线程池中运行阻塞I/O操作
  • 在进程池中运行CPU密集型任务
  • 为网络操作设置合适的超时时间
  • 使用结构化并发模式(nurseries)

Common Pitfalls

常见陷阱

  • Forgetting to await coroutines (creates coroutine object, doesn't run)
  • Blocking the event loop with CPU-intensive work
  • Not handling task cancellation properly
  • Using time.sleep() instead of asyncio.sleep()
  • Creating too many concurrent tasks without limits
  • Not closing resources properly in async context
  • Mixing blocking and async code incorrectly
  • Not handling exceptions in background tasks
  • Forgetting to call task_done() with Queue
  • Using global event loop instead of asyncio.run()
  • 忘记await协程(仅创建协程对象,不会实际运行)
  • 用CPU密集型工作阻塞事件循环
  • 未正确处理任务取消
  • 使用time.sleep()而非asyncio.sleep()
  • 无限制创建过多并发任务
  • 未在异步上下文中正确关闭资源
  • 错误混合阻塞与异步代码
  • 未处理后台任务中的异常
  • 使用Queue时忘记调用task_done()
  • 使用全局事件循环而非asyncio.run()

Resources

参考资源