tokio-troubleshooting
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseTokio Troubleshooting
Tokio应用故障排查
This skill provides techniques for debugging and troubleshooting async applications built with Tokio.
本文提供了针对基于Tokio构建的异步应用的调试与故障排查技巧。
Using tokio-console for Runtime Inspection
使用tokio-console进行运行时监控
Monitor async runtime in real-time:
rust
// In Cargo.toml
[dependencies]
console-subscriber = "0.2"
// In main.rs
fn main() {
console_subscriber::init();
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
run_application().await
});
}Run console in separate terminal:
bash
tokio-consoleKey metrics to monitor:
- Task spawn rate and total tasks
- Poll duration per task
- Idle vs. busy time
- Waker operations
- Resource utilization
Identifying issues:
- Long poll durations: CPU-intensive work in async context
- Many wakers: Potential contention or inefficient polling
- Growing task count: Task leak or unbounded spawning
- High idle time: Not enough work or blocking operations
实时监控异步运行时:
rust
// In Cargo.toml
[dependencies]
console-subscriber = "0.2"
// In main.rs
fn main() {
console_subscriber::init();
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
run_application().await
});
}在独立终端中启动控制台:
bash
tokio-console需监控的关键指标:
- 任务生成速率与总任务数
- 每个任务的轮询时长
- 空闲与忙碌时间占比
- Waker操作情况
- 资源利用率
问题识别:
- 轮询时长久:异步上下文存在CPU密集型工作
- Waker操作频繁:可能存在资源竞争或低效轮询
- 任务数持续增长:任务泄漏或无限制生成任务
- 空闲时间占比高:任务不足或存在阻塞操作
Debugging Deadlocks and Hangs
调试死锁与挂起
Detect and resolve deadlock situations:
检测并解决死锁场景:
Common Deadlock Pattern
常见死锁模式
rust
// BAD: Potential deadlock
async fn deadlock_example() {
let mutex1 = Arc::new(Mutex::new(()));
let mutex2 = Arc::new(Mutex::new(()));
let m1 = mutex1.clone();
let m2 = mutex2.clone();
tokio::spawn(async move {
let _g1 = m1.lock().await;
tokio::time::sleep(Duration::from_millis(10)).await;
let _g2 = m2.lock().await; // May deadlock
});
let _g2 = mutex2.lock().await;
tokio::time::sleep(Duration::from_millis(10)).await;
let _g1 = mutex1.lock().await; // May deadlock
}
// GOOD: Consistent lock ordering
async fn no_deadlock_example() {
let mutex1 = Arc::new(Mutex::new(()));
let mutex2 = Arc::new(Mutex::new(()));
// Always acquire locks in same order
let _g1 = mutex1.lock().await;
let _g2 = mutex2.lock().await;
}
// BETTER: Avoid nested locks
async fn best_example() {
// Use message passing instead
let (tx, mut rx) = mpsc::channel(10);
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
process_message(msg).await;
}
});
tx.send(message).await.unwrap();
}rust
// BAD: 潜在死锁
async fn deadlock_example() {
let mutex1 = Arc::new(Mutex::new(()));
let mutex2 = Arc::new(Mutex::new(()));
let m1 = mutex1.clone();
let m2 = mutex2.clone();
tokio::spawn(async move {
let _g1 = m1.lock().await;
tokio::time::sleep(Duration::from_millis(10)).await;
let _g2 = m2.lock().await; // 可能发生死锁
});
let _g2 = mutex2.lock().await;
tokio::time::sleep(Duration::from_millis(10)).await;
let _g1 = mutex1.lock().await; // 可能发生死锁
}
// GOOD: 一致的锁获取顺序
async fn no_deadlock_example() {
let mutex1 = Arc::new(Mutex::new(()));
let mutex2 = Arc::new(Mutex::new(()));
// 始终按相同顺序获取锁
let _g1 = mutex1.lock().await;
let _g2 = mutex2.lock().await;
}
// BETTER: 避免嵌套锁
async fn best_example() {
// 使用消息传递替代
let (tx, mut rx) = mpsc::channel(10);
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
process_message(msg).await;
}
});
tx.send(message).await.unwrap();
}Detecting Hangs with Timeouts
使用超时检测挂起
rust
use tokio::time::{timeout, Duration};
async fn detect_hang() {
match timeout(Duration::from_secs(5), potentially_hanging_operation()).await {
Ok(result) => println!("Completed: {:?}", result),
Err(_) => {
eprintln!("Operation timed out - potential hang detected");
// Log stack traces, metrics, etc.
}
}
}rust
use tokio::time::{timeout, Duration};
async fn detect_hang() {
match timeout(Duration::from_secs(5), potentially_hanging_operation()).await {
Ok(result) => println!("执行完成: {:?}", result),
Err(_) => {
eprintln!("操作超时 - 检测到潜在挂起");
// 记录堆栈跟踪、指标等
}
}
}Deadlock Detection with try_lock
使用try_lock检测死锁
rust
use tokio::sync::Mutex;
async fn try_with_timeout(mutex: &Mutex<State>) -> Option<State> {
for _ in 0..10 {
if let Ok(guard) = mutex.try_lock() {
return Some(guard.clone());
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
eprintln!("Failed to acquire lock - possible deadlock");
None
}rust
use tokio::sync::Mutex;
async fn try_with_timeout(mutex: &Mutex<State>) -> Option<State> {
for _ in 0..10 {
if let Ok(guard) = mutex.try_lock() {
return Some(guard.clone());
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
eprintln!("无法获取锁 - 可能存在死锁");
None
}Memory Leak Detection
内存泄漏检测
Identify and fix memory leaks:
识别并修复内存泄漏:
Task Leaks
任务泄漏
rust
// BAD: Tasks never complete
async fn leaking_tasks() {
loop {
tokio::spawn(async {
loop {
// Never exits
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}
}
// GOOD: Tasks have exit condition
async fn proper_tasks(shutdown: broadcast::Receiver<()>) {
loop {
let mut shutdown_rx = shutdown.resubscribe();
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_rx.recv() => break,
_ = tokio::time::sleep(Duration::from_secs(1)) => {
// Work
}
}
}
});
}
}rust
// BAD: 任务永不结束
async fn leaking_tasks() {
loop {
tokio::spawn(async {
loop {
// 永不退出
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}
}
// GOOD: 任务具备退出条件
async fn proper_tasks(shutdown: broadcast::Receiver<()>) {
loop {
let mut shutdown_rx = shutdown.resubscribe();
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_rx.recv() => break,
_ = tokio::time::sleep(Duration::from_secs(1)) => {
// 执行任务
}
}
}
});
}
}Arc Cycles
Arc引用循环
rust
// BAD: Reference cycle
struct Node {
next: Option<Arc<Mutex<Node>>>,
prev: Option<Arc<Mutex<Node>>>, // Creates cycle!
}
// GOOD: Use weak references
use std::sync::Weak;
struct Node {
next: Option<Arc<Mutex<Node>>>,
prev: Option<Weak<Mutex<Node>>>, // Weak reference breaks cycle
}rust
// BAD: 引用循环
struct Node {
next: Option<Arc<Mutex<Node>>>,
prev: Option<Arc<Mutex<Node>>>, // 形成循环!
}
// GOOD: 使用弱引用
use std::sync::Weak;
struct Node {
next: Option<Arc<Mutex<Node>>>,
prev: Option<Weak<Mutex<Node>>>, // 弱引用打破循环
}Monitoring Memory Usage
监控内存使用情况
rust
use sysinfo::{System, SystemExt};
pub async fn memory_monitor() {
let mut system = System::new_all();
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
system.refresh_memory();
let used = system.used_memory();
let total = system.total_memory();
let percent = (used as f64 / total as f64) * 100.0;
tracing::info!(
used_mb = used / 1024 / 1024,
total_mb = total / 1024 / 1024,
percent = %.2 percent,
"Memory usage"
);
if percent > 80.0 {
tracing::warn!("High memory usage detected");
}
}
}rust
use sysinfo::{System, SystemExt};
pub async fn memory_monitor() {
let mut system = System::new_all();
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
system.refresh_memory();
let used = system.used_memory();
let total = system.total_memory();
let percent = (used as f64 / total as f64) * 100.0;
tracing::info!(
used_mb = used / 1024 / 1024,
total_mb = total / 1024 / 1024,
percent = %.2 percent,
"内存使用情况"
);
if percent > 80.0 {
tracing::warn!("检测到内存使用率过高");
}
}
}Performance Profiling with Tracing
使用Tracing进行性能分析
Instrument code for performance analysis:
rust
use tracing::{info, instrument, span, Level};
#[instrument]
async fn process_request(id: u64) -> Result<Response, Error> {
let span = span!(Level::INFO, "database_query");
let _enter = span.enter();
let data = fetch_from_database(id).await?;
drop(_enter);
let span = span!(Level::INFO, "transformation");
let _enter = span.enter();
let result = transform_data(data).await?;
Ok(Response { result })
}
// Configure subscriber for flame graphs
use tracing_subscriber::layer::SubscriberExt;
fn init_tracing() {
let fmt_layer = tracing_subscriber::fmt::layer();
let filter_layer = tracing_subscriber::EnvFilter::from_default_env();
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.init();
}为代码添加埋点以进行性能分析:
rust
use tracing::{info, instrument, span, Level};
#[instrument]
async fn process_request(id: u64) -> Result<Response, Error> {
let span = span!(Level::INFO, "database_query");
let _enter = span.enter();
let data = fetch_from_database(id).await?;
drop(_enter);
let span = span!(Level::INFO, "transformation");
let _enter = span.enter();
let result = transform_data(data).await?;
Ok(Response { result })
}
// 配置订阅者以生成火焰图
use tracing_subscriber::layer::SubscriberExt;
fn init_tracing() {
let fmt_layer = tracing_subscriber::fmt::layer();
let filter_layer = tracing_subscriber::EnvFilter::from_default_env();
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.init();
}Understanding Panic Messages
理解Panic信息
Common async panic patterns:
常见异步Panic模式:
Panics in Spawned Tasks
已生成任务中的Panic
rust
// Panic is isolated to the task
tokio::spawn(async {
panic!("This won't crash the program");
});
// To catch panics
let handle = tokio::spawn(async {
// Work that might panic
});
match handle.await {
Ok(result) => println!("Success: {:?}", result),
Err(e) if e.is_panic() => {
eprintln!("Task panicked: {:?}", e);
// Handle panic
}
Err(e) => eprintln!("Task cancelled: {:?}", e),
}rust
// Panic仅影响当前任务
tokio::spawn(async {
panic!("此错误不会导致程序崩溃");
});
// 捕获Panic
let handle = tokio::spawn(async {
// 可能发生Panic的任务
});
match handle.await {
Ok(result) => println!("执行成功: {:?}", result),
Err(e) if e.is_panic() => {
eprintln!("任务发生Panic: {:?}", e);
// 处理Panic
}
Err(e) => eprintln!("任务被取消: {:?}", e),
}Send + 'static Errors
Send + 'static错误
rust
// ERROR: future cannot be sent between threads
async fn bad_example() {
let rc = Rc::new(5); // Rc is !Send
tokio::spawn(async move {
println!("{}", rc); // Error!
});
}
// FIX: Use Arc instead
async fn good_example() {
let rc = Arc::new(5); // Arc is Send
tokio::spawn(async move {
println!("{}", rc); // OK
});
}
// ERROR: borrowed value does not live long enough
async fn lifetime_error() {
let data = String::from("hello");
tokio::spawn(async {
println!("{}", data); // Error: data might not live long enough
});
}
// FIX: Move ownership
async fn lifetime_fixed() {
let data = String::from("hello");
tokio::spawn(async move {
println!("{}", data); // OK: data is moved
});
}rust
// ERROR: 未来任务无法在线程间发送
async fn bad_example() {
let rc = Rc::new(5); // Rc不支持Send
tokio::spawn(async move {
println!("{}", rc); // 错误!
});
}
// FIX: 使用Arc替代
async fn good_example() {
let rc = Arc::new(5); // Arc支持Send
tokio::spawn(async move {
println!("{}", rc); // 正常
});
}
// ERROR: 借用的值生命周期不足
async fn lifetime_error() {
let data = String::from("hello");
tokio::spawn(async {
println!("{}", data); // 错误: data可能已被释放
});
}
// FIX: 转移所有权
async fn lifetime_fixed() {
let data = String::from("hello");
tokio::spawn(async move {
println!("{}", data); // 正常: data所有权已转移
});
}Common Error Patterns and Solutions
常见错误模式与解决方案
Blocking in Async Context
异步上下文中的阻塞操作
rust
// PROBLEM: Detected with tokio-console (long poll time)
async fn blocking_example() {
std::thread::sleep(Duration::from_secs(1)); // Blocks thread!
}
// SOLUTION
async fn non_blocking_example() {
tokio::time::sleep(Duration::from_secs(1)).await; // Yields control
}
// For unavoidable blocking
async fn necessary_blocking() {
tokio::task::spawn_blocking(|| {
expensive_cpu_work()
}).await.unwrap();
}rust
// PROBLEM: 可通过tokio-console检测到(轮询时长久)
async fn blocking_example() {
std::thread::sleep(Duration::from_secs(1)); // 阻塞线程!
}
// SOLUTION
async fn non_blocking_example() {
tokio::time::sleep(Duration::from_secs(1)).await; // 释放控制权
}
// 针对无法避免的阻塞操作
async fn necessary_blocking() {
tokio::task::spawn_blocking(|| {
expensive_cpu_work()
}).await.unwrap();
}Channel Closed Errors
通道关闭错误
rust
// PROBLEM: SendError because receiver dropped
async fn send_error_example() {
let (tx, rx) = mpsc::channel(10);
drop(rx); // Receiver dropped
match tx.send(42).await {
Ok(_) => println!("Sent"),
Err(e) => eprintln!("Send failed: {}", e), // Channel closed
}
}
// SOLUTION: Check if receiver exists
async fn handle_closed_channel() {
let (tx, rx) = mpsc::channel(10);
tokio::spawn(async move {
// Receiver keeps channel open
while let Some(msg) = rx.recv().await {
process(msg).await;
}
});
// Or handle the error
if let Err(e) = tx.send(42).await {
tracing::warn!("Channel closed: {}", e);
// Cleanup or alternative action
}
}rust
// PROBLEM: 接收端已释放导致SendError
async fn send_error_example() {
let (tx, rx) = mpsc::channel(10);
drop(rx); // 接收端已释放
match tx.send(42).await {
Ok(_) => println!("发送成功"),
Err(e) => eprintln!("发送失败: {}", e), // 通道已关闭
}
}
// SOLUTION: 检查接收端是否存在
async fn handle_closed_channel() {
let (tx, rx) = mpsc::channel(10);
tokio::spawn(async move {
// 接收端保持通道开启
while let Some(msg) = rx.recv().await {
process(msg).await;
}
});
// 或直接处理错误
if let Err(e) = tx.send(42).await {
tracing::warn!("通道已关闭: {}", e);
// 执行清理或备选操作
}
}Task Cancellation
任务取消
rust
// PROBLEM: Task cancelled unexpectedly
let handle = tokio::spawn(async {
// Long-running work
});
handle.abort(); // Cancels task
// SOLUTION: Handle cancellation gracefully
let handle = tokio::spawn(async {
let result = tokio::select! {
result = do_work() => result,
_ = tokio::signal::ctrl_c() => {
cleanup().await;
return Err(Error::Cancelled);
}
};
result
});rust
// PROBLEM: 任务被意外取消
let handle = tokio::spawn(async {
// 长时间运行的任务
});
handle.abort(); // 取消任务
// SOLUTION: 优雅处理取消
let handle = tokio::spawn(async {
let result = tokio::select! {
result = do_work() => result,
_ = tokio::signal::ctrl_c() => {
cleanup().await;
return Err(Error::Cancelled);
}
};
result
});Testing Async Code Effectively
高效测试异步代码
Write reliable async tests:
rust
#[tokio::test]
async fn test_with_timeout() {
tokio::time::timeout(
Duration::from_secs(5),
async {
let result = my_async_function().await;
assert!(result.is_ok());
}
)
.await
.expect("Test timed out");
}
#[tokio::test]
async fn test_concurrent_access() {
let shared = Arc::new(Mutex::new(0));
let handles: Vec<_> = (0..10)
.map(|_| {
let shared = shared.clone();
tokio::spawn(async move {
let mut lock = shared.lock().await;
*lock += 1;
})
})
.collect();
for handle in handles {
handle.await.unwrap();
}
assert_eq!(*shared.lock().await, 10);
}
// Test with mocked time
#[tokio::test(start_paused = true)]
async fn test_with_time_control() {
let start = tokio::time::Instant::now();
tokio::time::sleep(Duration::from_secs(100)).await;
// Time is mocked, so this completes instantly
assert!(start.elapsed() < Duration::from_secs(1));
}编写可靠的异步测试:
rust
#[tokio::test]
async fn test_with_timeout() {
tokio::time::timeout(
Duration::from_secs(5),
async {
let result = my_async_function().await;
assert!(result.is_ok());
}
)
.await
.expect("测试超时");
}
#[tokio::test]
async fn test_concurrent_access() {
let shared = Arc::new(Mutex::new(0));
let handles: Vec<_> = (0..10)
.map(|_| {
let shared = shared.clone();
tokio::spawn(async move {
let mut lock = shared.lock().await;
*lock += 1;
})
})
.collect();
for handle in handles {
handle.await.unwrap();
}
assert_eq!(*shared.lock().await, 10);
}
// 使用模拟时间进行测试
#[tokio::test(start_paused = true)]
async fn test_with_time_control() {
let start = tokio::time::Instant::now();
tokio::time::sleep(Duration::from_secs(100)).await;
// 时间已被模拟,因此此操作立即完成
assert!(start.elapsed() < Duration::from_secs(1));
}Debugging Checklist
调试检查清单
When troubleshooting async issues:
- Use tokio-console to monitor runtime behavior
- Check for blocking operations with tracing
- Verify all locks are released properly
- Look for task leaks (growing task count)
- Monitor memory usage over time
- Add timeouts to detect hangs
- Check for channel closure errors
- Verify Send + 'static bounds are satisfied
- Use try_lock to detect potential deadlocks
- Profile with tracing for performance bottlenecks
- Test with tokio-test for time-based code
- Check for Arc cycles with weak references
排查异步问题时:
- 使用tokio-console监控运行时行为
- 通过tracing检查阻塞操作
- 验证所有锁均已正确释放
- 检查是否存在任务泄漏(任务数持续增长)
- 长期监控内存使用情况
- 添加超时以检测挂起
- 检查通道关闭错误
- 确保满足Send + 'static约束
- 使用try_lock检测潜在死锁
- 通过tracing进行性能瓶颈分析
- 使用tokio-test测试基于时间的代码
- 使用弱引用检查Arc循环
Helpful Tools
实用工具
- tokio-console: Real-time async runtime monitoring
- tracing: Structured logging and profiling
- cargo-flamegraph: Generate flame graphs
- valgrind/heaptrack: Memory profiling
- perf: CPU profiling on Linux
- Instruments: Profiling on macOS
- tokio-console: 实时异步运行时监控
- tracing: 结构化日志与性能分析
- cargo-flamegraph: 生成火焰图
- valgrind/heaptrack: 内存分析
- perf: Linux系统下的CPU分析
- Instruments: macOS系统下的分析工具
Best Practices
最佳实践
- Always use tokio-console in development
- Add tracing spans to critical code paths
- Use timeouts liberally to detect hangs
- Monitor task count for leaks
- Profile before optimizing - measure first
- Test with real concurrency - don't just test happy paths
- Handle cancellation gracefully in all tasks
- Use structured logging for debugging
- Avoid nested locks - prefer message passing
- Document lock ordering when necessary
- 开发环境中始终启用tokio-console
- 为关键代码路径添加tracing埋点
- 大量使用超时检测挂起
- 监控任务数以检测泄漏
- 先分析再优化 - 先测量再动手
- 使用真实并发场景测试 - 不要仅测试正常流程
- 所有任务均需优雅处理取消
- 使用结构化日志进行调试
- 避免嵌套锁 - 优先使用消息传递
- 必要时记录锁获取顺序