databricks-zerobus-ingest

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Zerobus Ingest

Zerobus Ingest

Build clients that ingest data directly into Databricks Delta tables via the Zerobus gRPC API.
Status: Public Preview (currently free; Databricks plans to introduce charges in the future)
Documentation:

构建可通过Zerobus gRPC API直接将数据摄取到Databricks Delta表的客户端。
状态: 公共预览版(当前免费,Databricks计划未来推出收费方案)
文档:

What Is Zerobus Ingest?

什么是Zerobus Ingest?

Zerobus Ingest is a serverless connector that enables direct, record-by-record data ingestion into Delta tables via gRPC. It eliminates the need for message bus infrastructure (Kafka, Kinesis, Event Hub) for lakehouse-bound data. The service validates schemas, materializes data to target tables, and sends durability acknowledgments back to the client.
Core pattern: SDK init -> create stream -> ingest records -> handle ACKs -> flush -> close

Zerobus Ingest是一款无服务器连接器,支持通过gRPC直接逐条将数据摄取到Delta表中,省去了湖仓数据接入所需的消息总线基础设施(Kafka、Kinesis、Event Hub等)。该服务会验证schema、将数据物化到目标表,并向客户端返回持久化确认通知。
核心流程: 初始化SDK -> 创建流 -> 摄取记录 -> 处理ACK -> 刷新数据 -> 关闭流

Quick Decision: What Are You Building?

快速选型:你要构建什么?

ScenarioLanguageSerializationReference
Quick prototype / test harnessPythonJSON2-python-client.md
Production Python producerPythonProtobuf2-python-client.md + 4-protobuf-schema.md
JVM microserviceJavaProtobuf3-multilanguage-clients.md
Go serviceGoJSON or Protobuf3-multilanguage-clients.md
Node.js / TypeScript appTypeScriptJSON3-multilanguage-clients.md
High-performance system serviceRustJSON or Protobuf3-multilanguage-clients.md
Schema generation from UC tableAnyProtobuf4-protobuf-schema.md
Retry / reconnection logicAnyAny5-operations-and-limits.md
If not speficfied, default to python.

场景开发语言序列化方式参考文档
快速原型/测试工具PythonJSON2-python-client.md
生产环境Python生产者PythonProtobuf2-python-client.md + 4-protobuf-schema.md
JVM微服务JavaProtobuf3-multilanguage-clients.md
Go服务GoJSON或Protobuf3-multilanguage-clients.md
Node.js/TypeScript应用TypeScriptJSON3-multilanguage-clients.md
高性能系统服务RustJSON或Protobuf3-multilanguage-clients.md
从UC表生成schema任意Protobuf4-protobuf-schema.md
重试/重连逻辑任意任意5-operations-and-limits.md
如未指定,默认使用Python。

Common Libraries

常用依赖库

These libraries are essential for ZeroBus data ingestion:
  • databricks-sdk>=0.85.0: Databricks workspace client for authentication and metadata
  • databricks-zerobus-ingest-sdk>=0.2.0: ZeroBus SDK for high-performance streaming ingestion
  • grpcio-tools These are typically NOT pre-installed on Databricks. Install them using
    execute_databricks_command
    tool:
  • code
    : "%pip install databricks-sdk>=VERSION databricks-zerobus-ingest-sdk>=VERSION"
Save the returned
cluster_id
and
context_id
for subsequent calls.
Smart Installation Approach
以下是ZeroBus数据摄取必备的依赖库:
  • databricks-sdk>=0.85.0:Databricks工作区客户端,用于身份认证和元数据操作
  • databricks-zerobus-ingest-sdk>=0.2.0:ZeroBus SDK,用于高性能流式摄取
  • grpcio-tools 这些依赖通常不会预装在Databricks环境中,可通过
    execute_databricks_command
    工具安装:
  • code
    : "%pip install databricks-sdk>=VERSION databricks-zerobus-ingest-sdk>=VERSION"
请保存返回的
cluster_id
context_id
用于后续调用。
智能安装方案
python
undefined

Check protobuf version first, then install compatible

先检查protobuf版本,再安装兼容的grpcio-tools

grpcio-tools import google.protobuf runtime_version = google.protobuf.version print(f"Runtime protobuf version: {runtime_version}")

if runtime_version.startswith("5.26") or runtime_version.startswith("5.29"): %pip install grpcio-tools==1.62.0 else: %pip install grpcio-tools # Use latest for newer protobuf versions

import google.protobuf runtime_version = google.protobuf.version print(f"Runtime protobuf version: {runtime_version}")
if runtime_version.startswith("5.26") or runtime_version.startswith("5.29"): %pip install grpcio-tools==1.62.0 else: %pip install grpcio-tools # 新版本protobuf使用最新版grpcio-tools
---

Prerequisites

前置条件

You must never execute the skill without confirming the below objects are valid:
  1. A Unity Catalog managed Delta table to ingest into
  2. A service principal id and secret with
    MODIFY
    and
    SELECT
    on the target table
  3. The Zerobus server endpoint for your workspace region
  4. The Zerobus Ingest SDK installed for your target language
See 1-setup-and-authentication.md for complete setup instructions.

在未确认以下对象有效前,切勿执行相关操作:
  1. 用于写入数据的Unity Catalog托管Delta表
  2. 对目标表拥有
    MODIFY
    SELECT
    权限的服务主体ID和密钥
  3. 对应工作区区域的Zerobus服务端点
  4. 适配目标开发语言的Zerobus Ingest SDK
完整配置说明请参考1-setup-and-authentication.md

Minimal Python Example (JSON)

最简Python示例(JSON格式)

python
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

sdk = ZerobusSdk(server_endpoint, workspace_url)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
table_props = TableProperties(table_name)

stream = sdk.create_stream(client_id, client_secret, table_props, options)
try:
    record = {"device_name": "sensor-1", "temp": 22, "humidity": 55}
    offset = stream.ingest_record_offset(record)
    stream.wait_for_offset(offset)
finally:
    stream.close()

python
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties

sdk = ZerobusSdk(server_endpoint, workspace_url)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
table_props = TableProperties(table_name)

stream = sdk.create_stream(client_id, client_secret, table_props, options)
try:
    record = {"device_name": "sensor-1", "temp": 22, "humidity": 55}
    offset = stream.ingest_record_offset(record)
    stream.wait_for_offset(offset)
finally:
    stream.close()

Detailed guides

详细指南

TopicFileWhen to Read
Setup & Auth1-setup-and-authentication.mdEndpoint formats, service principals, SDK install
Python Client2-python-client.mdSync/async Python, JSON and Protobuf flows, reusable client class
Multi-Language3-multilanguage-clients.mdJava, Go, TypeScript, Rust SDK examples
Protobuf Schema4-protobuf-schema.mdGenerate .proto from UC table, compile, type mappings
Operations & Limits5-operations-and-limits.mdACK handling, retries, reconnection, throughput limits, constraints

You must always follow all the steps in the Workslfow
主题文件适用场景
配置与身份认证1-setup-and-authentication.md端点格式、服务主体配置、SDK安装
Python客户端2-python-client.md同步/异步Python开发、JSON和Protobuf流实现、可复用客户端类
多语言支持3-multilanguage-clients.mdJava、Go、TypeScript、Rust SDK示例
Protobuf Schema4-protobuf-schema.md从UC表生成.proto文件、编译、类型映射
运维与限制5-operations-and-limits.mdACK处理、重试、重连、吞吐量限制、约束说明

请严格遵循以下工作流的所有步骤操作

Workflow

工作流

  1. Display the plan of your execution
  2. Determinate the type of client
  3. Get schema Always use 4-protobuf-schema.md. Execute using the
    run_python_file_on_databricks
    MCP tool
  4. Write Python code to a local file follow the instructions in the relevant guide to ingest with zerobus in the project (e.g.,
    scripts/zerobus_ingest.py
    ).
  5. Execute on Databricks using the
    run_python_file_on_databricks
    MCP tool
  6. If execution fails: Edit the local file to fix the error, then re-execute
  7. Reuse the context for follow-up executions by passing the returned
    cluster_id
    and
    context_id

  1. 展示你的执行计划
  2. 确定客户端类型
  3. 获取Schema 始终参考4-protobuf-schema.md,使用
    run_python_file_on_databricks
    MCP工具执行相关操作
  4. 参考对应指南编写Python代码到项目本地文件 实现Zerobus摄取逻辑(例如
    scripts/zerobus_ingest.py
  5. 使用
    run_python_file_on_databricks
    MCP工具在Databricks上执行代码
  6. 如果执行失败:编辑本地文件修复错误后重新执行
  7. 后续执行复用上下文 传入返回的
    cluster_id
    context_id
    即可

Important

注意事项

  • Never install local packages
  • Always validate MCP server requirement before execution

  • 切勿安装本地包
  • 执行前始终验证MCP服务要求

Context Reuse Pattern

上下文复用模式

The first execution auto-selects a running cluster and creates an execution context. Reuse this context for follow-up calls - it's much faster (~1s vs ~15s) and shares variables/imports:
First execution - use
run_python_file_on_databricks
tool:
  • file_path
    : "scripts/zerobus_ingest.py"
Returns:
{ success, output, error, cluster_id, context_id, ... }
Save
cluster_id
and
context_id
for follow-up calls.
If execution fails:
  1. Read the error from the result
  2. Edit the local Python file to fix the issue
  3. Re-execute with same context using
    run_python_file_on_databricks
    tool:
    • file_path
      : "scripts/zerobus_ingest.py"
    • cluster_id
      : "<saved_cluster_id>"
    • context_id
      : "<saved_context_id>"
Follow-up executions reuse the context (faster, shares state):
  • file_path
    : "scripts/validate_ingestion.py"
  • cluster_id
    : "<saved_cluster_id>"
  • context_id
    : "<saved_context_id>"
首次执行会自动选择运行中的集群并创建执行上下文,后续调用请复用该上下文——速度快得多(约1秒vs约15秒),且可以共享变量/导入内容:
首次执行 - 使用
run_python_file_on_databricks
工具:
  • file_path
    : "scripts/zerobus_ingest.py"
返回:
{ success, output, error, cluster_id, context_id, ... }
保存
cluster_id
context_id
用于后续调用。
如果执行失败:
  1. 读取返回结果中的错误信息
  2. 编辑本地Python文件修复问题
  3. 使用相同上下文重新调用
    run_python_file_on_databricks
    工具:
    • file_path
      : "scripts/zerobus_ingest.py"
    • cluster_id
      : "<saved_cluster_id>"
    • context_id
      : "<saved_context_id>"
后续执行 复用上下文(速度更快、共享状态):
  • file_path
    : "scripts/validate_ingestion.py"
  • cluster_id
    : "<saved_cluster_id>"
  • context_id
    : "<saved_context_id>"

Handling Failures

故障处理

When execution fails:
  1. Read the error from the result
  2. Edit the local Python file to fix the issue
  3. Re-execute using the same
    cluster_id
    and
    context_id
    (faster, keeps installed libraries)
  4. If the context is corrupted, omit
    context_id
    to create a fresh one

执行失败时:
  1. 读取返回结果中的错误信息
  2. 编辑本地Python文件修复问题
  3. 使用相同的
    cluster_id
    context_id
    重新执行(速度更快,保留已安装的依赖库)
  4. 如果上下文损坏,省略
    context_id
    创建新的上下文即可

Installing Libraries

安装依赖库

Databricks provides Spark, pandas, numpy, and common data libraries by default. Only install a library if you get an import error.
Use
execute_databricks_command
tool:
  • code
    : "%pip install databricks-zerobus-ingest-sdk>=0.2.0"
  • cluster_id
    : "<cluster_id>"
  • context_id
    : "<context_id>"
The library is immediately available in the same context.
Note: Keeping the same
context_id
means installed libraries persist across calls.
Databricks默认提供Spark、pandas、numpy等常用数据处理库,仅当出现导入错误时才安装额外依赖
使用
execute_databricks_command
工具:
  • code
    : "%pip install databricks-zerobus-ingest-sdk>=0.2.0"
  • cluster_id
    : "<cluster_id>"
  • context_id
    : "<context_id>"
安装的依赖会在当前上下文中立即生效。
注意: 保持相同的
context_id
意味着安装的依赖会在多次调用中持久化。

🚨 Critical Learning: Timestamp Format Fix

🚨 关键注意点:时间戳格式修复

BREAKTHROUGH: ZeroBus requires timestamp fields as Unix integer timestamps, NOT string timestamps. The timestamp generation must use microseconds for Databricks.

重要说明:ZeroBus要求时间戳字段为Unix整数时间戳,不支持字符串格式的时间戳。Databricks的时间戳生成必须使用微秒级精度。

Key Concepts

核心概念

  • gRPC + Protobuf: Zerobus uses gRPC as its transport protocol. Any application that can communicate via gRPC and construct Protobuf messages can produce to Zerobus.
  • JSON or Protobuf serialization: JSON for quick starts; Protobuf for type safety, forward compatibility, and performance.
  • At-least-once delivery: The connector provides at-least-once guarantees. Design consumers to handle duplicates.
  • Durability ACKs: Each ingested record returns an offset. Use
    wait_for_offset(offset)
    to confirm durable write. ACKs indicate all records up to that offset have been durably written.
  • No table management: Zerobus does not create or alter tables. You must pre-create your target table and manage schema evolution yourself.
  • Single-AZ durability: The service runs in a single availability zone. Plan for potential zone outages.

  • gRPC + Protobuf:Zerobus使用gRPC作为传输协议,任何支持gRPC通信、可构造Protobuf消息的应用都可以作为Zerobus的生产者。
  • JSON或Protobuf序列化:JSON适合快速上手;Protobuf提供类型安全、向前兼容和更高性能。
  • 至少一次交付:连接器提供至少一次交付保证,消费者需设计去重逻辑。
  • 持久化ACK:每条摄取的记录都会返回一个偏移量,使用
    wait_for_offset(offset)
    可确认数据已持久化写入,ACK表示该偏移量之前的所有记录都已完成持久化。
  • 无表管理能力:Zerobus不会创建或修改表,你需要提前创建目标表并自行管理schema演进。
  • 单可用区持久化:服务运行在单个可用区,需提前规划可用区故障的应对方案。

Common Issues

常见问题

IssueSolution
Connection refusedVerify server endpoint format matches your cloud (AWS vs Azure). Check firewall allowlists.
Authentication failedConfirm service principal client_id/secret. Verify GRANT statements on the target table.
Schema mismatchEnsure record fields match the target table schema exactly. Regenerate .proto if table changed.
Stream closed unexpectedlyImplement retry with exponential backoff and stream reinitialization. See 5-operations-and-limits.md.
Throughput limits hitMax 100 MB/s and 15,000 rows/s per stream. Open multiple streams or contact Databricks.
Region not supportedCheck supported regions in 5-operations-and-limits.md.
Table not foundEnsure table is a managed Delta table in a supported region with correct three-part name.

问题解决方案
连接被拒绝确认服务端点格式匹配你的云厂商(AWS vs Azure),检查防火墙白名单配置。
身份认证失败确认服务主体的client_id/密钥正确,验证目标表的GRANT权限配置。
Schema不匹配确保记录字段与目标表schema完全一致,若表有变更请重新生成.proto文件。
流意外关闭实现指数退避重试和流重新初始化逻辑,参考5-operations-and-limits.md
触及吞吐量上限单流最大吞吐量为100 MB/s、15000行/秒,可开启多个流或联系Databricks支持。
区域不支持查看5-operations-and-limits.md中的支持区域列表。
找不到表确认表是支持区域内的托管Delta表,且使用正确的三段式名称。

Related Skills

相关技能

  • databricks-python-sdk - General SDK patterns and WorkspaceClient for table/schema management
  • databricks-spark-declarative-pipelines - Downstream pipeline processing of ingested data
  • databricks-unity-catalog - Managing catalogs, schemas, and tables that Zerobus writes to
  • databricks-synthetic-data-generation - Generate test data to feed into Zerobus producers
  • databricks-config - Profile and authentication setup
  • databricks-python-sdk - 通用SDK使用模式,以及用于表/schema管理的WorkspaceClient
  • databricks-spark-declarative-pipelines - 摄取后数据的下游流水线处理
  • databricks-unity-catalog - 管理Zerobus写入的目录、schema和表
  • databricks-synthetic-data-generation - 生成测试数据供Zerobus生产者使用
  • databricks-config - 配置和身份认证设置

Resources

资源