confluent-cloud-cdc-tableflow

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Confluent Cloud CDC to Tableflow Pipeline

Confluent Cloud CDC 到 Tableflow 管道

Build production-ready Change Data Capture pipelines that stream database changes through Confluent Cloud to Iceberg or Delta Lake tables using Debezium, Flink, and Tableflow.
使用Debezium、Flink和Tableflow构建生产级变更数据捕获(CDC)管道,将数据库变更通过Confluent Cloud流式传输到Iceberg或Delta Lake表中。

Overview

概述

This skill automates the setup of a complete CDC pipeline:
Database → Debezium CDC Connector → Kafka + Schema Registry → Flink (decode & transform) → Tableflow → Iceberg/Delta Tables
本技能可自动搭建完整的CDC管道:
数据库 → Debezium CDC连接器 → Kafka + Schema Registry → Flink(解码与转换) → Tableflow → Iceberg/Delta表

Supported Databases (Fully-Managed Debezium Connectors Only)

支持的数据库(仅全托管Debezium连接器)

  • Microsoft SQL Server CDC Source V2
  • MySQL CDC Source V2
  • PostgreSQL CDC Source V2
  • Oracle XStream CDC Source
  • DynamoDB CDC Source
  • Microsoft SQL Server CDC Source V2
  • MySQL CDC Source V2
  • PostgreSQL CDC Source V2
  • Oracle XStream CDC Source
  • DynamoDB CDC Source

Key Components

核心组件

  1. Debezium CDC Source Connector: Captures database changes as events
  2. Schema Registry: Manages Avro/JSON/Protobuf schemas (default: JSON_SR)
  3. Confluent Cloud Flink: Decodes Debezium envelopes and transforms data
  4. Tableflow: Native Confluent Cloud feature that materializes Kafka topics as Iceberg or Delta tables
  1. Debezium CDC源连接器:捕获数据库变更作为事件
  2. Schema Registry:管理Avro/JSON/Protobuf Schema(默认:JSON_SR)
  3. Confluent Cloud Flink:解码Debezium信封并转换数据
  4. Tableflow:Confluent Cloud原生功能,将Kafka主题物化为Iceberg或Delta表

Critical Architecture Rules

关键架构规则

1. NEVER enable Tableflow directly on CDC source topics.
Always use the Flink decode pattern: CDC Source Topic → Flink INSERT → Target Topic (
changelog.mode = 'upsert'
) → Tableflow.
CDC connectors with
tombstones.on.delete=true
produce null-value Kafka records (tombstones) on DELETE operations. If Tableflow is enabled directly on the CDC source topic, it will use APPEND mode by default and immediately suspend when it encounters a tombstone: "Tableflow will be suspended because we detected a Kafka record with a null value."
The Flink decode layer solves this by interpreting Debezium CDC semantics natively — it translates DELETEs into proper retract/tombstone messages that upsert-mode Tableflow handles correctly.
Do NOT use
after.state.only=true
as a shortcut to bypass the Flink decode step. While it strips the Debezium envelope, tombstone records from DELETEs still break APPEND-mode Tableflow. Additionally,
OracleXStreamSource
does not support the
after.state.only
configuration option at all.
2. Tableflow changelog mode is IMMUTABLE after first materialization.
Tableflow caches the changelog mode (APPEND or UPSERT) when it first materializes data. Once set, it cannot be changed — even by altering the Kafka topic's
changelog.mode
property or by deleting and recreating the Tableflow topic. The S3
table_path
is keyed by Kafka topic name, so recreating a Tableflow topic reuses the same S3 path and cached state.
Attempting to change the mode causes: "The changelog mode for this topic has been modified since table materialization began." Flip-flopping the mode further corrupts state with: "Incompatible schema evolution detected."
To change changelog mode, you must delete the Tableflow topic, delete the underlying Kafka topic, and recreate both from scratch. This is why it's critical to create target topics with
'changelog.mode' = 'upsert'
from the start.
3. Pipeline cleanup order matters.
When resetting a CDC-to-Tableflow pipeline, delete resources in this order:
  1. Tableflow topics (on target topics)
  2. Flink INSERT statements
  3. Flink target tables (DROP TABLE)
  4. Target Kafka topics
  5. CDC connectors
  6. CDC source Kafka topics (including dbhistory/schema-changes topics)
  7. All associated schemas from Schema Registry (both
    -key
    and
    -value
    subjects)
Never delete CDC source Kafka topics while the connector is still running — the connector cannot recover or re-snapshot and must be fully recreated.
1. 切勿直接在CDC源主题上启用Tableflow。
务必使用Flink解码模式:CDC源主题 → Flink INSERT → 目标主题(
changelog.mode = 'upsert'
) → Tableflow。
启用
tombstones.on.delete=true
的CDC连接器会在执行DELETE操作时生成值为null的Kafka记录(墓碑记录)。如果直接在CDC源主题上启用Tableflow,它默认会使用APPEND模式,遇到墓碑记录时会立即暂停:“Tableflow将被暂停,因为我们检测到一条值为null的Kafka记录。”
Flink解码层通过原生解析Debezium CDC语义解决了这个问题——它将DELETE操作转换为Tableflow可正确处理的撤回/墓碑消息。
请勿使用
after.state.only=true
作为绕过Flink解码步骤的捷径
。虽然它会剥离Debezium信封,但DELETE操作产生的墓碑记录仍会破坏APPEND模式的Tableflow。此外,
OracleXStreamSource
完全不支持
after.state.only
配置项。
2. Tableflow变更日志模式在首次物化后不可修改。
Tableflow会在首次物化数据时缓存变更日志模式(APPEND或UPSERT)。一旦设置,即使修改Kafka主题的
changelog.mode
属性,或删除并重新创建Tableflow主题,该模式也无法更改。S3的
table_path
以Kafka主题名称为键,因此重新创建Tableflow主题会复用相同的S3路径和缓存状态。
尝试修改模式会导致:“自表物化开始以来,此主题的变更日志模式已被修改。”反复切换模式会进一步破坏状态:“检测到不兼容的Schema演进。”
要变更日志模式,必须删除Tableflow主题、底层Kafka主题,然后从头重新创建两者。这就是为什么从一开始就必须创建带有
'changelog.mode' = 'upsert'
的目标主题至关重要。
3. 管道清理顺序至关重要。
重置CDC到Tableflow管道时,请按以下顺序删除资源:
  1. Tableflow主题(目标主题上的)
  2. Flink INSERT语句
  3. Flink目标表(DROP TABLE)
  4. 目标Kafka主题
  5. CDC连接器
  6. CDC源Kafka主题(包括dbhistory/schema-changes主题)
  7. Schema Registry中所有关联的Schema(包括
    -key
    -value
    主题)
切勿在连接器仍运行时删除CDC源Kafka主题——连接器无法恢复或重新生成快照,必须完全重新创建。

Important Clarifications

重要说明

  • Tableflow is NOT a connector. It is a native topic-level feature enabled via the Tableflow API or Confluent Cloud UI.
  • Confluent Cloud Flink auto-discovers CDC tables. You do NOT need to manually create source tables — topics with Schema Registry schemas are automatically available as Flink tables.
  • Topics without SR schemas can still be handled — register a JSON schema (partial is fine), use schema inference, or use Flink's raw BYTES with JSON functions. See
    references/connector-configs.md
    "Handling Topics Without Schema Registry".
  • All SR-backed formats work identically
    JSON_SR
    ,
    AVRO
    , and
    PROTOBUF
    all support Flink auto-discovery and Tableflow. Choose based on throughput needs vs. debuggability.
  • Managed connectors use
    output.data.format
    , not
    key.converter
    /
    value.converter
    classes.
  • Tableflow不是连接器。它是通过Tableflow API或Confluent Cloud UI启用的原生主题级功能。
  • Confluent Cloud Flink会自动发现CDC表。无需手动创建源表——带有Schema Registry Schema的主题会自动作为Flink表可用。
  • 无SR Schema的主题仍可处理——注册JSON Schema(部分Schema即可)、使用Schema推断,或使用Flink的原始BYTES结合JSON函数。详见
    references/connector-configs.md
    中的“处理无Schema Registry的主题”章节。
  • 所有基于SR的格式工作方式完全相同——
    JSON_SR
    AVRO
    PROTOBUF
    均支持Flink自动发现和Tableflow。可根据吞吐量需求与可调试性进行选择。
  • 托管连接器使用
    output.data.format
    ,而非
    key.converter
    /
    value.converter
    类。

Workflow Phases

工作流阶段

Phase 0: Tool Selection & MCP Server Validation (CRITICAL)

阶段0:工具选择与MCP服务器验证(关键)

Default: Use Confluent MCP Server. The MCP server is the preferred method for all Confluent Cloud operations. Only fall back to the Confluent CLI (
confluent
command) and REST APIs if the MCP server is not installed or unavailable.
默认:使用Confluent MCP Server。 MCP服务器是所有Confluent Cloud操作的首选方法。仅当MCP服务器未安装或不可用时,才退回到Confluent CLI(
confluent
命令)和REST API。

0.1 Verify MCP Server Availability

0.1 验证MCP服务器可用性

Check for
mcp__confluent__*
tools (list-environments, create-connector, create-flink-statement, create-tableflow-topic, list-schemas, list-topics, consume-messages, search-topics-by-name).
If MCP is not available, fall back to the Confluent CLI (
confluent
command) and REST APIs for all operations. The CLI fallback should mirror the same workflow phases but use CLI commands instead of MCP tool calls.
CLI Fallback Examples:
bash
undefined
检查是否存在
mcp__confluent__*
工具(list-environments、create-connector、create-flink-statement、create-tableflow-topic、list-schemas、list-topics、consume-messages、search-topics-by-name)。
如果MCP不可用,则退回到Confluent CLI(
confluent
命令)和REST API执行所有操作。CLI回退应遵循相同的工作流阶段,但使用CLI命令而非MCP工具调用。
CLI回退示例:
bash
undefined

Environment & cluster discovery

环境与集群发现

confluent environment list confluent kafka cluster list --environment <env-id>
confluent environment list confluent kafka cluster list --environment <env-id>

Connector operations

连接器操作

confluent connect cluster create --config-file connector-config.json --cluster <cluster-id> --environment <env-id> confluent connect cluster describe <connector-id> confluent connect cluster list --cluster <cluster-id> --environment <env-id>
confluent connect cluster create --config-file connector-config.json --cluster <cluster-id> --environment <env-id> confluent connect cluster describe <connector-id> confluent connect cluster list --cluster <cluster-id> --environment <env-id>

Flink operations

Flink操作

confluent flink compute-pool create <pool-name> --cloud <cloud> --region <region> --environment <env-id> confluent flink statement create <statement-name> --sql "<SQL>" --compute-pool <pool-id> --environment <env-id> confluent flink statement describe <statement-name> --environment <env-id> confluent flink statement delete <statement-name> --environment <env-id>
confluent flink compute-pool create <pool-name> --cloud <cloud> --region <region> --environment <env-id> confluent flink statement create <statement-name> --sql "<SQL>" --compute-pool <pool-id> --environment <env-id> confluent flink statement describe <statement-name> --environment <env-id> confluent flink statement delete <statement-name> --environment <env-id>

Topic & schema operations

主题与Schema操作

confluent kafka topic list --cluster <cluster-id> --environment <env-id> confluent schema-registry subject list --environment <env-id>
confluent kafka topic list --cluster <cluster-id> --environment <env-id> confluent schema-registry subject list --environment <env-id>

Tableflow operations

Tableflow操作

confluent tableflow topic enable <topic-name> --cluster <cluster-id> --environment <env-id> --storage-type MANAGED --table-formats ICEBERG confluent tableflow topic list --cluster <cluster-id> --environment <env-id> confluent tableflow topic describe <topic-name> --cluster <cluster-id> --environment <env-id> confluent tableflow topic disable <topic-name> --cluster <cluster-id> --environment <env-id>

**REST API Fallback:** If neither MCP nor CLI is available, use the Confluent Cloud REST APIs directly. All calls use HTTP Basic Auth with a **Cloud API Key** (not a Kafka API key). See `references/rest-api.md` for endpoint patterns and examples.
confluent tableflow topic enable <topic-name> --cluster <cluster-id> --environment <env-id> --storage-type MANAGED --table-formats ICEBERG confluent tableflow topic list --cluster <cluster-id> --environment <env-id> confluent tableflow topic describe <topic-name> --cluster <cluster-id> --environment <env-id> confluent tableflow topic disable <topic-name> --cluster <cluster-id> --environment <env-id>

**REST API回退:** 如果MCP和CLI均不可用,则直接使用Confluent Cloud REST API。所有调用均使用HTTP基本认证,搭配**Cloud API密钥**(非Kafka API密钥)。详见`references/rest-api.md`中的端点模式和示例。

0.2 Gather Confluent Cloud Details from the User

0.2 从用户处收集Confluent Cloud详细信息

Ask the user to provide the following Confluent Cloud details:
DetailExampleUsed For
Environment ID
env-0ypxv6
environmentId
in all MCP calls
Kafka Cluster ID
lkc-qo5k36
clusterId
in all MCP calls
Flink Compute Pool ID
lfcp-3v39xw
computePoolId
in Flink statements
Flink Catalog Name
my_environment
catalogName
in Flink statements
Flink Database Name
cluster_0
databaseName
in Flink statements
Credentials: Generate a
cdc-credentials.properties
file with placeholders for: Kafka API Key/Secret (cluster-scoped), Database Username/Password. Have the user populate it in their editor and add it to
.gitignore
. If the user prefers Claude not read the file, fall back to CLI: generate
connector-config.json
with placeholders, user fills it in, then
confluent connect cluster create --config-file connector-config.json
.
请用户提供以下Confluent Cloud详细信息:
详情示例用途
环境ID
env-0ypxv6
所有MCP调用中的
environmentId
Kafka集群ID
lkc-qo5k36
所有MCP调用中的
clusterId
Flink计算池ID
lfcp-3v39xw
Flink语句中的
computePoolId
Flink Catalog名称
my_environment
Flink语句中的
catalogName
Flink数据库名称
cluster_0
Flink语句中的
databaseName
凭据: 生成一个
cdc-credentials.properties
文件,包含Kafka API密钥/密钥(集群范围)、数据库用户名/密码的占位符。请用户在编辑器中填充该文件并添加到
.gitignore
。如果用户不希望Claude读取该文件,则退回到CLI:生成带有占位符的
connector-config.json
,用户填充后执行
confluent connect cluster create --config-file connector-config.json

0.3 Verify MCP Cluster Targeting

0.3 验证MCP集群目标

Quick verification:
  1. Run
    mcp__confluent__list-topics
    to confirm the MCP server is connected to the expected cluster
  2. Run
    mcp__confluent__list-schemas
    to verify Schema Registry is accessible
Schema Registry is shared at the environment level across all clusters.
快速验证:
  1. 运行
    mcp__confluent__list-topics
    确认MCP服务器已连接到预期集群
  2. 运行
    mcp__confluent__list-schemas
    验证Schema Registry可访问
Schema Registry在环境级别跨所有集群共享。

Phase 1: Discovery & Validation

阶段1:发现与验证

1.1 Check Existing Setup

1.1 检查现有设置

Use MCP to check what already exists:
mcp__confluent__list-connectors(environmentId, clusterId)  →  Existing CDC connectors
mcp__confluent__list-flink-statements(environmentId, computePoolId)  →  Existing Flink jobs
mcp__confluent__list-tableflow-topics(environmentId, clusterId)  →  Existing Tableflow topics
Ask the user:
  • "Do you have any CDC connectors already running?"
  • "Do you have a Flink compute pool you'd like to use, or should I create one?"
  • "Is your database already configured for CDC?"
使用MCP检查已存在的资源:
mcp__confluent__list-connectors(environmentId, clusterId)  →  现有CDC连接器
mcp__confluent__list-flink-statements(environmentId, computePoolId)  →  现有Flink作业
mcp__confluent__list-tableflow-topics(environmentId, clusterId)  →  现有Tableflow主题
询问用户:
  • "您是否已有运行中的CDC连接器?"
  • "您是否有想要使用的Flink计算池,还是需要我创建一个?"
  • "您的数据库是否已配置好CDC?"

1.2 Validate Topic Prefix Uniqueness

1.2 验证主题前缀唯一性

Before proceeding, validate that the chosen
topic.prefix
won't collide with existing topics:
mcp__confluent__search-topics-by-name(topicName: "<proposed-prefix>", environmentId, clusterId)
Or via CLI:
bash
confluent kafka topic list --cluster <cluster-id> --environment <env-id> | grep "^<proposed-prefix>"
If any existing topics share the proposed prefix, warn the user and recommend a unique prefix. A prefix collision silently merges CDC data with unrelated topics, which can corrupt both pipelines.
在继续之前,验证所选的
topic.prefix
不会与现有主题冲突:
mcp__confluent__search-topics-by-name(topicName: "<拟议前缀>", environmentId, clusterId)
或通过CLI:
bash
confluent kafka topic list --cluster <cluster-id> --environment <env-id> | grep "^<拟议前缀>"
如果任何现有主题与拟议前缀重复,请警告用户并建议使用唯一前缀。前缀冲突会静默地将CDC数据与无关主题合并,可能损坏两个管道。

1.3 Check Schema Registry Compatibility

1.3 检查Schema Registry兼容性

Default
BACKWARD
compatibility can halt CDC connectors when database columns are dropped. Set
FULL_TRANSITIVE
for CDC subjects after the connector creates them:
bash
confluent schema-registry config update --subject "<topic-prefix>.<schema>.<table>-value" --compatibility FULL_TRANSITIVE --environment <env-id>
默认的
BACKWARD
兼容性会在删除数据库列时暂停CDC连接器。在连接器创建CDC主题后,将其兼容性设置为
FULL_TRANSITIVE
bash
confluent schema-registry config update --subject "<topic-prefix>.<schema>.<table>-value" --compatibility FULL_TRANSITIVE --environment <env-id>

1.4 Gather Required Information

1.4 收集必要信息

Database Configuration:
  • Database type (SQL Server, MySQL, PostgreSQL, Oracle, or DynamoDB)
  • Connection details (hostname, port, database name)
  • Credentials (populated by the user in the credentials file)
  • Specific tables to capture (format:
    schema.table
    )
Schema Format: Ask the user:
JSON_SR
(default, human-readable),
AVRO
(smaller payloads, high-throughput), or
PROTOBUF
(strongly typed). All work identically with Flink auto-discovery and Tableflow. Never use plain
JSON
— it breaks both. See
references/connector-configs.md
for detailed comparison.
Existing Topics Without SR: See
references/connector-configs.md
"Handling Topics Without Schema Registry" for options (register JSON schema, schema inference, or Flink raw BYTES).
Tableflow Destination:
  • Target format: Iceberg or Delta Lake
  • Storage: Managed (recommended, Confluent manages S3) or BYOB (user's own S3 bucket, requires Provider Integration ID)
Naming Convention:
  • Default:
    cdc-pipeline-skill-{database-type}-{YYYYMMDD}
  • Example:
    cdc-pipeline-skill-postgres-20260323
数据库配置:
  • 数据库类型(SQL Server、MySQL、PostgreSQL、Oracle或DynamoDB)
  • 连接详情(主机名、端口、数据库名称)
  • 凭据(用户在凭据文件中填充)
  • 要捕获的特定表(格式:
    schema.table
Schema格式: 询问用户:
JSON_SR
(默认,人类可读)、
AVRO
(有效负载更小,高吞吐量)或
PROTOBUF
(强类型)。所有格式均与Flink自动发现和Tableflow兼容。切勿使用纯
JSON
——这会破坏两者。详见
references/connector-configs.md
中的详细对比。
无SR的现有主题: 详见
references/connector-configs.md
中的“处理无Schema Registry的主题”章节,选项包括注册JSON Schema、Schema推断或Flink原始BYTES。
Tableflow目标:
  • 目标格式:Iceberg或Delta Lake
  • 存储:托管(推荐,Confluent管理S3)或BYOB(用户自有S3存储桶,需要提供商集成ID)
命名约定:
  • 默认:
    cdc-pipeline-skill-{database-type}-{YYYYMMDD}
  • 示例:
    cdc-pipeline-skill-postgres-20260323

1.5 Validate Database Prerequisites

1.5 验证数据库先决条件

Each database requires specific CDC setup. Read
references/database-prerequisites.md
for details:
  • PostgreSQL: WAL level = logical, replication slots, publication
  • MySQL: binlog format = ROW, GTID mode
  • SQL Server: CDC enabled on database and tables, SQL Server Agent running
  • Oracle XStream: GoldenGate replication enabled (
    enable_goldengate_replication=TRUE
    ), ARCHIVELOG mode, supplemental logging, XStream admin user with
    DBMS_XSTREAM_AUTH
    privileges, XStream outbound server created via
    DBMS_XSTREAM_ADM.CREATE_OUTBOUND
    , connector user with XStream connect privilege. Full prereqs: https://docs.confluent.io/cloud/current/connectors/cc-oracle-xstream-cdc-source/prereqs-validation.html
  • DynamoDB: Streams enabled with NEW_AND_OLD_IMAGES
If the database isn't properly configured, guide the user through setup before proceeding.
Oracle XStream important limitations:
  • Only supports non-CDB architecture on Amazon RDS for Oracle
  • Does NOT support Oracle Autonomous Databases or Oracle Standby (Data Guard)
  • Does NOT support Downstream Capture
  • after.state.only
    is NOT supported by
    OracleXStreamSource
  • Requires a valid Confluent license for XStream Out
每种数据库都需要特定的CDC设置。详见
references/database-prerequisites.md
  • PostgreSQL:WAL级别=logical,复制槽,发布
  • MySQL:binlog格式=ROW,GTID模式
  • SQL Server:数据库和表上启用CDC,SQL Server Agent运行
  • Oracle XStream:启用GoldenGate复制(
    enable_goldengate_replication=TRUE
    ),ARCHIVELOG模式,补充日志,拥有
    DBMS_XSTREAM_AUTH
    权限的XStream管理员用户,通过
    DBMS_XSTREAM_ADM.CREATE_OUTBOUND
    创建XStream出站服务器,拥有XStream连接权限的连接器用户。完整先决条件:https://docs.confluent.io/cloud/current/connectors/cc-oracle-xstream-cdc-source/prereqs-validation.html
  • DynamoDB:启用流,模式为NEW_AND_OLD_IMAGES
如果数据库未正确配置,请引导用户完成设置后再继续。
Oracle XStream重要限制:
  • 仅支持Amazon RDS for Oracle上的非CDB架构
  • 不支持Oracle自治数据库或Oracle Standby(Data Guard)
  • 不支持下游捕获
  • OracleXStreamSource
    不支持
    after.state.only
  • 需要有效的Confluent XStream Out许可证

Phase 2: Planning

阶段2:规划

Generate the complete configuration plan and present it to the user for approval.
生成完整的配置计划并提交给用户审批。

2.1 Connector Configuration

2.1 连接器配置

Based on the database type, generate the connector configuration using the appropriate template from
references/connector-configs.md
. The templates include all required fields (
name
,
connector.class
,
topic.prefix
,
kafka.api.key
,
output.data.format
,
decimal.handling.mode
, etc.) and database-specific settings.
Set the schema format based on user preference (default
JSON_SR
):
json
{
  "output.data.format": "JSON_SR",
  "output.key.format": "JSON_SR"
}
Replace
JSON_SR
with
AVRO
or
PROTOBUF
if the user requested a different format. Both key and value formats should match. All other connector settings remain the same regardless of format choice.
Topic Naming Pattern:
{topic.prefix}.{schema}.{table}
Example with
topic.prefix = "postgres-cdc"
:
postgres-cdc.public.customers
根据数据库类型,使用
references/connector-configs.md
中的相应模板生成连接器配置。模板包含所有必填字段(
name
connector.class
topic.prefix
kafka.api.key
output.data.format
decimal.handling.mode
等)和数据库特定设置。
根据用户偏好设置Schema格式(默认
JSON_SR
):
json
{
  "output.data.format": "JSON_SR",
  "output.key.format": "JSON_SR"
}
如果用户要求其他格式,将
JSON_SR
替换为
AVRO
PROTOBUF
。键和值格式应匹配。无论格式选择如何,所有其他连接器设置保持不变。
主题命名模式:
{topic.prefix}.{schema}.{table}
示例(
topic.prefix = "postgres-cdc"
):
postgres-cdc.public.customers

2.2 Flink SQL Statements

2.2 Flink SQL语句

In Confluent Cloud Flink, the CDC source table is auto-discovered from the Kafka topic. You only need to:
Note: The examples below use a
customers
table with illustrative column names. Substitute the user's actual table name, columns, and types based on the schema discovered from their CDC topic.
  1. Create a target table (for plain JSON_SR output to Tableflow):
sql
CREATE TABLE `target_customers` (
  `id` INT NOT NULL,
  `name` STRING,
  `email` STRING,
  `created_at` TIMESTAMP_LTZ(3),
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'changelog.mode' = 'upsert'
);
  1. Create an INSERT statement (continuous job to decode and transform):
sql
INSERT INTO `target_customers`
SELECT
  `id`,
  `name`,
  `email`,
  TO_TIMESTAMP_LTZ(`created_at` / 1000, 3)
FROM `postgres-cdc.public.customers`;
IMPORTANT Cloud Flink differences:
  • Do NOT specify
    'connector'
    ,
    'value.format'
    ,
    'properties.bootstrap.servers'
    , or Schema Registry URLs in CREATE TABLE — Cloud Flink handles all of this automatically
  • Do NOT create source tables for CDC topics — they are auto-discovered
  • Do NOT reference
    after.*
    fields or filter by
    op
    — Flink interprets CDC changelog semantics natively
  • Use
    TIMESTAMP_LTZ(3)
    for Debezium timestamps, not
    TIMESTAMP(3)
DynamoDB CDC is different from SQL CDC in Flink. The auto-discovered table has columns
id
(key) and
val
(a complex ROW type containing the CDC envelope with
op
,
before.document
,
after.document
). Flink does NOT auto-decode this envelope like it does for SQL Debezium connectors. You must manually extract fields:
sql
CREATE TABLE `target_dynamodb` (
  `id` STRING NOT NULL,
  `document` STRING,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH ('changelog.mode' = 'upsert');

INSERT INTO `target_dynamodb`
SELECT `id`, `val`.`after`.`document`
FROM `dynamodb-cdc-source-topic`;
The
document
field is a JSON string of the DynamoDB item in DynamoDB's native type format (e.g.,
{"name":{"S":"Alice"},"age":{"N":"30"}}
).
Debezium Type Conversions: See
references/flink-sql-patterns.md
for the full type mapping table. Key conversions: use
TO_TIMESTAMP_LTZ(col / 1000, 3)
for MicroTimestamp,
TO_TIMESTAMP_LTZ(col, 3)
for Timestamp, and ensure
decimal.handling.mode=string
is set on the connector (BYTES default is unusable in Flink).
在Confluent Cloud Flink中,CDC源表会从Kafka主题自动发现。您只需:
注意: 以下示例使用
customers
表和示例列名。请根据用户CDC主题的实际表名、列和类型进行替换。
  1. 创建目标表(用于输出纯JSON_SR到Tableflow):
sql
CREATE TABLE `target_customers` (
  `id` INT NOT NULL,
  `name` STRING,
  `email` STRING,
  `created_at` TIMESTAMP_LTZ(3),
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'changelog.mode' = 'upsert'
);
  1. 创建INSERT语句(持续作业,用于解码和转换):
sql
INSERT INTO `target_customers`
SELECT
  `id`,
  `name`,
  `email`,
  TO_TIMESTAMP_LTZ(`created_at` / 1000, 3)
FROM `postgres-cdc.public.customers`;
Confluent Cloud Flink重要差异:
  • 在CREATE TABLE中无需指定
    'connector'
    'value.format'
    'properties.bootstrap.servers'
    或Schema Registry URL——Cloud Flink会自动处理所有这些
  • 无需为CDC主题创建源表——它们会被自动发现
  • 无需引用
    after.*
    字段或按
    op
    过滤——Flink会原生解析CDC变更日志语义
  • 对于Debezium时间戳,使用
    TIMESTAMP_LTZ(3)
    而非
    TIMESTAMP(3)
Flink中的DynamoDB CDC与SQL CDC不同。自动发现的表包含
id
(键)和
val
(复杂ROW类型,包含带有
op
before.document
after.document
的CDC信封)列。Flink不会像处理SQL Debezium连接器那样自动解码此信封。您必须手动提取字段:
sql
CREATE TABLE `target_dynamodb` (
  `id` STRING NOT NULL,
  `document` STRING,
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH ('changelog.mode' = 'upsert');

INSERT INTO `target_dynamodb`
SELECT `id`, `val`.`after`.`document`
FROM `dynamodb-cdc-source-topic`;
document
字段是DynamoDB项的JSON字符串,采用DynamoDB原生类型格式(例如
{"name":{"S":"Alice"},"age":{"N":"30"}}
)。
Debezium类型转换: 详见
references/flink-sql-patterns.md
中的完整类型映射表。关键转换:对于MicroTimestamp使用
TO_TIMESTAMP_LTZ(col / 1000, 3)
,对于Timestamp使用
TO_TIMESTAMP_LTZ(col, 3)
,并确保连接器上设置
decimal.handling.mode=string
(默认BYTES在Flink中无法使用)。

2.3 Tableflow Configuration

2.3 Tableflow配置

Tableflow is a native topic-level feature, not a connector. It is enabled per-topic.
Storage Options:
  • Managed (recommended): Confluent manages the S3 storage. No credentials needed.
  • BYOB (Bring Your Own Bucket): User provides their S3 bucket. Requires a Provider Integration ID set up in Confluent Cloud (Settings → Provider Integrations).
Table Formats: Iceberg (recommended) or Delta Lake
Tableflow是原生主题级功能,而非连接器。它按主题启用。
存储选项:
  • 托管(推荐):Confluent管理S3存储,无需凭据。
  • BYOB(自带存储桶):用户提供自己的S3存储桶,需要在Confluent Cloud中设置提供商集成ID(设置 → 提供商集成)。
表格式: Iceberg(推荐)或Delta Lake

2.4 Present the Plan

2.4 提交计划

Show the user:
  1. Connector configuration (with sensitive fields masked)
  2. Flink compute pool to use
  3. Flink SQL statements (target table + INSERT)
  4. Tableflow config (storage type, format)
  5. Expected topic names
Wait for explicit confirmation before proceeding.
向用户展示:
  1. 连接器配置(敏感字段已掩码)
  2. 要使用的Flink计算池
  3. Flink SQL语句(目标表 + INSERT)
  4. Tableflow配置(存储类型、格式)
  5. 预期主题名称
等待用户明确确认后再继续。

Phase 3: Execution

阶段3:执行

Execute step-by-step using MCP tools, checking status after each component.
使用MCP工具逐步执行,每个组件完成后检查状态。

3.1 Create CDC Source Connector

3.1 创建CDC源连接器

Build the connector configuration using the template for the user's database type from
references/connector-configs.md
. Each template includes all required fields, including the
name
field.
Using MCP:
mcp__confluent__create-connector(
  connectorName: "<connector-name>",
  environmentId: "<env-id>",
  clusterId: "<cluster-id>",
  connectorConfig: { <config from references/connector-configs.md> }
)
Verify: Managed connectors take 2-5 minutes to provision. Poll
mcp__confluent__read-connector
tasks: []
means still provisioning;
tasks: [{...}]
means ready. Then verify schemas with
mcp__confluent__list-schemas(subjectPrefix: "postgres-cdc")
. If no schemas after 5 min with tasks assigned, check database connectivity. Use Confluent Cloud UI for connector error logs (MCP doesn't expose them).
使用
references/connector-configs.md
中用户数据库类型的模板构建连接器配置。每个模板包含所有必填字段,包括
name
字段。
使用MCP:
mcp__confluent__create-connector(
  connectorName: "<连接器名称>",
  environmentId: "<env-id>",
  clusterId: "<cluster-id>",
  connectorConfig: { <来自references/connector-configs.md的配置> }
)
验证: 托管连接器需要2-5分钟来配置。轮询
mcp__confluent__read-connector
——
tasks: []
表示仍在配置中;
tasks: [{...}]
表示已就绪。然后使用
mcp__confluent__list-schemas(subjectPrefix: "postgres-cdc")
验证Schema。如果5分钟后已分配任务但仍无Schema,请检查数据库连接。使用Confluent Cloud UI查看连接器错误日志(MCP不暴露这些日志)。

3.2 Create Flink Compute Pool (if needed)

3.2 创建Flink计算池(如需要)

If the user doesn't have an existing Flink compute pool, create one before executing SQL:
confluent flink compute-pool create <pool-name> --cloud <cloud-provider> --region <region> --environment <env-id>
Use the same cloud provider and region as the Kafka cluster. Wait for the pool status to be
RUNNING
before proceeding.
如果用户没有现有Flink计算池,在执行SQL前创建一个:
confluent flink compute-pool create <pool-name> --cloud <云提供商> --region <区域> --environment <env-id>
使用与Kafka集群相同的云提供商和区域。等待池状态变为
RUNNING
后再继续。

3.3 Execute Flink SQL

3.3 执行Flink SQL

Step 1: Verify CDC table is auto-discovered:
mcp__confluent__create-flink-statement(
  statementName: "show-tables-check",
  statement: "SHOW TABLES;",
  environmentId: "<env-id>",
  computePoolId: "<pool-id>",
  catalogName: "<environment-display-name>",
  databaseName: "<cluster-display-name>"
)
Then read results:
mcp__confluent__read-flink-statement(statementName: "show-tables-check", environmentId: "<env-id>")
Look for the CDC topic table (e.g.,
postgres-cdc.public.customers
). If not present, the connector hasn't produced data yet — wait and retry.
Step 2: Create target table:
mcp__confluent__create-flink-statement(
  statementName: "cdc-create-target-customers",
  statement: "CREATE TABLE `target_customers` (...) WITH ('changelog.mode' = 'upsert');",
  environmentId, computePoolId, catalogName, databaseName
)
Step 3: Create INSERT job:
mcp__confluent__create-flink-statement(
  statementName: "cdc-decode-customers",
  statement: "INSERT INTO `target_customers` SELECT ... FROM `postgres-cdc.public.customers`;",
  environmentId, computePoolId, catalogName, databaseName
)
The INSERT creates a continuous Flink job. Verify it transitions to RUNNING (not FAILED):
mcp__confluent__read-flink-statement(statementName: "cdc-decode-customers", environmentId)
Common INSERT failures:
  • "Table does not exist" → CDC source table not yet auto-discovered; wait for connector
  • "Incompatible types for sink column" → Type mismatch; check Debezium type mappings above
  • "Unsupported format" → Remove any explicit format properties from CREATE TABLE
Advisory warnings (can be ignored):
  • "Primary key does not match upsert key" — Expected for CDC decode patterns
  • "Highly state-intensive operators without TTL" — Advisory; set TTL if needed for production
步骤1:验证CDC表已自动发现:
mcp__confluent__create-flink-statement(
  statementName: "show-tables-check",
  statement: "SHOW TABLES;",
  environmentId: "<env-id>",
  computePoolId: "<pool-id>",
  catalogName: "<环境显示名称>",
  databaseName: "<集群显示名称>"
)
然后读取结果:
mcp__confluent__read-flink-statement(statementName: "show-tables-check", environmentId: "<env-id>")
查找CDC主题表(例如
postgres-cdc.public.customers
)。如果不存在,说明连接器尚未生成数据——等待并重试。
步骤2:创建目标表:
mcp__confluent__create-flink-statement(
  statementName: "cdc-create-target-customers",
  statement: "CREATE TABLE `target_customers` (...) WITH ('changelog.mode' = 'upsert');",
  environmentId, computePoolId, catalogName, databaseName
)
步骤3:创建INSERT作业:
mcp__confluent__create-flink-statement(
  statementName: "cdc-decode-customers",
  statement: "INSERT INTO `target_customers` SELECT ... FROM `postgres-cdc.public.customers`;",
  environmentId, computePoolId, catalogName, databaseName
)
INSERT会创建一个持续运行的Flink作业。验证它是否转换为RUNNING状态(而非FAILED):
mcp__confluent__read-flink-statement(statementName: "cdc-decode-customers", environmentId)
常见INSERT失败原因:
  • "Table does not exist" → CDC源表尚未自动发现;等待连接器
  • "Incompatible types for sink column" → 类型不匹配;检查上述Debezium类型映射
  • "Unsupported format" → 从CREATE TABLE中移除任何显式格式属性
可忽略的警告:
  • "Primary key does not match upsert key" — CDC解码模式的预期警告
  • "Highly state-intensive operators without TTL" — 建议性警告;生产环境中如有需要可设置TTL

3.4 Enable Tableflow

3.4 启用Tableflow

Using MCP:
mcp__confluent__create-tableflow-topic(
  tableflowTopicConfig: {
    "display_name": "target_customers",
    "storage": { "kind": "Managed", "bucket_name": "managed", "provider_integration_id": "managed" },
    "table_formats": ["ICEBERG"],
    "config": { "record_failure_strategy": "SUSPEND", "retention_ms": "6048000000" }
  }
)
KNOWN LIMITATION: The MCP
create-tableflow-topic
tool does NOT accept
environmentId
or
clusterId
parameters. It defaults to the cluster configured in the MCP server. If the MCP server points to a different cluster than where the target topic exists, this will fail with "topic not found". Use the CLI or UI as a workaround.
Using CLI:
bash
undefined
使用MCP:
mcp__confluent__create-tableflow-topic(
  tableflowTopicConfig: {
    "display_name": "target_customers",
    "storage": { "kind": "Managed", "bucket_name": "managed", "provider_integration_id": "managed" },
    "table_formats": ["ICEBERG"],
    "config": { "record_failure_strategy": "SUSPEND", "retention_ms": "6048000000" }
  }
)
已知限制: MCP的
create-tableflow-topic
工具不接受
environmentId
clusterId
参数。它默认使用MCP服务器配置的集群。如果MCP服务器指向的集群与目标主题所在集群不同,会失败并显示“topic not found”。可使用CLI或UI作为解决方法。
使用CLI:
bash
undefined

Managed storage (Confluent manages S3)

托管存储(Confluent管理S3)

confluent tableflow topic enable target_customers
--cluster <cluster-id>
--environment <env-id>
--storage-type MANAGED
--table-formats ICEBERG
confluent tableflow topic enable target_customers
--cluster <cluster-id>
--environment <env-id>
--storage-type MANAGED
--table-formats ICEBERG

BYOB / BYOS (user's own S3 bucket)

BYOB / BYOS(用户自有S3存储桶)

confluent tableflow topic enable target_customers
--cluster <cluster-id>
--environment <env-id>
--storage-type BYOS
--provider-integration <provider-integration-id>
--bucket-name <bucket-name>
--table-formats ICEBERG
confluent tableflow topic enable target_customers
--cluster <cluster-id>
--environment <env-id>
--storage-type BYOS
--provider-integration <provider-integration-id>
--bucket-name <bucket-name>
--table-formats ICEBERG

Azure Data Lake Storage Gen2

Azure Data Lake Storage Gen2

confluent tableflow topic enable target_customers
--cluster <cluster-id>
--environment <env-id>
--storage-type AzureDataLakeStorageGen2
--provider-integration <provider-integration-id>
--storage-account-name <account-name>
--container-name <container-name>
--table-formats DELTA

Use `--table-formats DELTA` for Delta Lake instead of Iceberg.

**Verify Tableflow is enabled:**
mcp__confluent__list-tableflow-topics(environmentId, clusterId)

Or via CLI:
```bash
confluent tableflow topic describe target_customers --cluster <cluster-id> --environment <env-id>
confluent tableflow topic list --cluster <cluster-id> --environment <env-id>
Status will transition from
PENDING
ACTIVE
.
confluent tableflow topic enable target_customers
--cluster <cluster-id>
--environment <env-id>
--storage-type AzureDataLakeStorageGen2
--provider-integration <provider-integration-id>
--storage-account-name <account-name>
--container-name <container-name>
--table-formats DELTA

使用`--table-formats DELTA`替代Iceberg以使用Delta Lake。

**验证Tableflow已启用:**
mcp__confluent__list-tableflow-topics(environmentId, clusterId)

或通过CLI:
```bash
confluent tableflow topic describe target_customers --cluster <cluster-id> --environment <env-id>
confluent tableflow topic list --cluster <cluster-id> --environment <env-id>
状态会从
PENDING
转换为
ACTIVE

Phase 4: Verification & Troubleshooting

阶段4:验证与故障排除

4.1 Verify End-to-End Pipeline

4.1 验证端到端管道

Large Table Snapshots: If the connector was created with
snapshot.mode: initial
on a large table, verification may take hours. To distinguish a running snapshot from a broken pipeline:
  1. Confirm connector has tasks assigned (
    read-connector
    tasks
    is non-empty)
  2. Confirm schemas are registered (
    list-schemas
    → key/value schemas exist) — this means the snapshot has started producing
  3. Monitor the source topic message count in Confluent Cloud UI — a steady stream means progress
If you used
snapshot.mode: schema_only
for initial validation, insert a test row in the source database to trigger a CDC event and verify the full pipeline. See
references/troubleshooting.md
for detailed snapshot troubleshooting.
Check each component:
CheckMCP ToolWhat to Look For
Connector running
read-connector
tasks
array is non-empty
Schemas registered
list-schemas(subjectPrefix)
Key and value schemas for CDC topic
CDC table in Flink
create-flink-statement("SHOW TABLES")
CDC topic appears as table
Flink job running
read-flink-statement
No error in response
Target topic has data
consume-messages(topicNames)
Messages appear (note: consumer starts at latest offset)
Tableflow enabled
list-tableflow-topics
Status is PENDING or ACTIVE
Consume from target topic to verify decoded data:
mcp__confluent__consume-messages(
  topicNames: ["target_customers"],
  value: { "useSchemaRegistry": true },
  key: { "useSchemaRegistry": true },
  maxMessages: 5,
  timeoutMs: 15000
)
Note: The consumer starts at the latest offset. If the initial snapshot already completed, you may see 0 messages until a new database change occurs.
Test real-time CDC by inserting a row in the source database (adapt table name and columns to match the user's actual schema):
sql
INSERT INTO public.customers (name, email, created_at)
VALUES ('Test User', 'test@example.com', NOW());
大表快照: 如果连接器在大表上以
snapshot.mode: initial
创建,验证可能需要数小时。要区分正在运行的快照与损坏的管道:
  1. 确认连接器已分配任务(
    read-connector
    tasks
    非空)
  2. 确认Schema已注册(
    list-schemas
    → 存在键/值Schema)——这意味着快照已开始生成数据
  3. 在Confluent Cloud UI中监控源主题消息计数——稳定的数据流表示正在进行中
如果使用
snapshot.mode: schema_only
进行初始验证,请在源数据库中插入测试行以触发CDC事件并验证完整管道。详见
references/troubleshooting.md
中的详细快照故障排除。
检查每个组件:
检查项MCP工具检查要点
连接器运行状态
read-connector
tasks
数组非空
Schema已注册
list-schemas(subjectPrefix)
CDC主题的键和值Schema存在
Flink中的CDC表
create-flink-statement("SHOW TABLES")
CDC主题显示为表
Flink作业运行状态
read-flink-statement
响应中无错误
目标主题有数据
consume-messages(topicNames)
出现消息(注意:消费者从最新偏移量开始)
Tableflow已启用
list-tableflow-topics
状态为PENDING或ACTIVE
从目标主题消费以验证解码后的数据:
mcp__confluent__consume-messages(
  topicNames: ["target_customers"],
  value: { "useSchemaRegistry": true },
  key: { "useSchemaRegistry": true },
  maxMessages: 5,
  timeoutMs: 15000
)
注意:消费者从最新偏移量开始。如果初始快照已完成,在新的数据库变更发生前可能会看到0条消息。
通过在源数据库中插入行测试实时CDC(根据用户实际Schema调整表名和列):
sql
INSERT INTO public.customers (name, email, created_at)
VALUES ('测试用户', 'test@example.com', NOW());

4.2 Troubleshooting

4.2 故障排除

For detailed troubleshooting, see
references/troubleshooting.md
.
Quick reference — pipeline-blocking issues:
SymptomLikely CauseFix
Connector tasks stay emptyStill provisioningWait 2-5 minutes, retry
No schemas after 5 minDB connectivity or credentialsCheck host, port, user, password; verify DB CDC config
SHOW TABLES missing CDC tableConnector not producing yetVerify schemas exist first, then wait
INSERT: "Incompatible types"Debezium type mismatchUse TIMESTAMP_LTZ(3) + TO_TIMESTAMP_LTZ; see
references/flink-sql-patterns.md
Tableflow: "topic not found"MCP cluster mismatchUse CLI:
confluent tableflow topic enable
or Confluent Cloud UI
Topic exists but not in SHOW TABLESNo schema in SRRegister a JSON schema in SR or use schema inference; see
references/connector-configs.md
详细故障排除请见
references/troubleshooting.md
快速参考——管道阻塞问题:
症状可能原因解决方法
连接器任务保持为空仍在配置中等待2-5分钟,重试
5分钟后仍无Schema数据库连接或凭据问题检查主机、端口、用户、密码;验证数据库CDC配置
SHOW TABLES中缺少CDC表连接器尚未生成数据先验证Schema存在,然后等待
INSERT:"Incompatible types"Debezium类型不匹配使用TIMESTAMP_LTZ(3) + TO_TIMESTAMP_LTZ;详见
references/flink-sql-patterns.md
Tableflow:"topic not found"MCP集群不匹配使用CLI:
confluent tableflow topic enable
或Confluent Cloud UI
主题存在但未在SHOW TABLES中显示SR中无Schema在SR中注册JSON Schema或使用Schema推断;详见
references/connector-configs.md

Phase 5: Documentation

阶段5:文档

After successful setup, provide the user with:
  1. Pipeline Summary Table: All component names, IDs, and statuses
  2. Topic Names: Source CDC topic and target topic (with schema format used)
  3. Monitoring: Check connector, Flink job, and Tableflow status in Confluent Cloud UI
  4. Test Command: SQL INSERT to verify real-time CDC
成功搭建后,向用户提供:
  1. 管道汇总表:所有组件名称、ID和状态
  2. 主题名称:源CDC主题和目标主题(使用的Schema格式)
  3. 监控:在Confluent Cloud UI中检查连接器、Flink作业和Tableflow状态
  4. 测试命令:用于验证实时CDC的SQL INSERT语句

References

参考资料