rust-async-patterns

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Rust Async Patterns

Rust 异步编程模式

Master asynchronous programming in Rust using async/await syntax, tokio runtime, and the futures ecosystem for concurrent I/O operations.
掌握使用async/await语法、tokio运行时和futures生态系统在Rust中进行并发I/O操作的异步编程方法。

Async/Await Basics

Async/Await 基础

Basic async function:
rust
async fn fetch_data() -> String {
    String::from("data")
}

#[tokio::main]
async fn main() {
    let data = fetch_data().await;
    println!("{}", data);
}
Cargo.toml setup:
toml
[dependencies]
tokio = { version = "1", features = ["full"] }
基础异步函数:
rust
async fn fetch_data() -> String {
    String::from("data")
}

#[tokio::main]
async fn main() {
    let data = fetch_data().await;
    println!("{}", data);
}
Cargo.toml 配置:
toml
[dependencies]
tokio = { version = "1", features = ["full"] }

Tokio Runtime

Tokio 运行时

Different runtime configurations:
rust
// Multi-threaded runtime (default)
#[tokio::main]
async fn main() {
    // Code here
}

// Single-threaded runtime
#[tokio::main(flavor = "current_thread")]
async fn main() {
    // Code here
}

// Manual runtime creation
use tokio::runtime::Runtime;

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        println!("Running async code");
    });
}
不同的运行时配置:
rust
// 多线程运行时(默认)
#[tokio::main]
async fn main() {
    // 代码写在这里
}

// 单线程运行时
#[tokio::main(flavor = "current_thread")]
async fn main() {
    // 代码写在这里
}

// 手动创建运行时
use tokio::runtime::Runtime;

fn main() {
    let rt = Runtime::new().unwrap();
    rt.block_on(async {
        println!("Running async code");
    });
}

Spawning Tasks

生成任务

Creating concurrent tasks:
rust
use tokio::task;

#[tokio::main]
async fn main() {
    let task1 = task::spawn(async {
        println!("Task 1");
        42
    });

    let task2 = task::spawn(async {
        println!("Task 2");
        100
    });

    let result1 = task1.await.unwrap();
    let result2 = task2.await.unwrap();

    println!("Results: {}, {}", result1, result2);
}
Spawning with move:
rust
#[tokio::main]
async fn main() {
    let data = String::from("hello");

    let handle = task::spawn(async move {
        println!("{}", data);
    });

    handle.await.unwrap();
}
创建并发任务:
rust
use tokio::task;

#[tokio::main]
async fn main() {
    let task1 = task::spawn(async {
        println!("Task 1");
        42
    });

    let task2 = task::spawn(async {
        println!("Task 2");
        100
    });

    let result1 = task1.await.unwrap();
    let result2 = task2.await.unwrap();

    println!("Results: {}, {}", result1, result2);
}
使用move生成任务:
rust
#[tokio::main]
async fn main() {
    let data = String::from("hello");

    let handle = task::spawn(async move {
        println!("{}", data);
    });

    handle.await.unwrap();
}

Async HTTP with reqwest

基于Reqwest的异步HTTP请求

Install reqwest:
toml
[dependencies]
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
Making HTTP requests:
rust
use reqwest;

#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
    let response = reqwest::get("https://api.github.com/users/rust-lang")
        .await?
        .text()
        .await?;

    println!("{}", response);
    Ok(())
}
Concurrent requests:
rust
use reqwest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let urls = vec![
        "https://api.github.com/users/rust-lang",
        "https://api.github.com/users/tokio-rs",
    ];

    let mut handles = vec![];

    for url in urls {
        let handle = tokio::spawn(async move {
            reqwest::get(url).await?.text().await
        });
        handles.push(handle);
    }

    for handle in handles {
        let response = handle.await??;
        println!("{}", response);
    }

    Ok(())
}
安装reqwest:
toml
[dependencies]
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
发起HTTP请求:
rust
use reqwest;

#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
    let response = reqwest::get("https://api.github.com/users/rust-lang")
        .await?
        .text()
        .await?;

    println!("{}", response);
    Ok(())
}
并发请求:
rust
use reqwest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let urls = vec![
        "https://api.github.com/users/rust-lang",
        "https://api.github.com/users/tokio-rs",
    ];

    let mut handles = vec![];

    for url in urls {
        let handle = tokio::spawn(async move {
            reqwest::get(url).await?.text().await
        });
        handles.push(handle);
    }

    for handle in handles {
        let response = handle.await??;
        println!("{}", response);
    }

    Ok(())
}

Select and Join

Select与Join

tokio::select! for racing futures:
rust
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    tokio::select! {
        _ = sleep(Duration::from_secs(1)) => {
            println!("Timer finished first");
        }
        _ = async_operation() => {
            println!("Operation finished first");
        }
    }
}

async fn async_operation() {
    sleep(Duration::from_secs(2)).await;
}
tokio::join! for concurrent execution:
rust
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (r1, r2, r3) = tokio::join!(
        async { sleep(Duration::from_secs(1)).await; 1 },
        async { sleep(Duration::from_secs(1)).await; 2 },
        async { sleep(Duration::from_secs(1)).await; 3 },
    );

    println!("Results: {}, {}, {}", r1, r2, r3);
}
使用tokio::select!实现Future竞速:
rust
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    tokio::select! {
        _ = sleep(Duration::from_secs(1)) => {
            println!("Timer finished first");
        }
        _ = async_operation() => {
            println!("Operation finished first");
        }
    }
}

async fn async_operation() {
    sleep(Duration::from_secs(2)).await;
}
使用tokio::join!实现并发执行:
rust
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (r1, r2, r3) = tokio::join!(
        async { sleep(Duration::from_secs(1)).await; 1 },
        async { sleep(Duration::from_secs(1)).await; 2 },
        async { sleep(Duration::from_secs(1)).await; 3 },
    );

    println!("Results: {}, {}, {}", r1, r2, r3);
}

Channels

通道

mpsc channel for message passing:
rust
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    tokio::spawn(async move {
        for i in 0..10 {
            tx.send(i).await.unwrap();
        }
    });

    while let Some(value) = rx.recv().await {
        println!("Received: {}", value);
    }
}
Multiple producers:
rust
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    for i in 0..3 {
        let tx = tx.clone();
        tokio::spawn(async move {
            tx.send(format!("Message from {}", i)).await.unwrap();
        });
    }

    drop(tx); // Close channel when all senders dropped

    while let Some(msg) = rx.recv().await {
        println!("{}", msg);
    }
}
oneshot channel:
rust
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tx.send("result").unwrap();
    });

    let result = rx.await.unwrap();
    println!("{}", result);
}
用于消息传递的mpsc通道:
rust
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    tokio::spawn(async move {
        for i in 0..10 {
            tx.send(i).await.unwrap();
        }
    });

    while let Some(value) = rx.recv().await {
        println!("Received: {}", value);
    }
}
多个生产者:
rust
use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    for i in 0..3 {
        let tx = tx.clone();
        tokio::spawn(async move {
            tx.send(format!("Message from {}", i)).await.unwrap();
        });
    }

    drop(tx); // 当所有发送者都被销毁时关闭通道

    while let Some(msg) = rx.recv().await {
        println!("{}", msg);
    }
}
oneshot通道:
rust
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tx.send("result").unwrap();
    });

    let result = rx.await.unwrap();
    println!("{}", result);
}

Synchronization Primitives

同步原语

Mutex for shared state:
rust
use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = tokio::spawn(async move {
            let mut num = counter.lock().await;
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await.unwrap();
    }

    println!("Result: {}", *counter.lock().await);
}
RwLock for read-write access:
rust
use tokio::sync::RwLock;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let data = Arc::new(RwLock::new(vec![1, 2, 3]));

    // Multiple readers
    let data1 = Arc::clone(&data);
    let data2 = Arc::clone(&data);

    let reader1 = tokio::spawn(async move {
        let d = data1.read().await;
        println!("Reader 1: {:?}", *d);
    });

    let reader2 = tokio::spawn(async move {
        let d = data2.read().await;
        println!("Reader 2: {:?}", *d);
    });

    // One writer
    let data3 = Arc::clone(&data);
    let writer = tokio::spawn(async move {
        let mut d = data3.write().await;
        d.push(4);
    });

    reader1.await.unwrap();
    reader2.await.unwrap();
    writer.await.unwrap();
}
Semaphore for limiting concurrency:
rust
use tokio::sync::Semaphore;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(3));
    let mut handles = vec![];

    for i in 0..10 {
        let permit = semaphore.clone();
        let handle = tokio::spawn(async move {
            let _permit = permit.acquire().await.unwrap();
            println!("Task {} acquired permit", i);
            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
            println!("Task {} releasing permit", i);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await.unwrap();
    }
}
用于共享状态的Mutex:
rust
use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = tokio::spawn(async move {
            let mut num = counter.lock().await;
            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await.unwrap();
    }

    println!("Result: {}", *counter.lock().await);
}
用于读写访问的RwLock:
rust
use tokio::sync::RwLock;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let data = Arc::new(RwLock::new(vec![1, 2, 3]));

    // 多个读取者
    let data1 = Arc::clone(&data);
    let data2 = Arc::clone(&data);

    let reader1 = tokio::spawn(async move {
        let d = data1.read().await;
        println!("Reader 1: {:?}", *d);
    });

    let reader2 = tokio::spawn(async move {
        let d = data2.read().await;
        println!("Reader 2: {:?}", *d);
    });

    // 一个写入者
    let data3 = Arc::clone(&data);
    let writer = tokio::spawn(async move {
        let mut d = data3.write().await;
        d.push(4);
    });

    reader1.await.unwrap();
    reader2.await.unwrap();
    writer.await.unwrap();
}
用于限制并发的Semaphore:
rust
use tokio::sync::Semaphore;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(3));
    let mut handles = vec![];

    for i in 0..10 {
        let permit = semaphore.clone();
        let handle = tokio::spawn(async move {
            let _permit = permit.acquire().await.unwrap();
            println!("Task {} acquired permit", i);
            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
            println!("Task {} releasing permit", i);
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await.unwrap();
    }
}

Streams

Using async streams:
rust
use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(vec![1, 2, 3, 4, 5]);

    while let Some(value) = stream.next().await {
        println!("{}", value);
    }
}
Creating custom streams:
rust
use tokio_stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};

struct Counter {
    count: usize,
    max: usize,
}

impl Stream for Counter {
    type Item = usize;

    fn poll_next(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        if self.count < self.max {
            let current = self.count;
            self.count += 1;
            Poll::Ready(Some(current))
        } else {
            Poll::Ready(None)
        }
    }
}

#[tokio::main]
async fn main() {
    let mut counter = Counter { count: 0, max: 5 };

    while let Some(value) = counter.next().await {
        println!("{}", value);
    }
}
使用异步流:
rust
use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(vec![1, 2, 3, 4, 5]);

    while let Some(value) = stream.next().await {
        println!("{}", value);
    }
}
创建自定义流:
rust
use tokio_stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};

struct Counter {
    count: usize,
    max: usize,
}

impl Stream for Counter {
    type Item = usize;

    fn poll_next(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        if self.count < self.max {
            let current = self.count;
            self.count += 1;
            Poll::Ready(Some(current))
        } else {
            Poll::Ready(None)
        }
    }
}

#[tokio::main]
async fn main() {
    let mut counter = Counter { count: 0, max: 5 };

    while let Some(value) = counter.next().await {
        println!("{}", value);
    }
}

Timeouts and Intervals

超时与间隔

Using timeout:
rust
use tokio::time::{timeout, Duration};

async fn slow_operation() -> String {
    tokio::time::sleep(Duration::from_secs(5)).await;
    String::from("done")
}

#[tokio::main]
async fn main() {
    match timeout(Duration::from_secs(2), slow_operation()).await {
        Ok(result) => println!("Success: {}", result),
        Err(_) => println!("Operation timed out"),
    }
}
Using intervals:
rust
use tokio::time::{interval, Duration};

#[tokio::main]
async fn main() {
    let mut interval = interval(Duration::from_secs(1));

    for _ in 0..5 {
        interval.tick().await;
        println!("Tick");
    }
}
使用timeout:
rust
use tokio::time::{timeout, Duration};

async fn slow_operation() -> String {
    tokio::time::sleep(Duration::from_secs(5)).await;
    String::from("done")
}

#[tokio::main]
async fn main() {
    match timeout(Duration::from_secs(2), slow_operation()).await {
        Ok(result) => println!("Success: {}", result),
        Err(_) => println!("Operation timed out"),
    }
}
使用interval:
rust
use tokio::time::{interval, Duration};

#[tokio::main]
async fn main() {
    let mut interval = interval(Duration::from_secs(1));

    for _ in 0..5 {
        interval.tick().await;
        println!("Tick");
    }
}

Error Handling

错误处理

Propagating errors with ?:
rust
use reqwest;

async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
    let response = reqwest::get(url).await?;
    let body = response.text().await?;
    Ok(body)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let content = fetch_url("https://example.com").await?;
    println!("{}", content);
    Ok(())
}
使用?传播错误:
rust
use reqwest;

async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
    let response = reqwest::get(url).await?;
    let body = response.text().await?;
    Ok(body)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let content = fetch_url("https://example.com").await?;
    println!("{}", content);
    Ok(())
}

Blocking Operations

阻塞操作

Running CPU-intensive tasks:
rust
use tokio::task;

fn blocking_operation() -> u64 {
    // CPU-intensive work
    (0..1_000_000).sum()
}

#[tokio::main]
async fn main() {
    let result = task::spawn_blocking(|| {
        blocking_operation()
    }).await.unwrap();

    println!("Result: {}", result);
}
运行CPU密集型任务:
rust
use tokio::task;

fn blocking_operation() -> u64 {
    // CPU密集型工作
    (0..1_000_000).sum()
}

#[tokio::main]
async fn main() {
    let result = task::spawn_blocking(|| {
        blocking_operation()
    }).await.unwrap();

    println!("Result: {}", result);
}

Async Traits

异步Trait

Using async-trait crate:
toml
[dependencies]
async-trait = "0.1"
rust
use async_trait::async_trait;

#[async_trait]
trait Repository {
    async fn find(&self, id: u64) -> Option<String>;
    async fn save(&self, data: String) -> Result<(), String>;
}

struct UserRepository;

#[async_trait]
impl Repository for UserRepository {
    async fn find(&self, id: u64) -> Option<String> {
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        Some(format!("User {}", id))
    }

    async fn save(&self, data: String) -> Result<(), String> {
        println!("Saving: {}", data);
        Ok(())
    }
}
使用async-trait crate:
toml
[dependencies]
async-trait = "0.1"
rust
use async_trait::async_trait;

#[async_trait]
trait Repository {
    async fn find(&self, id: u64) -> Option<String>;
    async fn save(&self, data: String) -> Result<(), String>;
}

struct UserRepository;

#[async_trait]
impl Repository for UserRepository {
    async fn find(&self, id: u64) -> Option<String> {
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
        Some(format!("User {}", id))
    }

    async fn save(&self, data: String) -> Result<(), String> {
        println!("Saving: {}", data);
        Ok(())
    }
}

When to Use This Skill

何时使用该技能

Use rust-async-patterns when you need to:
  • Build async web servers or clients
  • Handle concurrent I/O operations efficiently
  • Make multiple HTTP requests concurrently
  • Implement producer-consumer patterns
  • Work with async streams of data
  • Manage shared state across async tasks
  • Control concurrency limits
  • Handle timeouts and cancellation
  • Build event-driven systems
  • Process data asynchronously
当你需要以下场景时,可以使用rust-async-patterns:
  • 构建异步Web服务器或客户端
  • 高效处理并发I/O操作
  • 并发发起多个HTTP请求
  • 实现生产者-消费者模式
  • 处理异步数据流
  • 管理异步任务间的共享状态
  • 控制并发限制
  • 处理超时和取消操作
  • 构建事件驱动系统
  • 异步处理数据

Best Practices

最佳实践

  • Use tokio::spawn for CPU-independent tasks
  • Use spawn_blocking for CPU-intensive work
  • Prefer channels over shared state with locks
  • Use select! for racing futures
  • Use join! for concurrent independent operations
  • Set appropriate timeouts for network operations
  • Use Semaphore to limit concurrent operations
  • Avoid holding locks across await points
  • Use Arc for shared ownership in async context
  • Handle errors properly with Result
  • 对于CPU无关的任务,使用tokio::spawn
  • 对于CPU密集型工作,使用spawn_blocking
  • 优先使用通道而非带锁的共享状态
  • 使用select!实现Future竞速
  • 使用join!执行独立的并发操作
  • 为网络操作设置合适的超时时间
  • 使用Semaphore限制并发操作数
  • 避免在await点持有锁
  • 在异步上下文中使用Arc实现共享所有权
  • 使用Result正确处理错误

Common Pitfalls

常见陷阱

  • Holding std::sync::Mutex across await (use tokio::sync::Mutex)
  • Not using move with closures in spawn
  • Forgetting to await futures
  • Blocking the runtime with CPU-intensive work
  • Creating too many tasks without limits
  • Not handling cancellation properly
  • Using the wrong channel type
  • Deadlocks with improper lock ordering
  • Not configuring runtime appropriately
  • Ignoring errors in spawned tasks
  • 在await点持有std::sync::Mutex(应使用tokio::sync::Mutex)
  • 在spawn中使用闭包时忘记使用move
  • 忘记await Future
  • 用CPU密集型工作阻塞运行时
  • 无限制创建过多任务
  • 未正确处理取消操作
  • 使用错误的通道类型
  • 因锁顺序不当导致死锁
  • 未正确配置运行时
  • 忽略生成任务中的错误

Resources

参考资源