cdc

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

CDC (Change Data Capture) - Internal Feature Map

CDC (Change Data Capture) - 内部特性映射

Overview

概述

CDC tracks INSERT/UPDATE/DELETE changes on database tables by writing change records into a dedicated CDC table (
turso_cdc
by default). It is per-connection, enabled via PRAGMA, and operates at the bytecode generation (translate) layer. The sync engine consumes CDC records to push local changes to the remote.
CDC通过将变更记录写入专用的CDC表(默认名为
turso_cdc
),跟踪数据库表上的INSERT/UPDATE/DELETE变更。它基于连接级配置,通过PRAGMA指令启用,在字节码生成(translate)层运行。同步引擎会消费CDC记录,将本地变更推送到远程端。

Architecture Diagram

架构图

User SQL (INSERT/UPDATE/DELETE/DDL)
        |
        v
  ┌─────────────────────────────────────────────────┐
  │  Translate layer (core/translate/)              │
  │  ┌───────────────────────────────────────────┐  │
  │  │ prepare_cdc_if_necessary()                │  │
  │  │   - checks CaptureDataChangesInfo         │  │
  │  │   - opens CDC table cursor (OpenWrite)    │  │
  │  │   - skips if target == CDC table itself   │  │
  │  └───────────────────────────────────────────┘  │
  │  ┌───────────────────────────────────────────┐  │
  │  │ emit_cdc_insns()                          │  │
  │  │   - writes (change_id, change_time,       │  │
  │  │     change_type, table_name, id,          │  │
  │  │     before, after, updates) into CDC tbl  │  │
  │  └───────────────────────────────────────────┘  │
  │  + emit_cdc_full_record() / emit_cdc_patch_record() │
  └─────────────────────────────────────────────────┘
        |
        v
  CDC table (turso_cdc or custom name)
        |
        v
  ┌─────────────────────────────────────────────────┐
  │  Sync engine (sync/engine/)                     │
  │  DatabaseTape reads CDC table → DatabaseChange  │
  │  → apply/revert → push to remote                 │
  └─────────────────────────────────────────────────┘
用户SQL (INSERT/UPDATE/DELETE/DDL)
        |
        v
  ┌─────────────────────────────────────────────────┐
  │  Translate层 (core/translate/)                  │
  │  ┌───────────────────────────────────────────┐  │
  │  │ prepare_cdc_if_necessary()                │  │
  │  │   - 检查CaptureDataChangesInfo             │  │
  │  │   - 打开CDC表游标 (OpenWrite)              │  │
  │  │   - 若目标为CDC表本身则跳过                │  │
  │  └───────────────────────────────────────────┘  │
  │  ┌───────────────────────────────────────────┐  │
  │  │ emit_cdc_insns()                          │  │
  │  │   - 将(change_id, change_time,             │  │
  │  │     change_type, table_name, id,          │  │
  │  │     before, after, updates)写入CDC表       │  │
  │  └───────────────────────────────────────────┘  │
  │  + emit_cdc_full_record() / emit_cdc_patch_record() │
  └─────────────────────────────────────────────────┘
        |
        v
  CDC表 (turso_cdc 或自定义名称)
        |
        v
  ┌─────────────────────────────────────────────────┐
  │  同步引擎 (sync/engine/)                       │
  │  DatabaseTape读取CDC表 → DatabaseChange        │
  │  → 应用/回滚 → 推送到远程端                     │
  └─────────────────────────────────────────────────┘

Core Data Types

核心数据类型

CaptureDataChangesMode
+
CaptureDataChangesInfo
core/lib.rs

CaptureDataChangesMode
+
CaptureDataChangesInfo
core/lib.rs

CDC behavior is controlled by two types:
rust
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u8)]
enum CdcVersion {
    V1 = 1,
    V2 = 2,
}

const CDC_VERSION_CURRENT: CdcVersion = CdcVersion::V2;

enum CaptureDataChangesMode {
    Id,          // capture only rowid
    Before,      // capture before-image
    After,       // capture after-image
    Full,        // before + after + updates
}

struct CaptureDataChangesInfo {
    mode: CaptureDataChangesMode,
    table: String,                  // CDC table name
    version: Option<CdcVersion>,    // schema version (V1 or V2)
}
The connection stores
Option<CaptureDataChangesInfo>
None
means CDC is off.
Key methods on
CdcVersion
:
  • has_commit_record()
    self >= V2
    , gates COMMIT record emission
  • Display
    /
    FromStr
    — round-trips
    "v1"
    V1
    ,
    "v2"
    V2
Key methods on
CaptureDataChangesInfo
:
  • parse(value: &str, version: Option<CdcVersion>)
    — parses PRAGMA argument
    "<mode>[,<table_name>]"
    , returns
    None
    for "off"
  • cdc_version()
    — returns
    CdcVersion
    (panics if version is None). Single accessor replacing old
    is_v1()
    /
    is_v2()
    /
    version()
    methods.
  • has_before()
    /
    has_after()
    /
    has_updates()
    — mode capability checks
  • mode_name()
    — returns mode as string
Convenience trait
CaptureDataChangesExt
on
Option<CaptureDataChangesInfo>
provides:
  • has_before()
    /
    has_after()
    /
    has_updates()
    — delegates to inner, returns false for None
  • table()
    — returns
    Option<&str>
    , None when CDC is off
CDC的行为由两种类型控制:
rust
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u8)]
enum CdcVersion {
    V1 = 1,
    V2 = 2,
}

const CDC_VERSION_CURRENT: CdcVersion = CdcVersion::V2;

enum CaptureDataChangesMode {
    Id,          // 仅捕获rowid
    Before,      // 捕获变更前镜像
    After,       // 捕获变更后镜像
    Full,        // 变更前+变更后+更新内容
}

struct CaptureDataChangesInfo {
    mode: CaptureDataChangesMode,
    table: String,                  // CDC表名称
    version: Option<CdcVersion>,    //  schema版本 (V1或V2)
}
连接中存储
Option<CaptureDataChangesInfo>
None
表示CDC已关闭。
CdcVersion
的关键方法:
  • has_commit_record()
    self >= V2
    ,控制COMMIT记录的生成
  • Display
    /
    FromStr
    — 实现
    "v1"
    V1
    "v2"
    V2
    的双向转换
CaptureDataChangesInfo
的关键方法:
  • parse(value: &str, version: Option<CdcVersion>)
    — 解析PRAGMA参数
    "<mode>[,<table_name>]"
    ,如果是"off"则返回
    None
  • cdc_version()
    — 返回
    CdcVersion
    (若version为None则触发panic)。替代旧的
    is_v1()
    /
    is_v2()
    /
    version()
    方法的统一访问器
  • has_before()
    /
    has_after()
    /
    has_updates()
    — 模式能力检查
  • mode_name()
    — 返回模式的字符串形式
Option<CaptureDataChangesInfo>
提供的便捷特性
CaptureDataChangesExt
包含:
  • has_before()
    /
    has_after()
    /
    has_updates()
    — 委托给内部实例,None时返回false
  • table()
    — 返回
    Option<&str>
    ,CDC关闭时为None

CDC Table Schema v1

CDC表Schema v1

Default table name:
turso_cdc
(constant
TURSO_CDC_DEFAULT_TABLE_NAME
)
sql
CREATE TABLE turso_cdc (
    change_id   INTEGER PRIMARY KEY AUTOINCREMENT,
    change_time INTEGER,        -- unixepoch()
    change_type INTEGER,        -- 1=INSERT, 0=UPDATE, -1=DELETE
    table_name  TEXT,
    id          <untyped>,      -- rowid of changed row
    before      BLOB,           -- binary record (before-image)
    after       BLOB,           -- binary record (after-image)
    updates     BLOB            -- binary record of per-column changes
);
默认表名:
turso_cdc
(常量
TURSO_CDC_DEFAULT_TABLE_NAME
sql
CREATE TABLE turso_cdc (
    change_id   INTEGER PRIMARY KEY AUTOINCREMENT,
    change_time INTEGER,        -- unixepoch()
    change_type INTEGER,        -- 1=INSERT, 0=UPDATE, -1=DELETE
    table_name  TEXT,
    id          <untyped>,      -- 变更行的rowid
    before      BLOB,           -- 二进制记录(变更前镜像)
    after       BLOB,           -- 二进制记录(变更后镜像)
    updates     BLOB            -- 二进制记录(逐列变更内容)
);

CDC Table Schema v2 (current)

CDC表Schema v2(当前版本)

sql
CREATE TABLE turso_cdc (
    change_id    INTEGER PRIMARY KEY AUTOINCREMENT,
    change_time  INTEGER,        -- unixepoch()
    change_type  INTEGER,        -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT
    table_name   TEXT,
    id           <untyped>,      -- rowid of changed row
    before       BLOB,           -- binary record (before-image)
    after        BLOB,           -- binary record (after-image)
    updates      BLOB,           -- binary record of per-column changes
    change_txn_id INTEGER        -- transaction ID (groups rows into transactions)
);
v2 adds:
  • change_txn_id
    column — groups CDC rows by transaction. Assigned via
    conn_txn_id(candidate)
    opcode which get-or-sets a per-connection transaction ID.
  • change_type=2
    (COMMIT) records — mark transaction boundaries. Emitted once per statement in autocommit mode, or on explicit
    COMMIT
    .
The CDC table is created at runtime by the
InitCdcVersion
opcode via
CREATE TABLE IF NOT EXISTS
.
sql
CREATE TABLE turso_cdc (
    change_id    INTEGER PRIMARY KEY AUTOINCREMENT,
    change_time  INTEGER,        -- unixepoch()
    change_type  INTEGER,        -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT
    table_name   TEXT,
    id           <untyped>,      -- 变更行的rowid
    before       BLOB,           -- 二进制记录(变更前镜像)
    after        BLOB,           -- 二进制记录(变更后镜像)
    updates      BLOB,           -- 二进制记录(逐列变更内容)
    change_txn_id INTEGER        -- 事务ID(将CDC行按事务分组)
);
v2新增:
  • change_txn_id
    列 — 按事务对CDC行进行分组。通过
    conn_txn_id(candidate)
    操作码分配,该操作码获取或设置每个连接的事务ID。
  • change_type=2
    (COMMIT)记录 — 标记事务边界。在自动提交模式下,每个语句执行后生成一次;或在显式
    COMMIT
    时生成。
CDC表在运行时由
InitCdcVersion
操作码通过
CREATE TABLE IF NOT EXISTS
语句创建。

CDC Version Table

CDC版本表

When CDC is first enabled, a version tracking table is created:
sql
CREATE TABLE turso_cdc_version (
    table_name TEXT PRIMARY KEY,
    version TEXT NOT NULL
);
Current version:
CDC_VERSION_CURRENT = CdcVersion::V2
(defined in
core/lib.rs
, re-exported from
core/translate/pragma.rs
)
首次启用CDC时,会创建一个版本跟踪表:
sql
CREATE TABLE turso_cdc_version (
    table_name TEXT PRIMARY KEY,
    version TEXT NOT NULL
);
当前版本:
CDC_VERSION_CURRENT = CdcVersion::V2
(定义于
core/lib.rs
,从
core/translate/pragma.rs
重导出)

Version Detection in InitCdcVersion

InitCdcVersion中的版本检测

The
InitCdcVersion
opcode detects v1 vs v2 by checking whether the CDC table already exists before creating it:
  • If CDC table already exists but has no version row → v1 (pre-existing table from before version tracking)
  • If CDC table doesn't exist → create with current version (v2)
  • If version row already exists → use that version as-is
InitCdcVersion
操作码在创建CDC表前,通过检查CDC表是否已存在来检测v1与v2版本:
  • 若CDC表已存在但无版本行 → v1(版本跟踪功能添加前创建的表)
  • 若CDC表不存在 → 使用当前版本(v2)创建
  • 若版本行已存在 → 直接使用该版本

DatabaseChange
sync/engine/src/types.rs:229-249

DatabaseChange
sync/engine/src/types.rs:229-249

Sync engine's Rust representation of a CDC row. Has
into_apply()
and
into_revert()
methods for forward/backward replay.
同步引擎中用于表示CDC行的Rust类型。包含
into_apply()
into_revert()
方法,用于正向/反向重放变更。

OperationMode
core/translate/emitter.rs

OperationMode
core/translate/emitter.rs

Used by
emit_cdc_insns()
to determine
change_type
value:
  • INSERT
    → 1
  • UPDATE
    /
    SELECT
    → 0
  • DELETE
    → -1
  • COMMIT
    → 2 (v2 only, emitted by
    emit_cdc_commit_insns
    )
emit_cdc_insns()
使用该类型确定
change_type
的值:
  • INSERT
    → 1
  • UPDATE
    /
    SELECT
    → 0
  • DELETE
    → -1
  • COMMIT
    → 2(仅v2,由
    emit_cdc_commit_insns
    生成)

Entry Points

入口点

1. PRAGMA — Enable/Disable CDC

1. PRAGMA — 启用/禁用CDC

Set:
core/translate/pragma.rs
  • Checks MVCC is not enabled (CDC and MVCC are mutually exclusive)
  • Parses mode string via
    CaptureDataChangesInfo::parse()
    with
    CDC_VERSION_CURRENT
  • Emits a single
    InitCdcVersion
    opcode — all CDC setup (table creation, version tracking, state change) happens at execution time
Get (read current mode):
core/translate/pragma.rs
  • Returns 3 columns:
    mode
    ,
    table
    ,
    version
  • When off: returns
    ("off", NULL, NULL)
  • When active: returns
    (mode_name, table, version)
Pragma registration:
core/pragma.rs
UnstableCaptureDataChangesConn
with columns
["mode", "table", "version"]
设置:
core/translate/pragma.rs
  • 检查MVCC是否未启用(CDC与MVCC互斥)
  • 使用
    CaptureDataChangesInfo::parse()
    结合
    CDC_VERSION_CURRENT
    解析模式字符串
  • 生成单个
    InitCdcVersion
    操作码——所有CDC设置(表创建、版本跟踪、状态变更)都在执行时完成
获取(读取当前模式):
core/translate/pragma.rs
  • 返回3列:
    mode
    ,
    table
    ,
    version
  • 关闭时:返回
    ("off", NULL, NULL)
  • 激活时:返回
    (mode_name, table, version)
PRAGMA注册:
core/pragma.rs
UnstableCaptureDataChangesConn
,列名为
["mode", "table", "version"]

2. Connection State

2. 连接状态

Field:
core/connection.rs
capture_data_changes: RwLock<Option<CaptureDataChangesInfo>>
Getter:
get_capture_data_changes_info()
— returns read guard Setter:
set_capture_data_changes_info(opts: Option<CaptureDataChangesInfo>)
Default: initialized as
None
(CDC off)
字段:
core/connection.rs
capture_data_changes: RwLock<Option<CaptureDataChangesInfo>>
获取器:
get_capture_data_changes_info()
— 返回读取锁 设置器:
set_capture_data_changes_info(opts: Option<CaptureDataChangesInfo>)
默认值: 初始化为
None
(CDC关闭)

3. ProgramBuilder Integration

3. ProgramBuilder集成

Field:
core/vdbe/builder.rs
capture_data_changes_info: Option<CaptureDataChangesInfo>
Accessor:
capture_data_changes_info()
— returns
&Option<CaptureDataChangesInfo>
Passed from:
core/translate/mod.rs
— read from connection when creating builder
字段:
core/vdbe/builder.rs
capture_data_changes_info: Option<CaptureDataChangesInfo>
访问器:
capture_data_changes_info()
— 返回
&Option<CaptureDataChangesInfo>
来源:
core/translate/mod.rs
— 创建builder时从连接中读取

4. PrepareContext

4. PrepareContext

Field:
core/vdbe/mod.rs
capture_data_changes: Option<CaptureDataChangesInfo>
Set from:
PrepareContext::from_connection()
— clones from
connection.get_capture_data_changes_info()
字段:
core/vdbe/mod.rs
capture_data_changes: Option<CaptureDataChangesInfo>
设置来源:
PrepareContext::from_connection()
— 从
connection.get_capture_data_changes_info()
克隆

5. InitCdcVersion Opcode —
core/vdbe/execute.rs

5. InitCdcVersion操作码 —
core/vdbe/execute.rs

Always emitted by PRAGMA SET. Handles all CDC setup at execution time:
  1. For "off": stores
    None
    in
    state.pending_cdc_info
    , returns early
  2. Checks if CDC table already exists (for v1 backward compatibility)
  3. Creates CDC table (
    CREATE TABLE IF NOT EXISTS <cdc_table_name> ...
    ) — v2 schema with
    change_txn_id
    column
  4. Creates version table (
    CREATE TABLE IF NOT EXISTS turso_cdc_version ...
    )
  5. Inserts version row: if CDC table pre-existed → "v1", otherwise → current version ("v2"). Uses
    INSERT OR IGNORE
    to preserve existing version rows.
  6. Reads back actual version from the table
  7. Stores computed
    CaptureDataChangesInfo
    in
    state.pending_cdc_info
The connection's CDC state is not applied in the opcode. Instead,
pending_cdc_info
is applied in
halt()
only after the transaction commits successfully. This ensures atomicity: if any step fails and the transaction rolls back, the connection's CDC state remains unchanged.
All table creation is done via nested
conn.prepare()
/
run_ignore_rows()
calls rather than bytecode emission, because the PRAGMA plan can't contain DML against tables that don't exist yet in the schema.
始终由PRAGMA SET生成。在执行时处理所有CDC设置:
  1. 若为"off":在
    state.pending_cdc_info
    中存储
    None
    ,提前返回
  2. 检查CDC表是否已存在(用于v1向后兼容)
  3. 创建CDC表(
    CREATE TABLE IF NOT EXISTS <cdc_table_name> ...
    )——使用包含
    change_txn_id
    列的v2 schema
  4. 创建版本表(
    CREATE TABLE IF NOT EXISTS turso_cdc_version ...
  5. 插入版本行:若CDC表已存在 → "v1",否则 → 当前版本("v2")。使用
    INSERT OR IGNORE
    保留已有的版本行
  6. 从表中读取实际版本
  7. 将计算得到的
    CaptureDataChangesInfo
    存储在
    state.pending_cdc_info
连接的CDC状态不会在操作码中直接应用。相反,
pending_cdc_info
仅在事务成功提交后,在
halt()
中应用。这确保了原子性:如果任何步骤失败导致事务回滚,连接的CDC状态保持不变。
所有表创建操作通过嵌套的
conn.prepare()
/
run_ignore_rows()
调用完成,而非生成字节码,因为PRAGMA计划无法包含针对schema中尚未存在的表的DML操作。

Bytecode Emission (core/translate/emitter.rs)

字节码生成 (core/translate/emitter.rs)

These are the core CDC code generation functions:
FunctionPurpose
prepare_cdc_if_necessary()
Opens CDC table cursor if CDC is active and target != CDC table
emit_cdc_full_record()
Reads all columns from cursor into a MakeRecord (for before/after image)
emit_cdc_patch_record()
Builds record from in-flight register values (for after-image of INSERT/UPDATE)
emit_cdc_insns()
Writes a single CDC row per changed row (INSERT/UPDATE/DELETE). Called per-row inside DML loops.
emit_cdc_commit_insns()
Writes a COMMIT record (change_type=2) into CDC table (v2 only). Raw emission, no autocommit check.
emit_cdc_autocommit_commit()
End-of-statement COMMIT emission. Checks
is_autocommit()
at runtime — only emits COMMIT if in autocommit mode. v2 only.
以下是核心CDC代码生成函数:
函数用途
prepare_cdc_if_necessary()
若CDC已激活且目标不是CDC表,则打开CDC表游标
emit_cdc_full_record()
从游标中读取所有列并生成MakeRecord(用于变更前/后镜像)
emit_cdc_patch_record()
基于寄存器中的实时值构建记录(用于INSERT/UPDATE的变更后镜像)
emit_cdc_insns()
为每个变更行写入一条CDC记录(INSERT/UPDATE/DELETE)。在DML循环内逐行调用。
emit_cdc_commit_insns()
向CDC表写入COMMIT记录(change_type=2)(仅v2)。直接生成字节码,不检查自动提交模式。
emit_cdc_autocommit_commit()
语句结束时生成COMMIT记录。在运行时检查
is_autocommit()
— 仅在自动提交模式下生成COMMIT。仅v2支持。

COMMIT Emission Strategy (v2)

COMMIT记录生成策略 (v2)

Per-row call sites use
emit_cdc_insns()
(no COMMIT). End-of-statement sites call
emit_cdc_autocommit_commit()
which checks
is_autocommit()
at runtime:
  • Autocommit mode: emits a COMMIT record after the statement completes
  • Explicit transaction (
    BEGIN...COMMIT
    ):
    skips per-statement COMMIT; the explicit
    COMMIT
    statement emits the COMMIT record via
    emit_cdc_commit_insns()
This ensures multi-row statements like
INSERT INTO t VALUES (1),(2),(3)
produce one COMMIT at the end, not one per row.
逐行调用的位置使用
emit_cdc_insns()
(不生成COMMIT)。语句结束的位置调用
emit_cdc_autocommit_commit()
,该函数在运行时检查
is_autocommit()
  • 自动提交模式: 语句完成后生成一条COMMIT记录
  • 显式事务 (
    BEGIN...COMMIT
    ):
    跳过每个语句的COMMIT生成;显式
    COMMIT
    语句通过
    emit_cdc_commit_insns()
    生成COMMIT记录
这确保了像
INSERT INTO t VALUES (1),(2),(3)
这样的多行语句在结束时仅生成一条COMMIT记录,而非每行生成一条。

Integration Points — Where CDC Records Are Emitted

集成点 — CDC记录的生成位置

INSERT —
core/translate/insert.rs

INSERT —
core/translate/insert.rs

  • Per-row:
    emit_cdc_insns()
    after insert, and before delete for REPLACE/conflict
  • End-of-statement:
    emit_cdc_autocommit_commit()
    in
    emit_epilogue()
    after the insert loop
  • 逐行: 插入后、REPLACE/冲突处理的删除前调用
    emit_cdc_insns()
  • 语句结束:
    emit_epilogue()
    中的插入循环后调用
    emit_cdc_autocommit_commit()

UPDATE —
core/translate/emitter.rs

UPDATE —
core/translate/emitter.rs

  • Per-row: captures before-image, after-image via patch record, emits
    emit_cdc_insns()
  • End-of-statement:
    emit_cdc_autocommit_commit()
    after the update loop
  • 逐行: 捕获变更前镜像、通过patch record捕获变更后镜像,调用
    emit_cdc_insns()
  • 语句结束: 更新循环后调用
    emit_cdc_autocommit_commit()

DELETE —
core/translate/emitter.rs

DELETE —
core/translate/emitter.rs

  • Per-row: captures before-image and emits
    emit_cdc_insns()
  • End-of-statement:
    emit_cdc_autocommit_commit()
    after the delete loop
  • 逐行: 捕获变更前镜像并调用
    emit_cdc_insns()
  • 语句结束: 删除循环后调用
    emit_cdc_autocommit_commit()

UPSERT (ON CONFLICT DO UPDATE) —
core/translate/upsert.rs

UPSERT (ON CONFLICT DO UPDATE) —
core/translate/upsert.rs

  • Per-row:
    emit_cdc_insns()
    for all three cases: pure insert, update after conflict, replace
  • No end-of-statement COMMIT — upsert shares INSERT's epilogue
  • 逐行: 为三种情况调用
    emit_cdc_insns()
    :纯插入、冲突后更新、替换
  • 无语句结束COMMIT生成 — upsert复用INSERT的收尾逻辑

Schema Changes (DDL) —
core/translate/schema.rs

架构变更 (DDL) —
core/translate/schema.rs

  • CREATE TABLE:
    emit_cdc_insns()
    (insert into
    sqlite_schema
    ) +
    emit_cdc_autocommit_commit()
  • DROP TABLE:
    emit_cdc_insns()
    per-row in metadata loop +
    emit_cdc_autocommit_commit()
    after loop
  • CREATE INDEX:
    emit_cdc_insns()
    +
    emit_cdc_autocommit_commit()
    (
    core/translate/schema.rs
    )
  • DROP INDEX:
    emit_cdc_insns()
    per-row +
    emit_cdc_autocommit_commit()
    after loop (
    core/translate/index.rs
    )
DDL in explicit transactions (
BEGIN; CREATE TABLE t(x); COMMIT
) does NOT emit per-statement COMMIT — the autocommit check prevents it.
  • CREATE TABLE:
    emit_cdc_insns()
    (向
    sqlite_schema
    插入记录) +
    emit_cdc_autocommit_commit()
  • DROP TABLE: 在元数据循环中逐行调用
    emit_cdc_insns()
    + 循环后调用
    emit_cdc_autocommit_commit()
  • CREATE INDEX:
    emit_cdc_insns()
    +
    emit_cdc_autocommit_commit()
    core/translate/schema.rs
  • DROP INDEX: 逐行调用
    emit_cdc_insns()
    + 循环后调用
    emit_cdc_autocommit_commit()
    core/translate/index.rs
显式事务中的DDL(
BEGIN; CREATE TABLE t(x); COMMIT
)不会生成每个语句的COMMIT记录——自动提交检查会阻止该行为。

ALTER TABLE —
core/translate/update.rs

ALTER TABLE —
core/translate/update.rs

  • Sets
    cdc_update_alter_statement
    on the update plan when CDC has updates mode
  • 当CDC处于full模式时,在更新计划中设置
    cdc_update_alter_statement

Views/Triggers — Explicitly excluded

视图/触发器 — 明确排除

  • core/translate/view.rs
    — passes
    None
    for CDC cursor
  • core/translate/trigger.rs
    — passes
    None
    for CDC cursor
  • core/translate/view.rs
    — 传递
    None
    作为CDC游标
  • core/translate/trigger.rs
    — 传递
    None
    作为CDC游标

Subqueries — No CDC

子查询 — 不生成CDC记录

  • core/translate/subquery.rs
    cdc_cursor_id: None
  • core/translate/subquery.rs
    cdc_cursor_id: None

Helper Functions (for reading CDC data)

辅助函数(用于读取CDC数据)

table_columns_json_array(table_name)
core/function.rs
,
core/vdbe/execute.rs

table_columns_json_array(table_name)
core/function.rs
,
core/vdbe/execute.rs

Returns JSON array of column names for a table. Used to interpret binary records.
返回表的列名JSON数组。用于解析二进制记录。

bin_record_json_object(columns_json, blob)
core/function.rs
,
core/vdbe/execute.rs

bin_record_json_object(columns_json, blob)
core/function.rs
,
core/vdbe/execute.rs

Decodes a binary record (from
before
/
after
/
updates
columns) into a JSON object using column names.
使用列名将二进制记录(来自
before
/
after
/
updates
列)解码为JSON对象。

Sync Engine Integration

同步引擎集成

The sync engine is the primary consumer of CDC data.
同步引擎是CDC数据的主要消费者。

DatabaseTape —
sync/engine/src/database_tape.rs

DatabaseTape —
sync/engine/src/database_tape.rs

  • CDC config:
    DEFAULT_CDC_TABLE_NAME = "turso_cdc"
    ,
    DEFAULT_CDC_MODE = "full"
  • PRAGMA name:
    CDC_PRAGMA_NAME = "unstable_capture_data_changes_conn"
  • Initialization:
    connect()
    sets CDC pragma and caches
    cdc_version
    from
    turso_cdc_version
    table. Must be called before
    iterate_changes()
    .
  • Version caching:
    cdc_version: RwLock<Option<CdcVersion>>
    — set by
    connect()
    , read by
    iterate_changes()
    . Panics if not set.
  • Iterator:
    DatabaseChangesIterator
    reads CDC table in batches, emits
    DatabaseTapeOperation
    . For v2, real COMMIT records from the table are emitted. For v1, a synthetic Commit is appended at end of batch.
    ignore_schema_changes: true
    (default) filters out
    sqlite_schema
    row changes but not COMMIT records.
  • CDC配置:
    DEFAULT_CDC_TABLE_NAME = "turso_cdc"
    ,
    DEFAULT_CDC_MODE = "full"
  • PRAGMA名称:
    CDC_PRAGMA_NAME = "unstable_capture_data_changes_conn"
  • 初始化:
    connect()
    设置CDC pragma并从
    turso_cdc_version
    表缓存
    cdc_version
    。必须在
    iterate_changes()
    之前调用。
  • 版本缓存:
    cdc_version: RwLock<Option<CdcVersion>>
    — 由
    connect()
    设置,
    iterate_changes()
    读取。未设置时会触发panic。
  • 迭代器:
    DatabaseChangesIterator
    批量读取CDC表,生成
    DatabaseTapeOperation
    。对于v2,会生成表中的真实COMMIT记录;对于v1,会在批量结束时附加一条合成的Commit记录。默认
    ignore_schema_changes: true
    ,会过滤掉
    sqlite_schema
    行的变更,但不会过滤COMMIT记录。

Sync Operations —
sync/engine/src/database_sync_operations.rs

同步操作 —
sync/engine/src/database_sync_operations.rs

  • Change counting:
    SELECT COUNT(*) FROM turso_cdc WHERE change_id > ?
  • 变更计数:
    SELECT COUNT(*) FROM turso_cdc WHERE change_id > ?

Sync Engine —
sync/engine/src/database_sync_engine.rs

同步引擎 —
sync/engine/src/database_sync_engine.rs

  • Initialization:
    open_db()
    calls
    main_tape.connect(coro)
    to ensure CDC is set up and version is cached before any
    iterate_changes()
    calls.
  • During
    apply_changes
    , checks if CDC table existed, re-creates it after sync
  • 初始化:
    open_db()
    调用
    main_tape.connect(coro)
    ,确保在任何
    iterate_changes()
    调用前完成CDC设置和版本缓存。
  • apply_changes
    期间,检查CDC表是否存在,同步后重新创建该表

Replay Generator —
sync/engine/src/database_replay_generator.rs

重放生成器 —
sync/engine/src/database_replay_generator.rs

  • Requires
    updates
    column to be populated (full mode)
  • 需要
    updates
    列已填充(full模式)

Bindings CDC Surface

绑定层的CDC接口

All bindings expose
cdc_operations
as part of sync stats:
BindingFile
Python
bindings/python/src/turso_sync.rs
JavaScript
bindings/javascript/sync/src/lib.rs
JS (generator)
bindings/javascript/sync/src/generator.rs
Go
bindings/go/bindings_sync.go
React Native
bindings/react-native/src/types.ts
SDK Kit (C header)
sync/sdk-kit/turso_sync.h
SDK Kit (Rust)
sync/sdk-kit/src/bindings.rs
所有绑定层都将
cdc_operations
作为同步统计的一部分暴露:
绑定层文件
Python
bindings/python/src/turso_sync.rs
JavaScript
bindings/javascript/sync/src/lib.rs
JS (生成器)
bindings/javascript/sync/src/generator.rs
Go
bindings/go/bindings_sync.go
React Native
bindings/react-native/src/types.ts
SDK Kit (C头文件)
sync/sdk-kit/turso_sync.h
SDK Kit (Rust)
sync/sdk-kit/src/bindings.rs

Tests

测试

  • Integration tests:
    tests/integration/functions/test_cdc.rs
    — covers all modes, CRUD, transactions, schema changes, version table, backward compatibility. Registered in
    tests/integration/functions/mod.rs
    .
  • Sync engine tests:
    sync/engine/src/database_tape.rs
    — CDC table reads, tape iteration, replay of schema changes.
  • JS binding tests:
    bindings/javascript/sync/packages/{wasm,native}/promise.test.ts
Run:
cargo test -- test_cdc
(integration) or
cargo test -p turso_sync_engine -- database_tape
(sync engine).
  • 集成测试:
    tests/integration/functions/test_cdc.rs
    — 覆盖所有模式、CRUD操作、事务、架构变更、版本表、向后兼容。注册于
    tests/integration/functions/mod.rs
  • 同步引擎测试:
    sync/engine/src/database_tape.rs
    — CDC表读取、磁带迭代、架构变更重放。
  • JS绑定测试:
    bindings/javascript/sync/packages/{wasm,native}/promise.test.ts
运行方式:
cargo test -- test_cdc
(集成测试)或
cargo test -p turso_sync_engine -- database_tape
(同步引擎测试)。

User-facing Documentation

用户文档

  • CLI manual page:
    cli/manuals/cdc.md
    — accessible via
    .manual cdc
    in the REPL
  • Database manual:
    docs/manual.md
    — CDC section linked in TOC
  • CLI手册页:
    cli/manuals/cdc.md
    — 可通过REPL中的
    .manual cdc
    访问
  • 数据库手册:
    docs/manual.md
    — CDC章节已加入目录

Key Design Decisions

关键设计决策

  1. Per-connection, not per-database. Each connection has its own CDC mode and can target different tables.
  2. Bytecode-level implementation. CDC instructions are emitted alongside the actual DML bytecode during translation — no runtime hooks or triggers.
  3. Self-exclusion. Changes to the CDC table and
    turso_cdc_version
    table are never captured (checked in
    prepare_cdc_if_necessary
    ).
  4. Schema changes tracked. DDL operations are recorded as changes to
    sqlite_schema
    table.
  5. Binary record format. Before/after/updates columns use SQLite's MakeRecord format (same as B-tree payload).
  6. Transaction-aware. CDC writes happen within the same transaction as the DML, so rollback naturally discards CDC entries.
  7. Version tracking. CDC schema version is recorded in
    turso_cdc_version
    table and carried in
    CaptureDataChangesInfo.version
    for future schema evolution.
  8. Atomic PRAGMA. Connection CDC state is deferred via
    pending_cdc_info
    in
    ProgramState
    and applied only at Halt. If the PRAGMA's disk writes fail and the transaction rolls back, the connection state stays unchanged.
  9. Per-statement COMMIT (v2). COMMIT records are emitted once per statement (not per row), using
    emit_cdc_autocommit_commit()
    which checks
    is_autocommit()
    at runtime. In explicit transactions, only the final
    COMMIT
    emits a COMMIT CDC record.
  10. Backward-compatible version detection. Pre-existing v1 CDC tables (without
    turso_cdc_version
    ) are detected by checking table existence before creation. Existing tables get
    CdcVersion::V1
    inserted into the version table.
  11. Typed version enum.
    CdcVersion
    enum with
    #[repr(u8)]
    and
    Ord
    /
    PartialOrd
    enables feature gating via integer comparison (
    has_commit_record()
    =
    self >= V2
    ).
    Display
    /
    FromStr
    handles database round-trip.
  12. CDC and MVCC mutual exclusion. Enabling CDC when MVCC is active (or vice versa) returns an error. Checked at PRAGMA set time and journal mode switch time.
  1. 基于连接而非数据库: 每个连接有独立的CDC模式,可指向不同的表。
  2. 字节码层实现: CDC指令在翻译阶段与实际DML字节码一起生成——无需运行时钩子或触发器。
  3. 自排除: 永远不会捕获CDC表和
    turso_cdc_version
    表的变更(在
    prepare_cdc_if_necessary
    中检查)。
  4. 跟踪架构变更: DDL操作被记录为
    sqlite_schema
    表的变更。
  5. 二进制记录格式: 变更前/后/更新内容列使用SQLite的MakeRecord格式(与B-tree payload相同)。
  6. 事务感知: CDC写入操作与DML在同一事务中执行,因此回滚会自动丢弃CDC条目。
  7. 版本跟踪: CDC schema版本记录在
    turso_cdc_version
    表中,并携带在
    CaptureDataChangesInfo.version
    中,以支持未来的schema演进。
  8. 原子PRAGMA: 连接的CDC状态通过
    ProgramState
    中的
    pending_cdc_info
    延迟应用,仅在Halt阶段生效。如果PRAGMA的磁盘写入失败导致事务回滚,连接状态保持不变。
  9. 按语句生成COMMIT (v2): COMMIT记录按语句生成(而非按行),使用
    emit_cdc_autocommit_commit()
    在运行时检查
    is_autocommit()
    。在显式事务中,仅最终的
    COMMIT
    会生成CDC COMMIT记录。
  10. 向后兼容的版本检测: 通过在创建前检查CDC表是否存在,检测版本跟踪功能添加前创建的v1 CDC表。已存在的表会自动在版本表中插入
    CdcVersion::V1
  11. 类型化版本枚举:
    CdcVersion
    枚举使用
    #[repr(u8)]
    Ord
    /
    PartialOrd
    ,支持通过整数比较进行特性门控(
    has_commit_record()
    =
    self >= V2
    )。
    Display
    /
    FromStr
    实现了与数据库的双向转换。
  12. CDC与MVCC互斥: 当MVCC已激活时启用CDC(反之亦然)会返回错误。在PRAGMA设置时和日志模式切换时进行检查。