integration-testing
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseIntegration Testing
集成测试
Overview
概述
Integration testing validates that different components, modules, or services work correctly together. Unlike unit tests that isolate single functions, integration tests verify the interactions between multiple parts of your system including databases, APIs, external services, and infrastructure.
集成测试用于验证不同组件、模块或服务能否协同正常工作。与隔离单个函数的单元测试不同,集成测试会验证系统中多个部分之间的交互,包括数据库、API、外部服务和基础设施。
When to Use
适用场景
- Testing API endpoints with real database connections
- Verifying service-to-service communication
- Validating data flow across multiple layers
- Testing repository/DAO layer with actual databases
- Checking authentication and authorization flows
- Verifying message queue consumers and producers
- Testing third-party service integrations
- 使用真实数据库连接测试API端点
- 验证服务间的通信
- 验证跨多层的数据流向
- 结合实际数据库测试仓库/DAO层
- 检查认证与授权流程
- 验证消息队列的消费者与生产者
- 测试第三方服务集成
Instructions
操作指南
1. API Integration Testing
1. API集成测试
Express/Node.js with Jest and Supertest
基于Express/Node.js与Jest、Supertest的实现
javascript
// test/api/users.integration.test.js
const request = require('supertest');
const app = require('../../src/app');
const { setupTestDB, teardownTestDB } = require('../helpers/db');
describe('User API Integration Tests', () => {
beforeAll(async () => {
await setupTestDB();
});
afterAll(async () => {
await teardownTestDB();
});
beforeEach(async () => {
await clearUsers();
});
describe('POST /api/users', () => {
it('should create a new user with valid data', async () => {
const userData = {
email: 'test@example.com',
name: 'Test User',
password: 'SecurePass123!'
};
const response = await request(app)
.post('/api/users')
.send(userData)
.expect(201);
expect(response.body).toMatchObject({
id: expect.any(String),
email: userData.email,
name: userData.name
});
expect(response.body.password).toBeUndefined();
// Verify in database
const user = await User.findById(response.body.id);
expect(user).toBeTruthy();
expect(user.email).toBe(userData.email);
});
it('should reject duplicate email addresses', async () => {
const userData = { email: 'test@example.com', name: 'Test', password: 'pass' };
await request(app).post('/api/users').send(userData).expect(201);
const response = await request(app)
.post('/api/users')
.send(userData)
.expect(409);
expect(response.body.error).toMatch(/email.*exists/i);
});
});
describe('GET /api/users/:id', () => {
it('should retrieve user with associated orders', async () => {
const user = await createTestUser();
await createTestOrder({ userId: user.id, total: 99.99 });
const response = await request(app)
.get(`/api/users/${user.id}`)
.set('Authorization', `Bearer ${user.token}`)
.expect(200);
expect(response.body).toMatchObject({
id: user.id,
orders: expect.arrayContaining([
expect.objectContaining({ total: 99.99 })
])
});
});
});
});javascript
// test/api/users.integration.test.js
const request = require('supertest');
const app = require('../../src/app');
const { setupTestDB, teardownTestDB } = require('../helpers/db');
describe('User API Integration Tests', () => {
beforeAll(async () => {
await setupTestDB();
});
afterAll(async () => {
await teardownTestDB();
});
beforeEach(async () => {
await clearUsers();
});
describe('POST /api/users', () => {
it('should create a new user with valid data', async () => {
const userData = {
email: 'test@example.com',
name: 'Test User',
password: 'SecurePass123!'
};
const response = await request(app)
.post('/api/users')
.send(userData)
.expect(201);
expect(response.body).toMatchObject({
id: expect.any(String),
email: userData.email,
name: userData.name
});
expect(response.body.password).toBeUndefined();
// Verify in database
const user = await User.findById(response.body.id);
expect(user).toBeTruthy();
expect(user.email).toBe(userData.email);
});
it('should reject duplicate email addresses', async () => {
const userData = { email: 'test@example.com', name: 'Test', password: 'pass' };
await request(app).post('/api/users').send(userData).expect(201);
const response = await request(app)
.post('/api/users')
.send(userData)
.expect(409);
expect(response.body.error).toMatch(/email.*exists/i);
});
});
describe('GET /api/users/:id', () => {
it('should retrieve user with associated orders', async () => {
const user = await createTestUser();
await createTestOrder({ userId: user.id, total: 99.99 });
const response = await request(app)
.get(`/api/users/${user.id}`)
.set('Authorization', `Bearer ${user.token}`)
.expect(200);
expect(response.body).toMatchObject({
id: user.id,
orders: expect.arrayContaining([
expect.objectContaining({ total: 99.99 })
])
});
});
});
});FastAPI/Python with pytest
基于FastAPI/Python与pytest的实现
python
undefinedpython
undefinedtests/integration/test_user_api.py
tests/integration/test_user_api.py
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.main import app
from app.models import User
from tests.conftest import test_db
@pytest.mark.asyncio
class TestUserAPI:
async def test_create_user_integration(
self,
client: AsyncClient,
db: AsyncSession
):
"""Test user creation with database persistence."""
user_data = {
"email": "test@example.com",
"name": "Test User",
"password": "SecurePass123!"
}
response = await client.post("/api/users", json=user_data)
assert response.status_code == 201
data = response.json()
assert data["email"] == user_data["email"]
assert "password" not in data
# Verify in database
result = await db.execute(
select(User).where(User.email == user_data["email"])
)
user = result.scalar_one()
assert user is not None
assert user.name == user_data["name"]
async def test_user_with_relationships(
self,
client: AsyncClient,
db: AsyncSession
):
"""Test retrieving user with related data."""
# Setup: Create user with orders
user = await create_test_user(db)
await create_test_order(db, user_id=user.id, total=99.99)
# Test: Fetch user with orders
response = await client.get(
f"/api/users/{user.id}",
headers={"Authorization": f"Bearer {user.token}"}
)
assert response.status_code == 200
data = response.json()
assert data["id"] == user.id
assert len(data["orders"]) == 1
assert data["orders"][0]["total"] == 99.99undefinedimport pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from app.main import app
from app.models import User
from tests.conftest import test_db
@pytest.mark.asyncio
class TestUserAPI:
async def test_create_user_integration(
self,
client: AsyncClient,
db: AsyncSession
):
"""Test user creation with database persistence."""
user_data = {
"email": "test@example.com",
"name": "Test User",
"password": "SecurePass123!"
}
response = await client.post("/api/users", json=user_data)
assert response.status_code == 201
data = response.json()
assert data["email"] == user_data["email"]
assert "password" not in data
# Verify in database
result = await db.execute(
select(User).where(User.email == user_data["email"])
)
user = result.scalar_one()
assert user is not None
assert user.name == user_data["name"]
async def test_user_with_relationships(
self,
client: AsyncClient,
db: AsyncSession
):
"""Test retrieving user with related data."""
# Setup: Create user with orders
user = await create_test_user(db)
await create_test_order(db, user_id=user.id, total=99.99)
# Test: Fetch user with orders
response = await client.get(
f"/api/users/{user.id}",
headers={"Authorization": f"Bearer {user.token}"}
)
assert response.status_code == 200
data = response.json()
assert data["id"] == user.id
assert len(data["orders"]) == 1
assert data["orders"][0]["total"] == 99.99undefined2. Database Integration Testing
2. 数据库集成测试
Spring Boot with JUnit
基于Spring Boot与JUnit的实现
java
// src/test/java/com/example/integration/UserRepositoryIntegrationTest.java
@SpringBootTest
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@TestPropertySource(locations = "classpath:application-test.properties")
class UserRepositoryIntegrationTest {
@Autowired
private UserRepository userRepository;
@Autowired
private OrderRepository orderRepository;
@Autowired
private TestEntityManager entityManager;
@BeforeEach
void setUp() {
orderRepository.deleteAll();
userRepository.deleteAll();
}
@Test
@Transactional
void testSaveUserWithOrders() {
// Given
User user = new User();
user.setEmail("test@example.com");
user.setName("Test User");
Order order1 = new Order();
order1.setTotal(new BigDecimal("99.99"));
order1.setUser(user);
Order order2 = new Order();
order2.setTotal(new BigDecimal("49.99"));
order2.setUser(user);
user.setOrders(Arrays.asList(order1, order2));
// When
User savedUser = userRepository.save(user);
entityManager.flush();
entityManager.clear();
// Then
User foundUser = userRepository.findById(savedUser.getId())
.orElseThrow();
assertThat(foundUser.getEmail()).isEqualTo("test@example.com");
assertThat(foundUser.getOrders()).hasSize(2);
assertThat(foundUser.getOrders())
.extracting(Order::getTotal)
.containsExactlyInAnyOrder(
new BigDecimal("99.99"),
new BigDecimal("49.99")
);
}
@Test
void testCustomQueryWithJoins() {
// Given
User user = createTestUser("test@example.com");
createTestOrder(user, new BigDecimal("150.00"));
// When
List<User> highValueUsers = userRepository
.findUsersWithOrdersAbove(new BigDecimal("100.00"));
// Then
assertThat(highValueUsers).hasSize(1);
assertThat(highValueUsers.get(0).getEmail())
.isEqualTo("test@example.com");
}
}java
// src/test/java/com/example/integration/UserRepositoryIntegrationTest.java
@SpringBootTest
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@TestPropertySource(locations = "classpath:application-test.properties")
class UserRepositoryIntegrationTest {
@Autowired
private UserRepository userRepository;
@Autowired
private OrderRepository orderRepository;
@Autowired
private TestEntityManager entityManager;
@BeforeEach
void setUp() {
orderRepository.deleteAll();
userRepository.deleteAll();
}
@Test
@Transactional
void testSaveUserWithOrders() {
// Given
User user = new User();
user.setEmail("test@example.com");
user.setName("Test User");
Order order1 = new Order();
order1.setTotal(new BigDecimal("99.99"));
order1.setUser(user);
Order order2 = new Order();
order2.setTotal(new BigDecimal("49.99"));
order2.setUser(user);
user.setOrders(Arrays.asList(order1, order2));
// When
User savedUser = userRepository.save(user);
entityManager.flush();
entityManager.clear();
// Then
User foundUser = userRepository.findById(savedUser.getId())
.orElseThrow();
assertThat(foundUser.getEmail()).isEqualTo("test@example.com");
assertThat(foundUser.getOrders()).hasSize(2);
assertThat(foundUser.getOrders())
.extracting(Order::getTotal)
.containsExactlyInAnyOrder(
new BigDecimal("99.99"),
new BigDecimal("49.99")
);
}
@Test
void testCustomQueryWithJoins() {
// Given
User user = createTestUser("test@example.com");
createTestOrder(user, new BigDecimal("150.00"));
// When
List<User> highValueUsers = userRepository
.findUsersWithOrdersAbove(new BigDecimal("100.00"));
// Then
assertThat(highValueUsers).hasSize(1);
assertThat(highValueUsers.get(0).getEmail())
.isEqualTo("test@example.com");
}
}3. External Service Integration
3. 外部服务集成测试
Testing with Test Containers
基于Test Containers的测试
javascript
// test/integration/payment-service.test.js
const { GenericContainer } = require('testcontainers');
const PaymentService = require('../../src/services/payment');
describe('Payment Service Integration', () => {
let container;
let paymentService;
beforeAll(async () => {
// Start PostgreSQL container
container = await new GenericContainer('postgres:14')
.withEnvironment({
POSTGRES_DB: 'test',
POSTGRES_USER: 'test',
POSTGRES_PASSWORD: 'test'
})
.withExposedPorts(5432)
.start();
const connectionString = `postgresql://test:test@${container.getHost()}:${container.getMappedPort(5432)}/test`;
paymentService = new PaymentService(connectionString);
await paymentService.initialize();
}, 60000);
afterAll(async () => {
await paymentService.close();
await container.stop();
});
test('should process payment and update database', async () => {
const payment = {
orderId: 'order-123',
amount: 99.99,
currency: 'USD',
paymentMethod: 'credit_card'
};
const result = await paymentService.processPayment(payment);
expect(result.status).toBe('completed');
expect(result.transactionId).toBeDefined();
// Verify in database
const stored = await paymentService.getPayment(result.id);
expect(stored.orderId).toBe('order-123');
expect(stored.status).toBe('completed');
});
});javascript
// test/integration/payment-service.test.js
const { GenericContainer } = require('testcontainers');
const PaymentService = require('../../src/services/payment');
describe('Payment Service Integration', () => {
let container;
let paymentService;
beforeAll(async () => {
// Start PostgreSQL container
container = await new GenericContainer('postgres:14')
.withEnvironment({
POSTGRES_DB: 'test',
POSTGRES_USER: 'test',
POSTGRES_PASSWORD: 'test'
})
.withExposedPorts(5432)
.start();
const connectionString = `postgresql://test:test@${container.getHost()}:${container.getMappedPort(5432)}/test`;
paymentService = new PaymentService(connectionString);
await paymentService.initialize();
}, 60000);
afterAll(async () => {
await paymentService.close();
await container.stop();
});
test('should process payment and update database', async () => {
const payment = {
orderId: 'order-123',
amount: 99.99,
currency: 'USD',
paymentMethod: 'credit_card'
};
const result = await paymentService.processPayment(payment);
expect(result.status).toBe('completed');
expect(result.transactionId).toBeDefined();
// Verify in database
const stored = await paymentService.getPayment(result.id);
expect(stored.orderId).toBe('order-123');
expect(stored.status).toBe('completed');
});
});4. Message Queue Integration
4. 消息队列集成测试
python
undefinedpython
undefinedtests/integration/test_message_queue.py
tests/integration/test_message_queue.py
import pytest
from unittest.mock import patch
import json
from app.queue import MessageQueue
from app.workers import OrderProcessor
@pytest.mark.integration
class TestMessageQueueIntegration:
@pytest.fixture
async def queue(self):
"""Create test message queue."""
queue = MessageQueue(url=TEST_RABBITMQ_URL)
await queue.connect()
yield queue
await queue.close()
async def test_publish_and_consume_message(self, queue):
"""Test full message lifecycle."""
received_messages = []
async def message_handler(message):
received_messages.append(message)
# Subscribe to queue
await queue.subscribe('orders', message_handler)
# Publish message
order_data = {
'order_id': '123',
'customer': 'test@example.com',
'total': 99.99
}
await queue.publish('orders', order_data)
# Wait for message processing
await asyncio.sleep(0.5)
assert len(received_messages) == 1
assert received_messages[0]['order_id'] == '123'
async def test_order_processing_workflow(self, queue, db):
"""Test complete order processing through queue."""
processor = OrderProcessor(queue, db)
await processor.start()
# Publish order
order = await create_test_order(db, status='pending')
await queue.publish('orders.new', {'order_id': order.id})
# Wait for processing
await asyncio.sleep(1)
# Verify order was processed
await db.refresh(order)
assert order.status == 'processing'
assert order.processed_at is not Noneundefinedimport pytest
from unittest.mock import patch
import json
from app.queue import MessageQueue
from app.workers import OrderProcessor
@pytest.mark.integration
class TestMessageQueueIntegration:
@pytest.fixture
async def queue(self):
"""Create test message queue."""
queue = MessageQueue(url=TEST_RABBITMQ_URL)
await queue.connect()
yield queue
await queue.close()
async def test_publish_and_consume_message(self, queue):
"""Test full message lifecycle."""
received_messages = []
async def message_handler(message):
received_messages.append(message)
# Subscribe to queue
await queue.subscribe('orders', message_handler)
# Publish message
order_data = {
'order_id': '123',
'customer': 'test@example.com',
'total': 99.99
}
await queue.publish('orders', order_data)
# Wait for message processing
await asyncio.sleep(0.5)
assert len(received_messages) == 1
assert received_messages[0]['order_id'] == '123'
async def test_order_processing_workflow(self, queue, db):
"""Test complete order processing through queue."""
processor = OrderProcessor(queue, db)
await processor.start()
# Publish order
order = await create_test_order(db, status='pending')
await queue.publish('orders.new', {'order_id': order.id})
# Wait for processing
await asyncio.sleep(1)
# Verify order was processed
await db.refresh(order)
assert order.status == 'processing'
assert order.processed_at is not NoneundefinedTesting Patterns
测试模式
Test Data Management
测试数据管理
python
undefinedpython
undefinedconftest.py - Shared fixtures
conftest.py - Shared fixtures
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
@pytest.fixture(scope="session")
async def engine():
"""Create test database engine."""
engine = create_async_engine(TEST_DATABASE_URL)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest.fixture
async def db(engine):
"""Create database session for each test."""
async with AsyncSession(engine) as session:
yield session
await session.rollback()
@pytest.fixture
async def client(db):
"""Create test HTTP client."""
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
undefinedimport pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
@pytest.fixture(scope="session")
async def engine():
"""Create test database engine."""
engine = create_async_engine(TEST_DATABASE_URL)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await engine.dispose()
@pytest.fixture
async def db(engine):
"""Create database session for each test."""
async with AsyncSession(engine) as session:
yield session
await session.rollback()
@pytest.fixture
async def client(db):
"""Create test HTTP client."""
async with AsyncClient(app=app, base_url="http://test") as client:
yield client
undefinedBest Practices
最佳实践
✅ DO
✅ 建议
- Use real databases in integration tests (in-memory or containers)
- Test actual HTTP requests, not mocked responses
- Verify database state after operations
- Test transaction boundaries and rollbacks
- Include authentication/authorization in tests
- Test error scenarios and edge cases
- Use test containers for isolated environments
- Clean up data between tests
- 在集成测试中使用真实数据库(内存数据库或容器化数据库)
- 测试实际HTTP请求,而非模拟响应
- 操作后验证数据库状态
- 测试事务边界与回滚逻辑
- 在测试中包含认证/授权流程
- 测试错误场景与边缘情况
- 使用测试容器搭建隔离环境
- 在测试间清理数据
❌ DON'T
❌ 不建议
- Mock database connections in integration tests
- Skip testing error paths
- Leave test data in databases
- Use production databases for testing
- Ignore transaction management
- Test only happy paths
- Share state between tests
- Hardcode URLs or credentials
- 在集成测试中模拟数据库连接
- 跳过错误路径测试
- 遗留测试数据在数据库中
- 使用生产数据库进行测试
- 忽略事务管理
- 仅测试正常流程
- 在测试间共享状态
- 硬编码URL或凭证
Tools
工具
- Node.js: Supertest, Jest, Testcontainers
- Python: pytest, httpx, pytest-asyncio, Testcontainers
- Java: Spring Test, TestContainers, RestAssured
- Database: Testcontainers, in-memory DBs (H2, SQLite)
- Mocking Services: WireMock, MockServer, Localstack
- Node.js:Supertest、Jest、Testcontainers
- Python:pytest、httpx、pytest-asyncio、Testcontainers
- Java:Spring Test、TestContainers、RestAssured
- 数据库:Testcontainers、内存数据库(H2、SQLite)
- 服务模拟:WireMock、MockServer、Localstack
Common Patterns
常见模式
javascript
// Test helper for database setup
class TestDatabase {
static async setup() {
await db.migrate.latest();
}
static async teardown() {
await db.destroy();
}
static async clear() {
const tables = ['orders', 'users', 'products'];
for (const table of tables) {
await db(table).truncate();
}
}
}
// Factory pattern for test data
class TestDataFactory {
static async createUser(overrides = {}) {
const defaults = {
email: `user-${Date.now()}@test.com`,
name: 'Test User',
role: 'customer'
};
return await User.create({ ...defaults, ...overrides });
}
static async createOrder(userId, overrides = {}) {
const defaults = {
userId,
status: 'pending',
total: 99.99
};
return await Order.create({ ...defaults, ...overrides });
}
}javascript
// Test helper for database setup
class TestDatabase {
static async setup() {
await db.migrate.latest();
}
static async teardown() {
await db.destroy();
}
static async clear() {
const tables = ['orders', 'users', 'products'];
for (const table of tables) {
await db(table).truncate();
}
}
}
// Factory pattern for test data
class TestDataFactory {
static async createUser(overrides = {}) {
const defaults = {
email: `user-${Date.now()}@test.com`,
name: 'Test User',
role: 'customer'
};
return await User.create({ ...defaults, ...overrides });
}
static async createOrder(userId, overrides = {}) {
const defaults = {
userId,
status: 'pending',
total: 99.99
};
return await Order.create({ ...defaults, ...overrides });
}
}Examples
相关示例
See also: test-data-generation, mocking-stubbing, continuous-testing skills for related testing patterns.
另可参考:测试数据生成、模拟与存根、持续测试等相关测试模式。