Loading...
Loading...
Sets up async tests with proper fixtures and mocks using pytest-asyncio patterns. Use when testing async functions, creating async fixtures, mocking async services, or handling async context managers. Covers @pytest_asyncio.fixture, AsyncMock with side_effect, async generator fixtures (yield), and testing async context managers. Works with Python async/await patterns, pytest-asyncio, and unittest.mock.AsyncMock.
npx skill4agent add dawiddutoit/custom-claude test-setup-async# Testing an async function with mocked dependencies
import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_my_async_function():
mock_service = AsyncMock()
mock_service.fetch_data.return_value = {"result": "success"}
result = await my_async_function(mock_service)
assert result == {"result": "success"}
mock_service.fetch_data.assert_awaited_once()async defawaitasync withasync forasync withuv pip install pytest-asynciopyproject.toml[tool.pytest.ini_options]
asyncio_mode = "auto" # Auto-detect async testsimport pytest_asyncio
from collections.abc import AsyncGenerator
@pytest_asyncio.fixture
async def database_connection() -> AsyncGenerator[Connection, None]:
"""Create database connection with cleanup."""
conn = await create_connection()
yield conn
await conn.close()@pytest.fixture
def mock_service():
"""Create mock service (non-async fixture for mocks)."""
from unittest.mock import AsyncMock
mock = AsyncMock()
mock.fetch_data.return_value = {"data": "test"}
return mockmock_service = AsyncMock()
mock_service.process.return_value = "result"mock_service.process.side_effect = Exception("Connection failed")mock_service.fetch.side_effect = [
{"batch": 1},
{"batch": 2},
{"batch": 3}
]async def custom_behavior(arg):
if arg == "fail":
raise ValueError("Invalid")
return f"processed_{arg}"
mock_service.process.side_effect = custom_behaviorfrom unittest.mock import AsyncMock
@pytest.fixture
def mock_driver():
"""Mock Neo4j driver with async context manager."""
driver = MagicMock() # Driver itself is sync
# Create async session mock
mock_session = AsyncMock()
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=None)
# Configure session behavior
mock_result = AsyncMock()
mock_result.data = AsyncMock(return_value=[{"node": "data"}])
mock_session.run = AsyncMock(return_value=mock_result)
# Driver returns session
driver.session = MagicMock(return_value=mock_session)
return driver@pytest.mark.asyncio
async def test_with_session(mock_driver):
async with mock_driver.session() as session:
result = await session.run("MATCH (n) RETURN n")
data = await result.data()
assert data == [{"node": "data"}]
mock_driver.session.assert_called_once()mock_service.fetch_data.assert_awaited_once()
mock_service.fetch_data.assert_awaited_once_with(arg="value")
mock_service.fetch_data.assert_awaited() # At least onceassert mock_service.fetch_data.await_count == 3mock_service.error_handler.assert_not_awaited()import pytest_asyncio
from collections.abc import AsyncGenerator
from neo4j import AsyncDriver, AsyncGraphDatabase
@pytest_asyncio.fixture(scope="function")
async def neo4j_driver() -> AsyncGenerator[AsyncDriver, None]:
"""Create Neo4j driver with cleanup."""
driver = AsyncGraphDatabase.driver(
"bolt://localhost:7687",
auth=("neo4j", "password")
)
# Setup: Clear test data
async with driver.session() as session:
await session.run("MATCH (n:TestNode) DETACH DELETE n")
yield driver
# Cleanup: Close driver
await driver.close()import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_batch_processing():
mock_api = AsyncMock()
# First call succeeds, second fails, third succeeds
mock_api.fetch_batch.side_effect = [
{"batch": 1, "items": [1, 2, 3]},
Exception("Rate limit exceeded"),
{"batch": 2, "items": [4, 5, 6]}
]
# Process first batch
result1 = await mock_api.fetch_batch()
assert result1["batch"] == 1
# Second call raises exception
with pytest.raises(Exception, match="Rate limit"):
await mock_api.fetch_batch()
# Third call succeeds
result3 = await mock_api.fetch_batch()
assert result3["batch"] == 2
assert mock_api.fetch_batch.await_count == 3import pytest
import pytest_asyncio
from unittest.mock import AsyncMock
from project_watch_mcp.domain.values.service_result import ServiceResult
@pytest.fixture
def mock_embedding_service():
"""Create mock embedding service."""
service = AsyncMock()
service.get_embeddings.return_value = ServiceResult.success(
[[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]]
)
return service
@pytest.fixture
def mock_repository():
"""Create mock code repository."""
repo = AsyncMock()
repo.store_chunks.return_value = ServiceResult.success(True)
return repo
@pytest.mark.asyncio
async def test_chunking_service(mock_embedding_service, mock_repository):
"""Test chunking service with async dependencies."""
from project_watch_mcp.application.services.chunking_service import ChunkingService
# Create service with mocked dependencies
service = ChunkingService(
embedding_service=mock_embedding_service,
repository=mock_repository,
settings=mock_settings
)
# Execute async operation
result = await service.process_file("test.py")
# Verify async calls
mock_embedding_service.get_embeddings.assert_awaited_once()
mock_repository.store_chunks.assert_awaited_once()
assert result.is_success()import pytest
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_error_handling():
"""Test service handles async errors correctly."""
mock_db = AsyncMock()
mock_db.execute_query.side_effect = Exception("Connection lost")
service = MyService(database=mock_db)
result = await service.fetch_data()
# Verify ServiceResult.failure() was returned
assert not result.is_success()
assert "Connection lost" in result.error
# Verify retry logic was triggered
assert mock_db.execute_query.await_count == 3 # 3 retriesimport pytest_asyncio
from collections.abc import AsyncGenerator
@pytest_asyncio.fixture
async def resource_pool() -> AsyncGenerator[ResourcePool, None]:
"""Create resource pool with async initialization and cleanup."""
pool = ResourcePool()
# Async initialization
await pool.initialize()
await pool.create_connections(count=5)
yield pool
# Async cleanup
await pool.drain()
await pool.shutdown()
@pytest.mark.asyncio
async def test_resource_acquisition(resource_pool):
"""Test acquiring resource from pool."""
async with resource_pool.acquire() as resource:
result = await resource.execute("SELECT 1")
assert result is not None✅ Async Test Configured
Configuration:
✅ pytest-asyncio installed
✅ pyproject.toml configured (asyncio_mode = "auto")
✅ AsyncMock imported from unittest.mock
Test Results:
✅ All async tests discovered (@pytest.mark.asyncio)
✅ Async fixtures working (setup/teardown with yield)
✅ AsyncMock assertions passing (assert_awaited_once)
✅ No "coroutine was never awaited" warnings
Example test output:
tests/unit/test_service.py::test_async_function PASSED
tests/unit/test_service.py::test_with_async_fixture PASSED❌ Test Failed: RuntimeWarning
Error: RuntimeWarning: coroutine 'AsyncMock' was never awaited
File "tests/unit/test_service.py", line 42
Root Cause:
- Used MagicMock instead of AsyncMock for async method
- Mock configured incorrectly
Fix:
# ❌ WRONG
mock_service.fetch = MagicMock(return_value="result")
# ✅ CORRECT
from unittest.mock import AsyncMock
mock_service.fetch = AsyncMock(return_value="result")pytest-asyncio>=0.21.0pytest>=7.0.0# pyproject.toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
markers = [
"asyncio: mark test as async",
]from unittest.mock import AsyncMock, MagicMock
mock = MagicMock()
mock.sync_method = MagicMock(return_value="sync_result")
mock.async_method = AsyncMock(return_value="async_result")
# Usage
result1 = mock.sync_method() # Sync call
result2 = await mock.async_method() # Async call# Function scope (default) - new instance per test
@pytest_asyncio.fixture(scope="function")
async def function_scoped_resource():
resource = await create_resource()
yield resource
await resource.cleanup()
# Session scope - shared across all tests
@pytest_asyncio.fixture(scope="session")
async def session_scoped_resource():
resource = await create_expensive_resource()
yield resource
await resource.cleanup()@pytest.mark.asyncio
@pytest.mark.parametrize("input_value,expected", [
("test1", "result1"),
("test2", "result2"),
("test3", "result3"),
])
async def test_with_parameters(input_value, expected):
result = await async_function(input_value)
assert result == expectedimport asyncio
import pytest
@pytest.mark.asyncio
async def test_concurrent_execution():
"""Test multiple async operations running concurrently."""
mock_service = AsyncMock()
mock_service.process.side_effect = [
asyncio.sleep(0.1, result="result1"),
asyncio.sleep(0.1, result="result2"),
asyncio.sleep(0.1, result="result3"),
]
# Execute concurrently
results = await asyncio.gather(
mock_service.process("item1"),
mock_service.process("item2"),
mock_service.process("item3"),
)
assert len(results) == 3
assert mock_service.process.await_count == 3async defawaitassert_awaited_once()assert_called_once()@pytest_asyncio.fixtureAsyncMockMagicMockAsyncMock()MagicMock()MagicMockAsyncMockAsyncMocktry/finally@pytest_asyncio.fixture| Metric | Without Skill | With Skill | Improvement |
|---|---|---|---|
| Test Setup Time | 30 min (research) | 5 min (template) | 83% faster |
| Mock Configuration Errors | 40% (wrong mock type) | 5% (correct patterns) | 88% reduction |
| "Coroutine never awaited" Warnings | 20% of tests | 0% (proper AsyncMock) | 100% elimination |
| Async Fixture Cleanup Issues | 30% (leaks) | 0% (proper yield) | 100% fix |
| Test Reliability | 70% (flaky async) | 98% (stable) | 40% improvement |
| Knowledge Transfer Time | 2 hours (learning) | 15 min (examples) | 88% faster |
| Metric | Target | Measurement |
|---|---|---|
| Zero Async Warnings | 100% | pytest output |
| Mock Configuration Accuracy | 100% | AsyncMock validation |
| Fixture Cleanup Success | 100% | Resource leak detection |
| Test Reliability | 98%+ pass rate | CI/CD metrics |
| Pattern Adoption | 90% of async tests | Code review |
# Ensure pytest-asyncio installed
uv pip list | grep pytest-asyncio
# Verify pyproject.toml configuration
cat pyproject.toml | grep asyncio_mode# Check test file has correct decorator
@pytest.mark.asyncio
async def test_name():
...# Verify AsyncMock used for async methods
from unittest.mock import AsyncMock
mock_service.async_method = AsyncMock(return_value=...)# Async fixtures use AsyncGenerator
from collections.abc import AsyncGenerator
@pytest_asyncio.fixture
async def resource() -> AsyncGenerator[Resource, None]:
resource = await create()
yield resource
await resource.cleanup()# Run tests with verbose output
uv run pytest tests/test_async.py -v
# Verify no warnings
# ✓ No "RuntimeWarning: coroutine was never awaited"
# ✓ All async assertions passscripts/python scripts/validate_async_tests.py tests/unit/ --severity warningpython scripts/convert_to_async.py test_file.py --dry-runpython scripts/generate_async_fixture.py neo4j_driver database