confluent-cloud-cdc-tableflow
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseConfluent Cloud CDC to Tableflow Pipeline
Confluent Cloud CDC 到 Tableflow 管道
Build production-ready Change Data Capture pipelines that stream database changes through Confluent Cloud to Iceberg or Delta Lake tables using Debezium, Flink, and Tableflow.
使用Debezium、Flink和Tableflow构建生产级变更数据捕获(CDC)管道,将数据库变更通过Confluent Cloud流式传输到Iceberg或Delta Lake表中。
Overview
概述
This skill automates the setup of a complete CDC pipeline:
Database → Debezium CDC Connector → Kafka + Schema Registry → Flink (decode & transform) → Tableflow → Iceberg/Delta Tables
本技能可自动搭建完整的CDC管道:
数据库 → Debezium CDC连接器 → Kafka + Schema Registry → Flink(解码与转换) → Tableflow → Iceberg/Delta表
Supported Databases (Fully-Managed Debezium Connectors Only)
支持的数据库(仅全托管Debezium连接器)
- Microsoft SQL Server CDC Source V2
- MySQL CDC Source V2
- PostgreSQL CDC Source V2
- Oracle XStream CDC Source
- DynamoDB CDC Source
- Microsoft SQL Server CDC Source V2
- MySQL CDC Source V2
- PostgreSQL CDC Source V2
- Oracle XStream CDC Source
- DynamoDB CDC Source
Key Components
核心组件
- Debezium CDC Source Connector: Captures database changes as events
- Schema Registry: Manages Avro/JSON/Protobuf schemas (default: JSON_SR)
- Confluent Cloud Flink: Decodes Debezium envelopes and transforms data
- Tableflow: Native Confluent Cloud feature that materializes Kafka topics as Iceberg or Delta tables
- Debezium CDC源连接器:捕获数据库变更作为事件
- Schema Registry:管理Avro/JSON/Protobuf Schema(默认:JSON_SR)
- Confluent Cloud Flink:解码Debezium信封并转换数据
- Tableflow:Confluent Cloud原生功能,将Kafka主题物化为Iceberg或Delta表
Critical Architecture Rules
关键架构规则
1. NEVER enable Tableflow directly on CDC source topics.
Always use the Flink decode pattern: CDC Source Topic → Flink INSERT → Target Topic () → Tableflow.
changelog.mode = 'upsert'CDC connectors with produce null-value Kafka records (tombstones) on DELETE operations. If Tableflow is enabled directly on the CDC source topic, it will use APPEND mode by default and immediately suspend when it encounters a tombstone: "Tableflow will be suspended because we detected a Kafka record with a null value."
tombstones.on.delete=trueThe Flink decode layer solves this by interpreting Debezium CDC semantics natively — it translates DELETEs into proper retract/tombstone messages that upsert-mode Tableflow handles correctly.
Do NOT use as a shortcut to bypass the Flink decode step. While it strips the Debezium envelope, tombstone records from DELETEs still break APPEND-mode Tableflow. Additionally, does not support the configuration option at all.
after.state.only=trueOracleXStreamSourceafter.state.only2. Tableflow changelog mode is IMMUTABLE after first materialization.
Tableflow caches the changelog mode (APPEND or UPSERT) when it first materializes data. Once set, it cannot be changed — even by altering the Kafka topic's property or by deleting and recreating the Tableflow topic. The S3 is keyed by Kafka topic name, so recreating a Tableflow topic reuses the same S3 path and cached state.
changelog.modetable_pathAttempting to change the mode causes: "The changelog mode for this topic has been modified since table materialization began." Flip-flopping the mode further corrupts state with: "Incompatible schema evolution detected."
To change changelog mode, you must delete the Tableflow topic, delete the underlying Kafka topic, and recreate both from scratch. This is why it's critical to create target topics with from the start.
'changelog.mode' = 'upsert'3. Pipeline cleanup order matters.
When resetting a CDC-to-Tableflow pipeline, delete resources in this order:
- Tableflow topics (on target topics)
- Flink INSERT statements
- Flink target tables (DROP TABLE)
- Target Kafka topics
- CDC connectors
- CDC source Kafka topics (including dbhistory/schema-changes topics)
- All associated schemas from Schema Registry (both and
-keysubjects)-value
Never delete CDC source Kafka topics while the connector is still running — the connector cannot recover or re-snapshot and must be fully recreated.
1. 切勿直接在CDC源主题上启用Tableflow。
务必使用Flink解码模式:CDC源主题 → Flink INSERT → 目标主题() → Tableflow。
changelog.mode = 'upsert'启用的CDC连接器会在执行DELETE操作时生成值为null的Kafka记录(墓碑记录)。如果直接在CDC源主题上启用Tableflow,它默认会使用APPEND模式,遇到墓碑记录时会立即暂停:“Tableflow将被暂停,因为我们检测到一条值为null的Kafka记录。”
tombstones.on.delete=trueFlink解码层通过原生解析Debezium CDC语义解决了这个问题——它将DELETE操作转换为Tableflow可正确处理的撤回/墓碑消息。
请勿使用作为绕过Flink解码步骤的捷径。虽然它会剥离Debezium信封,但DELETE操作产生的墓碑记录仍会破坏APPEND模式的Tableflow。此外,完全不支持配置项。
after.state.only=trueOracleXStreamSourceafter.state.only2. Tableflow变更日志模式在首次物化后不可修改。
Tableflow会在首次物化数据时缓存变更日志模式(APPEND或UPSERT)。一旦设置,即使修改Kafka主题的属性,或删除并重新创建Tableflow主题,该模式也无法更改。S3的以Kafka主题名称为键,因此重新创建Tableflow主题会复用相同的S3路径和缓存状态。
changelog.modetable_path尝试修改模式会导致:“自表物化开始以来,此主题的变更日志模式已被修改。”反复切换模式会进一步破坏状态:“检测到不兼容的Schema演进。”
要变更日志模式,必须删除Tableflow主题、底层Kafka主题,然后从头重新创建两者。这就是为什么从一开始就必须创建带有的目标主题至关重要。
'changelog.mode' = 'upsert'3. 管道清理顺序至关重要。
重置CDC到Tableflow管道时,请按以下顺序删除资源:
- Tableflow主题(目标主题上的)
- Flink INSERT语句
- Flink目标表(DROP TABLE)
- 目标Kafka主题
- CDC连接器
- CDC源Kafka主题(包括dbhistory/schema-changes主题)
- Schema Registry中所有关联的Schema(包括和
-key主题)-value
切勿在连接器仍运行时删除CDC源Kafka主题——连接器无法恢复或重新生成快照,必须完全重新创建。
Important Clarifications
重要说明
- Tableflow is NOT a connector. It is a native topic-level feature enabled via the Tableflow API or Confluent Cloud UI.
- Confluent Cloud Flink auto-discovers CDC tables. You do NOT need to manually create source tables — topics with Schema Registry schemas are automatically available as Flink tables.
- Topics without SR schemas can still be handled — register a JSON schema (partial is fine), use schema inference, or use Flink's raw BYTES with JSON functions. See "Handling Topics Without Schema Registry".
references/connector-configs.md - All SR-backed formats work identically — ,
JSON_SR, andAVROall support Flink auto-discovery and Tableflow. Choose based on throughput needs vs. debuggability.PROTOBUF - Managed connectors use , not
output.data.format/key.converterclasses.value.converter
- Tableflow不是连接器。它是通过Tableflow API或Confluent Cloud UI启用的原生主题级功能。
- Confluent Cloud Flink会自动发现CDC表。无需手动创建源表——带有Schema Registry Schema的主题会自动作为Flink表可用。
- 无SR Schema的主题仍可处理——注册JSON Schema(部分Schema即可)、使用Schema推断,或使用Flink的原始BYTES结合JSON函数。详见中的“处理无Schema Registry的主题”章节。
references/connector-configs.md - 所有基于SR的格式工作方式完全相同——、
JSON_SR和AVRO均支持Flink自动发现和Tableflow。可根据吞吐量需求与可调试性进行选择。PROTOBUF - 托管连接器使用,而非
output.data.format/key.converter类。value.converter
Workflow Phases
工作流阶段
Phase 0: Tool Selection & MCP Server Validation (CRITICAL)
阶段0:工具选择与MCP服务器验证(关键)
Default: Use Confluent MCP Server. The MCP server is the preferred method for all Confluent Cloud operations. Only fall back to the Confluent CLI ( command) and REST APIs if the MCP server is not installed or unavailable.
confluent默认:使用Confluent MCP Server。 MCP服务器是所有Confluent Cloud操作的首选方法。仅当MCP服务器未安装或不可用时,才退回到Confluent CLI(命令)和REST API。
confluent0.1 Verify MCP Server Availability
0.1 验证MCP服务器可用性
Check for tools (list-environments, create-connector, create-flink-statement, create-tableflow-topic, list-schemas, list-topics, consume-messages, search-topics-by-name).
mcp__confluent__*If MCP is not available, fall back to the Confluent CLI ( command) and REST APIs for all operations. The CLI fallback should mirror the same workflow phases but use CLI commands instead of MCP tool calls.
confluentCLI Fallback Examples:
bash
undefined检查是否存在工具(list-environments、create-connector、create-flink-statement、create-tableflow-topic、list-schemas、list-topics、consume-messages、search-topics-by-name)。
mcp__confluent__*如果MCP不可用,则退回到Confluent CLI(命令)和REST API执行所有操作。CLI回退应遵循相同的工作流阶段,但使用CLI命令而非MCP工具调用。
confluentCLI回退示例:
bash
undefinedEnvironment & cluster discovery
环境与集群发现
confluent environment list
confluent kafka cluster list --environment <env-id>
confluent environment list
confluent kafka cluster list --environment <env-id>
Connector operations
连接器操作
confluent connect cluster create --config-file connector-config.json --cluster <cluster-id> --environment <env-id>
confluent connect cluster describe <connector-id>
confluent connect cluster list --cluster <cluster-id> --environment <env-id>
confluent connect cluster create --config-file connector-config.json --cluster <cluster-id> --environment <env-id>
confluent connect cluster describe <connector-id>
confluent connect cluster list --cluster <cluster-id> --environment <env-id>
Flink operations
Flink操作
confluent flink compute-pool create <pool-name> --cloud <cloud> --region <region> --environment <env-id>
confluent flink statement create <statement-name> --sql "<SQL>" --compute-pool <pool-id> --environment <env-id>
confluent flink statement describe <statement-name> --environment <env-id>
confluent flink statement delete <statement-name> --environment <env-id>
confluent flink compute-pool create <pool-name> --cloud <cloud> --region <region> --environment <env-id>
confluent flink statement create <statement-name> --sql "<SQL>" --compute-pool <pool-id> --environment <env-id>
confluent flink statement describe <statement-name> --environment <env-id>
confluent flink statement delete <statement-name> --environment <env-id>
Topic & schema operations
主题与Schema操作
confluent kafka topic list --cluster <cluster-id> --environment <env-id>
confluent schema-registry subject list --environment <env-id>
confluent kafka topic list --cluster <cluster-id> --environment <env-id>
confluent schema-registry subject list --environment <env-id>
Tableflow operations
Tableflow操作
confluent tableflow topic enable <topic-name> --cluster <cluster-id> --environment <env-id> --storage-type MANAGED --table-formats ICEBERG
confluent tableflow topic list --cluster <cluster-id> --environment <env-id>
confluent tableflow topic describe <topic-name> --cluster <cluster-id> --environment <env-id>
confluent tableflow topic disable <topic-name> --cluster <cluster-id> --environment <env-id>
**REST API Fallback:** If neither MCP nor CLI is available, use the Confluent Cloud REST APIs directly. All calls use HTTP Basic Auth with a **Cloud API Key** (not a Kafka API key). See `references/rest-api.md` for endpoint patterns and examples.confluent tableflow topic enable <topic-name> --cluster <cluster-id> --environment <env-id> --storage-type MANAGED --table-formats ICEBERG
confluent tableflow topic list --cluster <cluster-id> --environment <env-id>
confluent tableflow topic describe <topic-name> --cluster <cluster-id> --environment <env-id>
confluent tableflow topic disable <topic-name> --cluster <cluster-id> --environment <env-id>
**REST API回退:** 如果MCP和CLI均不可用,则直接使用Confluent Cloud REST API。所有调用均使用HTTP基本认证,搭配**Cloud API密钥**(非Kafka API密钥)。详见`references/rest-api.md`中的端点模式和示例。0.2 Gather Confluent Cloud Details from the User
0.2 从用户处收集Confluent Cloud详细信息
Ask the user to provide the following Confluent Cloud details:
| Detail | Example | Used For |
|---|---|---|
| Environment ID | | |
| Kafka Cluster ID | | |
| Flink Compute Pool ID | | |
| Flink Catalog Name | | |
| Flink Database Name | | |
Credentials: Generate a file with placeholders for: Kafka API Key/Secret (cluster-scoped), Database Username/Password. Have the user populate it in their editor and add it to . If the user prefers Claude not read the file, fall back to CLI: generate with placeholders, user fills it in, then .
cdc-credentials.properties.gitignoreconnector-config.jsonconfluent connect cluster create --config-file connector-config.json请用户提供以下Confluent Cloud详细信息:
| 详情 | 示例 | 用途 |
|---|---|---|
| 环境ID | | 所有MCP调用中的 |
| Kafka集群ID | | 所有MCP调用中的 |
| Flink计算池ID | | Flink语句中的 |
| Flink Catalog名称 | | Flink语句中的 |
| Flink数据库名称 | | Flink语句中的 |
凭据: 生成一个文件,包含Kafka API密钥/密钥(集群范围)、数据库用户名/密码的占位符。请用户在编辑器中填充该文件并添加到。如果用户不希望Claude读取该文件,则退回到CLI:生成带有占位符的,用户填充后执行。
cdc-credentials.properties.gitignoreconnector-config.jsonconfluent connect cluster create --config-file connector-config.json0.3 Verify MCP Cluster Targeting
0.3 验证MCP集群目标
Quick verification:
- Run to confirm the MCP server is connected to the expected cluster
mcp__confluent__list-topics - Run to verify Schema Registry is accessible
mcp__confluent__list-schemas
Schema Registry is shared at the environment level across all clusters.
快速验证:
- 运行确认MCP服务器已连接到预期集群
mcp__confluent__list-topics - 运行验证Schema Registry可访问
mcp__confluent__list-schemas
Schema Registry在环境级别跨所有集群共享。
Phase 1: Discovery & Validation
阶段1:发现与验证
1.1 Check Existing Setup
1.1 检查现有设置
Use MCP to check what already exists:
mcp__confluent__list-connectors(environmentId, clusterId) → Existing CDC connectors
mcp__confluent__list-flink-statements(environmentId, computePoolId) → Existing Flink jobs
mcp__confluent__list-tableflow-topics(environmentId, clusterId) → Existing Tableflow topicsAsk the user:
- "Do you have any CDC connectors already running?"
- "Do you have a Flink compute pool you'd like to use, or should I create one?"
- "Is your database already configured for CDC?"
使用MCP检查已存在的资源:
mcp__confluent__list-connectors(environmentId, clusterId) → 现有CDC连接器
mcp__confluent__list-flink-statements(environmentId, computePoolId) → 现有Flink作业
mcp__confluent__list-tableflow-topics(environmentId, clusterId) → 现有Tableflow主题询问用户:
- "您是否已有运行中的CDC连接器?"
- "您是否有想要使用的Flink计算池,还是需要我创建一个?"
- "您的数据库是否已配置好CDC?"
1.2 Validate Topic Prefix Uniqueness
1.2 验证主题前缀唯一性
Before proceeding, validate that the chosen won't collide with existing topics:
topic.prefixmcp__confluent__search-topics-by-name(topicName: "<proposed-prefix>", environmentId, clusterId)Or via CLI:
bash
confluent kafka topic list --cluster <cluster-id> --environment <env-id> | grep "^<proposed-prefix>"If any existing topics share the proposed prefix, warn the user and recommend a unique prefix. A prefix collision silently merges CDC data with unrelated topics, which can corrupt both pipelines.
在继续之前,验证所选的不会与现有主题冲突:
topic.prefixmcp__confluent__search-topics-by-name(topicName: "<拟议前缀>", environmentId, clusterId)或通过CLI:
bash
confluent kafka topic list --cluster <cluster-id> --environment <env-id> | grep "^<拟议前缀>"如果任何现有主题与拟议前缀重复,请警告用户并建议使用唯一前缀。前缀冲突会静默地将CDC数据与无关主题合并,可能损坏两个管道。
1.3 Check Schema Registry Compatibility
1.3 检查Schema Registry兼容性
Default compatibility can halt CDC connectors when database columns are dropped. Set for CDC subjects after the connector creates them:
BACKWARDFULL_TRANSITIVEbash
confluent schema-registry config update --subject "<topic-prefix>.<schema>.<table>-value" --compatibility FULL_TRANSITIVE --environment <env-id>默认的兼容性会在删除数据库列时暂停CDC连接器。在连接器创建CDC主题后,将其兼容性设置为:
BACKWARDFULL_TRANSITIVEbash
confluent schema-registry config update --subject "<topic-prefix>.<schema>.<table>-value" --compatibility FULL_TRANSITIVE --environment <env-id>1.4 Gather Required Information
1.4 收集必要信息
Database Configuration:
- Database type (SQL Server, MySQL, PostgreSQL, Oracle, or DynamoDB)
- Connection details (hostname, port, database name)
- Credentials (populated by the user in the credentials file)
- Specific tables to capture (format: )
schema.table
Schema Format: Ask the user: (default, human-readable), (smaller payloads, high-throughput), or (strongly typed). All work identically with Flink auto-discovery and Tableflow. Never use plain — it breaks both. See for detailed comparison.
JSON_SRAVROPROTOBUFJSONreferences/connector-configs.mdExisting Topics Without SR: See "Handling Topics Without Schema Registry" for options (register JSON schema, schema inference, or Flink raw BYTES).
references/connector-configs.mdTableflow Destination:
- Target format: Iceberg or Delta Lake
- Storage: Managed (recommended, Confluent manages S3) or BYOB (user's own S3 bucket, requires Provider Integration ID)
Naming Convention:
- Default:
cdc-pipeline-skill-{database-type}-{YYYYMMDD} - Example:
cdc-pipeline-skill-postgres-20260323
数据库配置:
- 数据库类型(SQL Server、MySQL、PostgreSQL、Oracle或DynamoDB)
- 连接详情(主机名、端口、数据库名称)
- 凭据(用户在凭据文件中填充)
- 要捕获的特定表(格式:)
schema.table
Schema格式: 询问用户:(默认,人类可读)、(有效负载更小,高吞吐量)或(强类型)。所有格式均与Flink自动发现和Tableflow兼容。切勿使用纯——这会破坏两者。详见中的详细对比。
JSON_SRAVROPROTOBUFJSONreferences/connector-configs.md无SR的现有主题: 详见中的“处理无Schema Registry的主题”章节,选项包括注册JSON Schema、Schema推断或Flink原始BYTES。
references/connector-configs.mdTableflow目标:
- 目标格式:Iceberg或Delta Lake
- 存储:托管(推荐,Confluent管理S3)或BYOB(用户自有S3存储桶,需要提供商集成ID)
命名约定:
- 默认:
cdc-pipeline-skill-{database-type}-{YYYYMMDD} - 示例:
cdc-pipeline-skill-postgres-20260323
1.5 Validate Database Prerequisites
1.5 验证数据库先决条件
Each database requires specific CDC setup. Read for details:
references/database-prerequisites.md- PostgreSQL: WAL level = logical, replication slots, publication
- MySQL: binlog format = ROW, GTID mode
- SQL Server: CDC enabled on database and tables, SQL Server Agent running
- Oracle XStream: GoldenGate replication enabled (), ARCHIVELOG mode, supplemental logging, XStream admin user with
enable_goldengate_replication=TRUEprivileges, XStream outbound server created viaDBMS_XSTREAM_AUTH, connector user with XStream connect privilege. Full prereqs: https://docs.confluent.io/cloud/current/connectors/cc-oracle-xstream-cdc-source/prereqs-validation.htmlDBMS_XSTREAM_ADM.CREATE_OUTBOUND - DynamoDB: Streams enabled with NEW_AND_OLD_IMAGES
If the database isn't properly configured, guide the user through setup before proceeding.
Oracle XStream important limitations:
- Only supports non-CDB architecture on Amazon RDS for Oracle
- Does NOT support Oracle Autonomous Databases or Oracle Standby (Data Guard)
- Does NOT support Downstream Capture
- is NOT supported by
after.state.onlyOracleXStreamSource - Requires a valid Confluent license for XStream Out
每种数据库都需要特定的CDC设置。详见:
references/database-prerequisites.md- PostgreSQL:WAL级别=logical,复制槽,发布
- MySQL:binlog格式=ROW,GTID模式
- SQL Server:数据库和表上启用CDC,SQL Server Agent运行
- Oracle XStream:启用GoldenGate复制(),ARCHIVELOG模式,补充日志,拥有
enable_goldengate_replication=TRUE权限的XStream管理员用户,通过DBMS_XSTREAM_AUTH创建XStream出站服务器,拥有XStream连接权限的连接器用户。完整先决条件:https://docs.confluent.io/cloud/current/connectors/cc-oracle-xstream-cdc-source/prereqs-validation.htmlDBMS_XSTREAM_ADM.CREATE_OUTBOUND - DynamoDB:启用流,模式为NEW_AND_OLD_IMAGES
如果数据库未正确配置,请引导用户完成设置后再继续。
Oracle XStream重要限制:
- 仅支持Amazon RDS for Oracle上的非CDB架构
- 不支持Oracle自治数据库或Oracle Standby(Data Guard)
- 不支持下游捕获
- 不支持
OracleXStreamSourceafter.state.only - 需要有效的Confluent XStream Out许可证
Phase 2: Planning
阶段2:规划
Generate the complete configuration plan and present it to the user for approval.
生成完整的配置计划并提交给用户审批。
2.1 Connector Configuration
2.1 连接器配置
Based on the database type, generate the connector configuration using the appropriate template from . The templates include all required fields (, , , , , , etc.) and database-specific settings.
references/connector-configs.mdnameconnector.classtopic.prefixkafka.api.keyoutput.data.formatdecimal.handling.modeSet the schema format based on user preference (default ):
JSON_SRjson
{
"output.data.format": "JSON_SR",
"output.key.format": "JSON_SR"
}Replace with or if the user requested a different format. Both key and value formats should match. All other connector settings remain the same regardless of format choice.
JSON_SRAVROPROTOBUFTopic Naming Pattern:
Example with :
{topic.prefix}.{schema}.{table}topic.prefix = "postgres-cdc"postgres-cdc.public.customers根据数据库类型,使用中的相应模板生成连接器配置。模板包含所有必填字段(、、、、、等)和数据库特定设置。
references/connector-configs.mdnameconnector.classtopic.prefixkafka.api.keyoutput.data.formatdecimal.handling.mode根据用户偏好设置Schema格式(默认):
JSON_SRjson
{
"output.data.format": "JSON_SR",
"output.key.format": "JSON_SR"
}如果用户要求其他格式,将替换为或。键和值格式应匹配。无论格式选择如何,所有其他连接器设置保持不变。
JSON_SRAVROPROTOBUF主题命名模式:
示例():
{topic.prefix}.{schema}.{table}topic.prefix = "postgres-cdc"postgres-cdc.public.customers2.2 Flink SQL Statements
2.2 Flink SQL语句
In Confluent Cloud Flink, the CDC source table is auto-discovered from the Kafka topic. You only need to:
Note: The examples below use a table with illustrative column names. Substitute the user's actual table name, columns, and types based on the schema discovered from their CDC topic.
customers- Create a target table (for plain JSON_SR output to Tableflow):
sql
CREATE TABLE `target_customers` (
`id` INT NOT NULL,
`name` STRING,
`email` STRING,
`created_at` TIMESTAMP_LTZ(3),
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'changelog.mode' = 'upsert'
);- Create an INSERT statement (continuous job to decode and transform):
sql
INSERT INTO `target_customers`
SELECT
`id`,
`name`,
`email`,
TO_TIMESTAMP_LTZ(`created_at` / 1000, 3)
FROM `postgres-cdc.public.customers`;IMPORTANT Cloud Flink differences:
- Do NOT specify ,
'connector','value.format', or Schema Registry URLs in CREATE TABLE — Cloud Flink handles all of this automatically'properties.bootstrap.servers' - Do NOT create source tables for CDC topics — they are auto-discovered
- Do NOT reference fields or filter by
after.*— Flink interprets CDC changelog semantics nativelyop - Use for Debezium timestamps, not
TIMESTAMP_LTZ(3)TIMESTAMP(3)
DynamoDB CDC is different from SQL CDC in Flink. The auto-discovered table has columns (key) and (a complex ROW type containing the CDC envelope with , , ). Flink does NOT auto-decode this envelope like it does for SQL Debezium connectors. You must manually extract fields:
idvalopbefore.documentafter.documentsql
CREATE TABLE `target_dynamodb` (
`id` STRING NOT NULL,
`document` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH ('changelog.mode' = 'upsert');
INSERT INTO `target_dynamodb`
SELECT `id`, `val`.`after`.`document`
FROM `dynamodb-cdc-source-topic`;The field is a JSON string of the DynamoDB item in DynamoDB's native type format (e.g., ).
document{"name":{"S":"Alice"},"age":{"N":"30"}}Debezium Type Conversions: See for the full type mapping table. Key conversions: use for MicroTimestamp, for Timestamp, and ensure is set on the connector (BYTES default is unusable in Flink).
references/flink-sql-patterns.mdTO_TIMESTAMP_LTZ(col / 1000, 3)TO_TIMESTAMP_LTZ(col, 3)decimal.handling.mode=string在Confluent Cloud Flink中,CDC源表会从Kafka主题自动发现。您只需:
注意: 以下示例使用表和示例列名。请根据用户CDC主题的实际表名、列和类型进行替换。
customers- 创建目标表(用于输出纯JSON_SR到Tableflow):
sql
CREATE TABLE `target_customers` (
`id` INT NOT NULL,
`name` STRING,
`email` STRING,
`created_at` TIMESTAMP_LTZ(3),
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'changelog.mode' = 'upsert'
);- 创建INSERT语句(持续作业,用于解码和转换):
sql
INSERT INTO `target_customers`
SELECT
`id`,
`name`,
`email`,
TO_TIMESTAMP_LTZ(`created_at` / 1000, 3)
FROM `postgres-cdc.public.customers`;Confluent Cloud Flink重要差异:
- 在CREATE TABLE中无需指定、
'connector'、'value.format'或Schema Registry URL——Cloud Flink会自动处理所有这些'properties.bootstrap.servers' - 无需为CDC主题创建源表——它们会被自动发现
- 无需引用字段或按
after.*过滤——Flink会原生解析CDC变更日志语义op - 对于Debezium时间戳,使用而非
TIMESTAMP_LTZ(3)TIMESTAMP(3)
Flink中的DynamoDB CDC与SQL CDC不同。自动发现的表包含(键)和(复杂ROW类型,包含带有、、的CDC信封)列。Flink不会像处理SQL Debezium连接器那样自动解码此信封。您必须手动提取字段:
idvalopbefore.documentafter.documentsql
CREATE TABLE `target_dynamodb` (
`id` STRING NOT NULL,
`document` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH ('changelog.mode' = 'upsert');
INSERT INTO `target_dynamodb`
SELECT `id`, `val`.`after`.`document`
FROM `dynamodb-cdc-source-topic`;document{"name":{"S":"Alice"},"age":{"N":"30"}}Debezium类型转换: 详见中的完整类型映射表。关键转换:对于MicroTimestamp使用,对于Timestamp使用,并确保连接器上设置(默认BYTES在Flink中无法使用)。
references/flink-sql-patterns.mdTO_TIMESTAMP_LTZ(col / 1000, 3)TO_TIMESTAMP_LTZ(col, 3)decimal.handling.mode=string2.3 Tableflow Configuration
2.3 Tableflow配置
Tableflow is a native topic-level feature, not a connector. It is enabled per-topic.
Storage Options:
- Managed (recommended): Confluent manages the S3 storage. No credentials needed.
- BYOB (Bring Your Own Bucket): User provides their S3 bucket. Requires a Provider Integration ID set up in Confluent Cloud (Settings → Provider Integrations).
Table Formats: Iceberg (recommended) or Delta Lake
Tableflow是原生主题级功能,而非连接器。它按主题启用。
存储选项:
- 托管(推荐):Confluent管理S3存储,无需凭据。
- BYOB(自带存储桶):用户提供自己的S3存储桶,需要在Confluent Cloud中设置提供商集成ID(设置 → 提供商集成)。
表格式: Iceberg(推荐)或Delta Lake
2.4 Present the Plan
2.4 提交计划
Show the user:
- Connector configuration (with sensitive fields masked)
- Flink compute pool to use
- Flink SQL statements (target table + INSERT)
- Tableflow config (storage type, format)
- Expected topic names
Wait for explicit confirmation before proceeding.
向用户展示:
- 连接器配置(敏感字段已掩码)
- 要使用的Flink计算池
- Flink SQL语句(目标表 + INSERT)
- Tableflow配置(存储类型、格式)
- 预期主题名称
等待用户明确确认后再继续。
Phase 3: Execution
阶段3:执行
Execute step-by-step using MCP tools, checking status after each component.
使用MCP工具逐步执行,每个组件完成后检查状态。
3.1 Create CDC Source Connector
3.1 创建CDC源连接器
Build the connector configuration using the template for the user's database type from . Each template includes all required fields, including the field.
references/connector-configs.mdnameUsing MCP:
mcp__confluent__create-connector(
connectorName: "<connector-name>",
environmentId: "<env-id>",
clusterId: "<cluster-id>",
connectorConfig: { <config from references/connector-configs.md> }
)Verify: Managed connectors take 2-5 minutes to provision. Poll — means still provisioning; means ready. Then verify schemas with . If no schemas after 5 min with tasks assigned, check database connectivity. Use Confluent Cloud UI for connector error logs (MCP doesn't expose them).
mcp__confluent__read-connectortasks: []tasks: [{...}]mcp__confluent__list-schemas(subjectPrefix: "postgres-cdc")使用中用户数据库类型的模板构建连接器配置。每个模板包含所有必填字段,包括字段。
references/connector-configs.mdname使用MCP:
mcp__confluent__create-connector(
connectorName: "<连接器名称>",
environmentId: "<env-id>",
clusterId: "<cluster-id>",
connectorConfig: { <来自references/connector-configs.md的配置> }
)验证: 托管连接器需要2-5分钟来配置。轮询——表示仍在配置中;表示已就绪。然后使用验证Schema。如果5分钟后已分配任务但仍无Schema,请检查数据库连接。使用Confluent Cloud UI查看连接器错误日志(MCP不暴露这些日志)。
mcp__confluent__read-connectortasks: []tasks: [{...}]mcp__confluent__list-schemas(subjectPrefix: "postgres-cdc")3.2 Create Flink Compute Pool (if needed)
3.2 创建Flink计算池(如需要)
If the user doesn't have an existing Flink compute pool, create one before executing SQL:
confluent flink compute-pool create <pool-name> --cloud <cloud-provider> --region <region> --environment <env-id>Use the same cloud provider and region as the Kafka cluster. Wait for the pool status to be before proceeding.
RUNNING如果用户没有现有Flink计算池,在执行SQL前创建一个:
confluent flink compute-pool create <pool-name> --cloud <云提供商> --region <区域> --environment <env-id>使用与Kafka集群相同的云提供商和区域。等待池状态变为后再继续。
RUNNING3.3 Execute Flink SQL
3.3 执行Flink SQL
Step 1: Verify CDC table is auto-discovered:
mcp__confluent__create-flink-statement(
statementName: "show-tables-check",
statement: "SHOW TABLES;",
environmentId: "<env-id>",
computePoolId: "<pool-id>",
catalogName: "<environment-display-name>",
databaseName: "<cluster-display-name>"
)Then read results:
mcp__confluent__read-flink-statement(statementName: "show-tables-check", environmentId: "<env-id>")Look for the CDC topic table (e.g., ). If not present, the connector hasn't produced data yet — wait and retry.
postgres-cdc.public.customersStep 2: Create target table:
mcp__confluent__create-flink-statement(
statementName: "cdc-create-target-customers",
statement: "CREATE TABLE `target_customers` (...) WITH ('changelog.mode' = 'upsert');",
environmentId, computePoolId, catalogName, databaseName
)Step 3: Create INSERT job:
mcp__confluent__create-flink-statement(
statementName: "cdc-decode-customers",
statement: "INSERT INTO `target_customers` SELECT ... FROM `postgres-cdc.public.customers`;",
environmentId, computePoolId, catalogName, databaseName
)The INSERT creates a continuous Flink job. Verify it transitions to RUNNING (not FAILED):
mcp__confluent__read-flink-statement(statementName: "cdc-decode-customers", environmentId)Common INSERT failures:
- "Table does not exist" → CDC source table not yet auto-discovered; wait for connector
- "Incompatible types for sink column" → Type mismatch; check Debezium type mappings above
- "Unsupported format" → Remove any explicit format properties from CREATE TABLE
Advisory warnings (can be ignored):
- "Primary key does not match upsert key" — Expected for CDC decode patterns
- "Highly state-intensive operators without TTL" — Advisory; set TTL if needed for production
步骤1:验证CDC表已自动发现:
mcp__confluent__create-flink-statement(
statementName: "show-tables-check",
statement: "SHOW TABLES;",
environmentId: "<env-id>",
computePoolId: "<pool-id>",
catalogName: "<环境显示名称>",
databaseName: "<集群显示名称>"
)然后读取结果:
mcp__confluent__read-flink-statement(statementName: "show-tables-check", environmentId: "<env-id>")查找CDC主题表(例如)。如果不存在,说明连接器尚未生成数据——等待并重试。
postgres-cdc.public.customers步骤2:创建目标表:
mcp__confluent__create-flink-statement(
statementName: "cdc-create-target-customers",
statement: "CREATE TABLE `target_customers` (...) WITH ('changelog.mode' = 'upsert');",
environmentId, computePoolId, catalogName, databaseName
)步骤3:创建INSERT作业:
mcp__confluent__create-flink-statement(
statementName: "cdc-decode-customers",
statement: "INSERT INTO `target_customers` SELECT ... FROM `postgres-cdc.public.customers`;",
environmentId, computePoolId, catalogName, databaseName
)INSERT会创建一个持续运行的Flink作业。验证它是否转换为RUNNING状态(而非FAILED):
mcp__confluent__read-flink-statement(statementName: "cdc-decode-customers", environmentId)常见INSERT失败原因:
- "Table does not exist" → CDC源表尚未自动发现;等待连接器
- "Incompatible types for sink column" → 类型不匹配;检查上述Debezium类型映射
- "Unsupported format" → 从CREATE TABLE中移除任何显式格式属性
可忽略的警告:
- "Primary key does not match upsert key" — CDC解码模式的预期警告
- "Highly state-intensive operators without TTL" — 建议性警告;生产环境中如有需要可设置TTL
3.4 Enable Tableflow
3.4 启用Tableflow
Using MCP:
mcp__confluent__create-tableflow-topic(
tableflowTopicConfig: {
"display_name": "target_customers",
"storage": { "kind": "Managed", "bucket_name": "managed", "provider_integration_id": "managed" },
"table_formats": ["ICEBERG"],
"config": { "record_failure_strategy": "SUSPEND", "retention_ms": "6048000000" }
}
)KNOWN LIMITATION: The MCP tool does NOT accept or parameters. It defaults to the cluster configured in the MCP server. If the MCP server points to a different cluster than where the target topic exists, this will fail with "topic not found". Use the CLI or UI as a workaround.
create-tableflow-topicenvironmentIdclusterIdUsing CLI:
bash
undefined使用MCP:
mcp__confluent__create-tableflow-topic(
tableflowTopicConfig: {
"display_name": "target_customers",
"storage": { "kind": "Managed", "bucket_name": "managed", "provider_integration_id": "managed" },
"table_formats": ["ICEBERG"],
"config": { "record_failure_strategy": "SUSPEND", "retention_ms": "6048000000" }
}
)已知限制: MCP的工具不接受或参数。它默认使用MCP服务器配置的集群。如果MCP服务器指向的集群与目标主题所在集群不同,会失败并显示“topic not found”。可使用CLI或UI作为解决方法。
create-tableflow-topicenvironmentIdclusterId使用CLI:
bash
undefinedManaged storage (Confluent manages S3)
托管存储(Confluent管理S3)
confluent tableflow topic enable target_customers
--cluster <cluster-id>
--environment <env-id>
--storage-type MANAGED
--table-formats ICEBERG
--cluster <cluster-id>
--environment <env-id>
--storage-type MANAGED
--table-formats ICEBERG
confluent tableflow topic enable target_customers
--cluster <cluster-id>
--environment <env-id>
--storage-type MANAGED
--table-formats ICEBERG
--cluster <cluster-id>
--environment <env-id>
--storage-type MANAGED
--table-formats ICEBERG
BYOB / BYOS (user's own S3 bucket)
BYOB / BYOS(用户自有S3存储桶)
confluent tableflow topic enable target_customers
--cluster <cluster-id>
--environment <env-id>
--storage-type BYOS
--provider-integration <provider-integration-id>
--bucket-name <bucket-name>
--table-formats ICEBERG
--cluster <cluster-id>
--environment <env-id>
--storage-type BYOS
--provider-integration <provider-integration-id>
--bucket-name <bucket-name>
--table-formats ICEBERG
confluent tableflow topic enable target_customers
--cluster <cluster-id>
--environment <env-id>
--storage-type BYOS
--provider-integration <provider-integration-id>
--bucket-name <bucket-name>
--table-formats ICEBERG
--cluster <cluster-id>
--environment <env-id>
--storage-type BYOS
--provider-integration <provider-integration-id>
--bucket-name <bucket-name>
--table-formats ICEBERG
Azure Data Lake Storage Gen2
Azure Data Lake Storage Gen2
confluent tableflow topic enable target_customers
--cluster <cluster-id>
--environment <env-id>
--storage-type AzureDataLakeStorageGen2
--provider-integration <provider-integration-id>
--storage-account-name <account-name>
--container-name <container-name>
--table-formats DELTA
--cluster <cluster-id>
--environment <env-id>
--storage-type AzureDataLakeStorageGen2
--provider-integration <provider-integration-id>
--storage-account-name <account-name>
--container-name <container-name>
--table-formats DELTA
Use `--table-formats DELTA` for Delta Lake instead of Iceberg.
**Verify Tableflow is enabled:**mcp__confluent__list-tableflow-topics(environmentId, clusterId)
Or via CLI:
```bash
confluent tableflow topic describe target_customers --cluster <cluster-id> --environment <env-id>
confluent tableflow topic list --cluster <cluster-id> --environment <env-id>Status will transition from → .
PENDINGACTIVEconfluent tableflow topic enable target_customers
--cluster <cluster-id>
--environment <env-id>
--storage-type AzureDataLakeStorageGen2
--provider-integration <provider-integration-id>
--storage-account-name <account-name>
--container-name <container-name>
--table-formats DELTA
--cluster <cluster-id>
--environment <env-id>
--storage-type AzureDataLakeStorageGen2
--provider-integration <provider-integration-id>
--storage-account-name <account-name>
--container-name <container-name>
--table-formats DELTA
使用`--table-formats DELTA`替代Iceberg以使用Delta Lake。
**验证Tableflow已启用:**mcp__confluent__list-tableflow-topics(environmentId, clusterId)
或通过CLI:
```bash
confluent tableflow topic describe target_customers --cluster <cluster-id> --environment <env-id>
confluent tableflow topic list --cluster <cluster-id> --environment <env-id>状态会从转换为。
PENDINGACTIVEPhase 4: Verification & Troubleshooting
阶段4:验证与故障排除
4.1 Verify End-to-End Pipeline
4.1 验证端到端管道
Large Table Snapshots: If the connector was created with on a large table, verification may take hours. To distinguish a running snapshot from a broken pipeline:
snapshot.mode: initial- Confirm connector has tasks assigned (→
read-connectoris non-empty)tasks - Confirm schemas are registered (→ key/value schemas exist) — this means the snapshot has started producing
list-schemas - Monitor the source topic message count in Confluent Cloud UI — a steady stream means progress
If you used for initial validation, insert a test row in the source database to trigger a CDC event and verify the full pipeline. See for detailed snapshot troubleshooting.
snapshot.mode: schema_onlyreferences/troubleshooting.mdCheck each component:
| Check | MCP Tool | What to Look For |
|---|---|---|
| Connector running | | |
| Schemas registered | | Key and value schemas for CDC topic |
| CDC table in Flink | | CDC topic appears as table |
| Flink job running | | No error in response |
| Target topic has data | | Messages appear (note: consumer starts at latest offset) |
| Tableflow enabled | | Status is PENDING or ACTIVE |
Consume from target topic to verify decoded data:
mcp__confluent__consume-messages(
topicNames: ["target_customers"],
value: { "useSchemaRegistry": true },
key: { "useSchemaRegistry": true },
maxMessages: 5,
timeoutMs: 15000
)Note: The consumer starts at the latest offset. If the initial snapshot already completed, you may see 0 messages until a new database change occurs.
Test real-time CDC by inserting a row in the source database (adapt table name and columns to match the user's actual schema):
sql
INSERT INTO public.customers (name, email, created_at)
VALUES ('Test User', 'test@example.com', NOW());大表快照: 如果连接器在大表上以创建,验证可能需要数小时。要区分正在运行的快照与损坏的管道:
snapshot.mode: initial- 确认连接器已分配任务(→
read-connector非空)tasks - 确认Schema已注册(→ 存在键/值Schema)——这意味着快照已开始生成数据
list-schemas - 在Confluent Cloud UI中监控源主题消息计数——稳定的数据流表示正在进行中
如果使用进行初始验证,请在源数据库中插入测试行以触发CDC事件并验证完整管道。详见中的详细快照故障排除。
snapshot.mode: schema_onlyreferences/troubleshooting.md检查每个组件:
| 检查项 | MCP工具 | 检查要点 |
|---|---|---|
| 连接器运行状态 | | |
| Schema已注册 | | CDC主题的键和值Schema存在 |
| Flink中的CDC表 | | CDC主题显示为表 |
| Flink作业运行状态 | | 响应中无错误 |
| 目标主题有数据 | | 出现消息(注意:消费者从最新偏移量开始) |
| Tableflow已启用 | | 状态为PENDING或ACTIVE |
从目标主题消费以验证解码后的数据:
mcp__confluent__consume-messages(
topicNames: ["target_customers"],
value: { "useSchemaRegistry": true },
key: { "useSchemaRegistry": true },
maxMessages: 5,
timeoutMs: 15000
)注意:消费者从最新偏移量开始。如果初始快照已完成,在新的数据库变更发生前可能会看到0条消息。
通过在源数据库中插入行测试实时CDC(根据用户实际Schema调整表名和列):
sql
INSERT INTO public.customers (name, email, created_at)
VALUES ('测试用户', 'test@example.com', NOW());4.2 Troubleshooting
4.2 故障排除
For detailed troubleshooting, see .
references/troubleshooting.mdQuick reference — pipeline-blocking issues:
| Symptom | Likely Cause | Fix |
|---|---|---|
| Connector tasks stay empty | Still provisioning | Wait 2-5 minutes, retry |
| No schemas after 5 min | DB connectivity or credentials | Check host, port, user, password; verify DB CDC config |
| SHOW TABLES missing CDC table | Connector not producing yet | Verify schemas exist first, then wait |
| INSERT: "Incompatible types" | Debezium type mismatch | Use TIMESTAMP_LTZ(3) + TO_TIMESTAMP_LTZ; see |
| Tableflow: "topic not found" | MCP cluster mismatch | Use CLI: |
| Topic exists but not in SHOW TABLES | No schema in SR | Register a JSON schema in SR or use schema inference; see |
详细故障排除请见。
references/troubleshooting.md快速参考——管道阻塞问题:
| 症状 | 可能原因 | 解决方法 |
|---|---|---|
| 连接器任务保持为空 | 仍在配置中 | 等待2-5分钟,重试 |
| 5分钟后仍无Schema | 数据库连接或凭据问题 | 检查主机、端口、用户、密码;验证数据库CDC配置 |
| SHOW TABLES中缺少CDC表 | 连接器尚未生成数据 | 先验证Schema存在,然后等待 |
| INSERT:"Incompatible types" | Debezium类型不匹配 | 使用TIMESTAMP_LTZ(3) + TO_TIMESTAMP_LTZ;详见 |
| Tableflow:"topic not found" | MCP集群不匹配 | 使用CLI: |
| 主题存在但未在SHOW TABLES中显示 | SR中无Schema | 在SR中注册JSON Schema或使用Schema推断;详见 |
Phase 5: Documentation
阶段5:文档
After successful setup, provide the user with:
- Pipeline Summary Table: All component names, IDs, and statuses
- Topic Names: Source CDC topic and target topic (with schema format used)
- Monitoring: Check connector, Flink job, and Tableflow status in Confluent Cloud UI
- Test Command: SQL INSERT to verify real-time CDC
成功搭建后,向用户提供:
- 管道汇总表:所有组件名称、ID和状态
- 主题名称:源CDC主题和目标主题(使用的Schema格式)
- 监控:在Confluent Cloud UI中检查连接器、Flink作业和Tableflow状态
- 测试命令:用于验证实时CDC的SQL INSERT语句
References
参考资料
- Database Prerequisites:
references/database-prerequisites.md - Connector Configurations:
references/connector-configs.md - Flink SQL Patterns:
references/flink-sql-patterns.md - Troubleshooting Guide:
references/troubleshooting.md - REST API Reference:
references/rest-api.md - Confluent Cloud Flink Docs: https://docs.confluent.io/cloud/current/flink/overview.html
- Tableflow Docs: https://docs.confluent.io/cloud/current/topics/tableflow/overview.html
- Debezium CDC Docs: https://debezium.io/documentation/
- Confluent MCP Server: https://github.com/confluentinc/mcp-confluent
- 数据库先决条件:
references/database-prerequisites.md - 连接器配置:
references/connector-configs.md - Flink SQL模式:
references/flink-sql-patterns.md - 故障排除指南:
references/troubleshooting.md - REST API参考:
references/rest-api.md - Confluent Cloud Flink文档:https://docs.confluent.io/cloud/current/flink/overview.html
- Tableflow文档:https://docs.confluent.io/cloud/current/topics/tableflow/overview.html
- Debezium CDC文档:https://debezium.io/documentation/
- Confluent MCP Server:https://github.com/confluentinc/mcp-confluent