async-programming
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAsync Programming Skill
异步编程技能
File Organization
文件结构
- SKILL.md: Core principles, patterns, essential security (this file)
- references/security-examples.md: Race condition and resource safety examples
- references/advanced-patterns.md: Advanced async patterns and optimization
- SKILL.md: 核心原则、模式、关键安全内容(本文件)
- references/security-examples.md: 竞态条件与资源安全示例
- references/advanced-patterns.md: 高级异步模式与优化
Validation Gates
验证关卡
Gate 0.1: Domain Expertise Validation
关卡0.1:领域专业知识验证
- Status: PASSED
- Expertise Areas: asyncio, Tokio, race conditions, resource management, concurrent safety
- 状态: 已通过
- 专业领域: asyncio, Tokio, 竞态条件, 资源管理, 并发安全
Gate 0.2: Vulnerability Research
关卡0.2:漏洞研究
- Status: PASSED (3+ issues for MEDIUM-RISK)
- Research Date: 2025-11-20
- Issues: CVE-2024-12254 (asyncio memory), Redis race condition (CVE-2023-28858/9)
- 状态: 已通过(发现3个以上中等风险问题)
- 研究日期: 2025-11-20
- 问题: CVE-2024-12254(asyncio内存问题)、Redis竞态条件(CVE-2023-28858/9)
Gate 0.11: File Organization Decision
关卡0.11:文件结构决策
- Decision: Split structure (MEDIUM-RISK, ~400 lines main + references)
- 决策: 拆分结构(中等风险,主文件约400行+参考文件)
1. Overview
1. 概述
Risk Level: MEDIUM
Justification: Async programming introduces race conditions, resource leaks, and timing-based vulnerabilities. While not directly exposed to external attacks, improper async code can cause data corruption, deadlocks, and security-sensitive race conditions like double-spending or TOCTOU (time-of-check-time-of-use).
You are an expert in asynchronous programming patterns for Python (asyncio) and Rust (Tokio). You write concurrent code that is free from race conditions, properly manages resources, and handles errors gracefully.
风险等级: 中等
理由: 异步编程会引入竞态条件、资源泄漏和基于时序的漏洞。虽然不会直接暴露于外部攻击,但不当的异步代码可能导致数据损坏、死锁,以及诸如双花或TOCTOU(检查时间-使用时间)等安全敏感的竞态条件。
您是Python(asyncio)和Rust(Tokio)异步编程模式专家。您编写的并发代码无竞态条件、能正确管理资源,并能优雅处理错误。
Core Expertise Areas
核心专业领域
- Race condition identification and prevention
- Async resource management (connections, locks, files)
- Error handling in concurrent contexts
- Performance optimization for async workloads
- Graceful shutdown and cancellation
- 竞态条件识别与预防
- 异步资源管理(连接、锁、文件)
- 并发场景下的错误处理
- 异步工作负载的性能优化
- 优雅关闭与取消
2. Core Principles
2. 核心原则
- TDD First: Write async tests before implementation using pytest-asyncio
- Performance Aware: Use asyncio.gather, semaphores, and avoid blocking calls
- Identify Race Conditions: Recognize shared state accessed across await points
- Protect Shared State: Use locks, atomic operations, or message passing
- Manage Resources: Ensure cleanup happens even on cancellation
- Handle Errors: Don't let one task's failure corrupt others
- Avoid Deadlocks: Consistent lock ordering, timeouts on locks
- 先做TDD: 使用pytest-asyncio在实现前编写异步测试
- 关注性能: 使用asyncio.gather、信号量,避免阻塞调用
- 识别竞态条件: 识别跨await点访问的共享状态
- 保护共享状态: 使用锁、原子操作或消息传递
- 管理资源: 即使在取消操作时也要确保资源清理
- 处理错误: 避免单个任务的失败影响其他任务
- 避免死锁: 保持一致的锁顺序,为锁设置超时
Decision Framework
决策框架
| Situation | Approach |
|---|---|
| Shared mutable state | Use asyncio.Lock or RwLock |
| Database transaction | Use atomic operations, SELECT FOR UPDATE |
| Resource cleanup | Use async context managers |
| Task coordination | Use asyncio.Event, Queue, or Semaphore |
| Background tasks | Track tasks, handle cancellation |
| 场景 | 处理方式 |
|---|---|
| 可变共享状态 | 使用asyncio.Lock或RwLock |
| 数据库事务 | 使用原子操作、SELECT FOR UPDATE |
| 资源清理 | 使用异步上下文管理器 |
| 任务协调 | 使用asyncio.Event、Queue或Semaphore |
| 后台任务 | 跟踪任务,处理取消操作 |
3. Implementation Workflow (TDD)
3. 实现工作流(TDD)
Step 1: Write Failing Test First
步骤1:先编写失败的测试
python
import pytest
import asyncio
@pytest.mark.asyncio
async def test_concurrent_counter_safety():
"""Test counter maintains consistency under concurrent access."""
counter = SafeCounter() # Not implemented yet - will fail
async def increment_many():
for _ in range(100):
await counter.increment()
# Run 10 concurrent incrementers
await asyncio.gather(*[increment_many() for _ in range(10)])
# Must be exactly 1000 (no lost updates)
assert await counter.get() == 1000
@pytest.mark.asyncio
async def test_resource_cleanup_on_cancellation():
"""Test resources are cleaned up even when task is cancelled."""
cleanup_called = False
async def task_with_resource():
nonlocal cleanup_called
async with managed_resource() as resource: # Not implemented yet
await asyncio.sleep(10) # Long operation
cleanup_called = True
task = asyncio.create_task(task_with_resource())
await asyncio.sleep(0.1)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert cleanup_called # Cleanup must happenpython
import pytest
import asyncio
@pytest.mark.asyncio
async def test_concurrent_counter_safety():
"""Test counter maintains consistency under concurrent access."""
counter = SafeCounter() # Not implemented yet - will fail
async def increment_many():
for _ in range(100):
await counter.increment()
# Run 10 concurrent incrementers
await asyncio.gather(*[increment_many() for _ in range(10)])
# Must be exactly 1000 (no lost updates)
assert await counter.get() == 1000
@pytest.mark.asyncio
async def test_resource_cleanup_on_cancellation():
"""Test resources are cleaned up even when task is cancelled."""
cleanup_called = False
async def task_with_resource():
nonlocal cleanup_called
async with managed_resource() as resource: # Not implemented yet
await asyncio.sleep(10) # Long operation
cleanup_called = True
task = asyncio.create_task(task_with_resource())
await asyncio.sleep(0.1)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert cleanup_called # Cleanup must happenStep 2: Implement Minimum to Pass
步骤2:实现满足测试的最小代码
python
import asyncio
from contextlib import asynccontextmanager
class SafeCounter:
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._value
@asynccontextmanager
async def managed_resource():
resource = await acquire_resource()
try:
yield resource
finally:
await release_resource(resource) # Always runspython
import asyncio
from contextlib import asynccontextmanager
class SafeCounter:
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._value
@asynccontextmanager
async def managed_resource():
resource = await acquire_resource()
try:
yield resource
finally:
await release_resource(resource) # Always runsStep 3: Refactor Following Patterns
步骤3:遵循模式重构
Apply performance patterns, add timeouts, improve error handling.
应用性能模式,添加超时,改进错误处理。
Step 4: Run Full Verification
步骤4:运行完整验证
bash
undefinedbash
undefinedRun async tests
Run async tests
pytest tests/ -v --asyncio-mode=auto
pytest tests/ -v --asyncio-mode=auto
Check for blocking calls
Check for blocking calls
python -m asyncio debug
python -m asyncio debug
Run with concurrency stress test
Run with concurrency stress test
pytest tests/ -v -n auto --asyncio-mode=auto
---pytest tests/ -v -n auto --asyncio-mode=auto
---4. Performance Patterns
4. 性能模式
Pattern 1: asyncio.gather for Concurrency
模式1:使用asyncio.gather实现并发
python
undefinedpython
undefinedBAD - Sequential execution
BAD - Sequential execution
async def fetch_all_sequential(urls: list[str]) -> list[str]:
results = []
for url in urls:
result = await fetch(url) # Waits for each
results.append(result)
return results # Total time: sum of all fetches
async def fetch_all_sequential(urls: list[str]) -> list[str]:
results = []
for url in urls:
result = await fetch(url) # Waits for each
results.append(result)
return results # Total time: sum of all fetches
GOOD - Concurrent execution
GOOD - Concurrent execution
async def fetch_all_concurrent(urls: list[str]) -> list[str]:
return await asyncio.gather(*[fetch(url) for url in urls])
# Total time: max of all fetches
undefinedasync def fetch_all_concurrent(urls: list[str]) -> list[str]:
return await asyncio.gather(*[fetch(url) for url in urls])
# Total time: max of all fetches
undefinedPattern 2: Semaphores for Rate Limiting
模式2:使用信号量进行速率限制
python
undefinedpython
undefinedBAD - Unbounded concurrency (may overwhelm server)
BAD - Unbounded concurrency (may overwhelm server)
async def fetch_many(urls: list[str]):
return await asyncio.gather(*[fetch(url) for url in urls])
async def fetch_many(urls: list[str]):
return await asyncio.gather(*[fetch(url) for url in urls])
GOOD - Bounded concurrency with semaphore
GOOD - Bounded concurrency with semaphore
async def fetch_many_limited(urls: list[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(url: str):
async with semaphore:
return await fetch(url)
return await asyncio.gather(*[fetch_with_limit(url) for url in urls])undefinedasync def fetch_many_limited(urls: list[str], max_concurrent: int = 10):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(url: str):
async with semaphore:
return await fetch(url)
return await asyncio.gather(*[fetch_with_limit(url) for url in urls])undefinedPattern 3: Task Groups (Python 3.11+)
模式3:任务组(Python 3.11+)
python
undefinedpython
undefinedBAD - Manual task tracking
BAD - Manual task tracking
async def process_items_manual(items):
tasks = []
for item in items:
task = asyncio.create_task(process(item))
tasks.append(task)
return await asyncio.gather(*tasks)
async def process_items_manual(items):
tasks = []
for item in items:
task = asyncio.create_task(process(item))
tasks.append(task)
return await asyncio.gather(*tasks)
GOOD - Task groups with automatic cleanup
GOOD - Task groups with automatic cleanup
async def process_items_taskgroup(items):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process(item)) for item in items]
return [task.result() for task in tasks]
# Automatic cancellation on any failure
undefinedasync def process_items_taskgroup(items):
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(process(item)) for item in items]
return [task.result() for task in tasks]
# Automatic cancellation on any failure
undefinedPattern 4: Efficient Event Loop Usage
模式4:高效使用事件循环
python
undefinedpython
undefinedBAD - Creating new event loop each time
BAD - Creating new event loop each time
def run_async_bad():
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(main())
finally:
loop.close()
def run_async_bad():
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(main())
finally:
loop.close()
GOOD - Reuse running loop or use asyncio.run
GOOD - Reuse running loop or use asyncio.run
def run_async_good():
return asyncio.run(main()) # Handles loop lifecycle
def run_async_good():
return asyncio.run(main()) # Handles loop lifecycle
GOOD - For library code, get existing loop
GOOD - For library code, get existing loop
async def library_function():
loop = asyncio.get_running_loop()
future = loop.create_future()
# Use the existing loop
undefinedasync def library_function():
loop = asyncio.get_running_loop()
future = loop.create_future()
# Use the existing loop
undefinedPattern 5: Avoiding Blocking Calls
模式5:避免阻塞调用
python
undefinedpython
undefinedBAD - Blocks event loop
BAD - Blocks event loop
async def process_file_bad(path: str):
with open(path) as f: # Blocking I/O
data = f.read()
result = hashlib.sha256(data).hexdigest() # CPU-bound blocks loop
return result
async def process_file_bad(path: str):
with open(path) as f: # Blocking I/O
data = f.read()
result = hashlib.sha256(data).hexdigest() # CPU-bound blocks loop
return result
GOOD - Non-blocking with aiofiles and executor
GOOD - Non-blocking with aiofiles and executor
import aiofiles
async def process_file_good(path: str):
async with aiofiles.open(path, 'rb') as f:
data = await f.read()
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, lambda: hashlib.sha256(data).hexdigest()
)
return result
---import aiofiles
async def process_file_good(path: str):
async with aiofiles.open(path, 'rb') as f:
data = await f.read()
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, lambda: hashlib.sha256(data).hexdigest()
)
return result
---5. Technical Foundation
5. 技术基础
Version Recommendations
版本推荐
| Component | Version | Notes |
|---|---|---|
| Python | 3.11+ | asyncio improvements, TaskGroup |
| Rust | 1.75+ | Stable async |
| Tokio | 1.35+ | Async runtime |
| aioredis | Use redis-py | Better maintenance |
| 组件 | 版本 | 说明 |
|---|---|---|
| Python | 3.11+ | asyncio改进,TaskGroup |
| Rust | 1.75+ | 稳定异步支持 |
| Tokio | 1.35+ | 异步运行时 |
| aioredis | 使用redis-py | 维护更完善 |
Key Libraries
核心库
python
undefinedpython
undefinedPython async ecosystem
Python async ecosystem
asyncio # Core async
aiohttp # HTTP client
asyncpg # PostgreSQL
aiofiles # File I/O
pytest-asyncio # Testing
---asyncio # Core async
aiohttp # HTTP client
asyncpg # PostgreSQL
aiofiles # File I/O
pytest-asyncio # Testing
---6. Implementation Patterns
6. 实现模式
Pattern 1: Protecting Shared State with Locks
模式1:使用锁保护共享状态
python
import asyncio
class SafeCounter:
"""Thread-safe counter for async contexts."""
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._valuepython
import asyncio
class SafeCounter:
"""Thread-safe counter for async contexts."""
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._valuePattern 2: Atomic Database Operations
模式2:原子数据库操作
python
from sqlalchemy.ext.asyncio import AsyncSession
async def transfer_safe(db: AsyncSession, from_id: int, to_id: int, amount: int):
"""Atomic transfer using row locks."""
async with db.begin():
stmt = (
select(Account)
.where(Account.id.in_([from_id, to_id]))
.with_for_update() # Lock rows
)
accounts = {a.id: a for a in (await db.execute(stmt)).scalars()}
if accounts[from_id].balance < amount:
raise ValueError("Insufficient funds")
accounts[from_id].balance -= amount
accounts[to_id].balance += amountpython
from sqlalchemy.ext.asyncio import AsyncSession
async def transfer_safe(db: AsyncSession, from_id: int, to_id: int, amount: int):
"""Atomic transfer using row locks."""
async with db.begin():
stmt = (
select(Account)
.where(Account.id.in_([from_id, to_id]))
.with_for_update() # Lock rows
)
accounts = {a.id: a for a in (await db.execute(stmt)).scalars()}
if accounts[from_id].balance < amount:
raise ValueError("Insufficient funds")
accounts[from_id].balance -= amount
accounts[to_id].balance += amountPattern 3: Resource Management with Context Managers
模式3:使用上下文管理器管理资源
python
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_connection():
"""Ensure connection cleanup even on cancellation."""
conn = await pool.acquire()
try:
yield conn
finally:
await pool.release(conn)python
from contextlib import asynccontextmanager
@asynccontextmanager
async def get_connection():
"""Ensure connection cleanup even on cancellation."""
conn = await pool.acquire()
try:
yield conn
finally:
await pool.release(conn)Pattern 4: Graceful Shutdown
模式4:优雅关闭
python
import asyncio, signal
class GracefulApp:
def __init__(self):
self.shutdown_event = asyncio.Event()
self.tasks: set[asyncio.Task] = set()
async def run(self):
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self.shutdown_event.set)
self.tasks.add(asyncio.create_task(self.worker()))
await self.shutdown_event.wait()
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)python
import asyncio, signal
class GracefulApp:
def __init__(self):
self.shutdown_event = asyncio.Event()
self.tasks: set[asyncio.Task] = set()
async def run(self):
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, self.shutdown_event.set)
self.tasks.add(asyncio.create_task(self.worker()))
await self.shutdown_event.wait()
for task in self.tasks:
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)7. Security Standards
7. 安全标准
7.1 Common Async Vulnerabilities
7.1 常见异步漏洞
| Issue | Severity | Mitigation |
|---|---|---|
| Race Conditions | HIGH | Use locks or atomic ops |
| TOCTOU | HIGH | Atomic DB operations |
| Resource Leaks | MEDIUM | Context managers |
| CVE-2024-12254 | HIGH | Upgrade Python |
| Deadlocks | MEDIUM | Lock ordering, timeouts |
| 问题 | 严重程度 | 缓解措施 |
|---|---|---|
| 竞态条件 | 高 | 使用锁或原子操作 |
| TOCTOU | 高 | 原子数据库操作 |
| 资源泄漏 | 中 | 上下文管理器 |
| CVE-2024-12254 | 高 | 升级Python版本 |
| 死锁 | 中 | 锁顺序、超时设置 |
7.2 Race Condition Detection
7.2 竞态条件检测
python
undefinedpython
undefinedRACE CONDITION - read/await/write pattern
RACE CONDITION - read/await/write pattern
class UserSession:
async def update(self, key, value):
current = self.data.get(key, 0) # Read
await validate(value) # Await = context switch
self.data[key] = current + value # Write stale value
class UserSession:
async def update(self, key, value):
current = self.data.get(key, 0) # Read
await validate(value) # Await = context switch
self.data[key] = current + value # Write stale value
FIXED - validate outside lock, atomic update inside
FIXED - validate outside lock, atomic update inside
class SafeUserSession:
async def update(self, key, value):
await validate(value)
async with self._lock:
self.data[key] = self.data.get(key, 0) + value
---class SafeUserSession:
async def update(self, key, value):
await validate(value)
async with self._lock:
self.data[key] = self.data.get(key, 0) + value
---8. Common Mistakes & Anti-Patterns
8. 常见错误与反模式
Anti-Pattern 1: Unprotected Shared State
反模式1:未保护的共享状态
python
undefinedpython
undefinedNEVER - race condition on cache
NEVER - race condition on cache
async def get_or_fetch(self, key):
if key not in self.data:
self.data[key] = await fetch(key)
return self.data[key]
async def get_or_fetch(self, key):
if key not in self.data:
self.data[key] = await fetch(key)
return self.data[key]
ALWAYS - lock protection
ALWAYS - lock protection
async def get_or_fetch(self, key):
async with self._lock:
if key not in self.data:
self.data[key] = await fetch(key)
return self.data[key]
undefinedasync def get_or_fetch(self, key):
async with self._lock:
if key not in self.data:
self.data[key] = await fetch(key)
return self.data[key]
undefinedAnti-Pattern 2: Fire and Forget Tasks
反模式2:无跟踪的后台任务
python
undefinedpython
undefinedNEVER - task may be garbage collected
NEVER - task may be garbage collected
asyncio.create_task(background_work())
asyncio.create_task(background_work())
ALWAYS - track tasks
ALWAYS - track tasks
task = asyncio.create_task(background_work())
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
undefinedtask = asyncio.create_task(background_work())
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
undefinedAnti-Pattern 3: Blocking the Event Loop
反模式3:阻塞事件循环
python
undefinedpython
undefinedNEVER - blocks all async tasks
NEVER - blocks all async tasks
time.sleep(5)
time.sleep(5)
ALWAYS - use async
ALWAYS - use async
await asyncio.sleep(5)
result = await loop.run_in_executor(None, cpu_bound_func)
---await asyncio.sleep(5)
result = await loop.run_in_executor(None, cpu_bound_func)
---9. Pre-Implementation Checklist
9. 预实现检查清单
Phase 1: Before Writing Code
阶段1:编写代码前
- Write failing tests for race condition scenarios
- Write tests for resource cleanup on cancellation
- Identify all shared mutable state
- Plan lock hierarchy to avoid deadlocks
- Determine appropriate concurrency limits
- 为竞态条件场景编写失败的测试
- 为取消操作时的资源清理编写测试
- 识别所有可变共享状态
- 规划锁层级以避免死锁
- 确定合适的并发限制
Phase 2: During Implementation
阶段2:实现过程中
- Protect all shared state with locks
- Use async context managers for resources
- Use asyncio.gather for concurrent operations
- Apply semaphores for rate limiting
- Run executor for CPU-bound work
- Track all created tasks
- 使用锁保护所有共享状态
- 使用异步上下文管理器管理资源
- 使用asyncio.gather实现并发操作
- 使用信号量进行速率限制
- 使用执行器处理CPU密集型工作
- 跟踪所有创建的任务
Phase 3: Before Committing
阶段3:提交代码前
- All async tests pass:
pytest --asyncio-mode=auto - No blocking calls on event loop
- Timeouts on all external operations
- Graceful shutdown handles cancellation
- Race condition tests verify thread safety
- Lock ordering is consistent (no deadlock potential)
- 所有异步测试通过:
pytest --asyncio-mode=auto - 事件循环中无阻塞调用
- 所有外部操作设置超时
- 优雅关闭能处理取消操作
- 竞态条件测试验证线程安全
- 锁顺序一致(无死锁风险)
10. Summary
10. 总结
Your goal is to create async code that is:
- Test-Driven: Write async tests first with pytest-asyncio
- Race-Free: Protect shared state, use atomic operations
- Resource-Safe: Context managers, proper cleanup
- Performant: asyncio.gather, semaphores, avoid blocking
- Resilient: Handle errors, support cancellation
Key Performance Rules:
- Use for concurrent I/O operations
asyncio.gather - Apply semaphores to limit concurrent connections
- Use TaskGroup (Python 3.11+) for automatic cleanup
- Never block event loop - use for CPU work
run_in_executor - Reuse event loops, don't create new ones
Security Reminder:
- Every shared mutable state needs protection
- Database operations must be atomic (TOCTOU prevention)
- Always use async context managers for resources
- Track all tasks for graceful shutdown
- Test with concurrent load to find race conditions
您的目标是创建具备以下特性的异步代码:
- 测试驱动:使用pytest-asyncio先编写异步测试
- 无竞态:保护共享状态,使用原子操作
- 资源安全:使用上下文管理器,正确清理资源
- 高性能:使用asyncio.gather、信号量,避免阻塞
- 高韧性:处理错误,支持取消操作
核心性能规则:
- 使用处理并发I/O操作
asyncio.gather - 应用信号量限制并发连接数
- 使用TaskGroup(Python 3.11+)实现自动清理
- 绝不阻塞事件循环 - 使用处理CPU密集型工作
run_in_executor - 复用事件循环,不创建新的事件循环
安全提醒:
- 所有可变共享状态都需要保护
- 数据库操作必须是原子性的(预防TOCTOU)
- 始终使用异步上下文管理器管理资源
- 跟踪所有任务以实现优雅关闭
- 通过并发负载测试发现竞态条件