cdc
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCDC (Change Data Capture) - Internal Feature Map
CDC (Change Data Capture) - 内部特性映射
Overview
概述
CDC tracks INSERT/UPDATE/DELETE changes on database tables by writing change records into a
dedicated CDC table ( by default). It is per-connection, enabled via PRAGMA, and
operates at the bytecode generation (translate) layer. The sync engine consumes CDC records
to push local changes to the remote.
turso_cdcCDC通过将变更记录写入专用的CDC表(默认名为),跟踪数据库表上的INSERT/UPDATE/DELETE变更。它基于连接级配置,通过PRAGMA指令启用,在字节码生成(translate)层运行。同步引擎会消费CDC记录,将本地变更推送到远程端。
turso_cdcArchitecture Diagram
架构图
User SQL (INSERT/UPDATE/DELETE/DDL)
|
v
┌─────────────────────────────────────────────────┐
│ Translate layer (core/translate/) │
│ ┌───────────────────────────────────────────┐ │
│ │ prepare_cdc_if_necessary() │ │
│ │ - checks CaptureDataChangesInfo │ │
│ │ - opens CDC table cursor (OpenWrite) │ │
│ │ - skips if target == CDC table itself │ │
│ └───────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────┐ │
│ │ emit_cdc_insns() │ │
│ │ - writes (change_id, change_time, │ │
│ │ change_type, table_name, id, │ │
│ │ before, after, updates) into CDC tbl │ │
│ └───────────────────────────────────────────┘ │
│ + emit_cdc_full_record() / emit_cdc_patch_record() │
└─────────────────────────────────────────────────┘
|
v
CDC table (turso_cdc or custom name)
|
v
┌─────────────────────────────────────────────────┐
│ Sync engine (sync/engine/) │
│ DatabaseTape reads CDC table → DatabaseChange │
│ → apply/revert → push to remote │
└─────────────────────────────────────────────────┘用户SQL (INSERT/UPDATE/DELETE/DDL)
|
v
┌─────────────────────────────────────────────────┐
│ Translate层 (core/translate/) │
│ ┌───────────────────────────────────────────┐ │
│ │ prepare_cdc_if_necessary() │ │
│ │ - 检查CaptureDataChangesInfo │ │
│ │ - 打开CDC表游标 (OpenWrite) │ │
│ │ - 若目标为CDC表本身则跳过 │ │
│ └───────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────┐ │
│ │ emit_cdc_insns() │ │
│ │ - 将(change_id, change_time, │ │
│ │ change_type, table_name, id, │ │
│ │ before, after, updates)写入CDC表 │ │
│ └───────────────────────────────────────────┘ │
│ + emit_cdc_full_record() / emit_cdc_patch_record() │
└─────────────────────────────────────────────────┘
|
v
CDC表 (turso_cdc 或自定义名称)
|
v
┌─────────────────────────────────────────────────┐
│ 同步引擎 (sync/engine/) │
│ DatabaseTape读取CDC表 → DatabaseChange │
│ → 应用/回滚 → 推送到远程端 │
└─────────────────────────────────────────────────┘Core Data Types
核心数据类型
CaptureDataChangesMode
+ CaptureDataChangesInfo
— core/lib.rs
CaptureDataChangesModeCaptureDataChangesInfocore/lib.rsCaptureDataChangesMode
+ CaptureDataChangesInfo
— core/lib.rs
CaptureDataChangesModeCaptureDataChangesInfocore/lib.rsCDC behavior is controlled by two types:
rust
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u8)]
enum CdcVersion {
V1 = 1,
V2 = 2,
}
const CDC_VERSION_CURRENT: CdcVersion = CdcVersion::V2;
enum CaptureDataChangesMode {
Id, // capture only rowid
Before, // capture before-image
After, // capture after-image
Full, // before + after + updates
}
struct CaptureDataChangesInfo {
mode: CaptureDataChangesMode,
table: String, // CDC table name
version: Option<CdcVersion>, // schema version (V1 or V2)
}The connection stores — means CDC is off.
Option<CaptureDataChangesInfo>NoneKey methods on :
CdcVersion- —
has_commit_record(), gates COMMIT record emissionself >= V2 - /
Display— round-tripsFromStr↔"v1",V1↔"v2"V2
Key methods on :
CaptureDataChangesInfo- — parses PRAGMA argument
parse(value: &str, version: Option<CdcVersion>), returns"<mode>[,<table_name>]"for "off"None - — returns
cdc_version()(panics if version is None). Single accessor replacing oldCdcVersion/is_v1()/is_v2()methods.version() - /
has_before()/has_after()— mode capability checkshas_updates() - — returns mode as string
mode_name()
Convenience trait on provides:
CaptureDataChangesExtOption<CaptureDataChangesInfo>- /
has_before()/has_after()— delegates to inner, returns false for Nonehas_updates() - — returns
table(), None when CDC is offOption<&str>
CDC的行为由两种类型控制:
rust
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u8)]
enum CdcVersion {
V1 = 1,
V2 = 2,
}
const CDC_VERSION_CURRENT: CdcVersion = CdcVersion::V2;
enum CaptureDataChangesMode {
Id, // 仅捕获rowid
Before, // 捕获变更前镜像
After, // 捕获变更后镜像
Full, // 变更前+变更后+更新内容
}
struct CaptureDataChangesInfo {
mode: CaptureDataChangesMode,
table: String, // CDC表名称
version: Option<CdcVersion>, // schema版本 (V1或V2)
}连接中存储 — 表示CDC已关闭。
Option<CaptureDataChangesInfo>NoneCdcVersion- —
has_commit_record(),控制COMMIT记录的生成self >= V2 - /
Display— 实现FromStr↔"v1"、V1↔"v2"的双向转换V2
CaptureDataChangesInfo- — 解析PRAGMA参数
parse(value: &str, version: Option<CdcVersion>),如果是"off"则返回"<mode>[,<table_name>]"None - — 返回
cdc_version()(若version为None则触发panic)。替代旧的CdcVersion/is_v1()/is_v2()方法的统一访问器version() - /
has_before()/has_after()— 模式能力检查has_updates() - — 返回模式的字符串形式
mode_name()
为提供的便捷特性包含:
Option<CaptureDataChangesInfo>CaptureDataChangesExt- /
has_before()/has_after()— 委托给内部实例,None时返回falsehas_updates() - — 返回
table(),CDC关闭时为NoneOption<&str>
CDC Table Schema v1
CDC表Schema v1
Default table name: (constant )
turso_cdcTURSO_CDC_DEFAULT_TABLE_NAMEsql
CREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE
table_name TEXT,
id <untyped>, -- rowid of changed row
before BLOB, -- binary record (before-image)
after BLOB, -- binary record (after-image)
updates BLOB -- binary record of per-column changes
);默认表名:(常量)
turso_cdcTURSO_CDC_DEFAULT_TABLE_NAMEsql
CREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE
table_name TEXT,
id <untyped>, -- 变更行的rowid
before BLOB, -- 二进制记录(变更前镜像)
after BLOB, -- 二进制记录(变更后镜像)
updates BLOB -- 二进制记录(逐列变更内容)
);CDC Table Schema v2 (current)
CDC表Schema v2(当前版本)
sql
CREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT
table_name TEXT,
id <untyped>, -- rowid of changed row
before BLOB, -- binary record (before-image)
after BLOB, -- binary record (after-image)
updates BLOB, -- binary record of per-column changes
change_txn_id INTEGER -- transaction ID (groups rows into transactions)
);v2 adds:
- column — groups CDC rows by transaction. Assigned via
change_txn_idopcode which get-or-sets a per-connection transaction ID.conn_txn_id(candidate) - (COMMIT) records — mark transaction boundaries. Emitted once per statement in autocommit mode, or on explicit
change_type=2.COMMIT
The CDC table is created at runtime by the opcode via .
InitCdcVersionCREATE TABLE IF NOT EXISTSsql
CREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT
table_name TEXT,
id <untyped>, -- 变更行的rowid
before BLOB, -- 二进制记录(变更前镜像)
after BLOB, -- 二进制记录(变更后镜像)
updates BLOB, -- 二进制记录(逐列变更内容)
change_txn_id INTEGER -- 事务ID(将CDC行按事务分组)
);v2新增:
- 列 — 按事务对CDC行进行分组。通过
change_txn_id操作码分配,该操作码获取或设置每个连接的事务ID。conn_txn_id(candidate) - (COMMIT)记录 — 标记事务边界。在自动提交模式下,每个语句执行后生成一次;或在显式
change_type=2时生成。COMMIT
CDC表在运行时由操作码通过语句创建。
InitCdcVersionCREATE TABLE IF NOT EXISTSCDC Version Table
CDC版本表
When CDC is first enabled, a version tracking table is created:
sql
CREATE TABLE turso_cdc_version (
table_name TEXT PRIMARY KEY,
version TEXT NOT NULL
);Current version: (defined in , re-exported from )
CDC_VERSION_CURRENT = CdcVersion::V2core/lib.rscore/translate/pragma.rs首次启用CDC时,会创建一个版本跟踪表:
sql
CREATE TABLE turso_cdc_version (
table_name TEXT PRIMARY KEY,
version TEXT NOT NULL
);当前版本:(定义于,从重导出)
CDC_VERSION_CURRENT = CdcVersion::V2core/lib.rscore/translate/pragma.rsVersion Detection in InitCdcVersion
InitCdcVersion中的版本检测
The opcode detects v1 vs v2 by checking whether the CDC table already exists before creating it:
InitCdcVersion- If CDC table already exists but has no version row → v1 (pre-existing table from before version tracking)
- If CDC table doesn't exist → create with current version (v2)
- If version row already exists → use that version as-is
InitCdcVersion- 若CDC表已存在但无版本行 → v1(版本跟踪功能添加前创建的表)
- 若CDC表不存在 → 使用当前版本(v2)创建
- 若版本行已存在 → 直接使用该版本
DatabaseChange
— sync/engine/src/types.rs:229-249
DatabaseChangesync/engine/src/types.rs:229-249DatabaseChange
— sync/engine/src/types.rs:229-249
DatabaseChangesync/engine/src/types.rs:229-249Sync engine's Rust representation of a CDC row. Has and methods
for forward/backward replay.
into_apply()into_revert()同步引擎中用于表示CDC行的Rust类型。包含和方法,用于正向/反向重放变更。
into_apply()into_revert()OperationMode
— core/translate/emitter.rs
OperationModecore/translate/emitter.rsOperationMode
— core/translate/emitter.rs
OperationModecore/translate/emitter.rsUsed by to determine value:
emit_cdc_insns()change_type- → 1
INSERT - /
UPDATE→ 0SELECT - → -1
DELETE - → 2 (v2 only, emitted by
COMMIT)emit_cdc_commit_insns
emit_cdc_insns()change_type- → 1
INSERT - /
UPDATE→ 0SELECT - → -1
DELETE - → 2(仅v2,由
COMMIT生成)emit_cdc_commit_insns
Entry Points
入口点
1. PRAGMA — Enable/Disable CDC
1. PRAGMA — 启用/禁用CDC
Set:
core/translate/pragma.rs- Checks MVCC is not enabled (CDC and MVCC are mutually exclusive)
- Parses mode string via with
CaptureDataChangesInfo::parse()CDC_VERSION_CURRENT - Emits a single opcode — all CDC setup (table creation, version tracking, state change) happens at execution time
InitCdcVersion
Get (read current mode):
core/translate/pragma.rs- Returns 3 columns: ,
mode,tableversion - When off: returns
("off", NULL, NULL) - When active: returns
(mode_name, table, version)
Pragma registration: — with columns
core/pragma.rsUnstableCaptureDataChangesConn["mode", "table", "version"]设置:
core/translate/pragma.rs- 检查MVCC是否未启用(CDC与MVCC互斥)
- 使用结合
CaptureDataChangesInfo::parse()解析模式字符串CDC_VERSION_CURRENT - 生成单个操作码——所有CDC设置(表创建、版本跟踪、状态变更)都在执行时完成
InitCdcVersion
获取(读取当前模式):
core/translate/pragma.rs- 返回3列:,
mode,tableversion - 关闭时:返回
("off", NULL, NULL) - 激活时:返回
(mode_name, table, version)
PRAGMA注册: — ,列名为
core/pragma.rsUnstableCaptureDataChangesConn["mode", "table", "version"]2. Connection State
2. 连接状态
Field: —
Getter: — returns read guard
Setter:
Default: initialized as (CDC off)
core/connection.rscapture_data_changes: RwLock<Option<CaptureDataChangesInfo>>get_capture_data_changes_info()set_capture_data_changes_info(opts: Option<CaptureDataChangesInfo>)None字段: —
获取器: — 返回读取锁
设置器:
默认值: 初始化为(CDC关闭)
core/connection.rscapture_data_changes: RwLock<Option<CaptureDataChangesInfo>>get_capture_data_changes_info()set_capture_data_changes_info(opts: Option<CaptureDataChangesInfo>)None3. ProgramBuilder Integration
3. ProgramBuilder集成
Field: —
Accessor: — returns
Passed from: — read from connection when creating builder
core/vdbe/builder.rscapture_data_changes_info: Option<CaptureDataChangesInfo>capture_data_changes_info()&Option<CaptureDataChangesInfo>core/translate/mod.rs字段: —
访问器: — 返回
来源: — 创建builder时从连接中读取
core/vdbe/builder.rscapture_data_changes_info: Option<CaptureDataChangesInfo>capture_data_changes_info()&Option<CaptureDataChangesInfo>core/translate/mod.rs4. PrepareContext
4. PrepareContext
Field: —
Set from: — clones from
core/vdbe/mod.rscapture_data_changes: Option<CaptureDataChangesInfo>PrepareContext::from_connection()connection.get_capture_data_changes_info()字段: —
设置来源: — 从克隆
core/vdbe/mod.rscapture_data_changes: Option<CaptureDataChangesInfo>PrepareContext::from_connection()connection.get_capture_data_changes_info()5. InitCdcVersion Opcode — core/vdbe/execute.rs
core/vdbe/execute.rs5. InitCdcVersion操作码 — core/vdbe/execute.rs
core/vdbe/execute.rsAlways emitted by PRAGMA SET. Handles all CDC setup at execution time:
- For "off": stores in
None, returns earlystate.pending_cdc_info - Checks if CDC table already exists (for v1 backward compatibility)
- Creates CDC table () — v2 schema with
CREATE TABLE IF NOT EXISTS <cdc_table_name> ...columnchange_txn_id - Creates version table ()
CREATE TABLE IF NOT EXISTS turso_cdc_version ... - Inserts version row: if CDC table pre-existed → "v1", otherwise → current version ("v2"). Uses to preserve existing version rows.
INSERT OR IGNORE - Reads back actual version from the table
- Stores computed in
CaptureDataChangesInfostate.pending_cdc_info
The connection's CDC state is not applied in the opcode. Instead, is applied in only after the transaction commits successfully. This ensures atomicity: if any step fails and the transaction rolls back, the connection's CDC state remains unchanged.
pending_cdc_infohalt()All table creation is done via nested / calls rather than bytecode emission, because the PRAGMA plan can't contain DML against tables that don't exist yet in the schema.
conn.prepare()run_ignore_rows()始终由PRAGMA SET生成。在执行时处理所有CDC设置:
- 若为"off":在中存储
state.pending_cdc_info,提前返回None - 检查CDC表是否已存在(用于v1向后兼容)
- 创建CDC表()——使用包含
CREATE TABLE IF NOT EXISTS <cdc_table_name> ...列的v2 schemachange_txn_id - 创建版本表()
CREATE TABLE IF NOT EXISTS turso_cdc_version ... - 插入版本行:若CDC表已存在 → "v1",否则 → 当前版本("v2")。使用保留已有的版本行
INSERT OR IGNORE - 从表中读取实际版本
- 将计算得到的存储在
CaptureDataChangesInfo中state.pending_cdc_info
连接的CDC状态不会在操作码中直接应用。相反,仅在事务成功提交后,在中应用。这确保了原子性:如果任何步骤失败导致事务回滚,连接的CDC状态保持不变。
pending_cdc_infohalt()所有表创建操作通过嵌套的/调用完成,而非生成字节码,因为PRAGMA计划无法包含针对schema中尚未存在的表的DML操作。
conn.prepare()run_ignore_rows()Bytecode Emission (core/translate/emitter.rs)
字节码生成 (core/translate/emitter.rs)
These are the core CDC code generation functions:
| Function | Purpose |
|---|---|
| Opens CDC table cursor if CDC is active and target != CDC table |
| Reads all columns from cursor into a MakeRecord (for before/after image) |
| Builds record from in-flight register values (for after-image of INSERT/UPDATE) |
| Writes a single CDC row per changed row (INSERT/UPDATE/DELETE). Called per-row inside DML loops. |
| Writes a COMMIT record (change_type=2) into CDC table (v2 only). Raw emission, no autocommit check. |
| End-of-statement COMMIT emission. Checks |
以下是核心CDC代码生成函数:
| 函数 | 用途 |
|---|---|
| 若CDC已激活且目标不是CDC表,则打开CDC表游标 |
| 从游标中读取所有列并生成MakeRecord(用于变更前/后镜像) |
| 基于寄存器中的实时值构建记录(用于INSERT/UPDATE的变更后镜像) |
| 为每个变更行写入一条CDC记录(INSERT/UPDATE/DELETE)。在DML循环内逐行调用。 |
| 向CDC表写入COMMIT记录(change_type=2)(仅v2)。直接生成字节码,不检查自动提交模式。 |
| 语句结束时生成COMMIT记录。在运行时检查 |
COMMIT Emission Strategy (v2)
COMMIT记录生成策略 (v2)
Per-row call sites use (no COMMIT). End-of-statement sites call which checks at runtime:
emit_cdc_insns()emit_cdc_autocommit_commit()is_autocommit()- Autocommit mode: emits a COMMIT record after the statement completes
- Explicit transaction (): skips per-statement COMMIT; the explicit
BEGIN...COMMITstatement emits the COMMIT record viaCOMMITemit_cdc_commit_insns()
This ensures multi-row statements like produce one COMMIT at the end, not one per row.
INSERT INTO t VALUES (1),(2),(3)逐行调用的位置使用(不生成COMMIT)。语句结束的位置调用,该函数在运行时检查:
emit_cdc_insns()emit_cdc_autocommit_commit()is_autocommit()- 自动提交模式: 语句完成后生成一条COMMIT记录
- 显式事务 (): 跳过每个语句的COMMIT生成;显式
BEGIN...COMMIT语句通过COMMIT生成COMMIT记录emit_cdc_commit_insns()
这确保了像这样的多行语句在结束时仅生成一条COMMIT记录,而非每行生成一条。
INSERT INTO t VALUES (1),(2),(3)Integration Points — Where CDC Records Are Emitted
集成点 — CDC记录的生成位置
INSERT — core/translate/insert.rs
core/translate/insert.rsINSERT — core/translate/insert.rs
core/translate/insert.rs- Per-row: after insert, and before delete for REPLACE/conflict
emit_cdc_insns() - End-of-statement: in
emit_cdc_autocommit_commit()after the insert loopemit_epilogue()
- 逐行: 插入后、REPLACE/冲突处理的删除前调用
emit_cdc_insns() - 语句结束: 在中的插入循环后调用
emit_epilogue()emit_cdc_autocommit_commit()
UPDATE — core/translate/emitter.rs
core/translate/emitter.rsUPDATE — core/translate/emitter.rs
core/translate/emitter.rs- Per-row: captures before-image, after-image via patch record, emits
emit_cdc_insns() - End-of-statement: after the update loop
emit_cdc_autocommit_commit()
- 逐行: 捕获变更前镜像、通过patch record捕获变更后镜像,调用
emit_cdc_insns() - 语句结束: 更新循环后调用
emit_cdc_autocommit_commit()
DELETE — core/translate/emitter.rs
core/translate/emitter.rsDELETE — core/translate/emitter.rs
core/translate/emitter.rs- Per-row: captures before-image and emits
emit_cdc_insns() - End-of-statement: after the delete loop
emit_cdc_autocommit_commit()
- 逐行: 捕获变更前镜像并调用
emit_cdc_insns() - 语句结束: 删除循环后调用
emit_cdc_autocommit_commit()
UPSERT (ON CONFLICT DO UPDATE) — core/translate/upsert.rs
core/translate/upsert.rsUPSERT (ON CONFLICT DO UPDATE) — core/translate/upsert.rs
core/translate/upsert.rs- Per-row: for all three cases: pure insert, update after conflict, replace
emit_cdc_insns() - No end-of-statement COMMIT — upsert shares INSERT's epilogue
- 逐行: 为三种情况调用:纯插入、冲突后更新、替换
emit_cdc_insns() - 无语句结束COMMIT生成 — upsert复用INSERT的收尾逻辑
Schema Changes (DDL) — core/translate/schema.rs
core/translate/schema.rs架构变更 (DDL) — core/translate/schema.rs
core/translate/schema.rs- CREATE TABLE: (insert into
emit_cdc_insns()) +sqlite_schemaemit_cdc_autocommit_commit() - DROP TABLE: per-row in metadata loop +
emit_cdc_insns()after loopemit_cdc_autocommit_commit() - CREATE INDEX: +
emit_cdc_insns()(emit_cdc_autocommit_commit())core/translate/schema.rs - DROP INDEX: per-row +
emit_cdc_insns()after loop (emit_cdc_autocommit_commit())core/translate/index.rs
DDL in explicit transactions () does NOT emit per-statement COMMIT — the autocommit check prevents it.
BEGIN; CREATE TABLE t(x); COMMIT- CREATE TABLE: (向
emit_cdc_insns()插入记录) +sqlite_schemaemit_cdc_autocommit_commit() - DROP TABLE: 在元数据循环中逐行调用+ 循环后调用
emit_cdc_insns()emit_cdc_autocommit_commit() - CREATE INDEX: +
emit_cdc_insns()(emit_cdc_autocommit_commit())core/translate/schema.rs - DROP INDEX: 逐行调用+ 循环后调用
emit_cdc_insns()(emit_cdc_autocommit_commit())core/translate/index.rs
显式事务中的DDL()不会生成每个语句的COMMIT记录——自动提交检查会阻止该行为。
BEGIN; CREATE TABLE t(x); COMMITALTER TABLE — core/translate/update.rs
core/translate/update.rsALTER TABLE — core/translate/update.rs
core/translate/update.rs- Sets on the update plan when CDC has updates mode
cdc_update_alter_statement
- 当CDC处于full模式时,在更新计划中设置
cdc_update_alter_statement
Views/Triggers — Explicitly excluded
视图/触发器 — 明确排除
- — passes
core/translate/view.rsfor CDC cursorNone - — passes
core/translate/trigger.rsfor CDC cursorNone
- — 传递
core/translate/view.rs作为CDC游标None - — 传递
core/translate/trigger.rs作为CDC游标None
Subqueries — No CDC
子查询 — 不生成CDC记录
- —
core/translate/subquery.rscdc_cursor_id: None
- —
core/translate/subquery.rscdc_cursor_id: None
Helper Functions (for reading CDC data)
辅助函数(用于读取CDC数据)
table_columns_json_array(table_name)
— core/function.rs
, core/vdbe/execute.rs
table_columns_json_array(table_name)core/function.rscore/vdbe/execute.rstable_columns_json_array(table_name)
— core/function.rs
, core/vdbe/execute.rs
table_columns_json_array(table_name)core/function.rscore/vdbe/execute.rsReturns JSON array of column names for a table. Used to interpret binary records.
返回表的列名JSON数组。用于解析二进制记录。
bin_record_json_object(columns_json, blob)
— core/function.rs
, core/vdbe/execute.rs
bin_record_json_object(columns_json, blob)core/function.rscore/vdbe/execute.rsbin_record_json_object(columns_json, blob)
— core/function.rs
, core/vdbe/execute.rs
bin_record_json_object(columns_json, blob)core/function.rscore/vdbe/execute.rsDecodes a binary record (from // columns) into a JSON object using column names.
beforeafterupdates使用列名将二进制记录(来自//列)解码为JSON对象。
beforeafterupdatesSync Engine Integration
同步引擎集成
The sync engine is the primary consumer of CDC data.
同步引擎是CDC数据的主要消费者。
DatabaseTape — sync/engine/src/database_tape.rs
sync/engine/src/database_tape.rsDatabaseTape — sync/engine/src/database_tape.rs
sync/engine/src/database_tape.rs- CDC config: ,
DEFAULT_CDC_TABLE_NAME = "turso_cdc"DEFAULT_CDC_MODE = "full" - PRAGMA name:
CDC_PRAGMA_NAME = "unstable_capture_data_changes_conn" - Initialization: sets CDC pragma and caches
connect()fromcdc_versiontable. Must be called beforeturso_cdc_version.iterate_changes() - Version caching: — set by
cdc_version: RwLock<Option<CdcVersion>>, read byconnect(). Panics if not set.iterate_changes() - Iterator: reads CDC table in batches, emits
DatabaseChangesIterator. For v2, real COMMIT records from the table are emitted. For v1, a synthetic Commit is appended at end of batch.DatabaseTapeOperation(default) filters outignore_schema_changes: truerow changes but not COMMIT records.sqlite_schema
- CDC配置: ,
DEFAULT_CDC_TABLE_NAME = "turso_cdc"DEFAULT_CDC_MODE = "full" - PRAGMA名称:
CDC_PRAGMA_NAME = "unstable_capture_data_changes_conn" - 初始化: 设置CDC pragma并从
connect()表缓存turso_cdc_version。必须在cdc_version之前调用。iterate_changes() - 版本缓存: — 由
cdc_version: RwLock<Option<CdcVersion>>设置,connect()读取。未设置时会触发panic。iterate_changes() - 迭代器: 批量读取CDC表,生成
DatabaseChangesIterator。对于v2,会生成表中的真实COMMIT记录;对于v1,会在批量结束时附加一条合成的Commit记录。默认DatabaseTapeOperation,会过滤掉ignore_schema_changes: true行的变更,但不会过滤COMMIT记录。sqlite_schema
Sync Operations — sync/engine/src/database_sync_operations.rs
sync/engine/src/database_sync_operations.rs同步操作 — sync/engine/src/database_sync_operations.rs
sync/engine/src/database_sync_operations.rs- Change counting:
SELECT COUNT(*) FROM turso_cdc WHERE change_id > ?
- 变更计数:
SELECT COUNT(*) FROM turso_cdc WHERE change_id > ?
Sync Engine — sync/engine/src/database_sync_engine.rs
sync/engine/src/database_sync_engine.rs同步引擎 — sync/engine/src/database_sync_engine.rs
sync/engine/src/database_sync_engine.rs- Initialization: calls
open_db()to ensure CDC is set up and version is cached before anymain_tape.connect(coro)calls.iterate_changes() - During , checks if CDC table existed, re-creates it after sync
apply_changes
- 初始化: 调用
open_db(),确保在任何main_tape.connect(coro)调用前完成CDC设置和版本缓存。iterate_changes() - 在期间,检查CDC表是否存在,同步后重新创建该表
apply_changes
Replay Generator — sync/engine/src/database_replay_generator.rs
sync/engine/src/database_replay_generator.rs重放生成器 — sync/engine/src/database_replay_generator.rs
sync/engine/src/database_replay_generator.rs- Requires column to be populated (full mode)
updates
- 需要列已填充(full模式)
updates
Bindings CDC Surface
绑定层的CDC接口
All bindings expose as part of sync stats:
cdc_operations| Binding | File |
|---|---|
| Python | |
| JavaScript | |
| JS (generator) | |
| Go | |
| React Native | |
| SDK Kit (C header) | |
| SDK Kit (Rust) | |
所有绑定层都将作为同步统计的一部分暴露:
cdc_operations| 绑定层 | 文件 |
|---|---|
| Python | |
| JavaScript | |
| JS (生成器) | |
| Go | |
| React Native | |
| SDK Kit (C头文件) | |
| SDK Kit (Rust) | |
Tests
测试
- Integration tests: — covers all modes, CRUD, transactions, schema changes, version table, backward compatibility. Registered in
tests/integration/functions/test_cdc.rs.tests/integration/functions/mod.rs - Sync engine tests: — CDC table reads, tape iteration, replay of schema changes.
sync/engine/src/database_tape.rs - JS binding tests:
bindings/javascript/sync/packages/{wasm,native}/promise.test.ts
Run: (integration) or (sync engine).
cargo test -- test_cdccargo test -p turso_sync_engine -- database_tape- 集成测试: — 覆盖所有模式、CRUD操作、事务、架构变更、版本表、向后兼容。注册于
tests/integration/functions/test_cdc.rs。tests/integration/functions/mod.rs - 同步引擎测试: — CDC表读取、磁带迭代、架构变更重放。
sync/engine/src/database_tape.rs - JS绑定测试:
bindings/javascript/sync/packages/{wasm,native}/promise.test.ts
运行方式:(集成测试)或(同步引擎测试)。
cargo test -- test_cdccargo test -p turso_sync_engine -- database_tapeUser-facing Documentation
用户文档
- CLI manual page: — accessible via
cli/manuals/cdc.mdin the REPL.manual cdc - Database manual: — CDC section linked in TOC
docs/manual.md
- CLI手册页: — 可通过REPL中的
cli/manuals/cdc.md访问.manual cdc - 数据库手册: — CDC章节已加入目录
docs/manual.md
Key Design Decisions
关键设计决策
- Per-connection, not per-database. Each connection has its own CDC mode and can target different tables.
- Bytecode-level implementation. CDC instructions are emitted alongside the actual DML bytecode during translation — no runtime hooks or triggers.
- Self-exclusion. Changes to the CDC table and table are never captured (checked in
turso_cdc_version).prepare_cdc_if_necessary - Schema changes tracked. DDL operations are recorded as changes to table.
sqlite_schema - Binary record format. Before/after/updates columns use SQLite's MakeRecord format (same as B-tree payload).
- Transaction-aware. CDC writes happen within the same transaction as the DML, so rollback naturally discards CDC entries.
- Version tracking. CDC schema version is recorded in table and carried in
turso_cdc_versionfor future schema evolution.CaptureDataChangesInfo.version - Atomic PRAGMA. Connection CDC state is deferred via in
pending_cdc_infoand applied only at Halt. If the PRAGMA's disk writes fail and the transaction rolls back, the connection state stays unchanged.ProgramState - Per-statement COMMIT (v2). COMMIT records are emitted once per statement (not per row), using which checks
emit_cdc_autocommit_commit()at runtime. In explicit transactions, only the finalis_autocommit()emits a COMMIT CDC record.COMMIT - Backward-compatible version detection. Pre-existing v1 CDC tables (without ) are detected by checking table existence before creation. Existing tables get
turso_cdc_versioninserted into the version table.CdcVersion::V1 - Typed version enum. enum with
CdcVersionand#[repr(u8)]/Ordenables feature gating via integer comparison (PartialOrd=has_commit_record()).self >= V2/Displayhandles database round-trip.FromStr - CDC and MVCC mutual exclusion. Enabling CDC when MVCC is active (or vice versa) returns an error. Checked at PRAGMA set time and journal mode switch time.
- 基于连接而非数据库: 每个连接有独立的CDC模式,可指向不同的表。
- 字节码层实现: CDC指令在翻译阶段与实际DML字节码一起生成——无需运行时钩子或触发器。
- 自排除: 永远不会捕获CDC表和表的变更(在
turso_cdc_version中检查)。prepare_cdc_if_necessary - 跟踪架构变更: DDL操作被记录为表的变更。
sqlite_schema - 二进制记录格式: 变更前/后/更新内容列使用SQLite的MakeRecord格式(与B-tree payload相同)。
- 事务感知: CDC写入操作与DML在同一事务中执行,因此回滚会自动丢弃CDC条目。
- 版本跟踪: CDC schema版本记录在表中,并携带在
turso_cdc_version中,以支持未来的schema演进。CaptureDataChangesInfo.version - 原子PRAGMA: 连接的CDC状态通过中的
ProgramState延迟应用,仅在Halt阶段生效。如果PRAGMA的磁盘写入失败导致事务回滚,连接状态保持不变。pending_cdc_info - 按语句生成COMMIT (v2): COMMIT记录按语句生成(而非按行),使用在运行时检查
emit_cdc_autocommit_commit()。在显式事务中,仅最终的is_autocommit()会生成CDC COMMIT记录。COMMIT - 向后兼容的版本检测: 通过在创建前检查CDC表是否存在,检测版本跟踪功能添加前创建的v1 CDC表。已存在的表会自动在版本表中插入。
CdcVersion::V1 - 类型化版本枚举: 枚举使用
CdcVersion和#[repr(u8)]/Ord,支持通过整数比较进行特性门控(PartialOrd=has_commit_record())。self >= V2/Display实现了与数据库的双向转换。FromStr - CDC与MVCC互斥: 当MVCC已激活时启用CDC(反之亦然)会返回错误。在PRAGMA设置时和日志模式切换时进行检查。