pytest-async-testing

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Pytest Async Testing

Pytest异步测试

Purpose

目的

Async testing requires special handling of event loops, context managers, and async fixtures. This skill provides production-ready patterns for testing all async scenarios in modern Python.
异步测试需要对事件循环、上下文管理器和异步fixtures进行特殊处理。本技能为现代Python中所有异步场景的测试提供了可用于生产环境的模式。

Quick Start

快速开始

Configure pytest for auto-detection of async tests:
toml
undefined
配置pytest以自动检测异步测试:
toml
undefined

pyproject.toml

pyproject.toml

[tool.pytest.ini_options] asyncio_mode = "auto" # Auto-detect async tests asyncio_default_fixture_loop_scope = "function" # Max isolation per test

Then write async tests naturally:

```python
@pytest.mark.asyncio
async def test_async_function() -> None:
    """Async test with full type safety."""
    result = await fetch_data()
    assert result is not None
[tool.pytest.ini_options] asyncio_mode = "auto" # 自动检测异步测试 asyncio_default_fixture_loop_scope = "function" # 每个测试最大程度隔离

然后自然编写异步测试:

```python
@pytest.mark.asyncio
async def test_async_function() -> None:
    """具备完整类型安全的异步测试。"""
    result = await fetch_data()
    assert result is not None

With pytest-asyncio auto-mode, @pytest.mark.asyncio is optional:

在pytest-asyncio自动模式下,@pytest.mark.asyncio是可选的:

async def test_auto_detected() -> None: """Async test without decorator.""" result = await fetch_data() assert result is not None
undefined
async def test_auto_detected() -> None: """无需装饰器的异步测试。""" result = await fetch_data() assert result is not None
undefined

Instructions

操作步骤

Step 1: Configure pytest-asyncio in pyproject.toml

步骤1:在pyproject.toml中配置pytest-asyncio

toml
[tool.pytest.ini_options]
toml
[tool.pytest.ini_options]

Auto-detect async tests (don't require @pytest.mark.asyncio)

自动检测异步测试(无需@pytest.mark.asyncio)

asyncio_mode = "auto"
asyncio_mode = "auto"

Function scope: New event loop per test (maximum isolation)

函数作用域:每个测试使用新的事件循环(最大隔离性)

asyncio_default_fixture_loop_scope = "function"
asyncio_default_fixture_loop_scope = "function"

Alternative scopes for performance:

为提升性能可选择其他作用域:

asyncio_default_fixture_loop_scope = "module" # Shared per module

asyncio_default_fixture_loop_scope = "module" # 每个模块共享

asyncio_default_fixture_loop_scope = "session" # Single for all tests

asyncio_default_fixture_loop_scope = "session" # 所有测试共用一个

undefined
undefined

Step 2: Create Async Fixtures

步骤2:创建异步Fixtures

python
from typing import AsyncGenerator
import pytest
python
from typing import AsyncGenerator
import pytest

✅ GOOD: Async fixture with proper type hints

✅ 推荐:带有正确类型提示的异步fixture

@pytest.fixture async def async_client() -> AsyncGenerator[AsyncClient, None]: """Provide async HTTP client.""" async with AsyncClient(app=app, base_url="http://test") as client: yield client # Cleanup happens after yield
@pytest.fixture async def async_client() -> AsyncGenerator[AsyncClient, None]: """提供异步HTTP客户端。""" async with AsyncClient(app=app, base_url="http://test") as client: yield client # 清理操作在yield之后执行

✅ GOOD: Async database session

✅ 推荐:异步数据库会话

@pytest.fixture async def db_session() -> AsyncGenerator[AsyncSession, None]: """Provide async database session.""" engine = create_async_engine("sqlite+aiosqlite:///:memory:") async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with async_session() as session:
    yield session
    await session.rollback()  # Cleanup
undefined
@pytest.fixture async def db_session() -> AsyncGenerator[AsyncSession, None]: """提供异步数据库会话。""" engine = create_async_engine("sqlite+aiosqlite:///:memory:") async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with async_session() as session:
    yield session
    await session.rollback()  # 清理
undefined

Step 3: Use AsyncMock for Mocking Async Methods

步骤3:使用AsyncMock模拟异步方法

python
from unittest.mock import AsyncMock
python
from unittest.mock import AsyncMock

✅ GOOD: AsyncMock for single async method

✅ 推荐:为单个异步方法创建AsyncMock

@pytest.fixture def mock_gateway() -> AsyncMock: """Mock async gateway.""" mock = AsyncMock() mock.fetch_orders.return_value = [order1, order2] mock.close.return_value = None return mock
@pytest.fixture def mock_gateway() -> AsyncMock: """模拟异步网关。""" mock = AsyncMock() mock.fetch_orders.return_value = [order1, order2] mock.close.return_value = None return mock

✅ GOOD: AsyncMock for async generator

✅ 推荐:为异步生成器创建AsyncMock

@pytest.fixture def mock_async_generator() -> AsyncMock: """Mock async generator.""" mock = AsyncMock()
async def fake_generator():
    yield item1
    yield item2
    yield item3

mock.stream.return_value = fake_generator()
return mock
undefined
@pytest.fixture def mock_async_generator() -> AsyncMock: """模拟异步生成器。""" mock = AsyncMock()
async def fake_generator():
    yield item1
    yield item2
    yield item3

mock.stream.return_value = fake_generator()
return mock
undefined

Step 4: Test Async Use Cases

步骤4:测试异步用例

python
@pytest.mark.asyncio
async def test_extract_orders_use_case(
    mock_gateway: AsyncMock,
    mock_publisher: AsyncMock,
) -> None:
    """Test async use case with async mocks."""
    use_case = ExtractOrdersUseCase(
        gateway=mock_gateway,
        publisher=mock_publisher
    )

    result = await use_case.execute()

    assert result.orders_count == 2
    # Verify async methods were awaited
    mock_gateway.fetch_orders.assert_awaited_once()
    assert mock_publisher.publish_order.await_count == 2
python
@pytest.mark.asyncio
async def test_extract_orders_use_case(
    mock_gateway: AsyncMock,
    mock_publisher: AsyncMock,
) -> None:
    """使用异步mock测试异步用例。"""
    use_case = ExtractOrdersUseCase(
        gateway=mock_gateway,
        publisher=mock_publisher
    )

    result = await use_case.execute()

    assert result.orders_count == 2
    # 验证异步方法已被调用
    mock_gateway.fetch_orders.assert_awaited_once()
    assert mock_publisher.publish_order.await_count == 2

Step 5: Test FastAPI Endpoints with AsyncClient

步骤5:使用AsyncClient测试FastAPI接口

python
from httpx import AsyncClient, ASGITransport

@pytest.fixture
async def test_client() -> AsyncGenerator[AsyncClient, None]:
    """Async HTTP client for FastAPI testing."""
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as client:
        yield client

@pytest.mark.asyncio
async def test_get_endpoint(test_client: AsyncClient) -> None:
    """Test FastAPI GET endpoint."""
    response = await test_client.get("/top-products?count=10")

    assert response.status_code == 200
    data = response.json()
    assert isinstance(data, list)
    assert len(data) <= 10

@pytest.mark.asyncio
async def test_post_endpoint(test_client: AsyncClient) -> None:
    """Test FastAPI POST endpoint."""
    payload = {"name": "Test", "price": 99.99}
    response = await test_client.post("/products", json=payload)

    assert response.status_code == 201
    data = response.json()
    assert data["id"] is not None
python
from httpx import AsyncClient, ASGITransport

@pytest.fixture
async def test_client() -> AsyncGenerator[AsyncClient, None]:
    """用于FastAPI测试的异步HTTP客户端。"""
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as client:
        yield client

@pytest.mark.asyncio
async def test_get_endpoint(test_client: AsyncClient) -> None:
    """测试FastAPI GET接口。"""
    response = await test_client.get("/top-products?count=10")

    assert response.status_code == 200
    data = response.json()
    assert isinstance(data, list)
    assert len(data) <= 10

@pytest.mark.asyncio
async def test_post_endpoint(test_client: AsyncClient) -> None:
    """测试FastAPI POST接口。"""
    payload = {"name": "Test", "price": 99.99}
    response = await test_client.post("/products", json=payload)

    assert response.status_code == 201
    data = response.json()
    assert data["id"] is not None

Step 6: Test Async Context Managers

步骤6:测试异步上下文管理器

python
from contextlib import asynccontextmanager

@asynccontextmanager
async def database_transaction():
    """Async context manager for transactions."""
    session = Session()
    try:
        yield session
        await session.commit()
    except Exception:
        await session.rollback()
        raise
    finally:
        await session.close()

@pytest.mark.asyncio
async def test_transaction_success() -> None:
    """Test successful transaction."""
    async with database_transaction() as session:
        await session.execute("INSERT INTO orders VALUES (1, 'test')")

    # Verify commit was called

@pytest.mark.asyncio
async def test_transaction_rollback() -> None:
    """Test transaction rollback on error."""
    with pytest.raises(ValueError):
        async with database_transaction() as session:
            await session.execute("INSERT INTO orders VALUES (1, 'test')")
            raise ValueError("Simulated error")

    # Verify rollback was called
python
from contextlib import asynccontextmanager

@asynccontextmanager
async def database_transaction():
    """用于事务的异步上下文管理器。"""
    session = Session()
    try:
        yield session
        await session.commit()
    except Exception:
        await session.rollback()
        raise
    finally:
        await session.close()

@pytest.mark.asyncio
async def test_transaction_success() -> None:
    """测试成功的事务。"""
    async with database_transaction() as session:
        await session.execute("INSERT INTO orders VALUES (1, 'test')")

    # 验证commit已被调用

@pytest.mark.asyncio
async def test_transaction_rollback() -> None:
    """测试出错时的事务回滚。"""
    with pytest.raises(ValueError):
        async with database_transaction() as session:
            await session.execute("INSERT INTO orders VALUES (1, 'test')")
            raise ValueError("模拟错误")

    # 验证rollback已被调用

Step 7: Test Async Generators

步骤7:测试异步生成器

python
from typing import AsyncIterator

async def fetch_orders_stream() -> AsyncIterator[dict]:
    """Async generator yielding orders."""
    for i in range(10):
        yield {"id": str(i), "total": 100.0 * i}

@pytest.mark.asyncio
async def test_async_generator() -> None:
    """Test async generator consumption."""
    orders = []

    async for order in fetch_orders_stream():
        orders.append(order)

    assert len(orders) == 10
    assert orders[0]["id"] == "0"
    assert orders[9]["total"] == 900.0

@pytest.mark.asyncio
async def test_async_generator_with_early_termination() -> None:
    """Test breaking out of async generator."""
    orders = []

    async for order in fetch_orders_stream():
        orders.append(order)
        if len(orders) >= 5:
            break

    assert len(orders) == 5
python
from typing import AsyncIterator

async def fetch_orders_stream() -> AsyncIterator[dict]:
    """生成订单的异步生成器。"""
    for i in range(10):
        yield {"id": str(i), "total": 100.0 * i}

@pytest.mark.asyncio
async def test_async_generator() -> None:
    """测试异步生成器的消费。"""
    orders = []

    async for order in fetch_orders_stream():
        orders.append(order)

    assert len(orders) == 10
    assert orders[0]["id"] == "0"
    assert orders[9]["total"] == 900.0

@pytest.mark.asyncio
async def test_async_generator_with_early_termination() -> None:
    """测试提前终止异步生成器。"""
    orders = []

    async for order in fetch_orders_stream():
        orders.append(order)
        if len(orders) >= 5:
            break

    assert len(orders) == 5

Step 8: Test Kafka Async Clients

步骤8:测试Kafka异步客户端

python
from unittest.mock import AsyncMock

@pytest.fixture
def mock_kafka_producer() -> AsyncMock:
    """Mock confluent-kafka async producer."""
    mock = AsyncMock()
    mock.produce.return_value = None
    mock.flush.return_value = 0  # All delivered
    mock.close.return_value = None
    return mock

@pytest.fixture
def mock_kafka_consumer() -> AsyncMock:
    """Mock Kafka consumer with async iteration."""
    mock = AsyncMock()

    async def fake_consume():
        yield {"key": "order_1", "value": {"id": "1"}}
        yield {"key": "order_2", "value": {"id": "2"}}

    mock.consume.return_value = fake_consume()
    return mock

@pytest.mark.asyncio
async def test_kafka_producer_integration(
    mock_kafka_producer: AsyncMock
) -> None:
    """Test async Kafka producer."""
    producer = KafkaProducerAdapter(mock_kafka_producer)

    await producer.publish("test_topic", "key", {"data": "value"})

    mock_kafka_producer.produce.assert_called_once()
    mock_kafka_producer.flush.assert_called_once()

@pytest.mark.asyncio
async def test_kafka_consumer_integration(
    mock_kafka_consumer: AsyncMock
) -> None:
    """Test async Kafka consumer."""
    consumer = KafkaConsumerAdapter(mock_kafka_consumer)

    messages = []
    async for message in consumer.consume():
        messages.append(message)

    assert len(messages) == 2
python
from unittest.mock import AsyncMock

@pytest.fixture
def mock_kafka_producer() -> AsyncMock:
    """模拟confluent-kafka异步生产者。"""
    mock = AsyncMock()
    mock.produce.return_value = None
    mock.flush.return_value = 0  # 所有消息已投递
    mock.close.return_value = None
    return mock

@pytest.fixture
def mock_kafka_consumer() -> AsyncMock:
    """支持异步迭代的Kafka消费者模拟。"""
    mock = AsyncMock()

    async def fake_consume():
        yield {"key": "order_1", "value": {"id": "1"}}
        yield {"key": "order_2", "value": {"id": "2"}}

    mock.consume.return_value = fake_consume()
    return mock

@pytest.mark.asyncio
async def test_kafka_producer_integration(
    mock_kafka_producer: AsyncMock
) -> None:
    """测试异步Kafka生产者。"""
    producer = KafkaProducerAdapter(mock_kafka_producer)

    await producer.publish("test_topic", "key", {"data": "value"})

    mock_kafka_producer.produce.assert_called_once()
    mock_kafka_producer.flush.assert_called_once()

@pytest.mark.asyncio
async def test_kafka_consumer_integration(
    mock_kafka_consumer: AsyncMock
) -> None:
    """测试异步Kafka消费者。"""
    consumer = KafkaConsumerAdapter(mock_kafka_consumer)

    messages = []
    async for message in consumer.consume():
        messages.append(message)

    assert len(messages) == 2

Step 9: Handle Event Loop Cleanup

步骤9:处理事件循环清理

python
import asyncio

@pytest.fixture
async def cleanup_tasks():
    """Ensure all async tasks are cleaned up."""
    yield

    # Cancel any pending tasks
    pending = asyncio.all_tasks()
    for task in pending:
        task.cancel()

    # Wait for cancellation to complete
    await asyncio.gather(*pending, return_exceptions=True)
python
import asyncio

@pytest.fixture
async def cleanup_tasks():
    """确保所有异步任务被清理。"""
    yield

    # 取消所有待处理任务
    pending = asyncio.all_tasks()
    for task in pending:
        task.cancel()

    # 等待取消操作完成
    await asyncio.gather(*pending, return_exceptions=True)

Step 10: Test with Timeouts for Long-Running Async Operations

步骤10:为长时间运行的异步操作设置超时测试

python
import pytest

@pytest.mark.asyncio
@pytest.mark.timeout(5)  # 5-second timeout
async def test_fetch_with_timeout() -> None:
    """Test that long-running operation completes in time."""
    result = await fetch_data()
    assert result is not None

@pytest.mark.asyncio
async def test_with_asyncio_wait_for() -> None:
    """Test with asyncio.wait_for for timeout."""
    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(long_running_op(), timeout=0.1)
python
import pytest

@pytest.mark.asyncio
@pytest.mark.timeout(5)  # 5秒超时
async def test_fetch_with_timeout() -> None:
    """测试长时间运行的操作是否在规定时间内完成。"""
    result = await fetch_data()
    assert result is not None

@pytest.mark.asyncio
async def test_with_asyncio_wait_for() -> None:
    """使用asyncio.wait_for进行超时测试。"""
    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(long_running_op(), timeout=0.1)

Examples

示例

Example 1: Complete FastAPI Test

示例1:完整的FastAPI测试

python
from httpx import AsyncClient, ASGITransport
from unittest.mock import AsyncMock
import pytest

from app.reporting.infrastructure.fastapi_app import app

@pytest.fixture
async def test_client() -> AsyncGenerator[AsyncClient, None]:
    """Async HTTP client for FastAPI testing."""
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as client:
        yield client

@pytest.fixture
def mock_query_use_case(monkeypatch: Any) -> AsyncMock:
    """Mock use case dependency in FastAPI."""
    mock = AsyncMock()

    from app.reporting.infrastructure import dependencies
    monkeypatch.setattr(
        dependencies,
        "get_query_use_case",
        lambda: mock
    )

    return mock

class TestTopProductsEndpoint:
    """Test reporting API endpoint."""

    @pytest.mark.asyncio
    async def test_get_top_products_success(
        self,
        test_client: AsyncClient,
        mock_query_use_case: AsyncMock,
    ) -> None:
        """Test successful response."""
        mock_query_use_case.execute.return_value = [
            ProductRanking(title="Laptop", rank=Rank(1), cnt_bought=100),
            ProductRanking(title="Mouse", rank=Rank(2), cnt_bought=50),
        ]

        response = await test_client.get("/top-products?count=10")

        assert response.status_code == 200
        data = response.json()
        assert len(data) == 2

    @pytest.mark.asyncio
    async def test_get_top_products_unavailable(
        self,
        test_client: AsyncClient,
        mock_query_use_case: AsyncMock,
    ) -> None:
        """Test 503 when data not available."""
        from app.reporting.application.exceptions import DataNotAvailableException

        mock_query_use_case.execute.side_effect = DataNotAvailableException(
            "ClickHouse not ready"
        )

        response = await test_client.get("/top-products?count=10")

        assert response.status_code == 503
python
from httpx import AsyncClient, ASGITransport
from unittest.mock import AsyncMock
import pytest

from app.reporting.infrastructure.fastapi_app import app

@pytest.fixture
async def test_client() -> AsyncGenerator[AsyncClient, None]:
    """用于FastAPI测试的异步HTTP客户端。"""
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as client:
        yield client

@pytest.fixture
def mock_query_use_case(monkeypatch: Any) -> AsyncMock:
    """模拟FastAPI中的用例依赖。"""
    mock = AsyncMock()

    from app.reporting.infrastructure import dependencies
    monkeypatch.setattr(
        dependencies,
        "get_query_use_case",
        lambda: mock
    )

    return mock

class TestTopProductsEndpoint:
    """测试报表API接口。"""

    @pytest.mark.asyncio
    async def test_get_top_products_success(
        self,
        test_client: AsyncClient,
        mock_query_use_case: AsyncMock,
    ) -> None:
        """测试成功响应。"""
        mock_query_use_case.execute.return_value = [
            ProductRanking(title="Laptop", rank=Rank(1), cnt_bought=100),
            ProductRanking(title="Mouse", rank=Rank(2), cnt_bought=50),
        ]

        response = await test_client.get("/top-products?count=10")

        assert response.status_code == 200
        data = response.json()
        assert len(data) == 2

    @pytest.mark.asyncio
    async def test_get_top_products_unavailable(
        self,
        test_client: AsyncClient,
        mock_query_use_case: AsyncMock,
    ) -> None:
        """测试数据不可用时返回503。"""
        from app.reporting.application.exceptions import DataNotAvailableException

        mock_query_use_case.execute.side_effect = DataNotAvailableException(
            "ClickHouse not ready"
        )

        response = await test_client.get("/top-products?count=10")

        assert response.status_code == 503

Example 2: Testing Async Use Case with Streaming

示例2:测试带流处理的异步用例

python
@pytest.mark.asyncio
async def test_extract_orders_with_stream(
    mock_gateway: AsyncMock,
    mock_publisher: AsyncMock,
) -> None:
    """Test use case that processes streaming orders."""
    # Mock async generator
    async def fake_orders():
        for i in range(100):
            yield create_test_order(order_id=str(i))

    mock_gateway.fetch_orders.return_value = fake_orders()

    use_case = ExtractOrdersUseCase(
        gateway=mock_gateway,
        publisher=mock_publisher
    )

    result = await use_case.execute()

    assert result.orders_count == 100
    assert mock_publisher.publish_order.call_count == 100
python
@pytest.mark.asyncio
async def test_extract_orders_with_stream(
    mock_gateway: AsyncMock,
    mock_publisher: AsyncMock,
) -> None:
    """测试处理流式订单的用例。"""
    # 模拟异步生成器
    async def fake_orders():
        for i in range(100):
            yield create_test_order(order_id=str(i))

    mock_gateway.fetch_orders.return_value = fake_orders()

    use_case = ExtractOrdersUseCase(
        gateway=mock_gateway,
        publisher=mock_publisher
    )

    result = await use_case.execute()

    assert result.orders_count == 100
    assert mock_publisher.publish_order.call_count == 100

Example 3: Async Fixture with Parametrization

示例3:带参数化的异步Fixture

python
@pytest.fixture(params=["http://localhost:8000", "https://api.example.com"])
async def api_client(request: pytest.FixtureRequest) -> AsyncGenerator[AsyncClient, None]:
    """Parametrized async client for different endpoints."""
    async with AsyncClient(base_url=request.param) as client:
        yield client

@pytest.mark.asyncio
async def test_against_multiple_endpoints(api_client: AsyncClient) -> None:
    """Test runs against each endpoint."""
    response = await api_client.get("/health")
    assert response.status_code == 200
python
@pytest.fixture(params=["http://localhost:8000", "https://api.example.com"])
async def api_client(request: pytest.FixtureRequest) -> AsyncGenerator[AsyncClient, None]:
    """针对不同端点的参数化异步客户端。"""
    async with AsyncClient(base_url=request.param) as client:
        yield client

@pytest.mark.asyncio
async def test_against_multiple_endpoints(api_client: AsyncClient) -> None:
    """测试在每个端点上运行。"""
    response = await api_client.get("/health")
    assert response.status_code == 200

Requirements

依赖要求

  • Python 3.11+
  • pytest >= 7.0
  • pytest-asyncio >= 0.20.0
  • httpx (for AsyncClient)
  • asyncio (standard library)
  • Python 3.11+
  • pytest >= 7.0
  • pytest-asyncio >= 0.20.0
  • httpx(用于AsyncClient)
  • asyncio(标准库)

See Also

相关链接

  • pytest-mocking-strategy - Async mocking with AsyncMock
  • pytest-configuration - pytest setup
  • PYTHON_UNIT_TESTING_BEST_PRACTICES.md - Section: "Async Testing Patterns"
  • PROJECT_UNIT_TESTING_STRATEGY.md - Section: "Async Testing Strategy"
  • pytest-mocking-strategy - 使用AsyncMock进行异步模拟
  • pytest-configuration - pytest配置
  • PYTHON_UNIT_TESTING_BEST_PRACTICES.md - 章节:"异步测试模式"
  • PROJECT_UNIT_TESTING_STRATEGY.md - 章节:"异步测试策略"