spark-declarative-pipelines
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLakeflow Spark Declarative Pipelines (SDP)
Lakeflow Spark声明式管道(SDP)
IMPORTANT: If this is a new pipeline (one does not already exist), see Quick Start. Be sure to use whatever language user has specified only (Python or SQL). Be sure to use Databricks Asset Bundles for new projects.
重要提示:如果是新建管道(当前不存在该管道),请查看快速入门。务必仅使用用户指定的语言(Python或SQL)。新项目请务必使用Databricks Asset Bundles。
Critical Rules (always follow)
核心规则(必须遵守)
- MUST confirm language as Python or SQL. Stick with that language unless told otherwise.
- MUST if not modifying an existing pipeline, use Quick Start below.
- MUST create serverless pipelines by default. ** Only use classic clusters if user explicitly requires R language, Spark RDD APIs, or JAR libraries.
- 必须确认使用的语言是Python还是SQL。除非另有说明,否则始终使用该语言。
- 必须如果不是修改现有管道,请使用下方的快速入门。
- 必须默认创建无服务器管道。仅当用户明确要求使用R语言、Spark RDD API或JAR库时,才使用经典集群。
Required Steps
必要步骤
Copy this checklist and verify each item:
- [ ] Language selected: Python or SQL
- [ ] Compute type decided: serverless or classic compute
- [ ] Decide on multiple catalogs or schemas vs. all in one default schema
- [ ] Consider what should be parameterized at the pipeline level to make deployment easy.
- [ ] Consider [Multi-Schema Patterns](#multi-schema-patterns) below, ask if unclear on best choices.
- [ ] Consider [Modern Defaults](#modern-defaults) below, ask if unclear on best choices.复制以下检查清单并逐一验证:
- [ ] 已选择语言:Python或SQL
- [ ] 已确定计算类型:无服务器或经典计算
- [ ] 已确定使用多目录/多Schema还是全部放在默认Schema中
- [ ] 考虑哪些内容需要在管道层面参数化,以便简化部署
- [ ] 参考下方的[多Schema模式](#多-schema-模式),若对最佳选择有疑问请询问用户
- [ ] 参考下方的[现代默认配置](#现代默认配置),若对最佳选择有疑问请询问用户Quick Start: Initialize New Pipeline Project
快速入门:初始化新管道项目
RECOMMENDED: Use to create production-ready Asset Bundle projects with multi-environment support.
databricks pipelines init推荐:使用命令创建支持多环境的生产级Asset Bundle项目。
databricks pipelines initWhen to Use Bundle Initialization
何时使用Bundle初始化
Use bundle initialization for New pipeline projects for a professional structure from the start
Use manual workflow for:
- Quick prototyping without multi-environment needs
- Existing manual projects you want to continue
- Learning/experimentation
对于新管道项目,使用Bundle初始化可以从一开始就搭建专业的项目结构
在以下场景使用手动工作流:
- 无需多环境支持的快速原型开发
- 希望继续维护的现有手动项目
- 学习/实验场景
Step 1: Initialize Project
步骤1:初始化项目
I will automatically run this command when you request a new pipeline:
bash
databricks pipelines initInteractive Prompts:
- Project name: e.g.,
customer_orders_pipeline - Initial catalog: Unity Catalog name (e.g., ,
main)prod_catalog - Personal schema per user?: for dev (each user gets their own schema),
yesfor prodno - Language: SQL or Python (auto-detected from your request - see language detection below)
Generated Structure:
my_pipeline/
├── databricks.yml # Multi-environment config (dev/prod)
├── resources/
│ └── *_etl.pipeline.yml # Pipeline resource definition
└── src/
└── *_etl/
├── explorations/ # Exploratory code in .ipynb
└── transformations/ # Your .sql or .py files here当你请求创建新管道时,我会自动运行以下命令:
bash
databricks pipelines init交互式提示:
- 项目名称:例如
customer_orders_pipeline - 初始目录:Unity Catalog名称(例如、
main)prod_catalog - 是否为每个用户分配独立Schema?:开发环境选(每个用户拥有独立Schema),生产环境选
yesno - 语言:SQL或Python(根据你的请求自动检测 - 详见下文的语言检测规则)
生成的项目结构:
my_pipeline/
├── databricks.yml # 多环境配置(开发/生产)
├── resources/
│ └── *_etl.pipeline.yml # 管道资源定义
└── src/
└── *_etl/
├── explorations/ # .ipynb格式的探索性代码
└── transformations/ # 存放你的.sql或.py文件Step 2: Customize Transformations
步骤2:自定义转换逻辑
Replace the example code created by the init process with custom transformation files in based on provided requirements, using best practice guidance from this skill.
src/transformations/For Python pipelines using cloudFiles: Ask the user where to store Auto Loader schema metadata. Recommend:
/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemas根据需求,将初始化过程生成的示例代码替换为目录下的自定义转换文件,并遵循本技能中的最佳实践指导。
src/transformations/对于使用cloudFiles的Python管道:询问用户Auto Loader Schema元数据的存储位置。推荐路径:
/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemasStep 3: Deploy and Run
步骤3:部署并运行
bash
undefinedbash
undefinedDeploy to workspace (dev by default)
部署到工作区(默认部署到开发环境)
databricks bundle deploy
databricks bundle deploy
Run pipeline
运行管道
databricks bundle run my_pipeline_etl
databricks bundle run my_pipeline_etl
Deploy to production
部署到生产环境
databricks bundle deploy --target prod
undefineddatabricks bundle deploy --target prod
undefinedQuick Reference
快速参考
| Concept | Details |
|---|---|
| Names | SDP = Spark Declarative Pipelines = LDP = Lakeflow Declarative Pipelines = Lakeflow Pipelines (all interchangeable) |
| Python Import | |
| Primary Decorators | |
| Temporary Views | |
| Replaces | Delta Live Tables (DLT) with |
| Based On | Apache Spark 4.1+ (Databricks' modern data pipeline framework) |
| Docs | https://docs.databricks.com/aws/en/ldp/developer/python-dev |
| 概念 | 详情 |
|---|---|
| 名称对应 | SDP = Spark Declarative Pipelines = LDP = Lakeflow Declarative Pipelines = Lakeflow Pipelines(以上名称可互换) |
| Python导入语句 | |
| 主要装饰器 | |
| 临时视图 | |
| 替代方案 | 替代使用 |
| 基于框架 | Apache Spark 4.1+(Databricks的现代数据管道框架) |
| 官方文档 | https://docs.databricks.com/aws/en/ldp/developer/python-dev |
Detailed guides
详细指南
Ingestion patterns: Use 1-ingestion-patterns.md when planning how to get new data into your Lakeflow pipeline —- covers file formats, batch/streaming options, and tips for incremental and full loads. (Keywords: Auto Loader, Kafka, Event Hub, Kinesis, file formats)
Streaming pipeline patterns: See 2-streaming-patterns.md for designing pipelines with streaming data sources, change data detection, triggers, and windowing. (Keywords: deduplication, windowing, stateful operations, joins)
SCD query patterns: See 3-scd-query-patterns.md for querying Slowly Changing Dimensions Type 2 history tables, including current state queries, point-in-time analysis, temporal joins, and change tracking. (Keywords: SCD Type 2 history tables, temporal joins, querying historical data)
Performance tuning: Use 4-performance-tuning.md for optimizing pipelines with Liquid Clustering, state management, and best practices for high-performance streaming workloads. (Keywords: Liquid Clustering, optimization, state management)
Python API reference: See 5-python-api.md for the modern (dp) API reference and migration from legacy API patterns. (Keywords: dp API, dlt API comparison)
pyspark.pipelinesdltDLT migration: Use 6-dlt-migration.md when migrating existing Delta Live Tables (DLT) pipelines to Spark Declarative Pipelines (SDP). (Keywords: migrating DLT pipelines to SDP)
Advanced configuration: See 7-advanced-configuration.md for advanced pipeline settings including development mode, continuous execution, notifications, Python dependencies, and custom cluster configurations. (Keywords: extra_settings parameter reference, examples)
Project initialization: Use 8-project-initialization.md for setting up new pipeline projects with , Asset Bundles, multi-environment deployments, and language detection logic. (Keywords: databricks pipelines init, Asset Bundles, language detection, migration guides)
databricks pipelines initAUTO CDC patterns: Use 9-auto_cdc.md for implementing Change Data Capture with AUTO CDC, including Slow Changing Dimensions (SCD Type 1 and Type 2) for tracking changes and deduplication. (Keywords: AUTO CDC, Slow Changing Dimension, SCD, SCD Type 1, SCD Type 2, change data capture, deduplication)
摄入模式:规划如何将新数据接入Lakeflow管道时,请参考1-ingestion-patterns.md —— 涵盖文件格式、批处理/流处理选项,以及增量和全量加载的技巧。(关键词:Auto Loader、Kafka、Event Hub、Kinesis、文件格式)
流管道模式:设计流数据源管道、变更数据检测、触发器和窗口操作时,请参考2-streaming-patterns.md。(关键词:去重、窗口操作、有状态操作、关联)
SCD查询模式:查询慢变维度Type 2历史表时,请参考3-scd-query-patterns.md,包括当前状态查询、时点分析、时态关联和变更跟踪。(关键词:SCD Type 2历史表、时态关联、历史数据查询)
性能调优:使用4-performance-tuning.md优化管道,包括Liquid Clustering、状态管理,以及高性能流处理工作负载的最佳实践。(关键词:Liquid Clustering、优化、状态管理)
Python API参考:参考5-python-api.md获取现代(dp) API参考,以及从旧版 API迁移的指南。(关键词:dp API、dlt API对比)
pyspark.pipelinesdltDLT迁移:将现有Delta Live Tables (DLT)管道迁移到Spark声明式管道(SDP)时,请参考6-dlt-migration.md。(关键词:将DLT管道迁移到SDP)
高级配置:参考7-advanced-configuration.md获取高级管道设置,包括开发模式、持续执行、通知、Python依赖和自定义集群配置。(关键词:extra_settings参数参考、示例)
项目初始化:参考8-project-initialization.md了解如何使用、Asset Bundles、多环境部署和语言检测逻辑搭建新管道项目。(关键词:databricks pipelines init、Asset Bundles、语言检测、迁移指南)
databricks pipelines initAUTO CDC模式:实现变更数据捕获时,请参考9-auto_cdc.md,包括慢变维度(SCD Type 1和Type 2)的变更跟踪和去重。(关键词:AUTO CDC、慢变维度、SCD、SCD Type 1、SCD Type 2、变更数据捕获、去重)
Workflow
工作流
-
Determine the task type:Setting up new project? → Read 8-project-initialization.md first Creating new pipeline? → Read 1-ingestion-patterns.md Creating stream table? → Read 2-streaming-patterns.md Querying SCD history tables? → Read 3-scd-query-patterns.md Implementing AUTO CDC or SCD? → Read 9-auto_cdc.md Performance issues? → Read 4-performance-tuning.md Using Python API? → Read 5-python-api.md Migrating from DLT? → Read 6-dlt-migration.md Advanced configuration? → Read 7-advanced-configuration.md Validating? → Read validation-checklist.md
-
Follow the instructions in the relevant guide
-
Repeat for next task type
-
确定任务类型:搭建新项目? → 先阅读8-project-initialization.md 创建新管道? → 阅读1-ingestion-patterns.md 创建流表? → 阅读2-streaming-patterns.md 查询SCD历史表? → 阅读3-scd-query-patterns.md 实现AUTO CDC或SCD? → 阅读9-auto_cdc.md 性能问题? → 阅读4-performance-tuning.md 使用Python API? → 阅读5-python-api.md 从DLT迁移? → 阅读6-dlt-migration.md 高级配置? → 阅读7-advanced-configuration.md 验证? → 阅读validation-checklist.md
-
遵循对应指南中的说明
-
重复上述步骤处理下一个任务
Official Documentation
官方文档
- Lakeflow Spark Declarative Pipelines Overview - Main documentation hub
- SQL Language Reference - SQL syntax for streaming tables and materialized views
- Python Language Reference - API
pyspark.pipelines - Loading Data - Auto Loader, Kafka, Kinesis ingestion
- Change Data Capture (CDC) - AUTO CDC, SCD Type 1/2
- Lakeflow Spark声明式管道概述 - 主文档中心
- SQL语言参考 - 流表和物化视图的SQL语法
- Python语言参考 - API
pyspark.pipelines - 数据加载 - Auto Loader、Kafka、Kinesis数据摄入
- 变更数据捕获(CDC) - AUTO CDC、SCD Type 1/2
Medallion Architecture Pattern
Medallion分层架构模式
Bronze Layer (Raw)
- Raw data ingested from sources in original format
- Minimal transformations (append-only, add metadata like ,
_ingested_at)_source_file - Single source of truth preserving data lineage
Silver Layer (Validated)
- Cleaned and validated data.
- Might deduplicate here with auto_cdc, but often wait until the final step for auto_cdc if possible.
- Business logic applied (type casting, quality checks, filtering invalid records)
- Enterprise view of key business entities
- Enables self-service analytics and ML
Gold Layer (Business-Ready)
- Aggregated, denormalized, project-specific tables
- Optimized for consumption (reporting, dashboards, BI tools)
- Fewer joins, read-optimized data models
- Kimball star schema tables - dim_<entity_name>, fact_<entity_name>
- Deduplication often happens here via Slow Changing Dimensions (SCD), using auto_cdc. Sometimes that will happen upstream in silver instead, such as when joining multiple tables or business users plan to query the table from silver.
Typical Flow (Can vary)
Bronze: read_files() or spark.readStream.format("cloudFiles") → streaming table
Silver: read bronze → filter/clean/validate → streaming table Gold: read silver → aggregate/denormalize → auto_cdc or materialized view
Bronze: read_files() or spark.readStream.format("cloudFiles") → streaming table
Silver: read bronze → filter/clean/validate → streaming table Gold: read silver → aggregate/denormalize → auto_cdc or materialized view
Sources:
- https://www.databricks.com/glossary/medallion-architecture
- https://docs.databricks.com/aws/en/lakehouse/medallion
- https://www.databricks.com/blog/2022/06/24/data-warehousing-modeling-techniques-and-their-implementation-on-the-databricks-lakehouse-platform.html
For medallion architecture (bronze/silver/gold), two approaches work:
- Flat with naming (template default): ,
bronze_*.sql,silver_*.sqlgold_*.sql - Subdirectories: ,
bronze/orders.sql,silver/cleaned.sqlgold/summary.sql
Both work with the glob pattern. Choose based on preference.
transformations/**See 8-project-initialization.md for complete details on bundle initialization, migration, and troubleshooting.
青铜层(原始数据)
- 从数据源以原始格式摄入的原始数据
- 仅做最小程度的转换(仅追加,添加、
_ingested_at等元数据)_source_file - 保留数据血缘的单一事实源
白银层(已验证数据)
- 经过清洗和验证的数据。
- 可在此处通过auto_cdc去重,但如果可能,通常会等到最后一步再执行auto_cdc。
- 应用业务逻辑(类型转换、质量检查、过滤无效记录)
- 关键业务实体的企业级视图
- 支持自助分析和机器学习
黄金层(业务就绪数据)
- 经过聚合、反规范化的项目特定表
- 针对消费场景优化(报表、仪表盘、BI工具)
- 减少关联操作,数据模型为读优化
- Kimball星型模型表 - dim_<实体名称>、fact_<实体名称>
- 通常通过慢变维度(SCD)在此处执行去重,使用auto_cdc。有时也会在白银层上游执行,例如当需要关联多个表或业务用户计划直接查询白银层表时。
典型流程(可按需调整)
青铜层:read_files()或spark.readStream.format("cloudFiles") → 流表
白银层:读取青铜层数据 → 过滤/清洗/验证 → 流表 黄金层:读取白银层数据 → 聚合/反规范化 → auto_cdc或物化视图
青铜层:read_files()或spark.readStream.format("cloudFiles") → 流表
白银层:读取青铜层数据 → 过滤/清洗/验证 → 流表 黄金层:读取白银层数据 → 聚合/反规范化 → auto_cdc或物化视图
参考来源:
- https://www.databricks.com/glossary/medallion-architecture
- https://docs.databricks.com/aws/en/lakehouse/medallion
- https://www.databricks.com/blog/2022/06/24/data-warehousing-modeling-techniques-and-their-implementation-on-the-databricks-lakehouse-platform.html
对于Medallion分层架构(青铜/白银/黄金),有两种实现方式:
- 带命名前缀的扁平结构(模板默认):、
bronze_*.sql、silver_*.sqlgold_*.sql - 子目录结构:、
bronze/orders.sql、silver/cleaned.sqlgold/summary.sql
两种方式都支持通配符模式。可根据偏好选择。
transformations/**有关Bundle初始化、迁移和故障排除的完整详情,请参考**8-project-initialization.md**。
General SDP development guidance
SDP开发通用指南
Step 1: Write Pipeline Files Locally
步骤1:在本地编写管道文件
Create or files in a local folder:
.sql.pymy_pipeline/
├── bronze/
│ ├── ingest_orders.sql # SQL (default for most cases)
│ └── ingest_events.py # Python (for complex logic)
├── silver/
│ └── clean_orders.sql
└── gold/
└── daily_summary.sqlSQL Example ():
bronze/ingest_orders.sqlsql
CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
*,
current_timestamp() AS _ingested_at,
_metadata.file_path AS _source_file
FROM read_files(
'/Volumes/catalog/schema/raw/orders/',
format => 'json',
schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);Python Example ():
bronze/ingest_events.pypython
from pyspark import pipelines as dp
from pyspark.sql.functions import col, current_timestamp在本地文件夹中创建或文件:
.sql.pymy_pipeline/
├── bronze/
│ ├── ingest_orders.sql # SQL(大多数场景的默认选择)
│ └── ingest_events.py # Python(用于复杂逻辑)
├── silver/
│ └── clean_orders.sql
└── gold/
└── daily_summary.sqlSQL示例():
bronze/ingest_orders.sqlsql
CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
*,
current_timestamp() AS _ingested_at,
_metadata.file_path AS _source_file
FROM read_files(
'/Volumes/catalog/schema/raw/orders/',
format => 'json',
schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);Python示例():
bronze/ingest_events.pypython
from pyspark import pipelines as dp
from pyspark.sql.functions import col, current_timestampGet schema location from pipeline configuration
从管道配置中获取Schema存储位置
schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_events", cluster_by=["event_date"])
def bronze_events():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_events")
.load("/Volumes/catalog/schema/raw/events/")
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", col("_metadata.file_path"))
)
**IMPORTANT for Python Pipelines**: When using `spark.readStream.format("cloudFiles")` for cloud storage ingestion, with schema inference (no schema specified), you **must specify a schema location**.
**Always ask the user** where to store Auto Loader schema metadata. Recommend:/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemas
Example: `/Volumes/my_catalog/pipeline_metadata/orders_pipeline_metadata/schemas`
**Never use the source data volume** - this causes permission conflicts. The schema location should be configured in the pipeline settings and accessed via `spark.conf.get("schema_location_base")`.
**Language Selection:**
**CRITICAL RULE**: If the user explicitly mentions "Python" in their request (e.g., "Python Spark Declarative Pipeline", "Python SDP", "use Python"), **ALWAYS use Python without asking**. The same applies to SQL - if they say "SQL pipeline", use SQL.
- **Explicit language request**: User says "Python" → Use Python. User says "SQL" → Use SQL. **Do not ask for clarification.**
- **Auto-detection** (only when no explicit language mentioned):
- **SQL indicators**: "sql files", "simple transformations", "aggregations", "materialized view", "CREATE OR REFRESH"
- **Python indicators**: ".py files", "UDF", "complex logic", "ML inference", "external API", "@dp.table", "pandas", "decorator"
- **Prompt for clarification** only when language intent is truly ambiguous (no explicit mention, mixed signals)
- **Default to SQL** only when ambiguous AND no Python indicators present
See **[8-project-initialization.md](8-project-initialization.md)** for detailed language detection logic.schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_events", cluster_by=["event_date"])
def bronze_events():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_events")
.load("/Volumes/catalog/schema/raw/events/")
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", col("_metadata.file_path"))
)
**Python管道重要提示**:当使用`spark.readStream.format("cloudFiles")`从云存储摄入数据且启用Schema推断(未指定Schema)时,**必须指定Schema存储位置**。
**请始终询问用户**Auto Loader Schema元数据的存储位置。推荐路径:/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemas
示例:`/Volumes/my_catalog/pipeline_metadata/orders_pipeline_metadata/schemas`
**绝不要使用源数据卷** - 这会导致权限冲突。Schema存储位置应在管道设置中配置,并通过`spark.conf.get("schema_location_base")`访问。
**语言选择规则:**
**核心规则**:如果用户在请求中明确提及"Python"(例如"Python Spark Declarative Pipeline"、"Python SDP"、"使用Python"),**始终使用Python,无需询问**。SQL同理 - 如果用户说"SQL pipeline",则使用SQL。
- **明确语言请求**:用户说"Python" → 使用Python;用户说"SQL" → 使用SQL。**无需进一步确认。**
- **自动检测**(仅当未明确提及语言时):
- **SQL标识**:"sql files"、"简单转换"、"聚合"、"物化视图"、"CREATE OR REFRESH"
- **Python标识**:".py files"、"UDF"、"复杂逻辑"、"ML推理"、"外部API"、"@dp.table"、"pandas"、"装饰器"
- **仅当语言意图真正模糊时**(未明确提及,信号混合)才提示确认
- **仅当模糊且无Python标识时**默认使用SQL
有关详细的语言检测逻辑,请参考**[8-project-initialization.md](8-project-initialization.md)**。Option 1: Pipelines with DABs:
选项1:基于DAB的管道:
Use asset bundles and pipeline CLI.
See Quick Start and 8-project-initialization.md for complete details.
使用Asset Bundles和管道CLI。
有关完整详情,请参考快速入门和**8-project-initialization.md**。
Option 2: Manual Workflow (Advanced)
选项2:手动工作流(高级)
For rapid prototyping, experimentation, or when you prefer direct control without Asset Bundles, use the manual workflow with MCP tools.
Use MCP tools to create, run, and iterate on serverless SDP pipelines. The primary tool is which handles the entire lifecycle.
create_or_update_pipelineIMPORTANT: Always create serverless pipelines (default). Only use classic clusters if user explicitly ask for classic, pro, advances compute or requires R language, Spark RDD APIs, or JAR libraries.
See 10-mcp-approach.md for detailed guide.
对于快速原型开发、实验或偏好不使用Asset Bundles直接控制的场景,可使用MCP工具的手动工作流。
使用MCP工具创建、运行和迭代无服务器SDP管道。核心工具是,它处理整个生命周期。
create_or_update_pipeline**重要提示:始终默认创建无服务器管道。**仅当用户明确要求经典、专业或高级计算,或需要使用R语言、Spark RDD API或JAR库时,才使用经典集群。
有关详细指南,请参考**10-mcp-approach.md**。
Best Practices (2026)
最佳实践(2026)
Project Structure
项目结构
- Default to for new projects (creates Asset Bundle)
databricks pipelines init - Use Asset Bundles for multi-environment deployments (dev/staging/prod)
- Manual structure only for quick prototypes or legacy migration
- Medallion architecture: Two approaches work with Asset Bundles:
- Flat structure (template default): ,
bronze_*.sql,silver_*.sqlingold_*.sqltransformations/ - Subdirectories: ,
transformations/bronze/,transformations/silver/transformations/gold/ - Both work with the glob pattern - choose based on team preference
transformations/**
- Flat structure (template default):
- See 8-project-initialization.md for project setup details
- **默认使用**创建新项目(生成Asset Bundle)
databricks pipelines init - 使用Asset Bundles实现多环境部署(开发/预发布/生产)
- 仅在快速原型或遗留系统迁移时使用手动结构
- Medallion分层架构:Asset Bundles支持两种实现方式:
- 扁平结构(模板默认):目录下的
transformations/、bronze_*.sql、silver_*.sqlgold_*.sql - 子目录结构:、
transformations/bronze/、transformations/silver/transformations/gold/ - 两种方式都支持通配符模式 - 可根据团队偏好选择
transformations/**
- 扁平结构(模板默认):
- 有关项目设置的详细信息,请参考**8-project-initialization.md**
Minimal pipeline config pointers
管道配置要点
- Define parameters in your pipeline’s configuration and access them in code with spark.conf.get("key").
- In Databricks Asset Bundles, set these under resources.pipelines.<pipeline>.configuration; validate with databricks bundle validate.
- 在管道配置中定义参数,并通过spark.conf.get("key")在代码中访问。
- 在Databricks Asset Bundles中,将参数添加到下;使用
resources.pipelines.<pipeline>.configuration验证。databricks bundle validate
Modern Defaults
现代默认配置
- CLUSTER BY (Liquid Clustering), not PARTITION BY - see 4-performance-tuning.md
- Raw /
.sqlfiles, not notebooks.py - Serverless compute ONLY - Do not use classic clusters unless explicitly required
- Unity Catalog (required for serverless)
- read_files() when using SQL for cloud storage ingestion - see 1-ingestion-patterns.md
- 使用CLUSTER BY(Liquid Clustering),而非PARTITION BY - 参考4-performance-tuning.md
- 使用原生/
.sql文件,而非笔记本.py - 仅使用无服务器计算 - 除非明确要求,否则不使用经典集群
- 使用Unity Catalog(无服务器管道必需)
- **使用read_files()**在SQL中实现云存储摄入 - 参考1-ingestion-patterns.md
Multi-Schema Patterns
多Schema模式
Default: Single target schema per pipeline. Each pipeline has one target and where all tables are written.
catalogschema**默认:每个管道对应一个目标Schema。**每个管道有一个目标和,所有表都写入该位置。
catalogschemaOption 1: Single Pipeline, Single Schema with Prefixes (Recommended)
选项1:单管道、单Schema加前缀(推荐)
Use one schema with table name prefixes to distinguish layers:
python
undefined使用一个Schema,通过表名前缀区分不同层级:
python
undefinedAll tables write to: catalog.schema.bronze_, silver_, gold_*
所有表写入:catalog.schema.bronze_、silver_、gold_*
@dp.table(name="bronze_orders") # → catalog.schema.bronze_orders
@dp.table(name="silver_orders") # → catalog.schema.silver_orders
@dp.table(name="gold_summary") # → catalog.schema.gold_summary
**Advantages:**
- Simpler configuration (one pipeline)
- All tables in one schema for easy discovery@dp.table(name="bronze_orders") # → catalog.schema.bronze_orders
@dp.table(name="silver_orders") # → catalog.schema.silver_orders
@dp.table(name="gold_summary") # → catalog.schema.gold_summary
**优势:**
- 配置更简单(仅一个管道)
- 所有表在一个Schema中,便于发现Option 2:
选项2:
Use varaiables to specific separate catalog and/or schema for different steps.
Below are Python SDP examples that source variables from pipeline configs via spark.conf.get, and use the default catalog/schema for bronze.
使用变量为不同步骤指定独立的catalog和/或schema。
以下是Python SDP示例,通过spark.conf.get从管道配置中获取变量,青铜层使用默认的catalog/schema。
Same catalog, separate schemas; bronze uses pipeline defaults
同一catalog,不同schema;青铜层使用管道默认配置
- Set your pipeline’s default catalog and default schema to the bronze layer (for example, catalog=my_catalog, schema=bronze). When you omit catalog/schema in code, reads/writes go to these defaults.
- Use pipeline parameters for the other schemas and any source schema/path, retrieved in code with spark.conf.get(...).
python
from pyspark import pipelines as dp
from pyspark.sql.functions import col- 将管道的默认catalog和默认schema设置为青铜层(例如catalog=my_catalog,schema=bronze)。当代码中省略catalog/schema时,读写操作将使用这些默认值。
- 使用管道参数配置其他schema和源schema/路径,在代码中通过spark.conf.get(...)获取。
python
from pyspark import pipelines as dp
from pyspark.sql.functions import colPull variables from pipeline configuration parameters
从管道配置参数中获取变量
silver_schema = spark.conf.get("silver_schema") # e.g., "silver"
gold_schema = spark.conf.get("gold_schema") # e.g., "gold"
landing_schema = spark.conf.get("landing_schema") # e.g., "landing"
silver_schema = spark.conf.get("silver_schema") # 例如"silver"
gold_schema = spark.conf.get("gold_schema") # 例如"gold"
landing_schema = spark.conf.get("landing_schema") # 例如"landing"
Bronze → uses default catalog/schema (set to bronze in pipeline settings)
青铜层 → 使用默认catalog/schema(在管道设置中配置为青铜层)
@dp.table(name="orders_bronze")
def orders_bronze():
# Read from another schema in the same default catalog
return spark.readStream.table(f"{landing_schema}.orders_raw")
@dp.table(name="orders_bronze")
def orders_bronze():
# 从同一默认catalog下的另一个schema读取数据
return spark.readStream.table(f"{landing_schema}.orders_raw")
Silver → same catalog, schema from parameter
白银层 → 同一catalog,schema来自参数
@dp.table(name=f"{silver_schema}.orders_clean")
def orders_clean():
return (spark.read.table("orders_bronze") # unqualified = default catalog/schema
.filter(col("order_id").isNotNull()))
@dp.table(name=f"{silver_schema}.orders_clean")
def orders_clean():
return (spark.read.table("orders_bronze") # 非限定名称 = 默认catalog/schema
.filter(col("order_id").isNotNull()))
Gold → same catalog, schema from parameter
黄金层 → 同一catalog,schema来自参数
@dp.materialized_view(name=f"{gold_schema}.orders_by_date")
def orders_by_date():
return (spark.read.table(f"{silver_schema}.orders_clean")
.groupBy("order_date")
.count().withColumnRenamed("count", "order_count"))
- Using unqualified names for bronze ensures it lands in the pipeline’s default catalog/schema; silver/gold are explicitly schema-qualified within the same catalog.
---@dp.materialized_view(name=f"{gold_schema}.orders_by_date")
def orders_by_date():
return (spark.read.table(f"{silver_schema}.orders_clean")
.groupBy("order_date")
.count().withColumnRenamed("count", "order_count"))
- 青铜层使用非限定名称确保数据写入管道的默认catalog/schema;白银/黄金层在同一catalog内使用明确的schema限定名称。
---Custom catalog/schema per layer; bronze still uses pipeline defaults
每个层级使用自定义catalog/schema;青铜层仍使用管道默认配置
- Keep bronze in the pipeline defaults (default catalog/schema set to your bronze layer). For silver/gold, use fully-qualified names with catalog and schema variables from pipeline configuration.
python
from pyspark import pipelines as dp
from pyspark.sql.functions import col- 青铜层保留管道默认配置(默认catalog/schema设置为青铜层)。对于白银/黄金层,使用来自管道配置的catalog和schema变量拼接完全限定名称。
python
from pyspark import pipelines as dp
from pyspark.sql.functions import colPull variables from pipeline configuration parameters
从管道配置参数中获取变量
silver_catalog = spark.conf.get("silver_catalog") # e.g., "my_catalog"
silver_schema = spark.conf.get("silver_schema") # e.g., "silver"
gold_catalog = spark.conf.get("gold_catalog") # e.g., "my_catalog"
gold_schema = spark.conf.get("gold_schema") # e.g., "gold"
landing_catalog = spark.conf.get("landing_catalog") # optional, if source is in another catalog
landing_schema = spark.conf.get("landing_schema")
silver_catalog = spark.conf.get("silver_catalog") # 例如"my_catalog"
silver_schema = spark.conf.get("silver_schema") # 例如"silver"
gold_catalog = spark.conf.get("gold_catalog") # 例如"my_catalog"
gold_schema = spark.conf.get("gold_schema") # 例如"gold"
landing_catalog = spark.conf.get("landing_catalog") # 可选,如果源数据在其他catalog中
landing_schema = spark.conf.get("landing_schema")
Bronze → uses default catalog/schema (set to bronze)
青铜层 → 使用默认catalog/schema(设置为青铜层)
@dp.table(name="orders_bronze")
def orders_bronze():
# If source is in a specified catalog/schema:
return spark.readStream.table(f"{landing_catalog}.{landing_schema}.orders_raw")
@dp.table(name="orders_bronze")
def orders_bronze():
# 如果源数据在指定的catalog/schema中:
return spark.readStream.table(f"{landing_catalog}.{landing_schema}.orders_raw")
Silver → custom catalog + schema via parameters
白银层 → 通过参数指定自定义catalog + schema
@dp.table(name=f"{silver_catalog}.{silver_schema}.orders_clean")
def orders_clean():
# Read bronze by its unqualified name (defaults), or fully qualify if preferred
return (spark.read.table("orders_bronze")
.filter(col("order_id").isNotNull()))
@dp.table(name=f"{silver_catalog}.{silver_schema}.orders_clean")
def orders_clean():
# 使用非限定名称读取青铜层数据(默认配置),也可选择完全限定名称
return (spark.read.table("orders_bronze")
.filter(col("order_id").isNotNull()))
Gold → custom catalog + schema via parameters
黄金层 → 通过参数指定自定义catalog + schema
@dp.materialized_view(name=f"{gold_catalog}.{gold_schema}.orders_by_date}")
def orders_by_date():
return (spark.read.table(f"{silver_catalog}.{silver_schema}.orders_clean")
.groupBy("order_date")
.count().withColumnRenamed("count", "order_count"))
- Multipart names in the decorator’s name argument let you publish to explicit catalog.schema targets within one pipeline.
- Unqualified reads/writes use the pipeline defaults; use fully-qualified names when crossing catalogs or when you need explicit namespace control.
---
**Note:** The `@dp.table()` decorator does not currently support separate for `schema=` or `catalog=` parameters. The table parameter is a string that contains the catalog.schema.table_name, or it can leave off catalog and or schema to use the pipeilnes configured default target schema.@dp.materialized_view(name=f"{gold_catalog}.{gold_schema}.orders_by_date")
def orders_by_date():
return (spark.read.table(f"{silver_catalog}.{silver_schema}.orders_clean")
.groupBy("order_date")
.count().withColumnRenamed("count", "order_count"))
- 装饰器的name参数中使用多部分名称,可在一个管道内将表发布到明确的catalog.schema目标位置。
- 非限定读写操作使用管道默认配置;跨catalog或需要明确命名空间控制时使用完全限定名称。
---
**注意:**`@dp.table()`装饰器目前不支持单独的`schema=`或`catalog=`参数。table参数是一个包含catalog.schema.table_name的字符串,也可省略catalog和/或schema以使用管道配置的默认目标schema。Reading Tables in Python
Python中读取表的最佳实践
Modern SDP Best Practice:
- Use for batch reads
spark.read.table() - Use for streaming reads
spark.readStream.table() - Don't use or
dp.read()(old syntax, no longer documented)dp.read_stream() - Don't use or
dlt.read()(legacy DLT API)dlt.read_stream()
Key Point: SDP automatically tracks table dependencies from standard Spark DataFrame operations. No special read APIs are needed.
现代SDP最佳实践:
- 使用进行批处理读取
spark.read.table() - 使用进行流处理读取
spark.readStream.table() - 不要使用或
dp.read()(旧语法,已不再文档化)dp.read_stream() - 不要使用或
dlt.read()(旧版DLT API)dlt.read_stream()
**关键点:**SDP会自动跟踪标准Spark DataFrame操作的表依赖关系。无需使用特殊的读取API。
Three-Tier Identifier Resolution
三级标识符解析
SDP supports three levels of table name qualification:
| Level | Syntax | When to Use |
|---|---|---|
| Unqualified | | Reading tables within the same pipeline's target catalog/schema (recommended) |
| Partially-qualified | | Reading from different schema in same catalog |
| Fully-qualified | | Reading from external catalogs/schemas |
SDP支持三级表名限定:
| 级别 | 语法 | 使用场景 |
|---|---|---|
| 非限定名称 | | 读取同一管道目标catalog/schema内的表(推荐) |
| 部分限定名称 | | 读取同一catalog下其他schema中的表 |
| 完全限定名称 | | 读取外部catalog/schema中的表 |
Option 1: Unqualified Names (Recommended for Pipeline Tables)
选项1:非限定名称(推荐用于管道内表)
Best practice for tables within the same pipeline. SDP resolves unqualified names to the pipeline's configured target catalog and schema. This makes code portable across environments (dev/prod).
python
@dp.table(name="silver_clean")
def silver_clean():
# Reads from pipeline's target catalog/schema (e.g., dev_catalog.dev_schema.bronze_raw)
return (
spark.read.table("bronze_raw")
.filter(F.col("valid") == True)
)
@dp.table(name="silver_events")
def silver_events():
# Streaming read from same pipeline's bronze_events table
return (
spark.readStream.table("bronze_events")
.withColumn("processed_at", F.current_timestamp())
)同一管道内创建的表的最佳实践。SDP会将非限定名称解析为管道配置的目标catalog和schema。这使得代码在不同环境间可移植。
python
@dp.table(name="silver_clean")
def silver_clean():
# 读取管道目标catalog/schema中的表(例如dev_catalog.dev_schema.bronze_raw)
return (
spark.read.table("bronze_raw")
.filter(F.col("valid") == True)
)
@dp.table(name="silver_events")
def silver_events():
# 读取同一管道内的bronze_events表的流数据
return (
spark.readStream.table("bronze_events")
.withColumn("processed_at", F.current_timestamp())
)Option 2: Pipeline Parameters (For External Sources)
选项2:管道参数(用于外部源)
Use to parameterize external catalog/schema references. Define parameters in pipeline configuration, then reference them at the module level.
spark.conf.get()python
from pyspark import pipelines as dp
from pyspark.sql import functions as F使用参数化外部catalog/schema引用。在管道配置中定义参数,然后在模块级别引用。
spark.conf.get()python
from pyspark import pipelines as dp
from pyspark.sql import functions as FGet parameterized values at module level (evaluated once at pipeline start)
在模块级别获取参数值(管道启动时仅计算一次)
source_catalog = spark.conf.get("source_catalog")
source_schema = spark.conf.get("source_schema", "sales") # with default
@dp.table(name="transaction_summary")
def transaction_summary():
return (
spark.read.table(f"{source_catalog}.{source_schema}.transactions")
.groupBy("account_id")
.agg(
F.count("txn_id").alias("txn_count"),
F.sum("txn_amount").alias("account_revenue")
)
)
**Configure parameters in pipeline settings:**
- **Asset Bundles**: Add to `pipeline.yml` under `configuration:`
- **Manual/MCP**: Pass via `extra_settings.configuration` dict
```yamlsource_catalog = spark.conf.get("source_catalog")
source_schema = spark.conf.get("source_schema", "sales") # 带默认值
@dp.table(name="transaction_summary")
def transaction_summary():
return (
spark.read.table(f"{source_catalog}.{source_schema}.transactions")
.groupBy("account_id")
.agg(
F.count("txn_id").alias("txn_count"),
F.sum("txn_amount").alias("account_revenue")
)
)
**在管道设置中配置参数:**
- **Asset Bundles**:添加到`pipeline.yml`的`configuration:`下
- **手动/MCP**:通过`extra_settings.configuration`字典传递
```yamlIn resources/my_pipeline.pipeline.yml
在resources/my_pipeline.pipeline.yml中
configuration:
source_catalog: "shared_catalog"
source_schema: "sales"
undefinedconfiguration:
source_catalog: "shared_catalog"
source_schema: "sales"
undefinedOption 3: Fully-Qualified Names (For Fixed External References)
选项3:完全限定名称(用于固定外部引用)
Use when referencing specific external tables that don't change across environments:
python
@dp.table(name="enriched_orders")
def enriched_orders():
# Pipeline-internal table (unqualified)
orders = spark.read.table("bronze_orders")
# External reference table (fully-qualified)
products = spark.read.table("shared_catalog.reference.products")
return orders.join(products, "product_id")当引用跨环境位置固定的共享/参考表时使用:
python
@dp.table(name="enriched_orders")
def enriched_orders():
# 管道内表(非限定名称)
orders = spark.read.table("bronze_orders")
# 外部参考表(完全限定名称)
products = spark.read.table("shared_catalog.reference.products")
return orders.join(products, "product_id")Choosing the Right Approach
选择合适的方式
| Scenario | Recommended Approach |
|---|---|
| Reading tables created in same pipeline | Unqualified names - portable, uses target catalog/schema |
| Reading from external source that varies by environment | Pipeline parameters - configurable per deployment |
| Reading from shared/reference tables with fixed location | Fully-qualified names - explicit and clear |
| Mixed pipeline (some internal, some external) | Combine approaches - unqualified for internal, parameters for external |
| 场景 | 推荐方式 |
|---|---|
| 读取同一管道内创建的表 | 非限定名称 - 可移植,使用目标catalog/schema |
| 读取跨环境变化的外部源 | 管道参数 - 可按部署环境配置 |
| 读取位置固定的共享/参考表 | 完全限定名称 - 明确清晰 |
| 混合管道(部分内部表,部分外部表) | 组合方式 - 内部表使用非限定名称,外部源使用参数 |
Common Issues
常见问题
| Issue | Solution |
|---|---|
| Empty output tables | Use |
| Pipeline stuck INITIALIZING | Normal for serverless, wait a few minutes |
| "Column not found" | Check |
| Streaming reads fail | For file ingestion in a streaming table, you must use the |
| Timeout during run | Increase |
| MV doesn't refresh | Enable row tracking on source tables |
| SCD2: query column not found | Lakeflow uses |
| AUTO CDC parse error at APPLY/SEQUENCE | Put |
| "Cannot create streaming table from batch query" | In a streaming table query, use |
For detailed errors, the from includes suggested next steps. Use for full stack traces.
result["message"]create_or_update_pipelineget_pipeline_events(pipeline_id=...)| 问题 | 解决方案 |
|---|---|
| 输出表为空 | 使用 |
| 管道卡在INITIALIZING状态 | 无服务器管道的正常现象,请等待几分钟 |
| "列未找到" | 检查 |
| 流读取失败 | 对于流表中的文件摄入,必须在 |
| 运行时超时 | 增加 |
| 物化视图不刷新 | 启用源表的行跟踪功能 |
| SCD2:查询列未找到 | Lakeflow使用 |
| AUTO CDC在APPLY/SEQUENCE处解析错误 | 将 |
| "无法从批处理查询创建流表" | 在流表查询中,使用 |
如需详细错误信息,返回的包含建议的下一步操作。使用获取完整堆栈跟踪。
create_or_update_pipelineresult["message"]get_pipeline_events(pipeline_id=...)Advanced Pipeline Configuration
高级管道配置
For advanced configuration options (development mode, continuous pipelines, custom clusters, notifications, Python dependencies, etc.), see 7-advanced-configuration.md.
有关高级配置选项(开发模式、持续管道、自定义集群、通知、Python依赖等),请参考**7-advanced-configuration.md**。
Platform Constraints
平台限制
Serverless Pipeline Requirements (Default)
无服务器管道要求(默认)
| Requirement | Details |
|---|---|
| Unity Catalog | Required - serverless pipelines always use UC |
| Workspace Region | Must be in serverless-enabled region |
| Serverless Terms | Must accept serverless terms of use |
| CDC Features | Requires serverless (or Pro/Advanced with classic clusters) |
| 要求 | 详情 |
|---|---|
| Unity Catalog | 必需 - 无服务器管道始终使用UC |
| 工作区区域 | 必须在支持无服务器的区域 |
| 无服务器条款 | 必须接受无服务器服务条款 |
| CDC功能 | 需要无服务器(或使用经典集群的专业/高级版) |
Serverless Limitations (When Classic Clusters Required)
无服务器限制(需要使用经典集群的场景)
| Limitation | Workaround |
|---|---|
| R language | Not supported - use classic clusters if required |
| Spark RDD APIs | Not supported - use classic clusters if required |
| JAR libraries | Not supported - use classic clusters if required |
| Maven coordinates | Not supported - use classic clusters if required |
| DBFS root access | Limited - must use Unity Catalog external locations |
| Global temp views | Not supported |
| 限制 | 替代方案 |
|---|---|
| R语言 | 不支持 - 若需要则使用经典集群 |
| Spark RDD API | 不支持 - 若需要则使用经典集群 |
| JAR库 | 不支持 - 若需要则使用经典集群 |
| Maven坐标 | 不支持 - 若需要则使用经典集群 |
| DBFS根目录访问 | 受限 - 必须使用Unity Catalog外部位置 |
| 全局临时视图 | 不支持 |
General Constraints
通用限制
| Constraint | Details |
|---|---|
| Schema Evolution | Streaming tables require full refresh for incompatible changes |
| SQL Limitations | PIVOT clause unsupported |
| Sinks | Python only, streaming only, append flows only |
Default to serverless unless user explicitly requires R, RDD APIs, or JAR libraries.
| 限制 | 详情 |
|---|---|
| Schema演化 | 流表的不兼容变更需要全量刷新 |
| SQL限制 | 不支持PIVOT子句 |
| 输出端 | 仅支持Python、仅支持流处理、仅支持追加流 |
默认使用无服务器管道,除非用户明确要求使用R语言、RDD API或JAR库。