tokio-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Tokio Patterns

Tokio 模式

This skill provides common patterns and idioms for building robust async applications with Tokio.
本内容介绍了使用Tokio构建可靠异步应用的常用模式与惯用法。

Worker Pool Pattern

工作池模式

Limit concurrent task execution using a semaphore:
rust
use tokio::sync::Semaphore;
use std::sync::Arc;

pub struct WorkerPool {
    semaphore: Arc<Semaphore>,
}

impl WorkerPool {
    pub fn new(size: usize) -> Self {
        Self {
            semaphore: Arc::new(Semaphore::new(size)),
        }
    }

    pub async fn execute<F, T>(&self, f: F) -> T
    where
        F: Future<Output = T>,
    {
        let _permit = self.semaphore.acquire().await.unwrap();
        f.await
    }
}

// Usage
let pool = WorkerPool::new(10);
let results = futures::future::join_all(
    (0..100).map(|i| pool.execute(process_item(i)))
).await;
使用信号量限制并发任务执行:
rust
use tokio::sync::Semaphore;
use std::sync::Arc;

pub struct WorkerPool {
    semaphore: Arc<Semaphore>,
}

impl WorkerPool {
    pub fn new(size: usize) -> Self {
        Self {
            semaphore: Arc::new(Semaphore::new(size)),
        }
    }

    pub async fn execute<F, T>(&self, f: F) -> T
    where
        F: Future<Output = T>,
    {
        let _permit = self.semaphore.acquire().await.unwrap();
        f.await
    }
}

// Usage
let pool = WorkerPool::new(10);
let results = futures::future::join_all(
    (0..100).map(|i| pool.execute(process_item(i)))
).await;

Request-Response Pattern

请求-响应模式

Use oneshot channels for request-response communication:
rust
use tokio::sync::{mpsc, oneshot};

pub enum Command {
    Get { key: String, respond_to: oneshot::Sender<Option<String>> },
    Set { key: String, value: String },
}

pub async fn actor(mut rx: mpsc::Receiver<Command>) {
    let mut store = HashMap::new();

    while let Some(cmd) = rx.recv().await {
        match cmd {
            Command::Get { key, respond_to } => {
                let value = store.get(&key).cloned();
                let _ = respond_to.send(value);
            }
            Command::Set { key, value } => {
                store.insert(key, value);
            }
        }
    }
}

// Client usage
let (tx, rx) = mpsc::channel(32);
tokio::spawn(actor(rx));

let (respond_to, response) = oneshot::channel();
tx.send(Command::Get { key: "foo".into(), respond_to }).await.unwrap();
let value = response.await.unwrap();
使用oneshot通道实现请求-响应通信:
rust
use tokio::sync::{mpsc, oneshot};

pub enum Command {
    Get { key: String, respond_to: oneshot::Sender<Option<String>> },
    Set { key: String, value: String },
}

pub async fn actor(mut rx: mpsc::Receiver<Command>) {
    let mut store = HashMap::new();

    while let Some(cmd) = rx.recv().await {
        match cmd {
            Command::Get { key, respond_to } => {
                let value = store.get(&key).cloned();
                let _ = respond_to.send(value);
            }
            Command::Set { key, value } => {
                store.insert(key, value);
            }
        }
    }
}

// Client usage
let (tx, rx) = mpsc::channel(32);
tokio::spawn(actor(rx));

let (respond_to, response) = oneshot::channel();
tx.send(Command::Get { key: "foo".into(), respond_to }).await.unwrap();
let value = response.await.unwrap();

Pub/Sub with Channels

基于通道的发布/订阅

Use broadcast channels for pub/sub messaging:
rust
use tokio::sync::broadcast;

pub struct PubSub<T: Clone> {
    tx: broadcast::Sender<T>,
}

impl<T: Clone> PubSub<T> {
    pub fn new(capacity: usize) -> Self {
        let (tx, _) = broadcast::channel(capacity);
        Self { tx }
    }

    pub fn subscribe(&self) -> broadcast::Receiver<T> {
        self.tx.subscribe()
    }

    pub fn publish(&self, message: T) -> Result<usize, broadcast::error::SendError<T>> {
        self.tx.send(message)
    }
}

// Usage
let pubsub = PubSub::new(100);

// Subscriber 1
let mut rx1 = pubsub.subscribe();
tokio::spawn(async move {
    while let Ok(msg) = rx1.recv().await {
        println!("Subscriber 1: {:?}", msg);
    }
});

// Subscriber 2
let mut rx2 = pubsub.subscribe();
tokio::spawn(async move {
    while let Ok(msg) = rx2.recv().await {
        println!("Subscriber 2: {:?}", msg);
    }
});

// Publisher
pubsub.publish("Hello".to_string()).unwrap();
使用广播通道实现发布/订阅消息传递:
rust
use tokio::sync::broadcast;

pub struct PubSub<T: Clone> {
    tx: broadcast::Sender<T>,
}

impl<T: Clone> PubSub<T> {
    pub fn new(capacity: usize) -> Self {
        let (tx, _) = broadcast::channel(capacity);
        Self { tx }
    }

    pub fn subscribe(&self) -> broadcast::Receiver<T> {
        self.tx.subscribe()
    }

    pub fn publish(&self, message: T) -> Result<usize, broadcast::error::SendError<T>> {
        self.tx.send(message)
    }
}

// Usage
let pubsub = PubSub::new(100);

// Subscriber 1
let mut rx1 = pubsub.subscribe();
tokio::spawn(async move {
    while let Ok(msg) = rx1.recv().await {
        println!("Subscriber 1: {:?}", msg);
    }
});

// Subscriber 2
let mut rx2 = pubsub.subscribe();
tokio::spawn(async move {
    while let Ok(msg) = rx2.recv().await {
        println!("Subscriber 2: {:?}", msg);
    }
});

// Publisher
pubsub.publish("Hello".to_string()).unwrap();

Timeout Pattern

超时模式

Wrap operations with timeouts:
rust
use tokio::time::{timeout, Duration};

pub async fn with_timeout<F, T>(duration: Duration, future: F) -> Result<T, TimeoutError>
where
    F: Future<Output = Result<T, Error>>,
{
    match timeout(duration, future).await {
        Ok(Ok(result)) => Ok(result),
        Ok(Err(e)) => Err(TimeoutError::Inner(e)),
        Err(_) => Err(TimeoutError::Elapsed),
    }
}

// Usage
let result = with_timeout(
    Duration::from_secs(5),
    fetch_data()
).await?;
为操作添加超时包装:
rust
use tokio::time::{timeout, Duration};

pub async fn with_timeout<F, T>(duration: Duration, future: F) -> Result<T, TimeoutError>
where
    F: Future<Output = Result<T, Error>>,
{
    match timeout(duration, future).await {
        Ok(Ok(result)) => Ok(result),
        Ok(Err(e)) => Err(TimeoutError::Inner(e)),
        Err(_) => Err(TimeoutError::Elapsed),
    }
}

// Usage
let result = with_timeout(
    Duration::from_secs(5),
    fetch_data()
).await?;

Retry with Exponential Backoff

指数退避重试

Retry failed operations with backoff:
rust
use tokio::time::{sleep, Duration};

pub async fn retry_with_backoff<F, T, E>(
    mut operation: F,
    max_retries: u32,
    initial_backoff: Duration,
) -> Result<T, E>
where
    F: FnMut() -> Pin<Box<dyn Future<Output = Result<T, E>>>>,
{
    let mut retries = 0;
    let mut backoff = initial_backoff;

    loop {
        match operation().await {
            Ok(result) => return Ok(result),
            Err(e) if retries < max_retries => {
                retries += 1;
                sleep(backoff).await;
                backoff *= 2; // Exponential backoff
            }
            Err(e) => return Err(e),
        }
    }
}

// Usage
let result = retry_with_backoff(
    || Box::pin(fetch_data()),
    3,
    Duration::from_millis(100)
).await?;
使用退避策略重试失败的操作:
rust
use tokio::time::{sleep, Duration};

pub async fn retry_with_backoff<F, T, E>(
    mut operation: F,
    max_retries: u32,
    initial_backoff: Duration,
) -> Result<T, E>
where
    F: FnMut() -> Pin<Box<dyn Future<Output = Result<T, E>>>>,
{
    let mut retries = 0;
    let mut backoff = initial_backoff;

    loop {
        match operation().await {
            Ok(result) => return Ok(result),
            Err(e) if retries < max_retries => {
                retries += 1;
                sleep(backoff).await;
                backoff *= 2; // Exponential backoff
            }
            Err(e) => return Err(e),
        }
    }
}

// Usage
let result = retry_with_backoff(
    || Box::pin(fetch_data()),
    3,
    Duration::from_millis(100)
).await?;

Graceful Shutdown

优雅关机

Coordinate graceful shutdown across components:
rust
use tokio::sync::broadcast;
use tokio::select;

pub struct ShutdownCoordinator {
    tx: broadcast::Sender<()>,
}

impl ShutdownCoordinator {
    pub fn new() -> Self {
        let (tx, _) = broadcast::channel(1);
        Self { tx }
    }

    pub fn subscribe(&self) -> broadcast::Receiver<()> {
        self.tx.subscribe()
    }

    pub fn shutdown(&self) {
        let _ = self.tx.send(());
    }
}

// Worker pattern
pub async fn worker(mut shutdown: broadcast::Receiver<()>) {
    loop {
        select! {
            _ = shutdown.recv() => {
                // Cleanup
                break;
            }
            result = do_work() => {
                // Process result
            }
        }
    }
}

// Main
let coordinator = ShutdownCoordinator::new();

let shutdown_rx1 = coordinator.subscribe();
let h1 = tokio::spawn(worker(shutdown_rx1));

let shutdown_rx2 = coordinator.subscribe();
let h2 = tokio::spawn(worker(shutdown_rx2));

// Wait for signal
tokio::signal::ctrl_c().await.unwrap();
coordinator.shutdown();

// Wait for workers
let _ = tokio::join!(h1, h2);
协调各个组件实现优雅关机:
rust
use tokio::sync::broadcast;
use tokio::select;

pub struct ShutdownCoordinator {
    tx: broadcast::Sender<()>,
}

impl ShutdownCoordinator {
    pub fn new() -> Self {
        let (tx, _) = broadcast::channel(1);
        Self { tx }
    }

    pub fn subscribe(&self) -> broadcast::Receiver<()> {
        self.tx.subscribe()
    }

    pub fn shutdown(&self) {
        let _ = self.tx.send(());
    }
}

// Worker pattern
pub async fn worker(mut shutdown: broadcast::Receiver<()>) {
    loop {
        select! {
            _ = shutdown.recv() => {
                // Cleanup
                break;
            }
            result = do_work() => {
                // Process result
            }
        }
    }
}

// Main
let coordinator = ShutdownCoordinator::new();

let shutdown_rx1 = coordinator.subscribe();
let h1 = tokio::spawn(worker(shutdown_rx1));

let shutdown_rx2 = coordinator.subscribe();
let h2 = tokio::spawn(worker(shutdown_rx2));

// Wait for signal
tokio::signal::ctrl_c().await.unwrap();
coordinator.shutdown();

// Wait for workers
let _ = tokio::join!(h1, h2);

Async Initialization

异步初始化

Lazy async initialization with
OnceCell
:
rust
use tokio::sync::OnceCell;

pub struct Service {
    connection: OnceCell<Connection>,
}

impl Service {
    pub fn new() -> Self {
        Self {
            connection: OnceCell::new(),
        }
    }

    async fn get_connection(&self) -> &Connection {
        self.connection
            .get_or_init(|| async {
                Connection::connect().await.unwrap()
            })
            .await
    }

    pub async fn query(&self, sql: &str) -> Result<Vec<Row>> {
        let conn = self.get_connection().await;
        conn.query(sql).await
    }
}
使用
OnceCell
实现延迟异步初始化:
rust
use tokio::sync::OnceCell;

pub struct Service {
    connection: OnceCell<Connection>,
}

impl Service {
    pub fn new() -> Self {
        Self {
            connection: OnceCell::new(),
        }
    }

    async fn get_connection(&self) -> &Connection {
        self.connection
            .get_or_init(|| async {
                Connection::connect().await.unwrap()
            })
            .await
    }

    pub async fn query(&self, sql: &str) -> Result<Vec<Row>> {
        let conn = self.get_connection().await;
        conn.query(sql).await
    }
}

Resource Cleanup with Drop

使用Drop进行资源清理

Ensure cleanup even on task cancellation:
rust
pub struct Resource {
    handle: SomeHandle,
}

impl Resource {
    pub async fn new() -> Self {
        Self {
            handle: acquire_resource().await,
        }
    }

    pub async fn use_resource(&self) -> Result<()> {
        // Use the resource
        Ok(())
    }
}

impl Drop for Resource {
    fn drop(&mut self) {
        // Synchronous cleanup
        // For async cleanup, use a separate shutdown method
        self.handle.close();
    }
}

// For async cleanup
impl Resource {
    pub async fn shutdown(self) {
        // Async cleanup
        self.handle.close_async().await;
    }
}
确保即使任务被取消也能清理资源:
rust
pub struct Resource {
    handle: SomeHandle,
}

impl Resource {
    pub async fn new() -> Self {
        Self {
            handle: acquire_resource().await,
        }
    }

    pub async fn use_resource(&self) -> Result<()> {
        // Use the resource
        Ok(())
    }
}

impl Drop for Resource {
    fn drop(&mut self) {
        // Synchronous cleanup
        // For async cleanup, use a separate shutdown method
        self.handle.close();
    }
}

// For async cleanup
impl Resource {
    pub async fn shutdown(self) {
        // Async cleanup
        self.handle.close_async().await;
    }
}

Select Multiple Futures

选择多个Future

Use
select!
to race multiple operations:
rust
use tokio::select;

pub async fn select_example() {
    let mut rx1 = channel1();
    let mut rx2 = channel2();

    loop {
        select! {
            msg = rx1.recv() => {
                if let Some(msg) = msg {
                    handle_channel1(msg).await;
                } else {
                    break;
                }
            }
            msg = rx2.recv() => {
                if let Some(msg) = msg {
                    handle_channel2(msg).await;
                } else {
                    break;
                }
            }
            _ = tokio::time::sleep(Duration::from_secs(60)) => {
                check_timeout().await;
            }
        }
    }
}
使用
select!
实现多个操作的竞争:
rust
use tokio::select;

pub async fn select_example() {
    let mut rx1 = channel1();
    let mut rx2 = channel2();

    loop {
        select! {
            msg = rx1.recv() => {
                if let Some(msg) = msg {
                    handle_channel1(msg).await;
                } else {
                    break;
                }
            }
            msg = rx2.recv() => {
                if let Some(msg) = msg {
                    handle_channel2(msg).await;
                } else {
                    break;
                }
            }
            _ = tokio::time::sleep(Duration::from_secs(60)) => {
                check_timeout().await;
            }
        }
    }
}

Cancellation Token Pattern

取消令牌模式

Use
tokio_util::sync::CancellationToken
for cooperative cancellation:
rust
use tokio_util::sync::CancellationToken;

pub async fn worker(token: CancellationToken) {
    loop {
        tokio::select! {
            _ = token.cancelled() => {
                // Cleanup
                break;
            }
            _ = do_work() => {
                // Continue
            }
        }
    }
}

// Hierarchical cancellation
let parent_token = CancellationToken::new();
let child_token = parent_token.child_token();

tokio::spawn(worker(child_token));

// Cancel all
parent_token.cancel();
使用
tokio_util::sync::CancellationToken
实现协作式取消:
rust
use tokio_util::sync::CancellationToken;

pub async fn worker(token: CancellationToken) {
    loop {
        tokio::select! {
            _ = token.cancelled() => {
                // Cleanup
                break;
            }
            _ = do_work() => {
                // Continue
            }
        }
    }
}

// Hierarchical cancellation
let parent_token = CancellationToken::new();
let child_token = parent_token.child_token();

tokio::spawn(worker(child_token));

// Cancel all
parent_token.cancel();

Best Practices

最佳实践

  1. Use semaphores for limiting concurrent operations
  2. Implement graceful shutdown in all long-running tasks
  3. Add timeouts to external operations
  4. Use channels for inter-task communication
  5. Handle cancellation properly in all tasks
  6. Clean up resources in Drop or explicit shutdown methods
  7. Use appropriate channel types for different patterns
  8. Implement retries for transient failures
  9. Use select! for coordinating multiple async operations
  10. Document lifetime and ownership patterns clearly
  1. 使用信号量限制并发操作
  2. 实现优雅关机在所有长期运行的任务中
  3. 为外部操作添加超时
  4. 使用通道进行任务间通信
  5. 妥善处理取消在所有任务中
  6. 在Drop或显式关机方法中清理资源
  7. 为不同模式选择合适的通道类型
  8. 为瞬时故障实现重试
  9. **使用select!**协调多个异步操作
  10. 清晰记录生命周期和所有权模式