asyncio
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePython asyncio - Async/Await Concurrency
Python asyncio - Async/Await 并发编程
Overview
概述
Python's asyncio library enables writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like HTTP requests, database queries, file operations, and WebSocket connections. asyncio provides non-blocking execution without the complexity of threading or multiprocessing.
Key Features:
- async/await syntax for readable concurrent code
- Event loop for managing concurrent operations
- Tasks for running multiple coroutines concurrently
- Primitives: locks, semaphores, events, queues
- HTTP client/server with aiohttp
- Database async support (asyncpg, aiomysql, motor)
- FastAPI async endpoints
- WebSocket support
- Background task management
Installation:
bash
undefinedPython的asyncio库支持使用async/await语法编写并发代码。它非常适合I/O密集型操作,如HTTP请求、数据库查询、文件操作和WebSocket连接。asyncio无需线程或多进程的复杂操作即可实现非阻塞执行。
核心特性:
- 采用async/await语法编写易读的并发代码
- 事件循环用于管理并发操作
- 任务用于同时运行多个协程
- 原语:锁、信号量、事件、队列
- 基于aiohttp的HTTP客户端/服务器
- 异步数据库支持(asyncpg、aiomysql、motor)
- FastAPI异步端点
- WebSocket支持
- 后台任务管理
安装:
bash
undefinedasyncio is built-in (Python 3.7+)
asyncio是Python 3.7+的内置库
Async HTTP client
异步HTTP客户端
pip install aiohttp
pip install aiohttp
Async HTTP requests (alternative)
异步HTTP请求(替代方案)
pip install httpx
pip install httpx
Async database drivers
异步数据库驱动
pip install asyncpg aiomysql motor # PostgreSQL, MySQL, MongoDB
pip install asyncpg aiomysql motor # PostgreSQL、MySQL、MongoDB
FastAPI with async support
带异步支持的FastAPI
pip install fastapi uvicorn[standard]
pip install fastapi uvicorn[standard]
Async testing
异步测试
pip install pytest-asyncio
undefinedpip install pytest-asyncio
undefinedBasic Async/Await Patterns
基础Async/Await模式
1. Simple Async Function
1. 简单异步函数
python
import asyncio
async def hello():
"""Basic async function (coroutine)."""
print("Hello")
await asyncio.sleep(1) # Async sleep (non-blocking)
print("World")
return "Done"python
import asyncio
async def hello():
"""基础异步函数(协程)。"""
print("Hello")
await asyncio.sleep(1) # 异步休眠(非阻塞)
print("World")
return "Done"Run async function
运行异步函数
result = asyncio.run(hello())
print(result) # "Done"
**Key Points**:
- `async def` defines a coroutine function
- `await` suspends execution until awaitable completes
- `asyncio.run()` is the entry point for async programs
- Coroutines must be awaited or scheduledresult = asyncio.run(hello())
print(result) # "Done"
**关键点**:
- `async def` 定义协程函数
- `await` 暂停执行,直到等待对象完成
- `asyncio.run()` 是异步程序的入口点
- 协程必须被await或调度执行2. Multiple Concurrent Tasks
2. 多任务并发执行
python
import asyncio
import time
async def task(name, duration):
"""Simulate async task."""
print(f"{name}: Starting (duration: {duration}s)")
await asyncio.sleep(duration)
print(f"{name}: Complete")
return f"{name} result"
async def run_concurrent():
"""Run multiple tasks concurrently."""
start = time.time()
# Sequential (slow) - 6 seconds total
# result1 = await task("Task 1", 3)
# result2 = await task("Task 2", 2)
# result3 = await task("Task 3", 1)
# Concurrent (fast) - 3 seconds total
results = await asyncio.gather(
task("Task 1", 3),
task("Task 2", 2),
task("Task 3", 1)
)
elapsed = time.time() - start
print(f"Total time: {elapsed:.2f}s")
print(f"Results: {results}")
asyncio.run(run_concurrent())python
import asyncio
import time
async def task(name, duration):
"""模拟异步任务。"""
print(f"{name}: 启动(时长: {duration}s)")
await asyncio.sleep(duration)
print(f"{name}: 完成")
return f"{name} result"
async def run_concurrent():
"""同时运行多个任务。"""
start = time.time()
# 顺序执行(慢)- 总耗时6秒
# result1 = await task("Task 1", 3)
# result2 = await task("Task 2", 2)
# result3 = await task("Task 3", 1)
# 并发执行(快)- 总耗时3秒
results = await asyncio.gather(
task("Task 1", 3),
task("Task 2", 2),
task("Task 3", 1)
)
elapsed = time.time() - start
print(f"总耗时: {elapsed:.2f}s")
print(f"结果: {results}")
asyncio.run(run_concurrent())Output: Total time: 3.00s (tasks ran concurrently)
输出: 总耗时: 3.00s(任务并发执行)
undefinedundefined3. Task Creation and Management
3. 任务创建与管理
python
import asyncio
async def background_task(name):
"""Long-running background task."""
for i in range(5):
print(f"{name}: iteration {i}")
await asyncio.sleep(1)
return f"{name} complete"
async def main():
# Create task (starts immediately)
task1 = asyncio.create_task(background_task("Task-1"))
task2 = asyncio.create_task(background_task("Task-2"))
# Do other work while tasks run
print("Main: doing other work")
await asyncio.sleep(2)
# Wait for tasks to complete
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())python
import asyncio
async def background_task(name):
"""长期运行的后台任务。"""
for i in range(5):
print(f"{name}: 迭代 {i}")
await asyncio.sleep(1)
return f"{name} complete"
async def main():
# 创建任务(立即启动)
task1 = asyncio.create_task(background_task("Task-1"))
task2 = asyncio.create_task(background_task("Task-2"))
# 任务运行时处理其他工作
print("Main: 处理其他工作")
await asyncio.sleep(2)
# 等待任务完成
result1 = await task1
result2 = await task2
print(f"结果: {result1}, {result2}")
asyncio.run(main())4. Error Handling in Async Code
4. 异步代码中的错误处理
python
import asyncio
async def risky_operation(fail=False):
"""Operation that might fail."""
await asyncio.sleep(1)
if fail:
raise ValueError("Operation failed")
return "Success"
async def handle_errors():
# Individual try/except
try:
result = await risky_operation(fail=True)
except ValueError as e:
print(f"Caught error: {e}")
result = "Fallback value"
# Gather with error handling
results = await asyncio.gather(
risky_operation(fail=False),
risky_operation(fail=True),
risky_operation(fail=False),
return_exceptions=True # Return exceptions instead of raising
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
asyncio.run(handle_errors())python
import asyncio
async def risky_operation(fail=False):
"""可能失败的操作。"""
await asyncio.sleep(1)
if fail:
raise ValueError("操作失败")
return "Success"
async def handle_errors():
# 单独的try/except
try:
result = await risky_operation(fail=True)
except ValueError as e:
print(f"捕获错误: {e}")
result = "回退值"
# 带错误处理的gather
results = await asyncio.gather(
risky_operation(fail=False),
risky_operation(fail=True),
risky_operation(fail=False),
return_exceptions=True # 返回异常而非抛出
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 失败: {result}")
else:
print(f"任务 {i} 成功: {result}")
asyncio.run(handle_errors())Event Loop Fundamentals
事件循环基础
1. Event Loop Lifecycle
1. 事件循环生命周期
python
import asynciopython
import asyncioModern approach (Python 3.7+)
现代方式(Python 3.7+)
async def main():
print("Main coroutine")
await asyncio.sleep(1)
asyncio.run(main()) # Creates loop, runs main, closes loop
async def main():
print("主协程")
await asyncio.sleep(1)
asyncio.run(main()) # 创建循环、运行主协程、关闭循环
Manual loop management (advanced use cases)
手动管理循环(高级用例)
async def manual_example():
loop = asyncio.get_event_loop()
# Schedule coroutine
task = loop.create_task(some_coroutine())
# Schedule callback
loop.call_later(5, callback_function)
# Run until complete
result = await task
return resultasync def manual_example():
loop = asyncio.get_event_loop()
# 调度协程
task = loop.create_task(some_coroutine())
# 调度回调
loop.call_later(5, callback_function)
# 运行直到完成
result = await task
return resultGet current event loop
获取当前事件循环
async def get_current_loop():
loop = asyncio.get_running_loop()
print(f"Loop: {loop}")
# Schedule callback in event loop
loop.call_soon(lambda: print("Callback executed"))
await asyncio.sleep(0) # Let callback executeundefinedasync def get_current_loop():
loop = asyncio.get_running_loop()
print(f"循环: {loop}")
# 在事件循环中调度回调
loop.call_soon(lambda: print("回调执行"))
await asyncio.sleep(0) # 让回调执行undefined2. Loop Scheduling and Callbacks
2. 循环调度与回调
python
import asyncio
from datetime import datetime
def callback(name, loop):
"""Callback function (not async)."""
print(f"{datetime.now()}: {name} callback executed")
# Stop loop after callback
# loop.stop()
async def schedule_callbacks():
loop = asyncio.get_running_loop()
# Schedule immediate callback
loop.call_soon(callback, "Immediate", loop)
# Schedule callback after delay
loop.call_later(2, callback, "Delayed 2s", loop)
# Schedule callback at specific time
loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)
# Wait for callbacks to execute
await asyncio.sleep(5)
asyncio.run(schedule_callbacks())python
import asyncio
from datetime import datetime
def callback(name, loop):
"""回调函数(非异步)。"""
print(f"{datetime.now()}: {name} 回调执行")
# 回调后停止循环
# loop.stop()
async def schedule_callbacks():
loop = asyncio.get_running_loop()
# 调度立即执行的回调
loop.call_soon(callback, "Immediate", loop)
# 调度延迟执行的回调
loop.call_later(2, callback, "Delayed 2s", loop)
# 调度在特定时间执行的回调
loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop)
# 等待回调执行
await asyncio.sleep(5)
asyncio.run(schedule_callbacks())3. Running Blocking Code
3. 运行阻塞代码
python
import asyncio
import time
def blocking_io():
"""CPU-intensive or blocking I/O operation."""
print("Blocking operation started")
time.sleep(2) # Blocks thread
print("Blocking operation complete")
return "Blocking result"
async def run_in_executor():
"""Run blocking code in thread pool."""
loop = asyncio.get_running_loop()
# Run in default executor (thread pool)
result = await loop.run_in_executor(
None, # Use default executor
blocking_io
)
print(f"Result: {result}")python
import asyncio
import time
def blocking_io():
"""CPU密集型或阻塞I/O操作。"""
print("阻塞操作启动")
time.sleep(2) # 阻塞线程
print("阻塞操作完成")
return "Blocking result"
async def run_in_executor():
"""在线程池中运行阻塞代码。"""
loop = asyncio.get_running_loop()
# 在默认执行器(线程池)中运行
result = await loop.run_in_executor(
None, # 使用默认执行器
blocking_io
)
print(f"结果: {result}")Run blocking operations concurrently
并发运行阻塞操作
async def concurrent_blocking():
loop = asyncio.get_running_loop()
# These run in thread pool, don't block event loop
results = await asyncio.gather(
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io)
)
print(f"All results: {results}")asyncio.run(concurrent_blocking())
undefinedasync def concurrent_blocking():
loop = asyncio.get_running_loop()
# 这些操作在线程池中运行,不会阻塞事件循环
results = await asyncio.gather(
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io),
loop.run_in_executor(None, blocking_io)
)
print(f"所有结果: {results}")asyncio.run(concurrent_blocking())
undefinedAsyncio Primitives
Asyncio原语
1. Locks for Mutual Exclusion
1. 用于互斥的锁
python
import asynciopython
import asyncioShared resource
共享资源
counter = 0
lock = asyncio.Lock()
async def increment_with_lock(name):
"""Increment counter with lock protection."""
global counter
async with lock:
# Critical section - only one task at a time
print(f"{name}: acquired lock")
current = counter
await asyncio.sleep(0.1) # Simulate processing
counter = current + 1
print(f"{name}: released lock, counter={counter}")async def increment_without_lock(name):
"""Increment without lock - race condition!"""
global counter
current = counter
await asyncio.sleep(0.1) # Race condition window
counter = current + 1
print(f"{name}: counter={counter}")async def test_locks():
global counter
# Without lock (race condition)
counter = 0
await asyncio.gather(
increment_without_lock("Task-1"),
increment_without_lock("Task-2"),
increment_without_lock("Task-3")
)
print(f"Without lock: {counter}") # Often wrong (< 3)
# With lock (correct)
counter = 0
await asyncio.gather(
increment_with_lock("Task-1"),
increment_with_lock("Task-2"),
increment_with_lock("Task-3")
)
print(f"With lock: {counter}") # Always 3asyncio.run(test_locks())
undefinedcounter = 0
lock = asyncio.Lock()
async def increment_with_lock(name):
"""使用锁保护的计数器递增。"""
global counter
async with lock:
# 临界区 - 同一时间仅一个任务执行
print(f"{name}: 获取锁")
current = counter
await asyncio.sleep(0.1) # 模拟处理
counter = current + 1
print(f"{name}: 释放锁, counter={counter}")async def increment_without_lock(name):
"""无锁递增 - 存在竞态条件!"""
global counter
current = counter
await asyncio.sleep(0.1) # 竞态条件窗口
counter = current + 1
print(f"{name}: counter={counter}")async def test_locks():
global counter
# 无锁(竞态条件)
counter = 0
await asyncio.gather(
increment_without_lock("Task-1"),
increment_without_lock("Task-2"),
increment_without_lock("Task-3")
)
print(f"无锁结果: {counter}") # 通常不正确(< 3)
# 有锁(正确)
counter = 0
await asyncio.gather(
increment_with_lock("Task-1"),
increment_with_lock("Task-2"),
increment_with_lock("Task-3")
)
print(f"有锁结果: {counter}") # 始终为3asyncio.run(test_locks())
undefined2. Semaphores for Resource Limiting
2. 用于资源限制的信号量
python
import asynciopython
import asyncioLimit concurrent operations
限制并发操作数
semaphore = asyncio.Semaphore(2) # Max 2 concurrent
async def limited_operation(name):
"""Operation limited by semaphore."""
print(f"{name}: waiting for semaphore")
async with semaphore:
print(f"{name}: acquired semaphore")
await asyncio.sleep(2) # Simulate work
print(f"{name}: releasing semaphore")async def test_semaphore():
# Create 5 tasks, but only 2 run concurrently
await asyncio.gather(
limited_operation("Task-1"),
limited_operation("Task-2"),
limited_operation("Task-3"),
limited_operation("Task-4"),
limited_operation("Task-5")
)
asyncio.run(test_semaphore())
semaphore = asyncio.Semaphore(2) # 最大2个并发
async def limited_operation(name):
"""受信号量限制的操作。"""
print(f"{name}: 等待信号量")
async with semaphore:
print(f"{name}: 获取信号量")
await asyncio.sleep(2) # 模拟工作
print(f"{name}: 释放信号量")async def test_semaphore():
# 创建5个任务,但仅2个可并发运行
await asyncio.gather(
limited_operation("Task-1"),
limited_operation("Task-2"),
limited_operation("Task-3"),
limited_operation("Task-4"),
limited_operation("Task-5")
)
asyncio.run(test_semaphore())
Only 2 tasks hold semaphore at any time
同一时间仅2个任务持有信号量
undefinedundefined3. Events for Signaling
3. 用于信号通知的事件
python
import asyncio
event = asyncio.Event()
async def waiter(name):
"""Wait for event to be set."""
print(f"{name}: waiting for event")
await event.wait() # Block until event is set
print(f"{name}: event received!")
async def setter():
"""Set event after delay."""
await asyncio.sleep(2)
print("Setter: setting event")
event.set() # Wake up all waiters
async def test_event():
# Create waiters
await asyncio.gather(
waiter("Waiter-1"),
waiter("Waiter-2"),
waiter("Waiter-3"),
setter()
)
asyncio.run(test_event())python
import asyncio
event = asyncio.Event()
async def waiter(name):
"""等待事件被设置。"""
print(f"{name}: 等待事件")
await event.wait() # 阻塞直到事件被设置
print(f"{name}: 收到事件!")
async def setter():
"""延迟后设置事件。"""
await asyncio.sleep(2)
print("Setter: 设置事件")
event.set() # 唤醒所有等待者
async def test_event():
# 创建等待者
await asyncio.gather(
waiter("Waiter-1"),
waiter("Waiter-2"),
waiter("Waiter-3"),
setter()
)
asyncio.run(test_event())4. Queues for Task Distribution
4. 用于任务分发的队列
python
import asyncio
import random
async def producer(queue, name):
"""Produce items and add to queue."""
for i in range(5):
item = f"{name}-item-{i}"
await queue.put(item)
print(f"{name}: produced {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
# Signal completion
await queue.put(None)
async def consumer(queue, name):
"""Consume items from queue."""
while True:
item = await queue.get() # Block until item available
if item is None: # Shutdown signal
queue.task_done()
break
print(f"{name}: consumed {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def test_queue():
queue = asyncio.Queue(maxsize=10)
# Create producers and consumers
await asyncio.gather(
producer(queue, "Producer-1"),
producer(queue, "Producer-2"),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
consumer(queue, "Consumer-3")
)
# Wait for all items to be processed
await queue.join()
print("All tasks complete")
asyncio.run(test_queue())python
import asyncio
import random
async def producer(queue, name):
"""生成项目并添加到队列。"""
for i in range(5):
item = f"{name}-item-{i}"
await queue.put(item)
print(f"{name}: 生成 {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
# 发送完成信号
await queue.put(None)
async def consumer(queue, name):
"""从队列消费项目。"""
while True:
item = await queue.get() # 阻塞直到项目可用
if item is None: # 关闭信号
queue.task_done()
break
print(f"{name}: 消费 {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def test_queue():
queue = asyncio.Queue(maxsize=10)
# 创建生产者和消费者
await asyncio.gather(
producer(queue, "Producer-1"),
producer(queue, "Producer-2"),
consumer(queue, "Consumer-1"),
consumer(queue, "Consumer-2"),
consumer(queue, "Consumer-3")
)
# 等待所有项目处理完成
await queue.join()
print("所有任务完成")
asyncio.run(test_queue())5. Condition Variables
5. 条件变量
python
import asyncio
condition = asyncio.Condition()
items = []
async def consumer(name):
"""Wait for items to be available."""
async with condition:
# Wait until items are available
await condition.wait_for(lambda: len(items) > 0)
item = items.pop(0)
print(f"{name}: consumed {item}")
async def producer(name):
"""Add items and notify consumers."""
async with condition:
item = f"{name}-item"
items.append(item)
print(f"{name}: produced {item}")
# Notify one waiting consumer
condition.notify(n=1)
# Or notify all: condition.notify_all()
async def test_condition():
await asyncio.gather(
consumer("Consumer-1"),
consumer("Consumer-2"),
producer("Producer-1"),
producer("Producer-2")
)
asyncio.run(test_condition())python
import asyncio
condition = asyncio.Condition()
items = []
async def consumer(name):
"""等待项目可用。"""
async with condition:
# 等待直到项目可用
await condition.wait_for(lambda: len(items) > 0)
item = items.pop(0)
print(f"{name}: 消费 {item}")
async def producer(name):
"""添加项目并通知消费者。"""
async with condition:
item = f"{name}-item"
items.append(item)
print(f"{name}: 生成 {item}")
# 通知一个等待的消费者
condition.notify(n=1)
# 或通知所有: condition.notify_all()
async def test_condition():
await asyncio.gather(
consumer("Consumer-1"),
consumer("Consumer-2"),
producer("Producer-1"),
producer("Producer-2")
)
asyncio.run(test_condition())Async HTTP with aiohttp
基于aiohttp的异步HTTP
1. Basic HTTP Client
1. 基础HTTP客户端
python
import asyncio
import aiohttp
async def fetch_url(session, url):
"""Fetch single URL."""
async with session.get(url) as response:
status = response.status
text = await response.text()
return {"url": url, "status": status, "length": len(text)}
async def fetch_multiple_urls():
"""Fetch multiple URLs concurrently."""
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/json",
]
async with aiohttp.ClientSession() as session:
# Concurrent requests
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result['url']}: {result['status']} ({result['length']} bytes)")
asyncio.run(fetch_multiple_urls())python
import asyncio
import aiohttp
async def fetch_url(session, url):
"""获取单个URL。"""
async with session.get(url) as response:
status = response.status
text = await response.text()
return {"url": url, "status": status, "length": len(text)}
async def fetch_multiple_urls():
"""并发获取多个URL。"""
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/json",
]
async with aiohttp.ClientSession() as session:
# 并发请求
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result['url']}: {result['status']} ({result['length']} 字节)")
asyncio.run(fetch_multiple_urls())2. HTTP Client with Error Handling
2. 带错误处理的HTTP客户端
python
import asyncio
import aiohttp
from typing import Dict, Any
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3
) -> Dict[str, Any]:
"""Fetch URL with retry logic."""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
response.raise_for_status() # Raise for 4xx/5xx
data = await response.json()
return {"success": True, "data": data}
except aiohttp.ClientError as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
return {"success": False, "error": str(e)}
# Exponential backoff
await asyncio.sleep(2 ** attempt)
except asyncio.TimeoutError:
print(f"Attempt {attempt + 1} timed out")
if attempt == max_retries - 1:
return {"success": False, "error": "Timeout"}
await asyncio.sleep(2 ** attempt)
async def parallel_api_calls():
"""Make parallel API calls with error handling."""
urls = [
"https://httpbin.org/json",
"https://httpbin.org/status/500", # Will fail
"https://httpbin.org/delay/1",
]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch_with_retry(session, url) for url in urls],
return_exceptions=True # Don't stop on errors
)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"{url}: Exception - {result}")
elif result["success"]:
print(f"{url}: Success")
else:
print(f"{url}: Failed - {result['error']}")
asyncio.run(parallel_api_calls())python
import asyncio
import aiohttp
from typing import Dict, Any
async def fetch_with_retry(
session: aiohttp.ClientSession,
url: str,
max_retries: int = 3
) -> Dict[str, Any]:
"""带重试逻辑的URL获取。"""
for attempt in range(max_retries):
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
response.raise_for_status() # 为4xx/5xx状态码抛出异常
data = await response.json()
return {"success": True, "data": data}
except aiohttp.ClientError as e:
print(f"第 {attempt + 1} 次尝试失败: {e}")
if attempt == max_retries - 1:
return {"success": False, "error": str(e)}
# 指数退避
await asyncio.sleep(2 ** attempt)
except asyncio.TimeoutError:
print(f"第 {attempt + 1} 次尝试超时")
if attempt == max_retries - 1:
return {"success": False, "error": "超时"}
await asyncio.sleep(2 ** attempt)
async def parallel_api_calls():
"""并行调用API并处理错误。"""
urls = [
"https://httpbin.org/json",
"https://httpbin.org/status/500", # 会失败
"https://httpbin.org/delay/1",
]
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
*[fetch_with_retry(session, url) for url in urls],
return_exceptions=True # 遇到错误不停止
)
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"{url}: 异常 - {result}")
elif result["success"]:
print(f"{url}: 成功")
else:
print(f"{url}: 失败 - {result['error']}")
asyncio.run(parallel_api_calls())3. HTTP Server with aiohttp
3. 基于aiohttp的HTTP服务器
python
from aiohttp import web
import asyncio
async def handle_hello(request):
"""Simple GET handler."""
name = request.query.get("name", "World")
return web.json_response({"message": f"Hello, {name}!"})
async def handle_post(request):
"""POST handler with JSON body."""
data = await request.json()
# Simulate async processing
await asyncio.sleep(1)
return web.json_response({
"received": data,
"status": "processed"
})
async def handle_stream(request):
"""Streaming response."""
response = web.StreamResponse()
await response.prepare(request)
for i in range(10):
await response.write(f"Chunk {i}\n".encode())
await asyncio.sleep(0.5)
await response.write_eof()
return responsepython
from aiohttp import web
import asyncio
async def handle_hello(request):
"""简单GET处理器。"""
name = request.query.get("name", "World")
return web.json_response({"message": f"Hello, {name}!"})
async def handle_post(request):
"""带JSON体的POST处理器。"""
data = await request.json()
# 模拟异步处理
await asyncio.sleep(1)
return web.json_response({
"received": data,
"status": "processed"
})
async def handle_stream(request):
"""流式响应。"""
response = web.StreamResponse()
await response.prepare(request)
for i in range(10):
await response.write(f"Chunk {i}\n".encode())
await asyncio.sleep(0.5)
await response.write_eof()
return responseCreate application
创建应用
app = web.Application()
app.router.add_get("/hello", handle_hello)
app.router.add_post("/process", handle_post)
app.router.add_get("/stream", handle_stream)
app = web.Application()
app.router.add_get("/hello", handle_hello)
app.router.add_post("/process", handle_post)
app.router.add_get("/stream", handle_stream)
Run server
运行服务器
if name == "main":
web.run_app(app, host="0.0.0.0", port=8080)
undefinedif name == "main":
web.run_app(app, host="0.0.0.0", port=8080)
undefined4. WebSocket Client
4. WebSocket客户端
python
import asyncio
import aiohttp
async def websocket_client():
"""Connect to WebSocket server."""
url = "wss://echo.websocket.org"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url) as ws:
# Send messages
await ws.send_str("Hello WebSocket")
await ws.send_json({"type": "greeting", "data": "test"})
# Receive messages
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(f"Received: {msg.data}")
if msg.data == "close":
await ws.close()
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"Error: {ws.exception()}")
break
asyncio.run(websocket_client())python
import asyncio
import aiohttp
async def websocket_client():
"""连接到WebSocket服务器。"""
url = "wss://echo.websocket.org"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url) as ws:
# 发送消息
await ws.send_str("Hello WebSocket")
await ws.send_json({"type": "greeting", "data": "test"})
# 接收消息
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(f"收到: {msg.data}")
if msg.data == "close":
await ws.close()
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"错误: {ws.exception()}")
break
asyncio.run(websocket_client())Async Database Operations
异步数据库操作
1. PostgreSQL with asyncpg
1. 基于asyncpg的PostgreSQL
python
import asyncio
import asyncpg
async def database_operations():
"""Async PostgreSQL operations."""
# Create connection pool
pool = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="user",
password="password",
min_size=5,
max_size=20
)
try:
# Acquire connection from pool
async with pool.acquire() as conn:
# Execute query
rows = await conn.fetch(
"SELECT id, name, email FROM users WHERE active = $1",
True
)
for row in rows:
print(f"User: {row['name']} ({row['email']})")
# Insert data
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"Alice", "alice@example.com"
)
# Transaction
async with conn.transaction():
await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1)
await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate")
finally:
await pool.close()
asyncio.run(database_operations())python
import asyncio
import asyncpg
async def database_operations():
"""异步PostgreSQL操作。"""
# 创建连接池
pool = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="user",
password="password",
min_size=5,
max_size=20
)
try:
# 从连接池获取连接
async with pool.acquire() as conn:
# 执行查询
rows = await conn.fetch(
"SELECT id, name, email FROM users WHERE active = $1",
True
)
for row in rows:
print(f"用户: {row['name']} ({row['email']})")
# 插入数据
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"Alice", "alice@example.com"
)
# 事务
async with conn.transaction():
await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1)
await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate")
finally:
await pool.close()
asyncio.run(database_operations())2. MongoDB with motor
2. 基于motor的MongoDB
python
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
async def mongodb_operations():
"""Async MongoDB operations."""
# Create client
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client.mydb
collection = db.users
try:
# Insert document
result = await collection.insert_one({
"name": "Alice",
"email": "alice@example.com",
"age": 30
})
print(f"Inserted ID: {result.inserted_id}")
# Find documents
cursor = collection.find({"age": {"$gte": 25}})
async for document in cursor:
print(f"User: {document['name']}")
# Update document
await collection.update_one(
{"name": "Alice"},
{"$set": {"age": 31}}
)
# Aggregation pipeline
pipeline = [
{"$match": {"age": {"$gte": 25}}},
{"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
]
async for result in collection.aggregate(pipeline):
print(f"Average age: {result['avg_age']}")
finally:
client.close()
asyncio.run(mongodb_operations())python
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
async def mongodb_operations():
"""异步MongoDB操作。"""
# 创建客户端
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client.mydb
collection = db.users
try:
# 插入文档
result = await collection.insert_one({
"name": "Alice",
"email": "alice@example.com",
"age": 30
})
print(f"插入ID: {result.inserted_id}")
# 查询文档
cursor = collection.find({"age": {"$gte": 25}})
async for document in cursor:
print(f"用户: {document['name']}")
# 更新文档
await collection.update_one(
{"name": "Alice"},
{"$set": {"age": 31}}
)
# 聚合管道
pipeline = [
{"$match": {"age": {"$gte": 25}}},
{"$group": {"_id": None, "avg_age": {"$avg": "$age"}}}
]
async for result in collection.aggregate(pipeline):
print(f"平均年龄: {result['avg_age']}")
finally:
client.close()
asyncio.run(mongodb_operations())3. Connection Pool Pattern
3. 连接池模式
python
import asyncio
import asyncpg
from typing import Optional
class DatabasePool:
"""Async database connection pool manager."""
def __init__(self, dsn: str):
self.dsn = dsn
self.pool: Optional[asyncpg.Pool] = None
async def connect(self):
"""Create connection pool."""
self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20)
async def close(self):
"""Close connection pool."""
if self.pool:
await self.pool.close()
async def execute(self, query: str, *args):
"""Execute query."""
async with self.pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args):
"""Fetch multiple rows."""
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(self, query: str, *args):
"""Fetch single row."""
async with self.pool.acquire() as conn:
return await conn.fetchrow(query, *args)python
import asyncio
import asyncpg
from typing import Optional
class DatabasePool:
"""异步数据库连接池管理器。"""
def __init__(self, dsn: str):
self.dsn = dsn
self.pool: Optional[asyncpg.Pool] = None
async def connect(self):
"""创建连接池。"""
self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20)
async def close(self):
"""关闭连接池。"""
if self.pool:
await self.pool.close()
async def execute(self, query: str, *args):
"""执行查询。"""
async with self.pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args):
"""获取多行结果。"""
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
async def fetchrow(self, query: str, *args):
"""获取单行结果。"""
async with self.pool.acquire() as conn:
return await conn.fetchrow(query, *args)Usage
使用示例
async def use_pool():
db = DatabasePool("postgresql://user:pass@localhost/mydb")
await db.connect()
try:
# Execute operations
rows = await db.fetch("SELECT * FROM users")
for row in rows:
print(row)
finally:
await db.close()asyncio.run(use_pool())
undefinedasync def use_pool():
db = DatabasePool("postgresql://user:pass@localhost/mydb")
await db.connect()
try:
# 执行操作
rows = await db.fetch("SELECT * FROM users")
for row in rows:
print(row)
finally:
await db.close()asyncio.run(use_pool())
undefinedFastAPI Async Patterns
FastAPI异步模式
1. Async Endpoints
1. 异步端点
python
from fastapi import FastAPI, HTTPException
import asyncio
import httpx
app = FastAPI()
@app.get("/")
async def root():
"""Simple async endpoint."""
return {"message": "Hello World"}
@app.get("/delay/{seconds}")
async def delayed_response(seconds: int):
"""Endpoint with async delay."""
await asyncio.sleep(seconds)
return {"message": f"Waited {seconds} seconds"}
@app.get("/fetch")
async def fetch_external():
"""Fetch data from external API."""
async with httpx.AsyncClient() as client:
response = await client.get("https://httpbin.org/json")
return response.json()
@app.get("/parallel")
async def parallel_requests():
"""Make parallel API calls."""
async with httpx.AsyncClient() as client:
responses = await asyncio.gather(
client.get("https://httpbin.org/delay/1"),
client.get("https://httpbin.org/delay/2"),
client.get("https://httpbin.org/json")
)
return {
"results": [r.json() for r in responses]
}python
from fastapi import FastAPI, HTTPException
import asyncio
import httpx
app = FastAPI()
@app.get("/")
async def root():
"""简单异步端点。"""
return {"message": "Hello World"}
@app.get("/delay/{seconds}")
async def delayed_response(seconds: int):
"""带异步延迟的端点。"""
await asyncio.sleep(seconds)
return {"message": f"等待了 {seconds} 秒"}
@app.get("/fetch")
async def fetch_external():
"""从外部API获取数据。"""
async with httpx.AsyncClient() as client:
response = await client.get("https://httpbin.org/json")
return response.json()
@app.get("/parallel")
async def parallel_requests():
"""并行调用API。"""
async with httpx.AsyncClient() as client:
responses = await asyncio.gather(
client.get("https://httpbin.org/delay/1"),
client.get("https://httpbin.org/delay/2"),
client.get("https://httpbin.org/json")
)
return {
"results": [r.json() for r in responses]
}2. Background Tasks
2. 后台任务
python
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
async def send_email(email: str, message: str):
"""Simulate sending email."""
print(f"Sending email to {email}")
await asyncio.sleep(5) # Simulate slow email service
print(f"Email sent to {email}: {message}")
@app.post("/send-notification")
async def send_notification(
email: str,
message: str,
background_tasks: BackgroundTasks
):
"""Send notification in background."""
# Add task to background
background_tasks.add_task(send_email, email, message)
# Return immediately
return {"status": "notification queued"}python
from fastapi import FastAPI, BackgroundTasks
import asyncio
app = FastAPI()
async def send_email(email: str, message: str):
"""模拟发送邮件。"""
print(f"正在向 {email} 发送邮件")
await asyncio.sleep(5) # 模拟慢速邮件服务
print(f"已向 {email} 发送邮件: {message}")
@app.post("/send-notification")
async def send_notification(
email: str,
message: str,
background_tasks: BackgroundTasks
):
"""在后台发送通知。"""
# 将任务添加到后台
background_tasks.add_task(send_email, email, message)
# 立即返回
return {"status": "通知已排队"}Alternative: manual task creation
替代方案:手动创建任务
@app.post("/send-notification-manual")
async def send_notification_manual(email: str, message: str):
"""Create background task manually."""
asyncio.create_task(send_email(email, message))
return {"status": "notification queued"}
undefined@app.post("/send-notification-manual")
async def send_notification_manual(email: str, message: str):
"""手动创建后台任务。"""
asyncio.create_task(send_email(email, message))
return {"status": "通知已排队"}
undefined3. Async Dependencies
3. 异步依赖
python
from fastapi import FastAPI, Depends
import asyncpg
app = FastAPI()python
from fastapi import FastAPI, Depends
import asyncpg
app = FastAPI()Database pool (global)
数据库池(全局)
db_pool = None
async def get_db():
"""Dependency: database connection."""
async with db_pool.acquire() as conn:
yield conn
@app.on_event("startup")
async def startup():
"""Initialize database pool on startup."""
global db_pool
db_pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/mydb"
)
@app.on_event("shutdown")
async def shutdown():
"""Close database pool on shutdown."""
await db_pool.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int, conn=Depends(get_db)):
"""Get user with async database dependency."""
user = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1",
user_id
)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return dict(user)undefineddb_pool = None
async def get_db():
"""依赖项:数据库连接。"""
async with db_pool.acquire() as conn:
yield conn
@app.on_event("startup")
async def startup():
"""启动时初始化数据库池。"""
global db_pool
db_pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/mydb"
)
@app.on_event("shutdown")
async def shutdown():
"""关闭时关闭数据库池。"""
await db_pool.close()
@app.get("/users/{user_id}")
async def get_user(user_id: int, conn=Depends(get_db)):
"""使用异步数据库依赖获取用户。"""
user = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1",
user_id
)
if not user:
raise HTTPException(status_code=404, detail="用户未找到")
return dict(user)undefined4. WebSocket Endpoints
4. WebSocket端点
python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio
app = FastAPI()python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import asyncio
app = FastAPI()Active connections
活跃连接
active_connections: List[WebSocket] = []
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint."""
await websocket.accept()
active_connections.append(websocket)
try:
while True:
# Receive message
data = await websocket.receive_text()
# Broadcast to all connections
for connection in active_connections:
await connection.send_text(f"Broadcast: {data}")
except WebSocketDisconnect:
active_connections.remove(websocket)
print("Client disconnected")active_connections: List[WebSocket] = []
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket端点。"""
await websocket.accept()
active_connections.append(websocket)
try:
while True:
# 接收消息
data = await websocket.receive_text()
# 广播到所有连接
for connection in active_connections:
await connection.send_text(f"广播: {data}")
except WebSocketDisconnect:
active_connections.remove(websocket)
print("客户端已断开连接")Background task to send periodic updates
用于发送定期更新的后台任务
async def broadcast_updates():
"""Send periodic updates to all clients."""
while True:
await asyncio.sleep(5)
for connection in active_connections:
try:
await connection.send_text("Periodic update")
except:
active_connections.remove(connection)@app.on_event("startup")
async def startup():
"""Start background update task."""
asyncio.create_task(broadcast_updates())
undefinedasync def broadcast_updates():
"""向所有客户端发送定期更新。"""
while True:
await asyncio.sleep(5)
for connection in active_connections:
try:
await connection.send_text("定期更新")
except:
active_connections.remove(connection)@app.on_event("startup")
async def startup():
"""启动后台更新任务。"""
asyncio.create_task(broadcast_updates())
undefinedCommon Patterns and Best Practices
常见模式与最佳实践
1. Timeout Handling
1. 超时处理
python
import asyncio
async def slow_operation():
"""Slow operation."""
await asyncio.sleep(10)
return "Result"
async def with_timeout():
"""Run operation with timeout."""
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
print(f"Result: {result}")
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(with_timeout())python
import asyncio
async def slow_operation():
"""慢速操作。"""
await asyncio.sleep(10)
return "Result"
async def with_timeout():
"""带超时的操作运行。"""
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
print(f"结果: {result}")
except asyncio.TimeoutError:
print("操作超时")
asyncio.run(with_timeout())2. Cancellation Handling
2. 取消处理
python
import asyncio
async def cancellable_task():
"""Task that can be cancelled."""
try:
for i in range(10):
print(f"Working: {i}")
await asyncio.sleep(1)
return "Complete"
except asyncio.CancelledError:
print("Task was cancelled")
# Cleanup
raise # Re-raise to propagate cancellation
async def cancel_example():
"""Example of task cancellation."""
task = asyncio.create_task(cancellable_task())
# Let it run for a bit
await asyncio.sleep(3)
# Cancel the task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Confirmed: task was cancelled")
asyncio.run(cancel_example())python
import asyncio
async def cancellable_task():
"""可取消的任务。"""
try:
for i in range(10):
print(f"工作中: {i}")
await asyncio.sleep(1)
return "Complete"
except asyncio.CancelledError:
print("任务已取消")
# 清理资源
raise # 重新抛出以传播取消信号
async def cancel_example():
"""任务取消示例。"""
task = asyncio.create_task(cancellable_task())
# 让任务运行一段时间
await asyncio.sleep(3)
# 取消任务
task.cancel()
try:
await task
except asyncio.CancelledError:
print("已确认: 任务已取消")
asyncio.run(cancel_example())3. Resource Cleanup with Context Managers
3. 使用上下文管理器进行资源清理
python
import asyncio
class AsyncResource:
"""Async context manager for resource management."""
async def __aenter__(self):
"""Async setup."""
print("Acquiring resource")
await asyncio.sleep(1) # Simulate async setup
self.connection = "connected"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async cleanup."""
print("Releasing resource")
await asyncio.sleep(1) # Simulate async cleanup
self.connection = None
async def use_resource():
"""Use async resource."""
async with AsyncResource() as resource:
print(f"Using resource: {resource.connection}")
await asyncio.sleep(1)
# Resource automatically cleaned up
asyncio.run(use_resource())python
import asyncio
class AsyncResource:
"""用于资源管理的异步上下文管理器。"""
async def __aenter__(self):
"""异步初始化。"""
print("获取资源")
await asyncio.sleep(1) # 模拟异步初始化
self.connection = "connected"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步清理。"""
print("释放资源")
await asyncio.sleep(1) # 模拟异步清理
self.connection = None
async def use_resource():
"""使用异步资源。"""
async with AsyncResource() as resource:
print(f"使用资源: {resource.connection}")
await asyncio.sleep(1)
# 资源自动清理
asyncio.run(use_resource())4. Debouncing and Throttling
4. 防抖与节流
python
import asyncio
from datetime import datetime
class Debouncer:
"""Debounce async function calls."""
def __init__(self, delay: float):
self.delay = delay
self.task = None
async def call(self, func, *args, **kwargs):
"""Debounced function call."""
# Cancel previous task
if self.task:
self.task.cancel()
# Create new task
async def delayed_call():
await asyncio.sleep(self.delay)
await func(*args, **kwargs)
self.task = asyncio.create_task(delayed_call())
async def api_call(query: str):
"""Simulated API call."""
print(f"{datetime.now()}: API call with query: {query}")
async def debounce_example():
"""Example of debouncing."""
debouncer = Debouncer(delay=1.0)
# Rapid calls - only last one executes
await debouncer.call(api_call, "query1")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query2")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query3")
# Wait for debounced call
await asyncio.sleep(2)
asyncio.run(debounce_example())python
import asyncio
from datetime import datetime
class Debouncer:
"""异步函数调用防抖。"""
def __init__(self, delay: float):
self.delay = delay
self.task = None
async def call(self, func, *args, **kwargs):
"""防抖函数调用。"""
# 取消之前的任务
if self.task:
self.task.cancel()
# 创建新任务
async def delayed_call():
await asyncio.sleep(self.delay)
await func(*args, **kwargs)
self.task = asyncio.create_task(delayed_call())
async def api_call(query: str):
"""模拟API调用。"""
print(f"{datetime.now()}: API调用,查询参数: {query}")
async def debounce_example():
"""防抖示例。"""
debouncer = Debouncer(delay=1.0)
# 快速连续调用 - 仅最后一个执行
await debouncer.call(api_call, "query1")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query2")
await asyncio.sleep(0.1)
await debouncer.call(api_call, "query3")
# 等待防抖调用执行
await asyncio.sleep(2)
asyncio.run(debounce_example())Output: Only "query3" API call executes
输出: 仅"query3"的API调用执行
undefinedundefined5. Rate Limiting
5. 速率限制
python
import asyncio
from datetime import datetime
class RateLimiter:
"""Limit rate of async operations."""
def __init__(self, max_calls: int, period: float):
self.max_calls = max_calls
self.period = period
self.semaphore = asyncio.Semaphore(max_calls)
self.calls = []
async def __aenter__(self):
"""Acquire rate limit slot."""
await self.semaphore.acquire()
now = asyncio.get_event_loop().time()
# Remove old calls outside period
self.calls = [t for t in self.calls if now - t < self.period]
if len(self.calls) >= self.max_calls:
# Wait until oldest call expires
sleep_time = self.period - (now - self.calls[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.calls.append(now)
return self
async def __aexit__(self, *args):
"""Release semaphore."""
self.semaphore.release()
async def rate_limited_operation(limiter, name):
"""Operation with rate limiting."""
async with limiter:
print(f"{datetime.now()}: {name}")
await asyncio.sleep(0.1)
async def rate_limit_example():
"""Example of rate limiting."""
# Max 3 calls per 2 seconds
limiter = RateLimiter(max_calls=3, period=2.0)
# Try to make 6 calls
await asyncio.gather(*[
rate_limited_operation(limiter, f"Call-{i}")
for i in range(6)
])
asyncio.run(rate_limit_example())python
import asyncio
from datetime import datetime
class RateLimiter:
"""限制异步操作速率。"""
def __init__(self, max_calls: int, period: float):
self.max_calls = max_calls
self.period = period
self.semaphore = asyncio.Semaphore(max_calls)
self.calls = []
async def __aenter__(self):
"""获取速率限制槽位。"""
await self.semaphore.acquire()
now = asyncio.get_event_loop().time()
# 移除周期外的旧调用记录
self.calls = [t for t in self.calls if now - t < self.period]
if len(self.calls) >= self.max_calls:
# 等待最早的调用记录过期
sleep_time = self.period - (now - self.calls[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.calls.append(now)
return self
async def __aexit__(self, *args):
"""释放信号量。"""
self.semaphore.release()
async def rate_limited_operation(limiter, name):
"""带速率限制的操作。"""
async with limiter:
print(f"{datetime.now()}: {name}")
await asyncio.sleep(0.1)
async def rate_limit_example():
"""速率限制示例。"""
# 每2秒最多3次调用
limiter = RateLimiter(max_calls=3, period=2.0)
# 尝试进行6次调用
await asyncio.gather(*[
rate_limited_operation(limiter, f"Call-{i}")
for i in range(6)
])
asyncio.run(rate_limit_example())Debugging Async Code
调试异步代码
1. Enable Debug Mode
1. 启用调试模式
python
import asyncio
import loggingpython
import asyncio
import loggingEnable asyncio debug mode
启用asyncio调试模式
asyncio.run(main(), debug=True)
asyncio.run(main(), debug=True)
Or set environment variable:
或设置环境变量:
PYTHONASYNCIODEBUG=1 python script.py
PYTHONASYNCIODEBUG=1 python script.py
Configure logging
配置日志
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(name)
async def debug_example():
logger.debug("Starting operation")
await asyncio.sleep(1)
logger.debug("Operation complete")
undefinedlogging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(name)
async def debug_example():
logger.debug("启动操作")
await asyncio.sleep(1)
logger.debug("操作完成")
undefined2. Detect Blocking Code
2. 检测阻塞代码
python
import asyncio
import time
async def problematic_code():
"""Code with blocking operation."""
print("Starting")
# BAD: Blocking sleep
time.sleep(2) # This blocks the event loop!
print("Complete")python
import asyncio
import time
async def problematic_code():
"""包含阻塞操作的代码。"""
print("启动")
# ❌ 错误:异步函数中的阻塞调用
time.sleep(2) # 这会阻塞事件循环!
print("完成")Run with debug mode to detect blocking
启用调试模式运行以检测阻塞
asyncio.run(problematic_code(), debug=True)
asyncio.run(problematic_code(), debug=True)
Warning: Executing <Task> took 2.001 seconds
警告: Executing <Task> took 2.001 seconds
undefinedundefined3. Track Pending Tasks
3. 跟踪待处理任务
python
import asyncio
async def track_tasks():
"""Track all pending tasks."""
# Get all tasks
tasks = asyncio.all_tasks()
print(f"Total tasks: {len(tasks)}")
for task in tasks:
print(f" - {task.get_name()}: {task}")
# Check if task is done
if task.done():
try:
result = task.result()
print(f" Result: {result}")
except Exception as e:
print(f" Exception: {e}")python
import asyncio
async def track_tasks():
"""跟踪所有待处理任务。"""
# 获取所有任务
tasks = asyncio.all_tasks()
print(f"总任务数: {len(tasks)}")
for task in tasks:
print(f" - {task.get_name()}: {task}")
# 检查任务是否完成
if task.done():
try:
result = task.result()
print(f" 结果: {result}")
except Exception as e:
print(f" 异常: {e}")Create some tasks
创建一些任务
async def main():
task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task")
task2 = asyncio.create_task(track_tasks(), name="tracking")
await task2
task1.cancel()asyncio.run(main())
undefinedasync def main():
task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task")
task2 = asyncio.create_task(track_tasks(), name="tracking")
await task2
task1.cancel()asyncio.run(main())
undefinedTesting Async Code
测试异步代码
1. pytest-asyncio Setup
1. pytest-asyncio设置
python
undefinedpython
undefinedtest_async.py
test_async.py
import pytest
import asyncio
import pytest
import asyncio
Mark test as async
将测试标记为异步
@pytest.mark.asyncio
async def test_async_function():
"""Test async function."""
result = await some_async_function()
assert result == "expected"
@pytest.mark.asyncio
async def test_async_http():
"""Test async HTTP client."""
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/json") as response:
assert response.status == 200
data = await response.json()
assert "slideshow" in data
@pytest.mark.asyncio
async def test_async_function():
"""测试异步函数。"""
result = await some_async_function()
assert result == "expected"
@pytest.mark.asyncio
async def test_async_http():
"""测试异步HTTP客户端。"""
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/json") as response:
assert response.status == 200
data = await response.json()
assert "slideshow" in data
Async fixture
异步fixture
@pytest.fixture
async def async_client():
"""Async test fixture."""
client = await create_async_client()
yield client
await client.close()
@pytest.mark.asyncio
async def test_with_fixture(async_client):
"""Test using async fixture."""
result = await async_client.fetch_data()
assert result is not None
undefined@pytest.fixture
async def async_client():
"""异步测试fixture。"""
client = await create_async_client()
yield client
await client.close()
@pytest.mark.asyncio
async def test_with_fixture(async_client):
"""使用异步fixture测试。"""
result = await async_client.fetch_data()
assert result is not None
undefined2. Mocking Async Functions
2. 模拟异步函数
python
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_with_mock():
"""Test with async mock."""
# Create async mock
mock_func = AsyncMock(return_value="mocked result")
result = await mock_func()
assert result == "mocked result"
mock_func.assert_called_once()
@pytest.mark.asyncio
@patch("module.async_function", new_callable=AsyncMock)
async def test_with_patch(mock_async):
"""Test with patched async function."""
mock_async.return_value = {"status": "success"}
result = await some_function_that_calls_async()
assert result["status"] == "success"
mock_async.assert_called_once()python
import pytest
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_with_mock():
"""使用异步模拟测试。"""
# 创建异步模拟
mock_func = AsyncMock(return_value="mocked result")
result = await mock_func()
assert result == "mocked result"
mock_func.assert_called_once()
@pytest.mark.asyncio
@patch("module.async_function", new_callable=AsyncMock)
async def test_with_patch(mock_async):
"""使用补丁异步函数测试。"""
mock_async.return_value = {"status": "success"}
result = await some_function_that_calls_async()
assert result["status"] == "success"
mock_async.assert_called_once()Performance Optimization
性能优化
1. Use asyncio.gather() for Parallelism
1. 使用asyncio.gather()实现并行
python
import asyncio
import time
async def slow_task(n):
await asyncio.sleep(1)
return n * 2
async def optimized():
"""Parallel execution."""
start = time.time()
# Sequential (slow) - 5 seconds
# results = []
# for i in range(5):
# result = await slow_task(i)
# results.append(result)
# Parallel (fast) - 1 second
results = await asyncio.gather(*[slow_task(i) for i in range(5)])
elapsed = time.time() - start
print(f"Time: {elapsed:.2f}s, Results: {results}")
asyncio.run(optimized())python
import asyncio
import time
async def slow_task(n):
await asyncio.sleep(1)
return n * 2
async def optimized():
"""并行执行。"""
start = time.time()
# 顺序执行(慢)- 5秒
# results = []
# for i in range(5):
# result = await slow_task(i)
# results.append(result)
# 并行执行(快)- 1秒
results = await asyncio.gather(*[slow_task(i) for i in range(5)])
elapsed = time.time() - start
print(f"耗时: {elapsed:.2f}s, 结果: {results}")
asyncio.run(optimized())2. Connection Pooling
2. 连接池
python
import asyncio
import aiohttppython
import asyncio
import aiohttpBAD: Create new session for each request
❌ 错误模式:每个请求创建新会话
async def bad_pattern():
for i in range(10):
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/json") as response:
await response.json()
async def bad_pattern():
for i in range(10):
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/json") as response:
await response.json()
GOOD: Reuse session with connection pool
✅ 正确模式:复用带连接池的会话
async def good_pattern():
async with aiohttp.ClientSession() as session:
tasks = [
session.get("https://httpbin.org/json")
for i in range(10)
]
responses = await asyncio.gather(*tasks)
for response in responses:
await response.json()
undefinedasync def good_pattern():
async with aiohttp.ClientSession() as session:
tasks = [
session.get("https://httpbin.org/json")
for i in range(10)
]
responses = await asyncio.gather(*tasks)
for response in responses:
await response.json()
undefined3. Avoid Blocking Operations
3. 避免阻塞操作
python
import asynciopython
import asyncioBAD: Blocking I/O in async function
❌ 错误:异步函数中的阻塞I/O
async def bad_file_read():
with open("large_file.txt") as f: # Blocks event loop!
data = f.read()
return data
async def bad_file_read():
with open("large_file.txt") as f: # 阻塞事件循环!
data = f.read()
return data
GOOD: Use async file I/O or run in executor
✅ 正确:使用异步文件I/O或在执行器中运行
async def good_file_read():
loop = asyncio.get_running_loop()
# Run blocking operation in thread pool
data = await loop.run_in_executor(
None,
lambda: open("large_file.txt").read()
)
return dataasync def good_file_read():
loop = asyncio.get_running_loop()
# 在线程池中运行阻塞操作
data = await loop.run_in_executor(
None,
lambda: open("large_file.txt").read()
)
return dataBETTER: Use aiofiles for async file I/O
✅ 更好:使用aiofiles进行异步文件I/O
import aiofiles
async def better_file_read():
async with aiofiles.open("large_file.txt") as f:
data = await f.read()
return data
undefinedimport aiofiles
async def better_file_read():
async with aiofiles.open("large_file.txt") as f:
data = await f.read()
return data
undefinedCommon Pitfalls
常见陷阱
❌ Anti-Pattern 1: Not Awaiting Coroutines
❌ 反模式1:未等待协程
python
undefinedpython
undefinedWRONG
错误
async def bad():
result = async_function() # Returns coroutine, doesn't execute!
print(result) # Prints: <coroutine object>
async def bad():
result = async_function() # 返回协程,未执行!
print(result) # 输出: <coroutine object>
CORRECT
正确
async def good():
result = await async_function() # Actually executes
print(result)
undefinedasync def good():
result = await async_function() # 实际执行
print(result)
undefined❌ Anti-Pattern 2: Blocking the Event Loop
❌ 反模式2:阻塞事件循环
python
undefinedpython
undefinedWRONG
错误
import time
async def bad():
time.sleep(5) # Blocks entire event loop!
import time
async def bad():
time.sleep(5) # 阻塞整个事件循环!
CORRECT
正确
async def good():
await asyncio.sleep(5) # Non-blocking
undefinedasync def good():
await asyncio.sleep(5) # 非阻塞
undefined❌ Anti-Pattern 3: Not Handling Cancellation
❌ 反模式3:未处理取消
python
undefinedpython
undefinedWRONG
错误
async def bad():
await asyncio.sleep(10)
# No cleanup if cancelled
async def bad():
await asyncio.sleep(10)
# 取消时无清理
CORRECT
正确
async def good():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
# Cleanup resources
await cleanup()
raise # Re-raise to propagate
undefinedasync def good():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
# 清理资源
await cleanup()
raise # 重新抛出以传播取消
undefined❌ Anti-Pattern 4: Creating Event Loop Incorrectly
❌ 反模式4:错误创建事件循环
python
undefinedpython
undefinedWRONG (Python 3.7+)
错误(Python 3.7+)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
CORRECT (Python 3.7+)
正确(Python 3.7+)
asyncio.run(main())
undefinedasyncio.run(main())
undefined❌ Anti-Pattern 5: Not Closing Resources
❌ 反模式5:未关闭资源
python
undefinedpython
undefinedWRONG
错误
async def bad():
session = aiohttp.ClientSession()
response = await session.get(url)
# Session never closed - resource leak!
async def bad():
session = aiohttp.ClientSession()
response = await session.get(url)
# 会话从未关闭 - 资源泄漏!
CORRECT
正确
async def good():
async with aiohttp.ClientSession() as session:
response = await session.get(url)
# Session automatically closed
undefinedasync def good():
async with aiohttp.ClientSession() as session:
response = await session.get(url)
# 会话自动关闭
undefinedBest Practices
最佳实践
- Use asyncio.run() for entry point (Python 3.7+)
- Always await coroutines - don't forget await
- Use async context managers for resource cleanup
- Connection pooling for HTTP and database clients
- Handle CancelledError for graceful shutdown
- Use asyncio.gather() for parallel operations
- Avoid blocking operations in async functions
- Use timeouts to prevent hanging operations
- Debug mode during development to catch issues
- Test async code with pytest-asyncio
- 使用asyncio.run() 作为入口点(Python 3.7+)
- 始终等待协程 - 不要忘记await关键字
- 使用异步上下文管理器 进行资源清理
- 连接池 用于HTTP和数据库客户端
- 处理CancelledError 以实现优雅关闭
- 使用asyncio.gather() 进行并行操作
- 异步函数中避免阻塞操作
- 使用超时 防止操作挂起
- 开发期间启用调试模式 以发现问题
- 使用pytest-asyncio测试异步代码
Quick Reference
快速参考
Common Commands
常用命令
bash
undefinedbash
undefinedRun async script
运行异步脚本
python script.py
python script.py
Run with debug mode
启用调试模式运行
PYTHONASYNCIODEBUG=1 python script.py
PYTHONASYNCIODEBUG=1 python script.py
Run tests
运行测试
pytest -v --asyncio-mode=auto
pytest -v --asyncio-mode=auto
Install async dependencies
安装异步依赖
pip install aiohttp asyncpg motor pytest-asyncio
undefinedpip install aiohttp asyncpg motor pytest-asyncio
undefinedEssential Imports
必要导入
python
import asyncio
import aiohttp
import asyncpg
from typing import List, Dict, Anypython
import asyncio
import aiohttp
import asyncpg
from typing import List, Dict, AnyResources
资源
- Official Documentation: https://docs.python.org/3/library/asyncio.html
- aiohttp: https://docs.aiohttp.org/
- asyncpg: https://magicstack.github.io/asyncpg/
- FastAPI Async: https://fastapi.tiangolo.com/async/
- pytest-asyncio: https://pytest-asyncio.readthedocs.io/
- 官方文档: https://docs.python.org/3/library/asyncio.html
- aiohttp: https://docs.aiohttp.org/
- asyncpg: https://magicstack.github.io/asyncpg/
- FastAPI异步: https://fastapi.tiangolo.com/async/
- pytest-asyncio: https://pytest-asyncio.readthedocs.io/
Related Skills
相关技能
When using asyncio, consider these complementary skills:
- fastapi-local-dev: FastAPI async server patterns and production deployment
- pytest: Testing async code with pytest-asyncio and fixtures
- systematic-debugging: Debugging async race conditions and deadlocks
使用asyncio时,可结合以下互补技能:
- fastapi-local-dev: FastAPI异步服务端模式与生产部署
- pytest: 使用pytest-asyncio和fixture测试异步代码
- systematic-debugging: 调试异步竞态条件与死锁
Quick FastAPI Async Patterns (Inlined for Standalone Use)
独立使用的FastAPI异步模式速览
python
undefinedpython
undefinedFastAPI async endpoint pattern
FastAPI异步端点模式
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
app = FastAPI()
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
app = FastAPI()
Async database setup
异步数据库设置
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
# Async database query
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
# 异步数据库查询
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="用户未找到")
return user
Background tasks with asyncio
基于asyncio的后台任务
@app.post("/send-email")
async def send_email_endpoint(email: EmailSchema):
# Non-blocking background task
asyncio.create_task(send_email_async(email))
return {"status": "email queued"}
undefined@app.post("/send-email")
async def send_email_endpoint(email: EmailSchema):
# 非阻塞后台任务
asyncio.create_task(send_email_async(email))
return {"status": "邮件已排队"}
undefinedQuick pytest-asyncio Patterns (Inlined for Standalone Use)
独立使用的pytest-asyncio模式速览
python
undefinedpython
undefinedTesting async functions with pytest
使用pytest测试异步函数
import pytest
import pytest_asyncio
from httpx import AsyncClient
import pytest
import pytest_asyncio
from httpx import AsyncClient
Async test fixture
异步测试fixture
@pytest_asyncio.fixture
async def async_client():
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
@pytest_asyncio.fixture
async def async_client():
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
Async test function
异步测试函数
@pytest.mark.asyncio
async def test_get_user(async_client):
response = await async_client.get("/users/1")
assert response.status_code == 200
assert response.json()["id"] == 1
@pytest.mark.asyncio
async def test_get_user(async_client):
response = await async_client.get("/users/1")
assert response.status_code == 200
assert response.json()["id"] == 1
Testing concurrent operations
测试并发操作
@pytest.mark.asyncio
async def test_concurrent_requests():
async with AsyncClient(app=app, base_url="http://test") as client:
# Run 10 requests concurrently
responses = await asyncio.gather(
*[client.get(f"/users/{i}") for i in range(1, 11)]
)
assert all(r.status_code == 200 for r in responses)
@pytest.mark.asyncio
async def test_concurrent_requests():
async with AsyncClient(app=app, base_url="http://test") as client:
# 并发运行10个请求
responses = await asyncio.gather(
*[client.get(f"/users/{i}") for i in range(1, 11)]
)
assert all(r.status_code == 200 for r in responses)
Mock async dependencies
模拟异步依赖
@pytest_asyncio.fixture
async def mock_db():
# Setup mock database
db = AsyncMock()
yield db
# Cleanup
undefined@pytest_asyncio.fixture
async def mock_db():
# 设置模拟数据库
db = AsyncMock()
yield db
# 清理
undefinedQuick Async Debugging Reference (Inlined for Standalone Use)
独立使用的异步调试参考速览
Common Async Pitfalls:
-
Blocking the Event Looppython
# ❌ WRONG: Blocking call in async function async def bad_function(): time.sleep(5) # Blocks entire event loop! return "done" # ✅ CORRECT: Use asyncio.sleep async def good_function(): await asyncio.sleep(5) # Releases event loop return "done" -
Debugging Race Conditionspython
# Add logging to track execution order import logging logging.basicConfig(level=logging.DEBUG) async def debug_task(name): logging.debug(f"{name}: Starting") await asyncio.sleep(1) logging.debug(f"{name}: Finished") return name # Run with detailed tracing asyncio.run(asyncio.gather(debug_task("A"), debug_task("B")), debug=True) -
Deadlock Detectionpython
# Use timeout to detect deadlocks try: result = await asyncio.wait_for(some_async_function(), timeout=5.0) except asyncio.TimeoutError: logging.error("Deadlock detected: operation timed out") # Investigate what's blocking -
Inspecting Running Taskspython
# Check all pending tasks tasks = asyncio.all_tasks() for task in tasks: print(f"Task: {task.get_name()}, Done: {task.done()}") if not task.done(): print(f" Current coro: {task.get_coro()}")
[Full FastAPI, pytest, and debugging patterns available in respective skills if deployed together]
Python Version Compatibility: This skill covers asyncio in Python 3.7+ and reflects current best practices for async programming in 2025.
常见异步陷阱:
-
阻塞事件循环python
# ❌ 错误:异步函数中的阻塞调用 async def bad_function(): time.sleep(5) # 阻塞整个事件循环! return "done" # ✅ 正确:使用asyncio.sleep async def good_function(): await asyncio.sleep(5) # 释放事件循环 return "done" -
调试竞态条件python
# 添加日志跟踪执行顺序 import logging logging.basicConfig(level=logging.DEBUG) async def debug_task(name): logging.debug(f"{name}: 启动") await asyncio.sleep(1) logging.debug(f"{name}: 完成") return name # 启用详细跟踪运行 asyncio.run(asyncio.gather(debug_task("A"), debug_task("B")), debug=True) -
死锁检测python
# 使用超时检测死锁 try: result = await asyncio.wait_for(some_async_function(), timeout=5.0) except asyncio.TimeoutError: logging.error("检测到死锁:操作超时") # 调查阻塞原因 -
检查运行中任务python
# 检查所有待处理任务 tasks = asyncio.all_tasks() for task in tasks: print(f"任务: {task.get_name()}, 完成: {task.done()}") if not task.done(): print(f" 当前协程: {task.get_coro()}")
[若同时部署,可查看对应技能中的完整FastAPI、pytest及调试模式]
Python版本兼容性: 本技能覆盖Python 3.7+中的asyncio,反映了2025年异步编程的当前最佳实践。