data-pipelines
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseYou are building data pipelines using a DuckDB-centric stack. The tools, in typical execution order: dlt (extract + load) → sqlmesh (transform) → DuckDB/MotherDuck (query engine) → polars (DataFrame work) → marimo (notebooks/apps). uv manages Python projects and dependencies.
你将构建以DuckDB为核心的技术栈来搭建数据流水线。各工具的典型执行顺序为:dlt(提取+加载)→ sqlmesh(转换)→ DuckDB/MotherDuck(查询引擎)→ polars(DataFrame处理)→ marimo(笔记本/应用)。uv用于管理Python项目及依赖。
Language Preference
语言偏好
SQL first (DuckDB dialect), then Python, then bash. Use the simplest language that gets the job done.
优先使用SQL(DuckDB方言),其次是Python,最后是bash。选择能完成任务的最简单语言。
uv — Project Management
uv — 项目管理
Never use pip directly. All Python work goes through uv.
bash
uv init my-project # New project
uv add "dlt[duckdb]" sqlmesh polars # Add dependencies
uv sync # Install into .venv
uv run python pipeline.py # Run in project venv
uv run --with requests script.py # Ad-hoc dependencyInline script dependencies (PEP 723) for standalone scripts:
python
undefined切勿直接使用pip。所有Python工作都需通过uv完成。
bash
uv init my-project # New project
uv add "dlt[duckdb]" sqlmesh polars # Add dependencies
uv sync # Install into .venv
uv run python pipeline.py # Run in project venv
uv run --with requests script.py # Ad-hoc dependency独立脚本的内联依赖声明(PEP 723):
python
undefined/// script
/// script
dependencies = ["dlt[duckdb]", "polars"]
dependencies = ["dlt[duckdb]", "polars"]
requires-python = ">=3.12"
requires-python = ">=3.12"
///
///
Run with `uv run script.py` — deps are resolved automatically.
Always commit `uv.lock`. Use `pyproject.toml` for dependency declarations, never `requirements.txt`.
使用`uv run script.py`运行——依赖会自动解析。
务必提交`uv.lock`文件。使用`pyproject.toml`声明依赖,切勿使用`requirements.txt`。dlt — Extract + Load
dlt — 提取+加载
dlt handles ingestion: API calls, pagination, schema inference, incremental loading, and state management.
dlt负责数据摄入:API调用、分页、 schema推断、增量加载及状态管理。
Scaffold and Run
脚手架搭建与运行
bash
dlt init rest_api duckdb # Scaffold pipeline
uv run python pipeline.py # Run extraction
dlt pipeline <name> info # Inspect state
dlt pipeline <name> schema # View inferred schemabash
dlt init rest_api duckdb # Scaffold pipeline
uv run python pipeline.py # Run extraction
dlt pipeline <name> info # Inspect state
dlt pipeline <name> schema # View inferred schemaPipeline Patterns
流水线模式
Minimal pipeline:
python
import dlt
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="duckdb",
dataset_name="raw",
)
info = pipeline.run(data, table_name="events")Incremental loading:
python
@dlt.resource(write_disposition="merge", primary_key="id")
def users(updated_at=dlt.sources.incremental("updated_at")):
yield from fetch_users(since=updated_at.last_value)REST API source (declarative):
python
from dlt.sources.rest_api import rest_api_source
source = rest_api_source({
"client": {"base_url": "https://api.example.com/v1"},
"resource_defaults": {"primary_key": "id", "write_disposition": "merge"},
"resources": [
"users",
{
"name": "events",
"write_disposition": "append",
"endpoint": {
"path": "events",
"incremental": {"cursor_path": "created_at", "initial_value": "2024-01-01"},
},
},
],
})Write dispositions:
| Disposition | Behavior | Use For |
|---|---|---|
| Insert rows (default) | Immutable events, logs |
| Drop and recreate | Small lookup tables |
| Upsert by | Mutable records |
Destinations: (local file), (cloud). Set env var or configure in .
duckdbmotherduckmotherduck_token.dlt/secrets.toml最简流水线:
python
import dlt
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="duckdb",
dataset_name="raw",
)
info = pipeline.run(data, table_name="events")增量加载:
python
@dlt.resource(write_disposition="merge", primary_key="id")
def users(updated_at=dlt.sources.incremental("updated_at")):
yield from fetch_users(since=updated_at.last_value)REST API数据源(声明式):
python
from dlt.sources.rest_api import rest_api_source
source = rest_api_source({
"client": {"base_url": "https://api.example.com/v1"},
"resource_defaults": {"primary_key": "id", "write_disposition": "merge"},
"resources": [
"users",
{
"name": "events",
"write_disposition": "append",
"endpoint": {
"path": "events",
"incremental": {"cursor_path": "created_at", "initial_value": "2024-01-01"},
},
},
],
})写入策略:
| Disposition | 行为 | 适用场景 |
|---|---|---|
| 插入行(默认) | 不可变事件、日志 |
| 删除并重建表 | 小型查找表 |
| 按 | 可变记录 |
目标存储: (本地文件)、(云端)。设置环境变量,或在中配置。
duckdbmotherduckmotherduck_token.dlt/secrets.tomlProject Structure
项目结构
.dlt/
config.toml # Pipeline config
secrets.toml # Credentials (gitignored)
<source>_pipeline.py.dlt/
config.toml # Pipeline config
secrets.toml # Credentials (gitignored)
<source>_pipeline.pysqlmesh — Transform
sqlmesh — 转换
SQL-first transformation framework. Models are SQL files with a header block. Plan/apply workflow — no accidental production changes.
优先使用SQL的转换框架。模型是带有头部块的SQL文件。采用规划/应用工作流——避免意外修改生产环境数据。
Scaffold and Run
脚手架搭建与运行
bash
sqlmesh init duckdb # New project
sqlmesh init -t dlt --dlt-pipeline <name> # From dlt schema
sqlmesh plan # Preview + apply (dev)
sqlmesh plan prod # Promote to production
sqlmesh fetchdf "SELECT * FROM analytics.users" # Ad-hoc query
sqlmesh test # Run unit tests
sqlmesh ui # Web interfacebash
sqlmesh init duckdb # New project
sqlmesh init -t dlt --dlt-pipeline <name> # From dlt schema
sqlmesh plan # Preview + apply (dev)
sqlmesh plan prod # Promote to production
sqlmesh fetchdf "SELECT * FROM analytics.users" # Ad-hoc query
sqlmesh test # Run unit tests
sqlmesh ui # Web interfaceModel Kinds
模型类型
| Kind | Behavior | Use For |
|---|---|---|
| Rewrite entire table | Small dimension tables |
| Process new time intervals | Facts, events, logs |
| Upsert by key | Mutable dimensions |
| Static CSV data | Reference/lookup data |
| SQL view | Simple pass-throughs |
| Slowly changing dimensions | Historical tracking |
| Kind | 行为 | 适用场景 |
|---|---|---|
| 重写整个表 | 小型维度表 |
| 处理新的时间区间 | 事实表、事件、日志 |
| 按键进行Upsert | 可变维度表 |
| 静态CSV数据 | 参考/查找数据 |
| SQL视图 | 简单透传 |
| 缓慢变化维度 | 历史追踪 |
Model Example
模型示例
sql
MODEL (
name analytics.stg_events,
kind INCREMENTAL_BY_TIME_RANGE (time_column event_date),
cron '@daily',
grain (event_id),
audits (NOT_NULL(columns=[event_id]))
);
SELECT
event_id,
user_id,
event_type,
event_date
FROM raw.events
WHERE event_date BETWEEN @start_date AND @end_datesql
MODEL (
name analytics.stg_events,
kind INCREMENTAL_BY_TIME_RANGE (time_column event_date),
cron '@daily',
grain (event_id),
audits (NOT_NULL(columns=[event_id]))
);
SELECT
event_id,
user_id,
event_type,
event_date
FROM raw.events
WHERE event_date BETWEEN @start_date AND @end_dateConfig (config.yaml
)
config.yaml配置文件 (config.yaml
)
config.yamlyaml
gateways:
local:
connection:
type: duckdb
database: db.duckdb
default_gateway: local
model_defaults:
dialect: duckdbyaml
gateways:
local:
connection:
type: duckdb
database: db.duckdb
default_gateway: local
model_defaults:
dialect: duckdbdlt Integration
与dlt集成
sqlmesh init -t dltsqlmesh plansqlmesh init -t dltsqlmesh planDuckDB — Query Engine
DuckDB — 查询引擎
DuckDB is the shared SQL engine across the entire stack. Use DuckDB-specific syntax freely.
DuckDB是整个技术栈共享的SQL引擎。可自由使用DuckDB特定语法。
CLI
命令行界面
bash
duckdb # In-memory
duckdb my_data.db # Persistent local
duckdb md:my_db # MotherDuck
duckdb -c "SELECT 42" # One-shotbash
duckdb # In-memory
duckdb my_data.db # Persistent local
duckdb md:my_db # MotherDuck
duckdb -c "SELECT 42" # One-shotDuckDB SQL Syntax
DuckDB SQL语法
Friendly SQL:
sql
FROM my_table; -- Implicit SELECT *
FROM my_table SELECT col1, col2 WHERE col3 > 5; -- FROM-first
SELECT * EXCLUDE (internal_id) FROM events; -- Drop columns
SELECT * REPLACE (amount / 100.0 AS amount) FROM txns; -- Transform in-place
SELECT category, SUM(amount) FROM sales GROUP BY ALL; -- Infer GROUP BYRead files directly (no import step):
sql
SELECT * FROM 'data.parquet';
SELECT * FROM read_csv('data.csv', header=true);
SELECT * FROM 's3://bucket/path/*.parquet';
COPY (SELECT * FROM events) TO 'output.parquet' (FORMAT PARQUET);Nested types:
sql
SELECT {'name': 'Alice', 'age': 30} AS person;
SELECT [1, 2, 3] AS nums;
SELECT list_filter([1, 2, 3, 4], x -> x > 2);Useful commands:
sql
DESCRIBE SELECT * FROM events;
SUMMARIZE events;便捷SQL:
sql
FROM my_table; -- Implicit SELECT *
FROM my_table SELECT col1, col2 WHERE col3 > 5; -- FROM-first
SELECT * EXCLUDE (internal_id) FROM events; -- Drop columns
SELECT * REPLACE (amount / 100.0 AS amount) FROM txns; -- Transform in-place
SELECT category, SUM(amount) FROM sales GROUP BY ALL; -- Infer GROUP BY直接读取文件(无需导入步骤):
sql
SELECT * FROM 'data.parquet';
SELECT * FROM read_csv('data.csv', header=true);
SELECT * FROM 's3://bucket/path/*.parquet';
COPY (SELECT * FROM events) TO 'output.parquet' (FORMAT PARQUET);嵌套类型:
sql
SELECT {'name': 'Alice', 'age': 30} AS person;
SELECT [1, 2, 3] AS nums;
SELECT list_filter([1, 2, 3, 4], x -> x > 2);实用命令:
sql
DESCRIBE SELECT * FROM events;
SUMMARIZE events;MotherDuck
MotherDuck
sql
ATTACH 'md:'; -- All databases
ATTACH 'md:my_db'; -- Specific databaseAuth via env var. Cross-database queries work: .
motherduck_tokenSELECT * FROM local_db.main.t1 JOIN md:cloud_db.main.t2 USING (id)sql
ATTACH 'md:'; # All databases
ATTACH 'md:my_db'; # Specific database通过环境变量进行认证。支持跨数据库查询:。
motherduck_tokenSELECT * FROM local_db.main.t1 JOIN md:cloud_db.main.t2 USING (id)polars — DataFrames
polars — DataFrames
Use polars when Python logic is needed — complex string transforms, ML features, row-level conditionals. For joins, aggregations, and window functions, prefer SQL.
当需要Python逻辑时使用polars——复杂字符串转换、机器学习特征、行级条件判断。对于连接、聚合和窗口函数,优先使用SQL。
Key Patterns
核心模式
python
import polars as plpython
import polars as plLazy evaluation (always prefer for production)
Lazy evaluation (always prefer for production)
lf = pl.scan_parquet("events/*.parquet")
result = (
lf.filter(pl.col("event_date") >= "2024-01-01")
.group_by("user_id")
.agg(pl.col("amount").sum().alias("total_spend"))
.sort("total_spend", descending=True)
.collect()
)
lf = pl.scan_parquet("events/*.parquet")
result = (
lf.filter(pl.col("event_date") >= "2024-01-01")
.group_by("user_id")
.agg(pl.col("amount").sum().alias("total_spend"))
.sort("total_spend", descending=True)
.collect()
)
Three contexts
Three contexts
df.select(...) # Pick/transform columns (output has ONLY these)
df.with_columns(...) # Add/overwrite columns (keeps all originals)
df.filter(...) # Keep rows matching condition
**DuckDB interop (zero-copy via Arrow):**
```python
import duckdb
result = duckdb.sql("SELECT * FROM df WHERE amount > 100").pl()df.select(...) # Pick/transform columns (output has ONLY these)
df.with_columns(...) # Add/overwrite columns (keeps all originals)
df.filter(...) # Keep rows matching condition
**与DuckDB互操作(通过Arrow零拷贝):**
```python
import duckdb
result = duckdb.sql("SELECT * FROM df WHERE amount > 100").pl()marimo — Notebooks
marimo — 笔记本
Reactive Python notebooks stored as plain files. Cells auto-re-execute when dependencies change.
.pybash
marimo edit notebook.py # Create/edit
marimo run notebook.py # Serve as app
marimo convert notebook.ipynb -o out.py # From JupyterSQL cells use DuckDB by default and return polars DataFrames:
python
result = mo.sql(f"""
SELECT * FROM events
WHERE event_date >= '{start_date}'
""")Python variables and polars DataFrames are queryable from SQL cells and vice versa.
以纯文件存储的响应式Python笔记本。当依赖项变更时,单元格会自动重新执行。
.pybash
marimo edit notebook.py # Create/edit
marimo run notebook.py # Serve as app
marimo convert notebook.ipynb -o out.py # From JupyterSQL单元格默认使用DuckDB,并返回polars DataFrames:
python
result = mo.sql(f"""
SELECT * FROM events
WHERE event_date >= '{start_date}'
""")Python变量和polars DataFrames可在SQL单元格中查询,反之亦然。
Typical Pipeline Flow
典型流水线流程
- +
uv inituv add "dlt[duckdb]" "sqlmesh[duckdb]" polars marimo - — scaffold extraction
dlt init rest_api duckdb - — dlt loads raw data into DuckDB
uv run python pipeline.py - — generate transform models
sqlmesh init -t dlt --dlt-pipeline <name> - Write SQL models → — transform raw into analytics
sqlmesh plan - — explore with SQL cells and polars
marimo edit analysis.py - For production: swap destination to ,
motherducksqlmesh plan prod
- +
uv inituv add "dlt[duckdb]" "sqlmesh[duckdb]" polars marimo - — 搭建数据提取脚手架
dlt init rest_api duckdb - — dlt将原始数据加载到DuckDB中
uv run python pipeline.py - — 生成转换模型
sqlmesh init -t dlt --dlt-pipeline <name> - 编写SQL模型 → — 将原始数据转换为分析用数据
sqlmesh plan - — 使用SQL单元格和polars进行探索分析
marimo edit analysis.py - 生产环境:将目标存储切换为,执行
motherducksqlmesh plan prod