pytest-async-testing
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePytest Async Testing
Pytest异步测试
Purpose
目的
Async testing requires special handling of event loops, context managers, and async fixtures. This skill provides production-ready patterns for testing all async scenarios in modern Python.
异步测试需要对事件循环、上下文管理器和异步fixtures进行特殊处理。本技能为现代Python中所有异步场景的测试提供了可用于生产环境的模式。
Quick Start
快速开始
Configure pytest for auto-detection of async tests:
toml
undefined配置pytest以自动检测异步测试:
toml
undefinedpyproject.toml
pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto" # Auto-detect async tests
asyncio_default_fixture_loop_scope = "function" # Max isolation per test
Then write async tests naturally:
```python
@pytest.mark.asyncio
async def test_async_function() -> None:
"""Async test with full type safety."""
result = await fetch_data()
assert result is not None[tool.pytest.ini_options]
asyncio_mode = "auto" # 自动检测异步测试
asyncio_default_fixture_loop_scope = "function" # 每个测试最大程度隔离
然后自然编写异步测试:
```python
@pytest.mark.asyncio
async def test_async_function() -> None:
"""具备完整类型安全的异步测试。"""
result = await fetch_data()
assert result is not NoneWith pytest-asyncio auto-mode, @pytest.mark.asyncio is optional:
在pytest-asyncio自动模式下,@pytest.mark.asyncio是可选的:
async def test_auto_detected() -> None:
"""Async test without decorator."""
result = await fetch_data()
assert result is not None
undefinedasync def test_auto_detected() -> None:
"""无需装饰器的异步测试。"""
result = await fetch_data()
assert result is not None
undefinedInstructions
操作步骤
Step 1: Configure pytest-asyncio in pyproject.toml
步骤1:在pyproject.toml中配置pytest-asyncio
toml
[tool.pytest.ini_options]toml
[tool.pytest.ini_options]Auto-detect async tests (don't require @pytest.mark.asyncio)
自动检测异步测试(无需@pytest.mark.asyncio)
asyncio_mode = "auto"
asyncio_mode = "auto"
Function scope: New event loop per test (maximum isolation)
函数作用域:每个测试使用新的事件循环(最大隔离性)
asyncio_default_fixture_loop_scope = "function"
asyncio_default_fixture_loop_scope = "function"
Alternative scopes for performance:
为提升性能可选择其他作用域:
asyncio_default_fixture_loop_scope = "module" # Shared per module
asyncio_default_fixture_loop_scope = "module" # 每个模块共享
asyncio_default_fixture_loop_scope = "session" # Single for all tests
asyncio_default_fixture_loop_scope = "session" # 所有测试共用一个
undefinedundefinedStep 2: Create Async Fixtures
步骤2:创建异步Fixtures
python
from typing import AsyncGenerator
import pytestpython
from typing import AsyncGenerator
import pytest✅ GOOD: Async fixture with proper type hints
✅ 推荐:带有正确类型提示的异步fixture
@pytest.fixture
async def async_client() -> AsyncGenerator[AsyncClient, None]:
"""Provide async HTTP client."""
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
# Cleanup happens after yield
@pytest.fixture
async def async_client() -> AsyncGenerator[AsyncClient, None]:
"""提供异步HTTP客户端。"""
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
# 清理操作在yield之后执行
✅ GOOD: Async database session
✅ 推荐:异步数据库会话
@pytest.fixture
async def db_session() -> AsyncGenerator[AsyncSession, None]:
"""Provide async database session."""
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with async_session() as session:
yield session
await session.rollback() # Cleanupundefined@pytest.fixture
async def db_session() -> AsyncGenerator[AsyncSession, None]:
"""提供异步数据库会话。"""
engine = create_async_engine("sqlite+aiosqlite:///:memory:")
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with async_session() as session:
yield session
await session.rollback() # 清理undefinedStep 3: Use AsyncMock for Mocking Async Methods
步骤3:使用AsyncMock模拟异步方法
python
from unittest.mock import AsyncMockpython
from unittest.mock import AsyncMock✅ GOOD: AsyncMock for single async method
✅ 推荐:为单个异步方法创建AsyncMock
@pytest.fixture
def mock_gateway() -> AsyncMock:
"""Mock async gateway."""
mock = AsyncMock()
mock.fetch_orders.return_value = [order1, order2]
mock.close.return_value = None
return mock
@pytest.fixture
def mock_gateway() -> AsyncMock:
"""模拟异步网关。"""
mock = AsyncMock()
mock.fetch_orders.return_value = [order1, order2]
mock.close.return_value = None
return mock
✅ GOOD: AsyncMock for async generator
✅ 推荐:为异步生成器创建AsyncMock
@pytest.fixture
def mock_async_generator() -> AsyncMock:
"""Mock async generator."""
mock = AsyncMock()
async def fake_generator():
yield item1
yield item2
yield item3
mock.stream.return_value = fake_generator()
return mockundefined@pytest.fixture
def mock_async_generator() -> AsyncMock:
"""模拟异步生成器。"""
mock = AsyncMock()
async def fake_generator():
yield item1
yield item2
yield item3
mock.stream.return_value = fake_generator()
return mockundefinedStep 4: Test Async Use Cases
步骤4:测试异步用例
python
@pytest.mark.asyncio
async def test_extract_orders_use_case(
mock_gateway: AsyncMock,
mock_publisher: AsyncMock,
) -> None:
"""Test async use case with async mocks."""
use_case = ExtractOrdersUseCase(
gateway=mock_gateway,
publisher=mock_publisher
)
result = await use_case.execute()
assert result.orders_count == 2
# Verify async methods were awaited
mock_gateway.fetch_orders.assert_awaited_once()
assert mock_publisher.publish_order.await_count == 2python
@pytest.mark.asyncio
async def test_extract_orders_use_case(
mock_gateway: AsyncMock,
mock_publisher: AsyncMock,
) -> None:
"""使用异步mock测试异步用例。"""
use_case = ExtractOrdersUseCase(
gateway=mock_gateway,
publisher=mock_publisher
)
result = await use_case.execute()
assert result.orders_count == 2
# 验证异步方法已被调用
mock_gateway.fetch_orders.assert_awaited_once()
assert mock_publisher.publish_order.await_count == 2Step 5: Test FastAPI Endpoints with AsyncClient
步骤5:使用AsyncClient测试FastAPI接口
python
from httpx import AsyncClient, ASGITransport
@pytest.fixture
async def test_client() -> AsyncGenerator[AsyncClient, None]:
"""Async HTTP client for FastAPI testing."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
@pytest.mark.asyncio
async def test_get_endpoint(test_client: AsyncClient) -> None:
"""Test FastAPI GET endpoint."""
response = await test_client.get("/top-products?count=10")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) <= 10
@pytest.mark.asyncio
async def test_post_endpoint(test_client: AsyncClient) -> None:
"""Test FastAPI POST endpoint."""
payload = {"name": "Test", "price": 99.99}
response = await test_client.post("/products", json=payload)
assert response.status_code == 201
data = response.json()
assert data["id"] is not Nonepython
from httpx import AsyncClient, ASGITransport
@pytest.fixture
async def test_client() -> AsyncGenerator[AsyncClient, None]:
"""用于FastAPI测试的异步HTTP客户端。"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
@pytest.mark.asyncio
async def test_get_endpoint(test_client: AsyncClient) -> None:
"""测试FastAPI GET接口。"""
response = await test_client.get("/top-products?count=10")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) <= 10
@pytest.mark.asyncio
async def test_post_endpoint(test_client: AsyncClient) -> None:
"""测试FastAPI POST接口。"""
payload = {"name": "Test", "price": 99.99}
response = await test_client.post("/products", json=payload)
assert response.status_code == 201
data = response.json()
assert data["id"] is not NoneStep 6: Test Async Context Managers
步骤6:测试异步上下文管理器
python
from contextlib import asynccontextmanager
@asynccontextmanager
async def database_transaction():
"""Async context manager for transactions."""
session = Session()
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
@pytest.mark.asyncio
async def test_transaction_success() -> None:
"""Test successful transaction."""
async with database_transaction() as session:
await session.execute("INSERT INTO orders VALUES (1, 'test')")
# Verify commit was called
@pytest.mark.asyncio
async def test_transaction_rollback() -> None:
"""Test transaction rollback on error."""
with pytest.raises(ValueError):
async with database_transaction() as session:
await session.execute("INSERT INTO orders VALUES (1, 'test')")
raise ValueError("Simulated error")
# Verify rollback was calledpython
from contextlib import asynccontextmanager
@asynccontextmanager
async def database_transaction():
"""用于事务的异步上下文管理器。"""
session = Session()
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
@pytest.mark.asyncio
async def test_transaction_success() -> None:
"""测试成功的事务。"""
async with database_transaction() as session:
await session.execute("INSERT INTO orders VALUES (1, 'test')")
# 验证commit已被调用
@pytest.mark.asyncio
async def test_transaction_rollback() -> None:
"""测试出错时的事务回滚。"""
with pytest.raises(ValueError):
async with database_transaction() as session:
await session.execute("INSERT INTO orders VALUES (1, 'test')")
raise ValueError("模拟错误")
# 验证rollback已被调用Step 7: Test Async Generators
步骤7:测试异步生成器
python
from typing import AsyncIterator
async def fetch_orders_stream() -> AsyncIterator[dict]:
"""Async generator yielding orders."""
for i in range(10):
yield {"id": str(i), "total": 100.0 * i}
@pytest.mark.asyncio
async def test_async_generator() -> None:
"""Test async generator consumption."""
orders = []
async for order in fetch_orders_stream():
orders.append(order)
assert len(orders) == 10
assert orders[0]["id"] == "0"
assert orders[9]["total"] == 900.0
@pytest.mark.asyncio
async def test_async_generator_with_early_termination() -> None:
"""Test breaking out of async generator."""
orders = []
async for order in fetch_orders_stream():
orders.append(order)
if len(orders) >= 5:
break
assert len(orders) == 5python
from typing import AsyncIterator
async def fetch_orders_stream() -> AsyncIterator[dict]:
"""生成订单的异步生成器。"""
for i in range(10):
yield {"id": str(i), "total": 100.0 * i}
@pytest.mark.asyncio
async def test_async_generator() -> None:
"""测试异步生成器的消费。"""
orders = []
async for order in fetch_orders_stream():
orders.append(order)
assert len(orders) == 10
assert orders[0]["id"] == "0"
assert orders[9]["total"] == 900.0
@pytest.mark.asyncio
async def test_async_generator_with_early_termination() -> None:
"""测试提前终止异步生成器。"""
orders = []
async for order in fetch_orders_stream():
orders.append(order)
if len(orders) >= 5:
break
assert len(orders) == 5Step 8: Test Kafka Async Clients
步骤8:测试Kafka异步客户端
python
from unittest.mock import AsyncMock
@pytest.fixture
def mock_kafka_producer() -> AsyncMock:
"""Mock confluent-kafka async producer."""
mock = AsyncMock()
mock.produce.return_value = None
mock.flush.return_value = 0 # All delivered
mock.close.return_value = None
return mock
@pytest.fixture
def mock_kafka_consumer() -> AsyncMock:
"""Mock Kafka consumer with async iteration."""
mock = AsyncMock()
async def fake_consume():
yield {"key": "order_1", "value": {"id": "1"}}
yield {"key": "order_2", "value": {"id": "2"}}
mock.consume.return_value = fake_consume()
return mock
@pytest.mark.asyncio
async def test_kafka_producer_integration(
mock_kafka_producer: AsyncMock
) -> None:
"""Test async Kafka producer."""
producer = KafkaProducerAdapter(mock_kafka_producer)
await producer.publish("test_topic", "key", {"data": "value"})
mock_kafka_producer.produce.assert_called_once()
mock_kafka_producer.flush.assert_called_once()
@pytest.mark.asyncio
async def test_kafka_consumer_integration(
mock_kafka_consumer: AsyncMock
) -> None:
"""Test async Kafka consumer."""
consumer = KafkaConsumerAdapter(mock_kafka_consumer)
messages = []
async for message in consumer.consume():
messages.append(message)
assert len(messages) == 2python
from unittest.mock import AsyncMock
@pytest.fixture
def mock_kafka_producer() -> AsyncMock:
"""模拟confluent-kafka异步生产者。"""
mock = AsyncMock()
mock.produce.return_value = None
mock.flush.return_value = 0 # 所有消息已投递
mock.close.return_value = None
return mock
@pytest.fixture
def mock_kafka_consumer() -> AsyncMock:
"""支持异步迭代的Kafka消费者模拟。"""
mock = AsyncMock()
async def fake_consume():
yield {"key": "order_1", "value": {"id": "1"}}
yield {"key": "order_2", "value": {"id": "2"}}
mock.consume.return_value = fake_consume()
return mock
@pytest.mark.asyncio
async def test_kafka_producer_integration(
mock_kafka_producer: AsyncMock
) -> None:
"""测试异步Kafka生产者。"""
producer = KafkaProducerAdapter(mock_kafka_producer)
await producer.publish("test_topic", "key", {"data": "value"})
mock_kafka_producer.produce.assert_called_once()
mock_kafka_producer.flush.assert_called_once()
@pytest.mark.asyncio
async def test_kafka_consumer_integration(
mock_kafka_consumer: AsyncMock
) -> None:
"""测试异步Kafka消费者。"""
consumer = KafkaConsumerAdapter(mock_kafka_consumer)
messages = []
async for message in consumer.consume():
messages.append(message)
assert len(messages) == 2Step 9: Handle Event Loop Cleanup
步骤9:处理事件循环清理
python
import asyncio
@pytest.fixture
async def cleanup_tasks():
"""Ensure all async tasks are cleaned up."""
yield
# Cancel any pending tasks
pending = asyncio.all_tasks()
for task in pending:
task.cancel()
# Wait for cancellation to complete
await asyncio.gather(*pending, return_exceptions=True)python
import asyncio
@pytest.fixture
async def cleanup_tasks():
"""确保所有异步任务被清理。"""
yield
# 取消所有待处理任务
pending = asyncio.all_tasks()
for task in pending:
task.cancel()
# 等待取消操作完成
await asyncio.gather(*pending, return_exceptions=True)Step 10: Test with Timeouts for Long-Running Async Operations
步骤10:为长时间运行的异步操作设置超时测试
python
import pytest
@pytest.mark.asyncio
@pytest.mark.timeout(5) # 5-second timeout
async def test_fetch_with_timeout() -> None:
"""Test that long-running operation completes in time."""
result = await fetch_data()
assert result is not None
@pytest.mark.asyncio
async def test_with_asyncio_wait_for() -> None:
"""Test with asyncio.wait_for for timeout."""
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(long_running_op(), timeout=0.1)python
import pytest
@pytest.mark.asyncio
@pytest.mark.timeout(5) # 5秒超时
async def test_fetch_with_timeout() -> None:
"""测试长时间运行的操作是否在规定时间内完成。"""
result = await fetch_data()
assert result is not None
@pytest.mark.asyncio
async def test_with_asyncio_wait_for() -> None:
"""使用asyncio.wait_for进行超时测试。"""
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(long_running_op(), timeout=0.1)Examples
示例
Example 1: Complete FastAPI Test
示例1:完整的FastAPI测试
python
from httpx import AsyncClient, ASGITransport
from unittest.mock import AsyncMock
import pytest
from app.reporting.infrastructure.fastapi_app import app
@pytest.fixture
async def test_client() -> AsyncGenerator[AsyncClient, None]:
"""Async HTTP client for FastAPI testing."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
@pytest.fixture
def mock_query_use_case(monkeypatch: Any) -> AsyncMock:
"""Mock use case dependency in FastAPI."""
mock = AsyncMock()
from app.reporting.infrastructure import dependencies
monkeypatch.setattr(
dependencies,
"get_query_use_case",
lambda: mock
)
return mock
class TestTopProductsEndpoint:
"""Test reporting API endpoint."""
@pytest.mark.asyncio
async def test_get_top_products_success(
self,
test_client: AsyncClient,
mock_query_use_case: AsyncMock,
) -> None:
"""Test successful response."""
mock_query_use_case.execute.return_value = [
ProductRanking(title="Laptop", rank=Rank(1), cnt_bought=100),
ProductRanking(title="Mouse", rank=Rank(2), cnt_bought=50),
]
response = await test_client.get("/top-products?count=10")
assert response.status_code == 200
data = response.json()
assert len(data) == 2
@pytest.mark.asyncio
async def test_get_top_products_unavailable(
self,
test_client: AsyncClient,
mock_query_use_case: AsyncMock,
) -> None:
"""Test 503 when data not available."""
from app.reporting.application.exceptions import DataNotAvailableException
mock_query_use_case.execute.side_effect = DataNotAvailableException(
"ClickHouse not ready"
)
response = await test_client.get("/top-products?count=10")
assert response.status_code == 503python
from httpx import AsyncClient, ASGITransport
from unittest.mock import AsyncMock
import pytest
from app.reporting.infrastructure.fastapi_app import app
@pytest.fixture
async def test_client() -> AsyncGenerator[AsyncClient, None]:
"""用于FastAPI测试的异步HTTP客户端。"""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
yield client
@pytest.fixture
def mock_query_use_case(monkeypatch: Any) -> AsyncMock:
"""模拟FastAPI中的用例依赖。"""
mock = AsyncMock()
from app.reporting.infrastructure import dependencies
monkeypatch.setattr(
dependencies,
"get_query_use_case",
lambda: mock
)
return mock
class TestTopProductsEndpoint:
"""测试报表API接口。"""
@pytest.mark.asyncio
async def test_get_top_products_success(
self,
test_client: AsyncClient,
mock_query_use_case: AsyncMock,
) -> None:
"""测试成功响应。"""
mock_query_use_case.execute.return_value = [
ProductRanking(title="Laptop", rank=Rank(1), cnt_bought=100),
ProductRanking(title="Mouse", rank=Rank(2), cnt_bought=50),
]
response = await test_client.get("/top-products?count=10")
assert response.status_code == 200
data = response.json()
assert len(data) == 2
@pytest.mark.asyncio
async def test_get_top_products_unavailable(
self,
test_client: AsyncClient,
mock_query_use_case: AsyncMock,
) -> None:
"""测试数据不可用时返回503。"""
from app.reporting.application.exceptions import DataNotAvailableException
mock_query_use_case.execute.side_effect = DataNotAvailableException(
"ClickHouse not ready"
)
response = await test_client.get("/top-products?count=10")
assert response.status_code == 503Example 2: Testing Async Use Case with Streaming
示例2:测试带流处理的异步用例
python
@pytest.mark.asyncio
async def test_extract_orders_with_stream(
mock_gateway: AsyncMock,
mock_publisher: AsyncMock,
) -> None:
"""Test use case that processes streaming orders."""
# Mock async generator
async def fake_orders():
for i in range(100):
yield create_test_order(order_id=str(i))
mock_gateway.fetch_orders.return_value = fake_orders()
use_case = ExtractOrdersUseCase(
gateway=mock_gateway,
publisher=mock_publisher
)
result = await use_case.execute()
assert result.orders_count == 100
assert mock_publisher.publish_order.call_count == 100python
@pytest.mark.asyncio
async def test_extract_orders_with_stream(
mock_gateway: AsyncMock,
mock_publisher: AsyncMock,
) -> None:
"""测试处理流式订单的用例。"""
# 模拟异步生成器
async def fake_orders():
for i in range(100):
yield create_test_order(order_id=str(i))
mock_gateway.fetch_orders.return_value = fake_orders()
use_case = ExtractOrdersUseCase(
gateway=mock_gateway,
publisher=mock_publisher
)
result = await use_case.execute()
assert result.orders_count == 100
assert mock_publisher.publish_order.call_count == 100Example 3: Async Fixture with Parametrization
示例3:带参数化的异步Fixture
python
@pytest.fixture(params=["http://localhost:8000", "https://api.example.com"])
async def api_client(request: pytest.FixtureRequest) -> AsyncGenerator[AsyncClient, None]:
"""Parametrized async client for different endpoints."""
async with AsyncClient(base_url=request.param) as client:
yield client
@pytest.mark.asyncio
async def test_against_multiple_endpoints(api_client: AsyncClient) -> None:
"""Test runs against each endpoint."""
response = await api_client.get("/health")
assert response.status_code == 200python
@pytest.fixture(params=["http://localhost:8000", "https://api.example.com"])
async def api_client(request: pytest.FixtureRequest) -> AsyncGenerator[AsyncClient, None]:
"""针对不同端点的参数化异步客户端。"""
async with AsyncClient(base_url=request.param) as client:
yield client
@pytest.mark.asyncio
async def test_against_multiple_endpoints(api_client: AsyncClient) -> None:
"""测试在每个端点上运行。"""
response = await api_client.get("/health")
assert response.status_code == 200Requirements
依赖要求
- Python 3.11+
- pytest >= 7.0
- pytest-asyncio >= 0.20.0
- httpx (for AsyncClient)
- asyncio (standard library)
- Python 3.11+
- pytest >= 7.0
- pytest-asyncio >= 0.20.0
- httpx(用于AsyncClient)
- asyncio(标准库)
See Also
相关链接
- pytest-mocking-strategy - Async mocking with AsyncMock
- pytest-configuration - pytest setup
- PYTHON_UNIT_TESTING_BEST_PRACTICES.md - Section: "Async Testing Patterns"
- PROJECT_UNIT_TESTING_STRATEGY.md - Section: "Async Testing Strategy"
- pytest-mocking-strategy - 使用AsyncMock进行异步模拟
- pytest-configuration - pytest配置
- PYTHON_UNIT_TESTING_BEST_PRACTICES.md - 章节:"异步测试模式"
- PROJECT_UNIT_TESTING_STRATEGY.md - 章节:"异步测试策略"