databricks-zerobus-ingest
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseZerobus Ingest
Zerobus Ingest
Build clients that ingest data directly into Databricks Delta tables via the Zerobus gRPC API.
Status: Public Preview (currently free; Databricks plans to introduce charges in the future)
Documentation:
构建可通过Zerobus gRPC API直接将数据摄取到Databricks Delta表的客户端。
状态: 公共预览版(当前免费,Databricks计划未来推出收费方案)
文档:
What Is Zerobus Ingest?
什么是Zerobus Ingest?
Zerobus Ingest is a serverless connector that enables direct, record-by-record data ingestion into Delta tables via gRPC. It eliminates the need for message bus infrastructure (Kafka, Kinesis, Event Hub) for lakehouse-bound data. The service validates schemas, materializes data to target tables, and sends durability acknowledgments back to the client.
Core pattern: SDK init -> create stream -> ingest records -> handle ACKs -> flush -> close
Zerobus Ingest是一款无服务器连接器,支持通过gRPC直接逐条将数据摄取到Delta表中,省去了湖仓数据接入所需的消息总线基础设施(Kafka、Kinesis、Event Hub等)。该服务会验证schema、将数据物化到目标表,并向客户端返回持久化确认通知。
核心流程: 初始化SDK -> 创建流 -> 摄取记录 -> 处理ACK -> 刷新数据 -> 关闭流
Quick Decision: What Are You Building?
快速选型:你要构建什么?
| Scenario | Language | Serialization | Reference |
|---|---|---|---|
| Quick prototype / test harness | Python | JSON | 2-python-client.md |
| Production Python producer | Python | Protobuf | 2-python-client.md + 4-protobuf-schema.md |
| JVM microservice | Java | Protobuf | 3-multilanguage-clients.md |
| Go service | Go | JSON or Protobuf | 3-multilanguage-clients.md |
| Node.js / TypeScript app | TypeScript | JSON | 3-multilanguage-clients.md |
| High-performance system service | Rust | JSON or Protobuf | 3-multilanguage-clients.md |
| Schema generation from UC table | Any | Protobuf | 4-protobuf-schema.md |
| Retry / reconnection logic | Any | Any | 5-operations-and-limits.md |
If not speficfied, default to python.
| 场景 | 开发语言 | 序列化方式 | 参考文档 |
|---|---|---|---|
| 快速原型/测试工具 | Python | JSON | 2-python-client.md |
| 生产环境Python生产者 | Python | Protobuf | 2-python-client.md + 4-protobuf-schema.md |
| JVM微服务 | Java | Protobuf | 3-multilanguage-clients.md |
| Go服务 | Go | JSON或Protobuf | 3-multilanguage-clients.md |
| Node.js/TypeScript应用 | TypeScript | JSON | 3-multilanguage-clients.md |
| 高性能系统服务 | Rust | JSON或Protobuf | 3-multilanguage-clients.md |
| 从UC表生成schema | 任意 | Protobuf | 4-protobuf-schema.md |
| 重试/重连逻辑 | 任意 | 任意 | 5-operations-and-limits.md |
如未指定,默认使用Python。
Common Libraries
常用依赖库
These libraries are essential for ZeroBus data ingestion:
- databricks-sdk>=0.85.0: Databricks workspace client for authentication and metadata
- databricks-zerobus-ingest-sdk>=0.2.0: ZeroBus SDK for high-performance streaming ingestion
- grpcio-tools
These are typically NOT pre-installed on Databricks. Install them using tool:
execute_databricks_command - : "%pip install databricks-sdk>=VERSION databricks-zerobus-ingest-sdk>=VERSION"
code
Save the returned and for subsequent calls.
cluster_idcontext_idSmart Installation Approach
以下是ZeroBus数据摄取必备的依赖库:
- databricks-sdk>=0.85.0:Databricks工作区客户端,用于身份认证和元数据操作
- databricks-zerobus-ingest-sdk>=0.2.0:ZeroBus SDK,用于高性能流式摄取
- grpcio-tools
这些依赖通常不会预装在Databricks环境中,可通过工具安装:
execute_databricks_command - : "%pip install databricks-sdk>=VERSION databricks-zerobus-ingest-sdk>=VERSION"
code
请保存返回的和用于后续调用。
cluster_idcontext_id智能安装方案
python
undefinedCheck protobuf version first, then install compatible
先检查protobuf版本,再安装兼容的grpcio-tools
grpcio-tools
import google.protobuf
runtime_version = google.protobuf.version
print(f"Runtime protobuf version: {runtime_version}")
if runtime_version.startswith("5.26") or runtime_version.startswith("5.29"): %pip install grpcio-tools==1.62.0 else: %pip install grpcio-tools # Use latest for newer protobuf versions
import google.protobuf
runtime_version = google.protobuf.version
print(f"Runtime protobuf version: {runtime_version}")
if runtime_version.startswith("5.26") or runtime_version.startswith("5.29"):
%pip install grpcio-tools==1.62.0
else:
%pip install grpcio-tools # 新版本protobuf使用最新版grpcio-tools
---Prerequisites
前置条件
You must never execute the skill without confirming the below objects are valid:
- A Unity Catalog managed Delta table to ingest into
- A service principal id and secret with and
MODIFYon the target tableSELECT - The Zerobus server endpoint for your workspace region
- The Zerobus Ingest SDK installed for your target language
See 1-setup-and-authentication.md for complete setup instructions.
在未确认以下对象有效前,切勿执行相关操作:
- 用于写入数据的Unity Catalog托管Delta表
- 对目标表拥有和
MODIFY权限的服务主体ID和密钥SELECT - 对应工作区区域的Zerobus服务端点
- 适配目标开发语言的Zerobus Ingest SDK
完整配置说明请参考1-setup-and-authentication.md。
Minimal Python Example (JSON)
最简Python示例(JSON格式)
python
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
sdk = ZerobusSdk(server_endpoint, workspace_url)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
table_props = TableProperties(table_name)
stream = sdk.create_stream(client_id, client_secret, table_props, options)
try:
record = {"device_name": "sensor-1", "temp": 22, "humidity": 55}
offset = stream.ingest_record_offset(record)
stream.wait_for_offset(offset)
finally:
stream.close()python
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
sdk = ZerobusSdk(server_endpoint, workspace_url)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
table_props = TableProperties(table_name)
stream = sdk.create_stream(client_id, client_secret, table_props, options)
try:
record = {"device_name": "sensor-1", "temp": 22, "humidity": 55}
offset = stream.ingest_record_offset(record)
stream.wait_for_offset(offset)
finally:
stream.close()Detailed guides
详细指南
| Topic | File | When to Read |
|---|---|---|
| Setup & Auth | 1-setup-and-authentication.md | Endpoint formats, service principals, SDK install |
| Python Client | 2-python-client.md | Sync/async Python, JSON and Protobuf flows, reusable client class |
| Multi-Language | 3-multilanguage-clients.md | Java, Go, TypeScript, Rust SDK examples |
| Protobuf Schema | 4-protobuf-schema.md | Generate .proto from UC table, compile, type mappings |
| Operations & Limits | 5-operations-and-limits.md | ACK handling, retries, reconnection, throughput limits, constraints |
You must always follow all the steps in the Workslfow
| 主题 | 文件 | 适用场景 |
|---|---|---|
| 配置与身份认证 | 1-setup-and-authentication.md | 端点格式、服务主体配置、SDK安装 |
| Python客户端 | 2-python-client.md | 同步/异步Python开发、JSON和Protobuf流实现、可复用客户端类 |
| 多语言支持 | 3-multilanguage-clients.md | Java、Go、TypeScript、Rust SDK示例 |
| Protobuf Schema | 4-protobuf-schema.md | 从UC表生成.proto文件、编译、类型映射 |
| 运维与限制 | 5-operations-and-limits.md | ACK处理、重试、重连、吞吐量限制、约束说明 |
请严格遵循以下工作流的所有步骤操作
Workflow
工作流
- Display the plan of your execution
- Determinate the type of client
- Get schema Always use 4-protobuf-schema.md. Execute using the MCP tool
run_python_file_on_databricks - Write Python code to a local file follow the instructions in the relevant guide to ingest with zerobus in the project (e.g., ).
scripts/zerobus_ingest.py - Execute on Databricks using the MCP tool
run_python_file_on_databricks - If execution fails: Edit the local file to fix the error, then re-execute
- Reuse the context for follow-up executions by passing the returned and
cluster_idcontext_id
- 展示你的执行计划
- 确定客户端类型
- 获取Schema 始终参考4-protobuf-schema.md,使用MCP工具执行相关操作
run_python_file_on_databricks - 参考对应指南编写Python代码到项目本地文件 实现Zerobus摄取逻辑(例如)
scripts/zerobus_ingest.py - 使用MCP工具在Databricks上执行代码
run_python_file_on_databricks - 如果执行失败:编辑本地文件修复错误后重新执行
- 后续执行复用上下文 传入返回的和
cluster_id即可context_id
Important
注意事项
- Never install local packages
- Always validate MCP server requirement before execution
- 切勿安装本地包
- 执行前始终验证MCP服务要求
Context Reuse Pattern
上下文复用模式
The first execution auto-selects a running cluster and creates an execution context. Reuse this context for follow-up calls - it's much faster (~1s vs ~15s) and shares variables/imports:
First execution - use tool:
run_python_file_on_databricks- : "scripts/zerobus_ingest.py"
file_path
Returns:
{ success, output, error, cluster_id, context_id, ... }Save and for follow-up calls.
cluster_idcontext_idIf execution fails:
- Read the error from the result
- Edit the local Python file to fix the issue
- Re-execute with same context using tool:
run_python_file_on_databricks- : "scripts/zerobus_ingest.py"
file_path - : "<saved_cluster_id>"
cluster_id - : "<saved_context_id>"
context_id
Follow-up executions reuse the context (faster, shares state):
- : "scripts/validate_ingestion.py"
file_path - : "<saved_cluster_id>"
cluster_id - : "<saved_context_id>"
context_id
首次执行会自动选择运行中的集群并创建执行上下文,后续调用请复用该上下文——速度快得多(约1秒vs约15秒),且可以共享变量/导入内容:
首次执行 - 使用工具:
run_python_file_on_databricks- : "scripts/zerobus_ingest.py"
file_path
返回:
{ success, output, error, cluster_id, context_id, ... }保存和用于后续调用。
cluster_idcontext_id如果执行失败:
- 读取返回结果中的错误信息
- 编辑本地Python文件修复问题
- 使用相同上下文重新调用工具:
run_python_file_on_databricks- : "scripts/zerobus_ingest.py"
file_path - : "<saved_cluster_id>"
cluster_id - : "<saved_context_id>"
context_id
后续执行 复用上下文(速度更快、共享状态):
- : "scripts/validate_ingestion.py"
file_path - : "<saved_cluster_id>"
cluster_id - : "<saved_context_id>"
context_id
Handling Failures
故障处理
When execution fails:
- Read the error from the result
- Edit the local Python file to fix the issue
- Re-execute using the same and
cluster_id(faster, keeps installed libraries)context_id - If the context is corrupted, omit to create a fresh one
context_id
执行失败时:
- 读取返回结果中的错误信息
- 编辑本地Python文件修复问题
- 使用相同的和
cluster_id重新执行(速度更快,保留已安装的依赖库)context_id - 如果上下文损坏,省略创建新的上下文即可
context_id
Installing Libraries
安装依赖库
Databricks provides Spark, pandas, numpy, and common data libraries by default. Only install a library if you get an import error.
Use tool:
execute_databricks_command- : "%pip install databricks-zerobus-ingest-sdk>=0.2.0"
code - : "<cluster_id>"
cluster_id - : "<context_id>"
context_id
The library is immediately available in the same context.
Note: Keeping the same means installed libraries persist across calls.
context_idDatabricks默认提供Spark、pandas、numpy等常用数据处理库,仅当出现导入错误时才安装额外依赖。
使用工具:
execute_databricks_command- : "%pip install databricks-zerobus-ingest-sdk>=0.2.0"
code - : "<cluster_id>"
cluster_id - : "<context_id>"
context_id
安装的依赖会在当前上下文中立即生效。
注意: 保持相同的意味着安装的依赖会在多次调用中持久化。
context_id🚨 Critical Learning: Timestamp Format Fix
🚨 关键注意点:时间戳格式修复
BREAKTHROUGH: ZeroBus requires timestamp fields as Unix integer timestamps, NOT string timestamps.
The timestamp generation must use microseconds for Databricks.
重要说明:ZeroBus要求时间戳字段为Unix整数时间戳,不支持字符串格式的时间戳。Databricks的时间戳生成必须使用微秒级精度。
Key Concepts
核心概念
- gRPC + Protobuf: Zerobus uses gRPC as its transport protocol. Any application that can communicate via gRPC and construct Protobuf messages can produce to Zerobus.
- JSON or Protobuf serialization: JSON for quick starts; Protobuf for type safety, forward compatibility, and performance.
- At-least-once delivery: The connector provides at-least-once guarantees. Design consumers to handle duplicates.
- Durability ACKs: Each ingested record returns an offset. Use to confirm durable write. ACKs indicate all records up to that offset have been durably written.
wait_for_offset(offset) - No table management: Zerobus does not create or alter tables. You must pre-create your target table and manage schema evolution yourself.
- Single-AZ durability: The service runs in a single availability zone. Plan for potential zone outages.
- gRPC + Protobuf:Zerobus使用gRPC作为传输协议,任何支持gRPC通信、可构造Protobuf消息的应用都可以作为Zerobus的生产者。
- JSON或Protobuf序列化:JSON适合快速上手;Protobuf提供类型安全、向前兼容和更高性能。
- 至少一次交付:连接器提供至少一次交付保证,消费者需设计去重逻辑。
- 持久化ACK:每条摄取的记录都会返回一个偏移量,使用可确认数据已持久化写入,ACK表示该偏移量之前的所有记录都已完成持久化。
wait_for_offset(offset) - 无表管理能力:Zerobus不会创建或修改表,你需要提前创建目标表并自行管理schema演进。
- 单可用区持久化:服务运行在单个可用区,需提前规划可用区故障的应对方案。
Common Issues
常见问题
| Issue | Solution |
|---|---|
| Connection refused | Verify server endpoint format matches your cloud (AWS vs Azure). Check firewall allowlists. |
| Authentication failed | Confirm service principal client_id/secret. Verify GRANT statements on the target table. |
| Schema mismatch | Ensure record fields match the target table schema exactly. Regenerate .proto if table changed. |
| Stream closed unexpectedly | Implement retry with exponential backoff and stream reinitialization. See 5-operations-and-limits.md. |
| Throughput limits hit | Max 100 MB/s and 15,000 rows/s per stream. Open multiple streams or contact Databricks. |
| Region not supported | Check supported regions in 5-operations-and-limits.md. |
| Table not found | Ensure table is a managed Delta table in a supported region with correct three-part name. |
| 问题 | 解决方案 |
|---|---|
| 连接被拒绝 | 确认服务端点格式匹配你的云厂商(AWS vs Azure),检查防火墙白名单配置。 |
| 身份认证失败 | 确认服务主体的client_id/密钥正确,验证目标表的GRANT权限配置。 |
| Schema不匹配 | 确保记录字段与目标表schema完全一致,若表有变更请重新生成.proto文件。 |
| 流意外关闭 | 实现指数退避重试和流重新初始化逻辑,参考5-operations-and-limits.md。 |
| 触及吞吐量上限 | 单流最大吞吐量为100 MB/s、15000行/秒,可开启多个流或联系Databricks支持。 |
| 区域不支持 | 查看5-operations-and-limits.md中的支持区域列表。 |
| 找不到表 | 确认表是支持区域内的托管Delta表,且使用正确的三段式名称。 |
Related Skills
相关技能
- databricks-python-sdk - General SDK patterns and WorkspaceClient for table/schema management
- databricks-spark-declarative-pipelines - Downstream pipeline processing of ingested data
- databricks-unity-catalog - Managing catalogs, schemas, and tables that Zerobus writes to
- databricks-synthetic-data-generation - Generate test data to feed into Zerobus producers
- databricks-config - Profile and authentication setup
- databricks-python-sdk - 通用SDK使用模式,以及用于表/schema管理的WorkspaceClient
- databricks-spark-declarative-pipelines - 摄取后数据的下游流水线处理
- databricks-unity-catalog - 管理Zerobus写入的目录、schema和表
- databricks-synthetic-data-generation - 生成测试数据供Zerobus生产者使用
- databricks-config - 配置和身份认证设置