Loading...
Loading...
Change Data Capture - architecture, entrypoints, bytecode emission, sync engine integration, tests
npx skill4agent add tursodatabase/turso cdcturso_cdcUser SQL (INSERT/UPDATE/DELETE/DDL)
|
v
┌─────────────────────────────────────────────────┐
│ Translate layer (core/translate/) │
│ ┌───────────────────────────────────────────┐ │
│ │ prepare_cdc_if_necessary() │ │
│ │ - checks CaptureDataChangesInfo │ │
│ │ - opens CDC table cursor (OpenWrite) │ │
│ │ - skips if target == CDC table itself │ │
│ └───────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────┐ │
│ │ emit_cdc_insns() │ │
│ │ - writes (change_id, change_time, │ │
│ │ change_type, table_name, id, │ │
│ │ before, after, updates) into CDC tbl │ │
│ └───────────────────────────────────────────┘ │
│ + emit_cdc_full_record() / emit_cdc_patch_record() │
└─────────────────────────────────────────────────┘
|
v
CDC table (turso_cdc or custom name)
|
v
┌─────────────────────────────────────────────────┐
│ Sync engine (sync/engine/) │
│ DatabaseTape reads CDC table → DatabaseChange │
│ → apply/revert → push to remote │
└─────────────────────────────────────────────────┘CaptureDataChangesModeCaptureDataChangesInfocore/lib.rs#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u8)]
enum CdcVersion {
V1 = 1,
V2 = 2,
}
const CDC_VERSION_CURRENT: CdcVersion = CdcVersion::V2;
enum CaptureDataChangesMode {
Id, // capture only rowid
Before, // capture before-image
After, // capture after-image
Full, // before + after + updates
}
struct CaptureDataChangesInfo {
mode: CaptureDataChangesMode,
table: String, // CDC table name
version: Option<CdcVersion>, // schema version (V1 or V2)
}Option<CaptureDataChangesInfo>NoneCdcVersionhas_commit_record()self >= V2DisplayFromStr"v1"V1"v2"V2CaptureDataChangesInfoparse(value: &str, version: Option<CdcVersion>)"<mode>[,<table_name>]"Nonecdc_version()CdcVersionis_v1()is_v2()version()has_before()has_after()has_updates()mode_name()CaptureDataChangesExtOption<CaptureDataChangesInfo>has_before()has_after()has_updates()table()Option<&str>turso_cdcTURSO_CDC_DEFAULT_TABLE_NAMECREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE
table_name TEXT,
id <untyped>, -- rowid of changed row
before BLOB, -- binary record (before-image)
after BLOB, -- binary record (after-image)
updates BLOB -- binary record of per-column changes
);CREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT
table_name TEXT,
id <untyped>, -- rowid of changed row
before BLOB, -- binary record (before-image)
after BLOB, -- binary record (after-image)
updates BLOB, -- binary record of per-column changes
change_txn_id INTEGER -- transaction ID (groups rows into transactions)
);change_txn_idconn_txn_id(candidate)change_type=2COMMITInitCdcVersionCREATE TABLE IF NOT EXISTSCREATE TABLE turso_cdc_version (
table_name TEXT PRIMARY KEY,
version TEXT NOT NULL
);CDC_VERSION_CURRENT = CdcVersion::V2core/lib.rscore/translate/pragma.rsInitCdcVersionDatabaseChangesync/engine/src/types.rs:229-249into_apply()into_revert()OperationModecore/translate/emitter.rsemit_cdc_insns()change_typeINSERTUPDATESELECTDELETECOMMITemit_cdc_commit_insnscore/translate/pragma.rsCaptureDataChangesInfo::parse()CDC_VERSION_CURRENTInitCdcVersioncore/translate/pragma.rsmodetableversion("off", NULL, NULL)(mode_name, table, version)core/pragma.rsUnstableCaptureDataChangesConn["mode", "table", "version"]core/connection.rscapture_data_changes: RwLock<Option<CaptureDataChangesInfo>>get_capture_data_changes_info()set_capture_data_changes_info(opts: Option<CaptureDataChangesInfo>)Nonecore/vdbe/builder.rscapture_data_changes_info: Option<CaptureDataChangesInfo>capture_data_changes_info()&Option<CaptureDataChangesInfo>core/translate/mod.rscore/vdbe/mod.rscapture_data_changes: Option<CaptureDataChangesInfo>PrepareContext::from_connection()connection.get_capture_data_changes_info()core/vdbe/execute.rsNonestate.pending_cdc_infoCREATE TABLE IF NOT EXISTS <cdc_table_name> ...change_txn_idCREATE TABLE IF NOT EXISTS turso_cdc_version ...INSERT OR IGNORECaptureDataChangesInfostate.pending_cdc_infopending_cdc_infohalt()conn.prepare()run_ignore_rows()| Function | Purpose |
|---|---|
| Opens CDC table cursor if CDC is active and target != CDC table |
| Reads all columns from cursor into a MakeRecord (for before/after image) |
| Builds record from in-flight register values (for after-image of INSERT/UPDATE) |
| Writes a single CDC row per changed row (INSERT/UPDATE/DELETE). Called per-row inside DML loops. |
| Writes a COMMIT record (change_type=2) into CDC table (v2 only). Raw emission, no autocommit check. |
| End-of-statement COMMIT emission. Checks |
emit_cdc_insns()emit_cdc_autocommit_commit()is_autocommit()BEGIN...COMMITCOMMITemit_cdc_commit_insns()INSERT INTO t VALUES (1),(2),(3)core/translate/insert.rsemit_cdc_insns()emit_cdc_autocommit_commit()emit_epilogue()core/translate/emitter.rsemit_cdc_insns()emit_cdc_autocommit_commit()core/translate/emitter.rsemit_cdc_insns()emit_cdc_autocommit_commit()core/translate/upsert.rsemit_cdc_insns()core/translate/schema.rsemit_cdc_insns()sqlite_schemaemit_cdc_autocommit_commit()emit_cdc_insns()emit_cdc_autocommit_commit()emit_cdc_insns()emit_cdc_autocommit_commit()core/translate/schema.rsemit_cdc_insns()emit_cdc_autocommit_commit()core/translate/index.rsBEGIN; CREATE TABLE t(x); COMMITcore/translate/update.rscdc_update_alter_statementcore/translate/view.rsNonecore/translate/trigger.rsNonecore/translate/subquery.rscdc_cursor_id: Nonetable_columns_json_array(table_name)core/function.rscore/vdbe/execute.rsbin_record_json_object(columns_json, blob)core/function.rscore/vdbe/execute.rsbeforeafterupdatessync/engine/src/database_tape.rsDEFAULT_CDC_TABLE_NAME = "turso_cdc"DEFAULT_CDC_MODE = "full"CDC_PRAGMA_NAME = "unstable_capture_data_changes_conn"connect()cdc_versionturso_cdc_versioniterate_changes()cdc_version: RwLock<Option<CdcVersion>>connect()iterate_changes()DatabaseChangesIteratorDatabaseTapeOperationignore_schema_changes: truesqlite_schemasync/engine/src/database_sync_operations.rsSELECT COUNT(*) FROM turso_cdc WHERE change_id > ?sync/engine/src/database_sync_engine.rsopen_db()main_tape.connect(coro)iterate_changes()apply_changessync/engine/src/database_replay_generator.rsupdatescdc_operations| Binding | File |
|---|---|
| Python | |
| JavaScript | |
| JS (generator) | |
| Go | |
| React Native | |
| SDK Kit (C header) | |
| SDK Kit (Rust) | |
tests/integration/functions/test_cdc.rstests/integration/functions/mod.rssync/engine/src/database_tape.rsbindings/javascript/sync/packages/{wasm,native}/promise.test.tscargo test -- test_cdccargo test -p turso_sync_engine -- database_tapecli/manuals/cdc.md.manual cdcdocs/manual.mdturso_cdc_versionprepare_cdc_if_necessarysqlite_schematurso_cdc_versionCaptureDataChangesInfo.versionpending_cdc_infoProgramStateemit_cdc_autocommit_commit()is_autocommit()COMMITturso_cdc_versionCdcVersion::V1CdcVersion#[repr(u8)]OrdPartialOrdhas_commit_record()self >= V2DisplayFromStr