kafka-schema-registry
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseKafka Schema Registry Skill
Kafka Schema Registry 技能
Scan a project to identify Kafka applications, extract schemas, generate Terraform for Schema Registry registration, and produce a comprehensive analysis report.
扫描项目以识别Kafka应用、提取Schema、生成用于Schema Registry注册的Terraform配置,并产出全面的分析报告。
When to Use
使用场景
Invoke this skill when:
- A user asks to analyze a project for Kafka usage in order to add event schemas or integrate Schema Registry
- A user wants to extract schemas from Kafka producers
- A user wants Terraform to register schemas to Schema Registry
- A user wants to audit Kafka producer/consumer configurations
在以下场景调用此技能:
- 用户要求分析项目的Kafka使用情况,以添加事件Schema或集成Schema Registry
- 用户希望从Kafka生产者中提取Schema
- 用户需要用于向Schema Registry注册Schema的Terraform配置
- 用户希望审核Kafka生产者/消费者配置
Deliverables
交付成果
This skill produces 3 outputs in the target project:
- — Full analysis report with findings, risks, and upgrade recommendations
schema-report.md - — Extracted schema files (Avro, JSON Schema, Protobuf) with PII tagging
schemas/ - — Terraform configs using Confluent provider to register schemas
terraform/
此技能会在目标项目中生成3类输出:
- — 包含发现结果、风险点和升级建议的完整分析报告
schema-report.md - — 带有PII标记的提取Schema文件(Avro、JSON Schema、Protobuf格式)
schemas/ - — 使用Confluent Provider的Terraform配置文件,用于注册Schema
terraform/
Optional: Code Migration Assistance
可选:代码迁移协助
If the user asks for their application code to be updated to integrate Schema Registry, use the Code Migration Reference to update the code with proper Schema Registry integration patterns.
如果用户要求更新应用代码以集成Schema Registry,请参考代码迁移文档,采用标准的Schema Registry集成模式更新代码。
High-Level Workflow
高层级工作流程
Phase 0: Initialize
阶段0:初始化
- Check for existing and
schema.yamldirectory manuallyschemas/ - Note any existing schema infrastructure in the report
- 手动检查是否存在和
schema.yaml目录schemas/ - 在报告中记录已有的Schema基础设施
Phase 1: Project Scan & Kafka Detection
阶段1:项目扫描与Kafka检测
- Find build files — Search for ,
pom.xml,build.gradle,requirements.txt, etc.package.json - Detect Kafka dependencies — Look for ,
spring-kafka,confluent-kafka, etc.kafkajs - Find producers & consumers — Grep for ,
KafkaTemplate,Producer(, etc.producer.send - Extract topic names — From string literals, config properties, YAML files
- Identify serializers — Find ,
value.serializer, custom serializersKafkaAvroSerializer - Build app catalog — Compile findings: app name, language, role, topics, serializer, category
Detailed patterns: Detection Patterns Reference
App catalog structure:
yaml
app_name: module name
language: Java | Python | .NET | Go | Node/TS
role: producer | consumer | both
topics: [list of topics]
serializer_class: value.serializer used
custom_serializer: true | false
schema_format: AVRO | JSON | PROTOBUF | UNKNOWN
sr_integrated: true | false
category: A | B | C | D | E # REQUIREDMulti-schema topic detection:
- If multiple data models produce to the same topic, create a wrapper schema with /union/
oneOfoneof - Generate Terraform with blocks
schema_reference - Flag prominently in report
- 查找构建文件 — 搜索、
pom.xml、build.gradle、requirements.txt等文件package.json - 检测Kafka依赖 — 查找、
spring-kafka、confluent-kafka等依赖kafkajs - 定位生产者与消费者 — 通过Grep查找、
KafkaTemplate、Producer(等关键字producer.send - 提取主题名称 — 从字符串常量、配置属性、YAML文件中提取
- 识别序列化器 — 查找、
value.serializer及自定义序列化器KafkaAvroSerializer - 构建应用目录 — 整理发现结果:应用名称、开发语言、角色、主题、序列化器、分类
详细检测规则: 检测模式参考文档
应用目录结构:
yaml
app_name: 模块名称
language: Java | Python | .NET | Go | Node/TS
role: producer | consumer | both
topics: [主题列表]
serializer_class: 使用的value.serializer
custom_serializer: true | false
schema_format: AVRO | JSON | PROTOBUF | UNKNOWN
sr_integrated: true | false
category: A | B | C | D | E # 必填多Schema主题检测:
- 如果多个数据模型向同一主题发送数据,创建包含/union/
oneOf的包装Schemaoneof - 生成带有块的Terraform配置
schema_reference - 在报告中突出标记
Phase 2: Risk Detection
阶段2:风险检测
Search for:
- — Uncontrolled schema evolution (Category C)
auto.register.schemas=true - — Eases migration when set
use.latest.version - Custom serializers — Bypass SR entirely (Category E)
Record file path, line number, and affected topics for each occurrence.
Patterns: Detection Patterns Reference
搜索以下风险点:
- — 不受控的Schema演进(C类)
auto.register.schemas=true - — 设置后可简化迁移
use.latest.version - 自定义序列化器 — 完全绕过SR(E类)
记录每个风险点的文件路径、行号及受影响的主题。
检测规则: 检测模式参考文档
Phase 3: Schema Inference
阶段3:Schema推断
For each producer:
- Check for existing schema files — ,
**/*.avsc,**/*.proto**/*.schema.json - Infer from data models — Java classes, Pydantic models, TypeScript interfaces, Go structs
- Infer from inline data — HashMap, dict literals, map[string]any, plain objects, JSON strings
- Convert to schemas — Map language types to JSON Schema / Avro / Protobuf
- Tag PII fields — Scan field names for ,
email,ssn,phone, etc.address
PII tagging: Add (, , , ) to detected fields.
confluent:tagsPIIPRIVATESENSITIVEPHIDetailed inference patterns: Schema Inference Reference
针对每个生产者:
- 检查已有Schema文件 — 查找、
**/*.avsc、**/*.proto文件**/*.schema.json - 从数据模型推断 — Java类、Pydantic模型、TypeScript接口、Go结构体
- 从内联数据推断 — HashMap、字典常量、map[string]any、普通对象、JSON字符串
- 转换为Schema格式 — 将语言类型映射为JSON Schema / Avro / Protobuf
- 标记PII字段 — 扫描字段名称,识别、
email、ssn、phone等关键字address
PII标记规则: 为检测到的字段添加标签(、、、)。
confluent:tagsPIIPRIVATESENSITIVEPHI详细推断规则: Schema推断参考文档
Phase 4: Categorize Producers
阶段4:生产者分类
Classify each producer:
| Category | Criteria |
|---|---|
| A: Compliant | Confluent serializer + SR + no auto.register |
| A→Header | Already on SR, migrating to headers |
| B: Schema in code, no SR | Data models exist, but no SR integration |
| C: Auto-register | |
| D: No schema | Raw strings/bytes, no data model |
| E: Custom serializer | Custom |
CRITICAL: Use exact phrase "Category X" in:
- App catalog field
- Applications Discovered table
- Report section headers
- Terraform comments
- Risk sections
Details: Categorization Reference
对每个生产者进行分类:
| 分类 | 判定标准 |
|---|---|
| A: 合规 | 使用Confluent序列化器 + 已集成SR + 未开启auto.register |
| A→Header | 已接入SR,正在迁移至Header模式 |
| B: 代码中存在Schema,未集成SR | 存在数据模型,但未集成Schema Registry |
| C: 自动注册 | |
| D: 无Schema | 使用原始字符串/字节,无数据模型 |
| E: 自定义序列化器 | 使用自定义 |
重要要求: 在以下场景必须使用精确的「分类X」表述:
- 应用目录字段
- 已发现应用表格
- 报告章节标题
- Terraform注释
- 风险部分
详细说明: 分类参考文档
Phase 5: Create Schema Files
阶段5:创建Schema文件
Directory structure:
schemas/
├── avro/
│ └── {topic}-value.avsc
├── json/
│ └── {topic}-value.json
└── proto/
└── {topic}-value.protoFile naming: MUST use kebab-case (lowercase with hyphens):
- Value:
{topic}-value.{ext} - Key:
{topic}-key.{ext} - Examples: ,
order-events-value.avscuser-notifications-value.json
Initialize: Create .
schema.yamlValidate: Call if available.
schema_lint(path: schemas/, fix: true)目录结构:
schemas/
├── avro/
│ └── {topic}-value.avsc
├── json/
│ └── {topic}-value.json
└── proto/
└── {topic}-value.proto文件命名规则: 必须使用短横线命名法(小写字母加连字符):
- 值Schema:
{topic}-value.{ext} - 键Schema:
{topic}-key.{ext} - 示例:、
order-events-value.avscuser-notifications-value.json
初始化操作: 创建文件。
schema.yaml验证操作: 如果可用,调用工具。
schema_lint(path: schemas/, fix: true)Phase 6: Generate Terraform
阶段6:生成Terraform配置
File structure (MANDATORY separate files):
terraform/
├── providers.tf # Provider config
├── variables.tf # Variable definitions
├── tags.tf # confluent_tag resources (if PII exists)
├── schemas.tf # Active schemas (A, B, E)
├── flagged-auto-register.tf # Category C only (commented out)
├── outputs.tf # Output values
└── import.sh # Import scriptCRITICAL:
- = Categories A, B, E — NOT commented out
schemas.tf - = Category C ONLY — MUST be commented out
flagged-auto-register.tf - = MUST exist if ANY schema uses
tags.tfconfluent:tags - Each schema resource MUST have comment block: Topic, App, Source, Category
Templates: Terraform Templates Reference
文件结构(必须拆分到独立文件):
terraform/
├── providers.tf # Provider配置
├── variables.tf # 变量定义
├── tags.tf # confluent_tag资源(如果存在PII)
├── schemas.tf # 活跃Schema(A、B、E类)
├── flagged-auto-register.tf # 仅包含C类(需注释)
├── outputs.tf # 输出值
└── import.sh # 导入脚本重要要求:
- = 包含A、B、E类Schema — 不能注释
schemas.tf - = 仅包含C类Schema — 必须注释
flagged-auto-register.tf - = 只要有Schema使用
tags.tf就必须存在confluent:tags - 每个Schema资源必须包含注释块:主题、应用、来源、分类
模板参考: Terraform模板参考文档
Phase 7: Generate Report
阶段7:生成报告
Create with:
schema-report.md- Executive Summary (metrics + category breakdown)
- Applications Discovered table (EXACT format, Category column MANDATORY)
- RISKS (auto-register, custom serializers)
- Producer Upgrade Recommendations (per app, with "Category X" in heading)
- Migration Rollout Ordering (by category)
- PII Fields Detected
- Terraform Resources Generated
- Next Steps checklist
CRITICAL formatting requirements:
- Applications Discovered = markdown table, NOT narrative sections
- Every app section MUST say "Category X" explicitly
- Terraform comment blocks required for every resource
Template: Report Template Reference
创建,包含以下内容:
schema-report.md- 执行摘要(指标 + 分类分布)
- 已发现应用表格(格式固定,必须包含分类列)
- 风险点(自动注册、自定义序列化器)
- 生产者升级建议(按应用分类,标题需包含「分类X」)
- 迁移部署顺序(按分类排序)
- 检测到的PII字段
- 生成的Terraform资源
- 后续步骤清单
重要格式要求:
- 已发现应用必须用Markdown表格展示,不能用叙述性段落
- 每个应用章节必须明确标注「分类X」
- 每个Terraform资源必须包含注释块
模板参考: 报告模板参考文档
Migration Rollout by Category
按分类的迁移部署顺序
- Category B (JSON, no SR): Producers first → consumers
- Category A→Header (already on SR): Verify consumer versions → producers only
- Category C (auto-register): Register via Terraform → disable auto-register → producers fetch latest
- Category E (custom serializers): Consumers first (composite deserializer) → producers
Details: Categorization Reference
- B类(JSON格式,未集成SR):先升级生产者 → 再升级消费者
- A→Header类(已接入SR):验证消费者版本 → 仅升级生产者
- C类(自动注册):通过Terraform注册Schema → 关闭自动注册 → 生产者拉取最新版本
- E类(自定义序列化器):先升级消费者(使用复合反序列化器)→ 再升级生产者
详细说明: 分类参考文档
Edge Cases
边缘场景处理
- Monorepos: Treat each service/module with Kafka deps as separate app
- Multi-topic producers: Generate one schema resource per topic
- Shared schemas: One schema file, multiple Terraform resources reference it
- No topic names: If loaded from env vars, use placeholders with TODO
- Test code: Skip test directories unless they contain only schema definitions
- Multiple serializers: Create separate schema files per format
- 单体仓库: 将每个包含Kafka依赖的服务/模块视为独立应用
- 多主题生产者: 为每个主题生成一个Schema资源
- 共享Schema: 一个Schema文件,多个Terraform资源引用它
- 无主题名称: 如果从环境变量加载,使用占位符并标记TODO
- 测试代码: 跳过测试目录,除非目录中仅包含Schema定义
- 多序列化器: 为每种格式创建独立的Schema文件
Output Organization
输出目录结构
{project_root}/
├── schema-report.md # Analysis report
├── schemas/
│ ├── schema.yaml # Schema project config
│ ├── avro/
│ │ └── {topic}-value.avsc
│ ├── json/
│ │ └── {topic}-value.json
│ └── proto/
│ └── {topic}-value.proto
└── terraform/
├── providers.tf
├── variables.tf
├── tags.tf # PII/PRIVATE/SENSITIVE tags
├── schemas.tf # Active schemas (depends_on tags)
├── flagged-auto-register.tf # Commented-out Category C
├── outputs.tf
└── import.sh # Import existing schemas{project_root}/
├── schema-report.md # 分析报告
├── schemas/
│ ├── schema.yaml # Schema项目配置
│ ├── avro/
│ │ └── {topic}-value.avsc
│ ├── json/
│ │ └── {topic}-value.json
│ └── proto/
│ └── {topic}-value.proto
└── terraform/
├── providers.tf
├── variables.tf
├── tags.tf # PII/PRIVATE/SENSITIVE标签
├── schemas.tf # 活跃Schema(依赖tags)
├── flagged-auto-register.tf # 已注释的C类Schema
├── outputs.tf
└── import.sh # 导入现有Schema的脚本Reference Documentation
参考文档
- Detection Patterns — Patterns for finding Kafka apps, dependencies, producers, consumers, serializers
- Schema Inference — Extract schemas from data models, inline data, PII tagging
- Categorization — Category definitions, rollout order, client version requirements
- Terraform Templates — File structure, templates, naming conventions
- Report Template — Required sections, formatting rules, validation checklist
- Code Migration — Serializer/deserializer implementation patterns for Python, Java, JavaScript, Go, and .NET
- 检测模式 — 查找Kafka应用、依赖、生产者、消费者、序列化器的规则
- Schema推断 — 从数据模型、内联数据提取Schema及PII标记规则
- 分类规则 — 分类定义、部署顺序、客户端版本要求
- Terraform模板 — 文件结构、模板、命名规范
- 报告模板 — 必填章节、格式规则、验证清单
- 代码迁移 — Python、Java、JavaScript、Go及.NET的序列化/反序列化实现模式
Execution Approach
执行方法
- Use Glob to find build files and schema files
- Use Grep for pattern detection (dependencies, producers, serializers, risks)
- Use Read to inspect source files and data models
- Use Write to create schema files, Terraform configs, and report
No need to use Agent tool — this skill is self-contained and uses direct tool calls.
- 使用Glob查找构建文件和Schema文件
- 使用Grep进行模式检测(依赖、生产者、序列化器、风险点)
- 使用Read读取源文件和数据模型
- 使用Write创建Schema文件、Terraform配置和报告
无需使用Agent工具 — 此技能为独立技能,直接调用工具即可。