stress-testing

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Stress Testing

压力测试(Stress Testing)

Overview

概述

Stress testing pushes systems beyond normal operating capacity to identify breaking points, failure modes, and recovery behavior. It validates system stability under extreme conditions and helps determine maximum capacity before degradation or failure.
压力测试将系统推向超出正常运行容量的状态,以识别断点、故障模式和恢复行为。它验证系统在极端条件下的稳定性,并帮助确定性能下降或故障前的最大容量。

When to Use

适用场景

  • Finding system capacity limits
  • Identifying breaking points
  • Testing auto-scaling behavior
  • Validating error handling under load
  • Testing recovery after failures
  • Planning capacity requirements
  • Verifying graceful degradation
  • Testing spike traffic handling
  • 寻找系统容量极限
  • 识别系统断点
  • 测试自动扩缩容行为
  • 验证负载下的错误处理能力
  • 测试故障后的恢复能力
  • 规划容量需求
  • 验证优雅降级机制
  • 测试峰值流量处理能力

Test Types

测试类型

  • Stress Test: Gradually increase load until failure
  • Spike Test: Sudden large increase in load
  • Soak Test: Sustained high load over extended period
  • Capacity Test: Find maximum sustainable load
  • Volume Test: Large data volumes
  • Scalability Test: Performance at different scales
  • Stress Test(压力测试):逐步增加负载直至系统故障
  • Spike Test(峰值测试):负载突然大幅增加
  • Soak Test(浸泡测试/耐久测试):长时间维持高负载
  • Capacity Test(容量测试):找出系统可维持的最大负载
  • Volume Test(数据量测试):处理大规模数据量
  • Scalability Test(可扩展性测试):不同规模下的性能表现

Instructions

操作指南

1. k6 Stress Testing

1. k6 Stress Testing

javascript
// stress-test.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate } from 'k6/metrics';

const errorRate = new Rate('errors');

export const options = {
  stages: [
    // Stress testing: Progressive load increase
    { duration: '2m', target: 100 },    // Normal load
    { duration: '5m', target: 100 },    // Sustain normal
    { duration: '2m', target: 200 },    // Above normal
    { duration: '5m', target: 200 },    // Sustain above normal
    { duration: '2m', target: 300 },    // Breaking point approaching
    { duration: '5m', target: 300 },    // Sustain high load
    { duration: '2m', target: 400 },    // Beyond capacity
    { duration: '5m', target: 400 },    // System under stress
    { duration: '5m', target: 0 },      // Gradual recovery
  ],
  thresholds: {
    http_req_duration: ['p(99)<1000'],  // 99% under 1s during stress
    http_req_failed: ['rate<0.05'],     // Allow 5% error rate under stress
    errors: ['rate<0.1'],
  },
};

const BASE_URL = __ENV.BASE_URL || 'http://localhost:3000';

export function setup() {
  // Prepare test data
  const res = http.post(`${BASE_URL}/api/auth/login`, {
    email: 'stress-test@example.com',
    password: 'test123',
  });

  return { token: res.json('token') };
}

export default function (data) {
  const headers = {
    Authorization: `Bearer ${data.token}`,
    'Content-Type': 'application/json',
  };

  // Heavy database query
  const productsRes = http.get(
    `${BASE_URL}/api/products?page=1&limit=100`,
    { headers }
  );

  const productsCheck = check(productsRes, {
    'products loaded': (r) => r.status === 200,
    'has products': (r) => r.json('products').length > 0,
  });

  if (!productsCheck) {
    errorRate.add(1);
    console.error(`Products failed: ${productsRes.status} ${productsRes.body}`);
  }

  sleep(1);

  // Write operation - stress database
  const orderPayload = JSON.stringify({
    items: [
      { productId: Math.floor(Math.random() * 100), quantity: 2 },
    ],
  });

  const orderRes = http.post(`${BASE_URL}/api/orders`, orderPayload, {
    headers,
  });

  const orderCheck = check(orderRes, {
    'order created': (r) => r.status === 201 || r.status === 503,
    'response within 5s': (r) => r.timings.duration < 5000,
  });

  if (!orderCheck) {
    errorRate.add(1);
  }

  // Monitor degradation
  if (orderRes.status === 503) {
    console.log('Service unavailable - system at capacity');
  }

  sleep(1);
}

export function teardown(data) {
  // Log final metrics
  console.log('Stress test completed');
}
javascript
// stress-test.js
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate } from 'k6/metrics';

const errorRate = new Rate('errors');

export const options = {
  stages: [
    // Stress testing: Progressive load increase
    { duration: '2m', target: 100 },    // Normal load
    { duration: '5m', target: 100 },    // Sustain normal
    { duration: '2m', target: 200 },    // Above normal
    { duration: '5m', target: 200 },    // Sustain above normal
    { duration: '2m', target: 300 },    // Breaking point approaching
    { duration: '5m', target: 300 },    // Sustain high load
    { duration: '2m', target: 400 },    // Beyond capacity
    { duration: '5m', target: 400 },    // System under stress
    { duration: '5m', target: 0 },      // Gradual recovery
  ],
  thresholds: {
    http_req_duration: ['p(99)<1000'],  // 99% under 1s during stress
    http_req_failed: ['rate<0.05'],     // Allow 5% error rate under stress
    errors: ['rate<0.1'],
  },
};

const BASE_URL = __ENV.BASE_URL || 'http://localhost:3000';

export function setup() {
  // Prepare test data
  const res = http.post(`${BASE_URL}/api/auth/login`, {
    email: 'stress-test@example.com',
    password: 'test123',
  });

  return { token: res.json('token') };
}

export default function (data) {
  const headers = {
    Authorization: `Bearer ${data.token}`,
    'Content-Type': 'application/json',
  };

  // Heavy database query
  const productsRes = http.get(
    `${BASE_URL}/api/products?page=1&limit=100`,
    { headers }
  );

  const productsCheck = check(productsRes, {
    'products loaded': (r) => r.status === 200,
    'has products': (r) => r.json('products').length > 0,
  });

  if (!productsCheck) {
    errorRate.add(1);
    console.error(`Products failed: ${productsRes.status} ${productsRes.body}`);
  }

  sleep(1);

  // Write operation - stress database
  const orderPayload = JSON.stringify({
    items: [
      { productId: Math.floor(Math.random() * 100), quantity: 2 },
    ],
  });

  const orderRes = http.post(`${BASE_URL}/api/orders`, orderPayload, {
    headers,
  });

  const orderCheck = check(orderRes, {
    'order created': (r) => r.status === 201 || r.status === 503,
    'response within 5s': (r) => r.timings.duration < 5000,
  });

  if (!orderCheck) {
    errorRate.add(1);
  }

  // Monitor degradation
  if (orderRes.status === 503) {
    console.log('Service unavailable - system at capacity');
  }

  sleep(1);
}

export function teardown(data) {
  // Log final metrics
  console.log('Stress test completed');
}

2. Spike Testing

2. Spike Testing

javascript
// spike-test.js
import http from 'k6/http';
import { check } from 'k6';

export const options = {
  stages: [
    { duration: '30s', target: 10 },     // Normal baseline
    { duration: '1m', target: 10 },      // Stable baseline
    { duration: '10s', target: 1000 },   // SPIKE! 100x increase
    { duration: '3m', target: 1000 },    // Maintain spike
    { duration: '10s', target: 10 },     // Drop back
    { duration: '3m', target: 10 },      // Recovery period
  ],
  thresholds: {
    http_req_duration: ['p(95)<5000'],   // Allow degradation during spike
    http_req_failed: ['rate<0.1'],       // Allow 10% errors during spike
  },
};

export default function () {
  const res = http.get('http://api.example.com/health');

  check(res, {
    'system responsive': (r) => r.status === 200 || r.status === 429,
    'response received': (r) => r.body.length > 0,
  });
}
javascript
// spike-test.js
import http from 'k6/http';
import { check } from 'k6';

export const options = {
  stages: [
    { duration: '30s', target: 10 },     // Normal baseline
    { duration: '1m', target: 10 },      // Stable baseline
    { duration: '10s', target: 1000 },   // SPIKE! 100x increase
    { duration: '3m', target: 1000 },    // Maintain spike
    { duration: '10s', target: 10 },     // Drop back
    { duration: '3m', target: 10 },      // Recovery period
  ],
  thresholds: {
    http_req_duration: ['p(95)<5000'],   // Allow degradation during spike
    http_req_failed: ['rate<0.1'],       // Allow 10% errors during spike
  },
};

export default function () {
  const res = http.get('http://api.example.com/health');

  check(res, {
    'system responsive': (r) => r.status === 200 || r.status === 429,
    'response received': (r) => r.body.length > 0,
  });
}

3. Soak/Endurance Testing

3. Soak/Endurance Testing

python
undefined
python
undefined

soak_test.py

soak_test.py

import asyncio import aiohttp import time from datetime import datetime, timedelta import psutil import logging
logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)
class SoakTest: """Run sustained load test to detect memory leaks and degradation."""
def __init__(self, url, duration_hours=4, requests_per_second=50):
    self.url = url
    self.duration = timedelta(hours=duration_hours)
    self.rps = requests_per_second
    self.metrics = {
        'requests': 0,
        'errors': 0,
        'response_times': [],
        'memory_usage': [],
    }

async def make_request(self, session):
    """Make single request and record metrics."""
    start = time.time()
    try:
        async with session.get(self.url) as response:
            await response.read()
            duration = time.time() - start

            self.metrics['requests'] += 1
            self.metrics['response_times'].append(duration)

            if response.status >= 400:
                self.metrics['errors'] += 1
                logger.warning(f"Error: {response.status}")

    except Exception as e:
        self.metrics['errors'] += 1
        logger.error(f"Request failed: {e}")

async def worker(self, session):
    """Worker that makes requests at target rate."""
    while self.running:
        await self.make_request(session)
        await asyncio.sleep(1 / self.rps)

def monitor_resources(self):
    """Monitor system resources."""
    process = psutil.Process()
    return {
        'memory_mb': process.memory_info().rss / 1024 / 1024,
        'cpu_percent': process.cpu_percent(),
        'timestamp': datetime.now(),
    }

async def run(self):
    """Execute soak test."""
    start_time = datetime.now()
    end_time = start_time + self.duration
    self.running = True

    logger.info(f"Starting soak test for {self.duration}")
    logger.info(f"Target: {self.rps} req/s to {self.url}")

    async with aiohttp.ClientSession() as session:
        # Start workers
        workers = [
            asyncio.create_task(self.worker(session))
            for _ in range(10)  # 10 concurrent workers
        ]

        # Monitor resources periodically
        while datetime.now() < end_time:
            await asyncio.sleep(60)  # Check every minute

            resources = self.monitor_resources()
            self.metrics['memory_usage'].append(resources)

            # Log progress
            elapsed = (datetime.now() - start_time).total_seconds()
            error_rate = self.metrics['errors'] / max(self.metrics['requests'], 1)
            avg_response = sum(self.metrics['response_times'][-1000:]) / 1000

            logger.info(
                f"Elapsed: {elapsed:.0f}s | "
                f"Requests: {self.metrics['requests']} | "
                f"Error Rate: {error_rate:.2%} | "
                f"Avg Response: {avg_response:.3f}s | "
                f"Memory: {resources['memory_mb']:.1f}MB"
            )

            # Check for memory leak
            if len(self.metrics['memory_usage']) > 10:
                initial_mem = self.metrics['memory_usage'][0]['memory_mb']
                current_mem = resources['memory_mb']
                growth = current_mem - initial_mem

                if growth > 500:  # 500MB growth
                    logger.warning(f"Possible memory leak: +{growth:.1f}MB")

        # Stop workers
        self.running = False
        await asyncio.gather(*workers, return_exceptions=True)

    self.report()

def report(self):
    """Generate test report."""
    total_requests = self.metrics['requests']
    error_rate = self.metrics['errors'] / total_requests if total_requests > 0 else 0
    response_times = self.metrics['response_times']

    print("\n" + "="*60)
    print("SOAK TEST RESULTS")
    print("="*60)
    print(f"Total Requests: {total_requests:,}")
    print(f"Total Errors: {self.metrics['errors']:,}")
    print(f"Error Rate: {error_rate:.2%}")
    print(f"\nResponse Times:")
    print(f"  Min: {min(response_times):.3f}s")
    print(f"  Max: {max(response_times):.3f}s")
    print(f"  Mean: {sum(response_times)/len(response_times):.3f}s")
    print(f"  P95: {sorted(response_times)[int(len(response_times)*0.95)]:.3f}s")

    # Memory analysis
    if self.metrics['memory_usage']:
        initial_mem = self.metrics['memory_usage'][0]['memory_mb']
        final_mem = self.metrics['memory_usage'][-1]['memory_mb']
        growth = final_mem - initial_mem

        print(f"\nMemory Usage:")
        print(f"  Initial: {initial_mem:.1f}MB")
        print(f"  Final: {final_mem:.1f}MB")
        print(f"  Growth: {growth:.1f}MB ({growth/initial_mem*100:.1f}%)")

        if growth > 200:
            print("  ⚠️  Possible memory leak detected!")

    print("="*60)
import asyncio import aiohttp import time from datetime import datetime, timedelta import psutil import logging
logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)
class SoakTest: """Run sustained load test to detect memory leaks and degradation."""
def __init__(self, url, duration_hours=4, requests_per_second=50):
    self.url = url
    self.duration = timedelta(hours=duration_hours)
    self.rps = requests_per_second
    self.metrics = {
        'requests': 0,
        'errors': 0,
        'response_times': [],
        'memory_usage': [],
    }

async def make_request(self, session):
    """Make single request and record metrics."""
    start = time.time()
    try:
        async with session.get(self.url) as response:
            await response.read()
            duration = time.time() - start

            self.metrics['requests'] += 1
            self.metrics['response_times'].append(duration)

            if response.status >= 400:
                self.metrics['errors'] += 1
                logger.warning(f"Error: {response.status}")

    except Exception as e:
        self.metrics['errors'] += 1
        logger.error(f"Request failed: {e}")

async def worker(self, session):
    """Worker that makes requests at target rate."""
    while self.running:
        await self.make_request(session)
        await asyncio.sleep(1 / self.rps)

def monitor_resources(self):
    """Monitor system resources."""
    process = psutil.Process()
    return {
        'memory_mb': process.memory_info().rss / 1024 / 1024,
        'cpu_percent': process.cpu_percent(),
        'timestamp': datetime.now(),
    }

async def run(self):
    """Execute soak test."""
    start_time = datetime.now()
    end_time = start_time + self.duration
    self.running = True

    logger.info(f"Starting soak test for {self.duration}")
    logger.info(f"Target: {self.rps} req/s to {self.url}")

    async with aiohttp.ClientSession() as session:
        # Start workers
        workers = [
            asyncio.create_task(self.worker(session))
            for _ in range(10)  # 10 concurrent workers
        ]

        # Monitor resources periodically
        while datetime.now() < end_time:
            await asyncio.sleep(60)  # Check every minute

            resources = self.monitor_resources()
            self.metrics['memory_usage'].append(resources)

            # Log progress
            elapsed = (datetime.now() - start_time).total_seconds()
            error_rate = self.metrics['errors'] / max(self.metrics['requests'], 1)
            avg_response = sum(self.metrics['response_times'][-1000:]) / 1000

            logger.info(
                f"Elapsed: {elapsed:.0f}s | "
                f"Requests: {self.metrics['requests']} | "
                f"Error Rate: {error_rate:.2%} | "
                f"Avg Response: {avg_response:.3f}s | "
                f"Memory: {resources['memory_mb']:.1f}MB"
            )

            # Check for memory leak
            if len(self.metrics['memory_usage']) > 10:
                initial_mem = self.metrics['memory_usage'][0]['memory_mb']
                current_mem = resources['memory_mb']
                growth = current_mem - initial_mem

                if growth > 500:  # 500MB growth
                    logger.warning(f"Possible memory leak: +{growth:.1f}MB")

        # Stop workers
        self.running = False
        await asyncio.gather(*workers, return_exceptions=True)

    self.report()

def report(self):
    """Generate test report."""
    total_requests = self.metrics['requests']
    error_rate = self.metrics['errors'] / total_requests if total_requests > 0 else 0
    response_times = self.metrics['response_times']

    print("\n" + "="*60)
    print("SOAK TEST RESULTS")
    print("="*60)
    print(f"Total Requests: {total_requests:,}")
    print(f"Total Errors: {self.metrics['errors']:,}")
    print(f"Error Rate: {error_rate:.2%}")
    print(f"\nResponse Times:")
    print(f"  Min: {min(response_times):.3f}s")
    print(f"  Max: {max(response_times):.3f}s")
    print(f"  Mean: {sum(response_times)/len(response_times):.3f}s")
    print(f"  P95: {sorted(response_times)[int(len(response_times)*0.95)]:.3f}s")

    # Memory analysis
    if self.metrics['memory_usage']:
        initial_mem = self.metrics['memory_usage'][0]['memory_mb']
        final_mem = self.metrics['memory_usage'][-1]['memory_mb']
        growth = final_mem - initial_mem

        print(f"\nMemory Usage:")
        print(f"  Initial: {initial_mem:.1f}MB")
        print(f"  Final: {final_mem:.1f}MB")
        print(f"  Growth: {growth:.1f}MB ({growth/initial_mem*100:.1f}%)")

        if growth > 200:
            print("  ⚠️  Possible memory leak detected!")

    print("="*60)

Run soak test

Run soak test

if name == 'main': test = SoakTest( url='http://api.example.com/products', duration_hours=4, requests_per_second=50 ) asyncio.run(test.run())
undefined
if name == 'main': test = SoakTest( url='http://api.example.com/products', duration_hours=4, requests_per_second=50 ) asyncio.run(test.run())
undefined

4. JMeter Stress Test

4. JMeter Stress Test

xml
<!-- stress-test.jmx -->
<jmeterTestPlan>
  <ThreadGroup testname="Stress Test Thread Group">
    <!-- Ultimate Thread Group for advanced load patterns -->
    <elementProp name="ThreadGroup.main_controller">
      <!-- Stage 1: Ramp up to 100 users -->
      <collectionProp name="ultimatethreadgroupdata">
        <stringProp>100</stringProp>  <!-- Users -->
        <stringProp>60</stringProp>   <!-- Ramp-up (sec) -->
        <stringProp>300</stringProp>  <!-- Duration (sec) -->
      </collectionProp>

      <!-- Stage 2: Ramp up to 500 users -->
      <collectionProp name="ultimatethreadgroupdata">
        <stringProp>500</stringProp>
        <stringProp>120</stringProp>
        <stringProp>600</stringProp>
      </collectionProp>

      <!-- Stage 3: Ramp up to 1000 users (stress) -->
      <collectionProp name="ultimatethreadgroupdata">
        <stringProp>1000</stringProp>
        <stringProp>180</stringProp>
        <stringProp>600</stringProp>
      </collectionProp>
    </elementProp>

    <HTTPSamplerProxy testname="Heavy Query">
      <stringProp name="HTTPSampler.domain">api.example.com</stringProp>
      <stringProp name="HTTPSampler.path">/api/search?q=stress</stringProp>
      <stringProp name="HTTPSampler.method">GET</stringProp>
    </HTTPSamplerProxy>

    <!-- Monitor for errors and degradation -->
    <ResponseAssertion testname="Allow 503 During Stress">
      <stringProp name="Assertion.test_field">Assertion.response_code</stringProp>
      <stringProp name="Assertion.test_type">8</stringProp>
      <stringProp>200|503</stringProp>
    </ResponseAssertion>
  </ThreadGroup>
</jmeterTestPlan>
xml
<!-- stress-test.jmx -->
<jmeterTestPlan>
  <ThreadGroup testname="Stress Test Thread Group">
    <!-- Ultimate Thread Group for advanced load patterns -->
    <elementProp name="ThreadGroup.main_controller">
      <!-- Stage 1: Ramp up to 100 users -->
      <collectionProp name="ultimatethreadgroupdata">
        <stringProp>100</stringProp>  <!-- Users -->
        <stringProp>60</stringProp>   <!-- Ramp-up (sec) -->
        <stringProp>300</stringProp>  <!-- Duration (sec) -->
      </collectionProp>

      <!-- Stage 2: Ramp up to 500 users -->
      <collectionProp name="ultimatethreadgroupdata">
        <stringProp>500</stringProp>
        <stringProp>120</stringProp>
        <stringProp>600</stringProp>
      </collectionProp>

      <!-- Stage 3: Ramp up to 1000 users (stress) -->
      <collectionProp name="ultimatethreadgroupdata">
        <stringProp>1000</stringProp>
        <stringProp>180</stringProp>
        <stringProp>600</stringProp>
      </collectionProp>
    </elementProp>

    <HTTPSamplerProxy testname="Heavy Query">
      <stringProp name="HTTPSampler.domain">api.example.com</stringProp>
      <stringProp name="HTTPSampler.path">/api/search?q=stress</stringProp>
      <stringProp name="HTTPSampler.method">GET</stringProp>
    </HTTPSamplerProxy>

    <!-- Monitor for errors and degradation -->
    <ResponseAssertion testname="Allow 503 During Stress">
      <stringProp name="Assertion.test_field">Assertion.response_code</stringProp>
      <stringProp name="Assertion.test_type">8</stringProp>
      <stringProp>200|503</stringProp>
    </ResponseAssertion>
  </ThreadGroup>
</jmeterTestPlan>

5. Auto-Scaling Validation

5. Auto-Scaling Validation

typescript
// test-autoscaling.ts
import { test, expect } from '@playwright/test';
import axios from 'axios';

test.describe('Auto-scaling Stress Test', () => {
  test('system should scale up under load', async () => {
    const baseUrl = 'http://api.example.com';
    const cloudwatch = new AWS.CloudWatch();

    // Initial instance count
    const initialInstances = await getInstanceCount();
    console.log(`Initial instances: ${initialInstances}`);

    // Generate high load
    const requests = [];
    for (let i = 0; i < 1000; i++) {
      requests.push(
        axios.get(`${baseUrl}/api/heavy-operation`)
          .catch(err => ({ error: err.message }))
      );
    }

    // Wait for auto-scaling trigger
    await Promise.all(requests);
    await new Promise(resolve => setTimeout(resolve, 120000)); // 2 min

    // Check if scaled up
    const scaledInstances = await getInstanceCount();
    console.log(`Scaled instances: ${scaledInstances}`);

    expect(scaledInstances).toBeGreaterThan(initialInstances);

    // Verify metrics
    const cpuMetrics = await cloudwatch.getMetricStatistics({
      Namespace: 'AWS/EC2',
      MetricName: 'CPUUtilization',
      // ... metric params
    }).promise();

    expect(cpuMetrics.Datapoints.some(d => d.Average > 70)).toBe(true);
  });
});
typescript
// test-autoscaling.ts
import { test, expect } from '@playwright/test';
import axios from 'axios';

test.describe('Auto-scaling Stress Test', () => {
  test('system should scale up under load', async () => {
    const baseUrl = 'http://api.example.com';
    const cloudwatch = new AWS.CloudWatch();

    // Initial instance count
    const initialInstances = await getInstanceCount();
    console.log(`Initial instances: ${initialInstances}`);

    // Generate high load
    const requests = [];
    for (let i = 0; i < 1000; i++) {
      requests.push(
        axios.get(`${baseUrl}/api/heavy-operation`)
          .catch(err => ({ error: err.message }))
      );
    }

    // Wait for auto-scaling trigger
    await Promise.all(requests);
    await new Promise(resolve => setTimeout(resolve, 120000)); // 2 min

    // Check if scaled up
    const scaledInstances = await getInstanceCount();
    console.log(`Scaled instances: ${scaledInstances}`);

    expect(scaledInstances).toBeGreaterThan(initialInstances);

    // Verify metrics
    const cpuMetrics = await cloudwatch.getMetricStatistics({
      Namespace: 'AWS/EC2',
      MetricName: 'CPUUtilization',
      // ... metric params
    }).promise();

    expect(cpuMetrics.Datapoints.some(d => d.Average > 70)).toBe(true);
  });
});

6. Breaking Point Analysis

6. Breaking Point Analysis

python
undefined
python
undefined

find_breaking_point.py

find_breaking_point.py

import requests import threading import time from collections import defaultdict
class BreakingPointTest: """Find system breaking point by gradually increasing load."""
def __init__(self, url):
    self.url = url
    self.results = defaultdict(lambda: {'success': 0, 'errors': 0, 'times': []})
    self.running = True

def worker(self, vusers):
    """Worker thread that makes requests."""
    while self.running:
        start = time.time()
        try:
            response = requests.get(self.url, timeout=10)
            duration = time.time() - start

            if response.status_code == 200:
                self.results[vusers]['success'] += 1
                self.results[vusers]['times'].append(duration)
            else:
                self.results[vusers]['errors'] += 1

        except Exception as e:
            self.results[vusers]['errors'] += 1

        time.sleep(0.1)

def test_load_level(self, vusers, duration=60):
    """Test system with specific number of virtual users."""
    print(f"\nTesting with {vusers} concurrent users...")

    threads = []
    for _ in range(vusers):
        t = threading.Thread(target=self.worker, args=(vusers,))
        t.start()
        threads.append(t)

    time.sleep(duration)

    self.running = False
    for t in threads:
        t.join()

    self.running = True

    # Analyze results
    stats = self.results[vusers]
    total = stats['success'] + stats['errors']
    error_rate = stats['errors'] / total if total > 0 else 0
    avg_time = sum(stats['times']) / len(stats['times']) if stats['times'] else 0

    print(f"  Requests: {total}")
    print(f"  Success: {stats['success']}")
    print(f"  Errors: {stats['errors']}")
    print(f"  Error Rate: {error_rate:.1%}")
    print(f"  Avg Response: {avg_time:.3f}s")

    # System is breaking if error rate > 5% or avg response > 5s
    is_breaking = error_rate > 0.05 or avg_time > 5.0

    return not is_breaking

def find_breaking_point(self):
    """Binary search to find breaking point."""
    min_users = 10
    max_users = 1000
    breaking_point = None

    while min_users < max_users:
        mid = (min_users + max_users) // 2

        if self.test_load_level(mid):
            # System handles this load, try higher
            min_users = mid + 10
        else:
            # System breaking, found upper limit
            breaking_point = mid
            max_users = mid - 10

    print(f"\n{'='*60}")
    print(f"Breaking point: ~{breaking_point} concurrent users")
    print(f"{'='*60}")

    return breaking_point
import requests import threading import time from collections import defaultdict
class BreakingPointTest: """Find system breaking point by gradually increasing load."""
def __init__(self, url):
    self.url = url
    self.results = defaultdict(lambda: {'success': 0, 'errors': 0, 'times': []})
    self.running = True

def worker(self, vusers):
    """Worker thread that makes requests."""
    while self.running:
        start = time.time()
        try:
            response = requests.get(self.url, timeout=10)
            duration = time.time() - start

            if response.status_code == 200:
                self.results[vusers]['success'] += 1
                self.results[vusers]['times'].append(duration)
            else:
                self.results[vusers]['errors'] += 1

        except Exception as e:
            self.results[vusers]['errors'] += 1

        time.sleep(0.1)

def test_load_level(self, vusers, duration=60):
    """Test system with specific number of virtual users."""
    print(f"\nTesting with {vusers} concurrent users...")

    threads = []
    for _ in range(vusers):
        t = threading.Thread(target=self.worker, args=(vusers,))
        t.start()
        threads.append(t)

    time.sleep(duration)

    self.running = False
    for t in threads:
        t.join()

    self.running = True

    # Analyze results
    stats = self.results[vusers]
    total = stats['success'] + stats['errors']
    error_rate = stats['errors'] / total if total > 0 else 0
    avg_time = sum(stats['times']) / len(stats['times']) if stats['times'] else 0

    print(f"  Requests: {total}")
    print(f"  Success: {stats['success']}")
    print(f"  Errors: {stats['errors']}")
    print(f"  Error Rate: {error_rate:.1%}")
    print(f"  Avg Response: {avg_time:.3f}s")

    # System is breaking if error rate > 5% or avg response > 5s
    is_breaking = error_rate > 0.05 or avg_time > 5.0

    return not is_breaking

def find_breaking_point(self):
    """Binary search to find breaking point."""
    min_users = 10
    max_users = 1000
    breaking_point = None

    while min_users < max_users:
        mid = (min_users + max_users) // 2

        if self.test_load_level(mid):
            # System handles this load, try higher
            min_users = mid + 10
        else:
            # System breaking, found upper limit
            breaking_point = mid
            max_users = mid - 10

    print(f"\n{'='*60}")
    print(f"Breaking point: ~{breaking_point} concurrent users")
    print(f"{'='*60}")

    return breaking_point

Run

Run

test = BreakingPointTest('http://api.example.com/products') test.find_breaking_point()
undefined
test = BreakingPointTest('http://api.example.com/products') test.find_breaking_point()
undefined

Metrics to Monitor

需监控的指标

Application Metrics

应用指标

  • Response times (P50, P95, P99, Max)
  • Error rates and types
  • Throughput (req/s)
  • Queue depths
  • Circuit breaker trips
  • 响应时间(P50、P95、P99、最大值)
  • 错误率及错误类型
  • 吞吐量(请求/秒)
  • 队列深度
  • 断路器触发次数

System Metrics

系统指标

  • CPU utilization
  • Memory usage and leaks
  • Disk I/O
  • Network bandwidth
  • Thread/connection pools
  • CPU利用率
  • 内存使用情况及内存泄漏
  • 磁盘I/O
  • 网络带宽
  • 线程/连接池

Database Metrics

数据库指标

  • Query execution times
  • Connection pool usage
  • Lock contention
  • Cache hit rates
  • Replication lag
  • 查询执行时间
  • 连接池使用率
  • 锁竞争情况
  • 缓存命中率
  • 复制延迟

Best Practices

最佳实践

✅ DO

✅ 建议

  • Test in production-like environment
  • Monitor all system resources
  • Gradually increase load to find limits
  • Test recovery after stress
  • Document breaking points
  • Test auto-scaling behavior
  • Plan for graceful degradation
  • Monitor for memory leaks
  • 在类生产环境中进行测试
  • 监控所有系统资源
  • 逐步增加负载以找出极限
  • 测试压力后的恢复能力
  • 记录断点信息
  • 测试自动扩缩容行为
  • 规划优雅降级机制
  • 监控内存泄漏情况

❌ DON'T

❌ 避免

  • Test in production without safeguards
  • Skip recovery testing
  • Ignore warning signs (CPU, memory)
  • Test only success scenarios
  • Assume linear scalability
  • Forget database capacity
  • Skip monitoring third-party dependencies
  • Test without proper cleanup
  • 在无防护措施的生产环境中测试
  • 跳过恢复测试
  • 忽略警告信号(CPU、内存)
  • 仅测试成功场景
  • 假设线性可扩展性
  • 忽略数据库容量
  • 跳过对第三方依赖的监控
  • 测试后不进行适当清理

Tools

工具

  • Load Generation: k6, JMeter, Gatling, Locust, Artillery
  • Monitoring: Prometheus, Grafana, DataDog, New Relic
  • Cloud Metrics: CloudWatch, Azure Monitor, GCP Monitoring
  • Profiling: py-spy, async-profiler, clinic.js
  • 负载生成:k6、JMeter、Gatling、Locust、Artillery
  • 监控工具:Prometheus、Grafana、DataDog、New Relic
  • 云指标:CloudWatch、Azure Monitor、GCP Monitoring
  • 性能分析:py-spy、async-profiler、clinic.js

Examples

相关示例

See also: performance-testing, continuous-testing, api-versioning-strategy for comprehensive system testing.
另请参阅:performance-testing(性能测试)、continuous-testing(持续测试)、api-versioning-strategy(API版本控制策略)以获取全面的系统测试方案。