async-io-model

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Async I/O Model Guide

异步I/O模型指南

Turso uses cooperative yielding with explicit state machines instead of Rust async/await.
Turso使用带显式状态机的协作式让出,而非Rust的async/await。

Core Types

核心类型

rust
pub enum IOCompletions {
    Single(Completion),
}

#[must_use]
pub enum IOResult<T> {
    Done(T),      // Operation complete, here's the result
    IO(IOCompletions),  // Need I/O, call me again after completions finish
}
Functions returning
IOResult
must be called repeatedly until
Done
.
rust
pub enum IOCompletions {
    Single(Completion),
}

#[must_use]
pub enum IOResult<T> {
    Done(T),      // 操作完成,返回结果
    IO(IOCompletions),  // 需要执行I/O,完成后再次调用
}
返回
IOResult
的函数必须被重复调用,直到返回
Done

Completion and CompletionGroup

Completion与CompletionGroup

A
Completion
tracks a single I/O operation:
rust
pub struct Completion { /* ... */ }

impl Completion {
    pub fn finished(&self) -> bool;
    pub fn succeeded(&self) -> bool;
    pub fn get_error(&self) -> Option<CompletionError>;
}
To wait for multiple I/O operations, use
CompletionGroup
:
rust
let mut group = CompletionGroup::new(|_| {});

// Add individual completions
group.add(&completion1);
group.add(&completion2);

// Build into single completion that finishes when all complete
let combined = group.build();
io_yield_one!(combined);
CompletionGroup
features:
  • Aggregates multiple completions into one
  • Calls callback when all complete (or any errors)
  • Can nest groups (add a group's completion to another group)
  • Cancellable via
    group.cancel()
Completion
用于跟踪单个I/O操作:
rust
pub struct Completion { /* ... */ }

impl Completion {
    pub fn finished(&self) -> bool;
    pub fn succeeded(&self) -> bool;
    pub fn get_error(&self) -> Option<CompletionError>;
}
若要等待多个I/O操作完成,请使用
CompletionGroup
rust
let mut group = CompletionGroup::new(|_| {});

// 添加单个completion
group.add(&completion1);
group.add(&completion2);

// 构建为单个completion,所有操作完成时触发
let combined = group.build();
io_yield_one!(combined);
CompletionGroup
特性:
  • 将多个completion聚合为一个
  • 所有操作完成(或出现任何错误)时调用回调函数
  • 支持嵌套组(将一个组的completion添加到另一个组)
  • 可通过
    group.cancel()
    取消

Helper Macros

辅助宏

return_if_io!

return_if_io!

Unwraps
IOResult
, propagates IO variant up the call stack:
rust
let result = return_if_io!(some_io_operation());
// Only reaches here if operation returned Done
解析
IOResult
,将IO变体向上传播到调用栈:
rust
let result = return_if_io!(some_io_operation());
// 仅当操作返回Done时才会执行到此处

io_yield_one!

io_yield_one!

Yields a single completion:
rust
io_yield_one!(completion);  // Returns Ok(IOResult::IO(Single(completion)))
让出单个completion:
rust
io_yield_one!(completion);  // 返回Ok(IOResult::IO(Single(completion)))

State Machine Pattern

状态机模式

Operations that may yield use explicit state enums:
rust
enum MyOperationState {
    Start,
    WaitingForRead { page: PageRef },
    Processing { data: Vec<u8> },
    Done,
}
The function loops, matching on state and transitioning:
rust
fn my_operation(&mut self) -> Result<IOResult<Output>> {
    loop {
        match &mut self.state {
            MyOperationState::Start => {
                let (page, completion) = start_read();
                self.state = MyOperationState::WaitingForRead { page };
                io_yield_one!(completion);
            }
            MyOperationState::WaitingForRead { page } => {
                let data = page.get_contents();
                self.state = MyOperationState::Processing { data: data.to_vec() };
                // No yield, continue loop
            }
            MyOperationState::Processing { data } => {
                let result = process(data);
                self.state = MyOperationState::Done;
                return Ok(IOResult::Done(result));
            }
            MyOperationState::Done => unreachable!(),
        }
    }
}
可能会让出的操作使用显式状态枚举:
rust
enum MyOperationState {
    Start,
    WaitingForRead { page: PageRef },
    Processing { data: Vec<u8> },
    Done,
}
函数通过循环匹配状态并进行转换:
rust
fn my_operation(&mut self) -> Result<IOResult<Output>> {
    loop {
        match &mut self.state {
            MyOperationState::Start => {
                let (page, completion) = start_read();
                self.state = MyOperationState::WaitingForRead { page };
                io_yield_one!(completion);
            }
            MyOperationState::WaitingForRead { page } => {
                let data = page.get_contents();
                self.state = MyOperationState::Processing { data: data.to_vec() };
                // 不让出,继续循环
            }
            MyOperationState::Processing { data } => {
                let result = process(data);
                self.state = MyOperationState::Done;
                return Ok(IOResult::Done(result));
            }
            MyOperationState::Done => unreachable!(),
        }
    }
}

Re-Entrancy: The Critical Pitfall

重入:关键陷阱

State mutations before yield points cause bugs on re-entry.
在让出点之前修改状态会导致重入时出现bug。

Wrong

错误示例

rust
fn bad_example(&mut self) -> Result<IOResult<()>> {
    self.counter += 1;  // Mutates state
    return_if_io!(something_that_might_yield());  // If yields, re-entry will increment again!
    Ok(IOResult::Done(()))
}
If
something_that_might_yield()
returns
IO
, caller waits for completion, then calls
bad_example()
again.
counter
gets incremented twice (or more).
rust
fn bad_example(&mut self) -> Result<IOResult<()>> {
    self.counter += 1;  // 修改状态
    return_if_io!(something_that_might_yield());  // 如果让出,重入时会再次递增counter!
    Ok(IOResult::Done(()))
}
如果
something_that_might_yield()
返回
IO
,调用者会等待操作完成,然后再次调用
bad_example()
,导致
counter
被递增多次。

Correct: Mutate After Yield

正确做法:让出后修改状态

rust
fn good_example(&mut self) -> Result<IOResult<()>> {
    return_if_io!(something_that_might_yield());
    self.counter += 1;  // Only reached once, after IO completes
    Ok(IOResult::Done(()))
}
rust
fn good_example(&mut self) -> Result<IOResult<()>> {
    return_if_io!(something_that_might_yield());
    self.counter += 1;  // 仅在IO完成后执行一次
    Ok(IOResult::Done(()))
}

Correct: Use State Machine

正确做法:使用状态机

rust
enum State { Start, AfterIO }

fn good_example(&mut self) -> Result<IOResult<()>> {
    loop {
        match self.state {
            State::Start => {
                // Don't mutate shared state here
                self.state = State::AfterIO;
                return_if_io!(something_that_might_yield());
            }
            State::AfterIO => {
                self.counter += 1;  // Safe: only entered once
                return Ok(IOResult::Done(()));
            }
        }
    }
}
rust
enum State { Start, AfterIO }

fn good_example(&mut self) -> Result<IOResult<()>> {
    loop {
        match self.state {
            State::Start => {
                // 此处不修改共享状态
                self.state = State::AfterIO;
                return_if_io!(something_that_might_yield());
            }
            State::AfterIO => {
                self.counter += 1;  // 安全:仅进入一次
                return Ok(IOResult::Done(()));
            }
        }
    }
}

Common Re-Entrancy Bugs

常见重入bug

PatternProblem
vec.push(x); return_if_io!(...)
Vec grows on each re-entry
idx += 1; return_if_io!(...)
Index advances multiple times
map.insert(k,v); return_if_io!(...)
Duplicate inserts or overwrites
flag = true; return_if_io!(...)
Usually ok, but check logic
模式问题
vec.push(x); return_if_io!(...)
每次重入时向量都会增长
idx += 1; return_if_io!(...)
索引被多次推进
map.insert(k,v); return_if_io!(...)
重复插入或覆盖
flag = true; return_if_io!(...)
通常无问题,但需检查逻辑

State Enum Design

状态枚举设计

Encode progress in state variants:
rust
// Good: index is part of state, preserved across yields
enum ProcessState {
    Start,
    ProcessingItem { idx: usize, items: Vec<Item> },
    Done,
}

// Loop advances idx only when transitioning states
ProcessingItem { idx, items } => {
    return_if_io!(process_item(&items[idx]));
    if idx + 1 < items.len() {
        self.state = ProcessingItem { idx: idx + 1, items };
    } else {
        self.state = Done;
    }
}
在状态变体中编码进度:
rust
// 良好设计:索引作为状态的一部分,在让出期间被保留
enum ProcessState {
    Start,
    ProcessingItem { idx: usize, items: Vec<Item> },
    Done,
}

// 仅在转换状态时推进索引
ProcessingItem { idx, items } => {
    return_if_io!(process_item(&items[idx]));
    if idx + 1 < items.len() {
        self.state = ProcessingItem { idx: idx + 1, items };
    } else {
        self.state = Done;
    }
}

Turso Implementation

Turso实现

Key files:
  • core/types.rs
    -
    IOResult
    ,
    IOCompletions
    ,
    return_if_io!
    ,
    return_and_restore_if_io!
  • core/io/completions.rs
    -
    Completion
    ,
    CompletionGroup
  • core/util.rs
    -
    io_yield_one!
    macro
  • core/state_machine.rs
    - Generic
    StateMachine
    wrapper
  • core/storage/btree.rs
    - Many state machine examples
  • core/storage/pager.rs
    -
    CompletionGroup
    usage examples
关键文件:
  • core/types.rs
    -
    IOResult
    IOCompletions
    return_if_io!
    return_and_restore_if_io!
  • core/io/completions.rs
    -
    Completion
    CompletionGroup
  • core/util.rs
    -
    io_yield_one!
  • core/state_machine.rs
    - 通用
    StateMachine
    包装器
  • core/storage/btree.rs
    - 大量状态机示例
  • core/storage/pager.rs
    -
    CompletionGroup
    使用示例

Testing Async Code

异步代码测试

Re-entrancy bugs often only manifest under specific IO timing. Use:
  • Deterministic simulation (
    testing/simulator/
    )
  • Whopper concurrent DST (
    testing/concurrent-simulator/
    )
  • Fault injection to force yields at different points
重入bug通常仅在特定IO时序下显现,请使用:
  • 确定性模拟(
    testing/simulator/
  • Whopper并发DST(
    testing/concurrent-simulator/
  • 故障注入,强制在不同点让出

References

参考资料

  • docs/manual.md
    section on I/O
  • docs/manual.md
    中关于I/O的章节