dagster-development
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDagster Development Expert
Dagster开发专家
Quick Reference
快速参考
| If you're writing... | Check this section/reference |
|---|---|
| Assets or |
| Resources or |
| Automation or |
| Sensors or |
| Partitions or |
Tests with | Testing or |
| |
| |
| dbt Integration or |
| |
| 如果你正在编写... | 查看此章节/参考文档 |
|---|---|
| 资产 或 |
| 资源 或 |
| 自动化 或 |
| 传感器 或 |
| 分区 或 |
使用 | 测试 或 |
| |
| |
| dbt集成 或 |
| |
Core Concepts
核心概念
Asset: A persistent object (table, file, model) that your pipeline produces. Define with .
@dg.assetResource: External services/tools (databases, APIs) shared across assets. Define with .
ConfigurableResourceJob: A selection of assets to execute together. Create with .
dg.define_asset_job()Schedule: Time-based automation for jobs. Create with .
dg.ScheduleDefinitionSensor: Event-driven automation that watches for changes. Define with .
@dg.sensorPartition: Logical divisions of data (by date, category). Define with .
PartitionsDefinitionDefinitions: The container for all Dagster objects in a code location.
Asset:管道生成的持久化对象(表、文件、模型)。使用定义。
@dg.assetResource:资产间共享的外部服务/工具(数据库、API)。使用定义。
ConfigurableResourceJob:一组要一起执行的资产。使用创建。
dg.define_asset_job()Schedule:基于时间的作业自动化。使用创建。
dg.ScheduleDefinitionSensor:监听变更的事件驱动自动化。使用定义。
@dg.sensorPartition:数据的逻辑划分(按日期、类别)。使用定义。
PartitionsDefinitionDefinitions:代码位置中所有Dagster对象的容器。
Assets Quick Reference
资产快速参考
Basic Asset
基础资产
python
import dagster as dg
@dg.asset
def my_asset() -> None:
"""Asset description appears in the UI."""
# Your computation logic here
passpython
import dagster as dg
@dg.asset
def my_asset() -> None:
"""资产描述会显示在UI中。"""
# 此处编写你的计算逻辑
passAsset with Dependencies
带依赖的资产
python
@dg.asset
def downstream_asset(upstream_asset) -> dict:
"""Depends on upstream_asset by naming it as a parameter."""
return {"processed": upstream_asset}python
@dg.asset
def downstream_asset(upstream_asset) -> dict:
"""通过将上游资产作为参数传入来建立依赖。"""
return {"processed": upstream_asset}Asset with Metadata
带元数据的资产
python
@dg.asset(
group_name="analytics",
key_prefix=["warehouse", "staging"],
description="Cleaned customer data",
)
def customers() -> None:
passNaming: Use nouns describing what is produced (, ), not verbs ().
customersdaily_revenueload_customerspython
@dg.asset(
group_name="analytics",
key_prefix=["warehouse", "staging"],
description="清洗后的客户数据",
)
def customers() -> None:
pass命名规范:使用描述产出物的名词(如、),而非动词(如)。
customersdaily_revenueload_customersResources Quick Reference
资源快速参考
Define a Resource
定义资源
python
from dagster import ConfigurableResource
class DatabaseResource(ConfigurableResource):
connection_string: str
def query(self, sql: str) -> list:
# Implementation here
passpython
from dagster import ConfigurableResource
class DatabaseResource(ConfigurableResource):
connection_string: str
def query(self, sql: str) -> list:
# 此处编写实现逻辑
passUse in Assets
在资产中使用资源
python
@dg.asset
def my_asset(database: DatabaseResource) -> None:
results = database.query("SELECT * FROM table")python
@dg.asset
def my_asset(database: DatabaseResource) -> None:
results = database.query("SELECT * FROM table")Register in Definitions
在Definitions中注册资源
python
dg.Definitions(
assets=[my_asset],
resources={"database": DatabaseResource(connection_string="...")},
)python
dg.Definitions(
assets=[my_asset],
resources={"database": DatabaseResource(connection_string="...")},
)Automation Quick Reference
自动化快速参考
Schedule
调度
python
import dagster as dg
from my_project.defs.jobs import my_job
my_schedule = dg.ScheduleDefinition(
job=my_job,
cron_schedule="0 0 * * *", # Daily at midnight
)python
import dagster as dg
from my_project.defs.jobs import my_job
my_schedule = dg.ScheduleDefinition(
job=my_job,
cron_schedule="0 0 * * *", # 每日午夜执行
)Common Cron Patterns
常见Cron表达式
| Pattern | Meaning |
|---|---|
| Every hour |
| Daily at midnight |
| Weekly on Monday |
| Monthly on the 1st |
| Monthly on the 5th |
| 表达式 | 含义 |
|---|---|
| 每小时执行一次 |
| 每日午夜执行 |
| 每周一执行 |
| 每月1日执行 |
| 每月5日执行 |
Sensors Quick Reference
传感器快速参考
Basic Sensor Pattern
基础传感器模式
python
@dg.sensor(job=my_job)
def my_sensor(context: dg.SensorEvaluationContext):
# 1. Read cursor (previous state)
previous_state = json.loads(context.cursor) if context.cursor else {}
current_state = {}
runs_to_request = []
# 2. Check for changes
for item in get_items_to_check():
current_state[item.id] = item.modified_at
if item.id not in previous_state or previous_state[item.id] != item.modified_at:
runs_to_request.append(dg.RunRequest(
run_key=f"run_{item.id}_{item.modified_at}",
run_config={...}
))
# 3. Return result with updated cursor
return dg.SensorResult(
run_requests=runs_to_request,
cursor=json.dumps(current_state)
)Key: Use cursors to track state between sensor evaluations.
python
@dg.sensor(job=my_job)
def my_sensor(context: dg.SensorEvaluationContext):
# 1. 读取游标(之前的状态)
previous_state = json.loads(context.cursor) if context.cursor else {}
current_state = {}
runs_to_request = []
# 2. 检查变更
for item in get_items_to_check():
current_state[item.id] = item.modified_at
if item.id not in previous_state or previous_state[item.id] != item.modified_at:
runs_to_request.append(dg.RunRequest(
run_key=f"run_{item.id}_{item.modified_at}",
run_config={...}
))
# 3. 返回更新游标后的结果
return dg.SensorResult(
run_requests=runs_to_request,
cursor=json.dumps(current_state)
)关键要点:使用游标跟踪传感器评估之间的状态。
Partitions Quick Reference
分区快速参考
Time-Based Partition
基于时间的分区
python
weekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01")
@dg.asset(partitions_def=weekly_partition)
def weekly_data(context: dg.AssetExecutionContext) -> None:
partition_key = context.partition_key # e.g., "2023-01-01"
# Process data for this partitionpython
weekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01")
@dg.asset(partitions_def=weekly_partition)
def weekly_data(context: dg.AssetExecutionContext) -> None:
partition_key = context.partition_key # 示例:"2023-01-01"
# 处理该分区的数据Static Partition
静态分区
python
region_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"])
@dg.asset(partitions_def=region_partition)
def regional_data(context: dg.AssetExecutionContext) -> None:
region = context.partition_keypython
region_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"])
@dg.asset(partitions_def=region_partition)
def regional_data(context: dg.AssetExecutionContext) -> None:
region = context.partition_keyPartition Types
分区类型
| Type | Use Case |
|---|---|
| One partition per day |
| One partition per week |
| One partition per month |
| Fixed set of partitions |
| Combine multiple partition dimensions |
| 类型 | 使用场景 |
|---|---|
| 按天划分分区 |
| 按周划分分区 |
| 按月划分分区 |
| 固定集合的分区 |
| 组合多个分区维度 |
Testing Quick Reference
测试快速参考
Direct Function Testing
直接函数测试
python
def test_my_asset():
result = my_asset()
assert result == expected_valuepython
def test_my_asset():
result = my_asset()
assert result == expected_valueTesting with Materialization
基于Materialization的测试
python
def test_asset_graph():
result = dg.materialize(
assets=[asset_a, asset_b],
resources={"database": mock_database},
)
assert result.success
assert result.output_for_node("asset_b") == expectedpython
def test_asset_graph():
result = dg.materialize(
assets=[asset_a, asset_b],
resources={"database": mock_database},
)
assert result.success
assert result.output_for_node("asset_b") == expectedMocking Resources
模拟资源
python
from unittest.mock import Mock
def test_with_mocked_resource():
mocked_resource = Mock()
mocked_resource.query.return_value = [{"id": 1}]
result = dg.materialize(
assets=[my_asset],
resources={"database": mocked_resource},
)
assert result.successpython
from unittest.mock import Mock
def test_with_mocked_resource():
mocked_resource = Mock()
mocked_resource.query.return_value = [{"id": 1}]
result = dg.materialize(
assets=[my_asset],
resources={"database": mocked_resource},
)
assert result.successAsset Checks
资产检查
python
@dg.asset_check(asset=my_asset)
def validate_non_empty(my_asset):
return dg.AssetCheckResult(
passed=len(my_asset) > 0,
metadata={"row_count": len(my_asset)},
)python
@dg.asset_check(asset=my_asset)
def validate_non_empty(my_asset):
return dg.AssetCheckResult(
passed=len(my_asset) > 0,
metadata={"row_count": len(my_asset)},
)dbt Integration
dbt集成
For dbt integration, use the minimal pattern below. For comprehensive dbt patterns, see the skill.
dbt-development对于dbt集成,可使用以下极简模式。如需完整的dbt模式,请查看技能。
dbt-developmentBasic dbt Assets
基础dbt资产
python
from dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path
dbt_project_dir = Path(__file__).parent / "dbt_project"
@dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json")
def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()python
from dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path
dbt_project_dir = Path(__file__).parent / "dbt_project"
@dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json")
def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()dbt Resource
dbt资源
python
dg.Definitions(
assets=[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)},
)Full patterns: See Dagster dbt docs
python
dg.Definitions(
assets=[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)},
)完整模式:查看Dagster dbt文档
When to Load References
何时加载参考文档
Load references/assets.md
when:
references/assets.md加载references/assets.md
的场景:
references/assets.md- Defining complex asset dependencies
- Adding metadata, groups, or key prefixes
- Working with asset factories
- Understanding asset materialization patterns
- 定义复杂的资产依赖
- 添加元数据、分组或键前缀
- 使用资产工厂
- 理解资产物化模式
Load references/resources.md
when:
references/resources.md加载references/resources.md
的场景:
references/resources.md- Creating custom classes
ConfigurableResource - Integrating with databases, APIs, or cloud services
- Understanding resource scoping and lifecycle
- 创建自定义类
ConfigurableResource - 与数据库、API或云服务集成
- 理解资源作用域和生命周期
Load references/automation.md
when:
references/automation.md加载references/automation.md
的场景:
references/automation.md- Creating schedules with complex cron patterns
- Building sensors with cursors and state management
- Implementing partitions and backfills
- Automating dbt or other integration runs
- 创建带复杂Cron表达式的调度
- 构建带游标和状态管理的传感器
- 实现分区和回填
- 自动化dbt或其他集成运行
Load references/testing.md
when:
references/testing.md加载references/testing.md
的场景:
references/testing.md- Writing unit tests for assets
- Mocking resources and dependencies
- Using for integration tests
dg.materialize() - Creating asset checks for data validation
- 为资产编写单元测试
- 模拟资源和依赖
- 使用进行集成测试
dg.materialize() - 创建用于数据验证的资产检查
Load references/etl-patterns.md
when:
references/etl-patterns.md加载references/etl-patterns.md
的场景:
references/etl-patterns.md- Using dlt for embedded ETL
- Using Sling for database replication
- Loading data from files or APIs
- Integrating external ETL tools
- 使用dlt进行嵌入式ETL
- 使用Sling进行数据库复制
- 从文件或API加载数据
- 集成外部ETL工具
Load references/project-structure.md
when:
references/project-structure.md加载references/project-structure.md
的场景:
references/project-structure.md- Setting up a new Dagster project
- Configuring and code locations
Definitions - Using CLI for scaffolding
dg - Organizing large projects with Components
- 搭建新的Dagster项目
- 配置和代码位置
Definitions - 使用CLI生成脚手架
dg - 使用组件组织大型项目
Project Structure
项目结构
Recommended Layout
推荐布局
my_project/
├── pyproject.toml
├── src/
│ └── my_project/
│ ├── definitions.py # Main Definitions
│ └── defs/
│ ├── assets/
│ │ ├── __init__.py
│ │ └── my_assets.py
│ ├── jobs.py
│ ├── schedules.py
│ ├── sensors.py
│ └── resources.py
└── tests/
└── test_assets.pymy_project/
├── pyproject.toml
├── src/
│ └── my_project/
│ ├── definitions.py # 主Definitions文件
│ └── defs/
│ ├── assets/
│ │ ├── __init__.py
│ │ └── my_assets.py
│ ├── jobs.py
│ ├── schedules.py
│ ├── sensors.py
│ └── resources.py
└── tests/
└── test_assets.pyDefinitions Pattern (Modern)
现代Definitions模式
python
undefinedpython
undefinedsrc/my_project/definitions.py
src/my_project/definitions.py
from pathlib import Path
from dagster import definitions, load_from_defs_folder
@definitions
def defs():
return load_from_defs_folder(project_root=Path(file).parent.parent.parent)
undefinedfrom pathlib import Path
from dagster import definitions, load_from_defs_folder
@definitions
def defs():
return load_from_defs_folder(project_root=Path(file).parent.parent.parent)
undefinedScaffolding with dg CLI
使用dg CLI生成脚手架
bash
undefinedbash
undefinedCreate new project
创建新项目
uvx create-dagster my_project
uvx create-dagster my_project
Scaffold new asset file
生成新资产文件
dg scaffold defs dagster.asset assets/new_asset.py
dg scaffold defs dagster.asset assets/new_asset.py
Scaffold schedule
生成调度
dg scaffold defs dagster.schedule schedules.py
dg scaffold defs dagster.schedule schedules.py
Scaffold sensor
生成传感器
dg scaffold defs dagster.sensor sensors.py
dg scaffold defs dagster.sensor sensors.py
Validate definitions
验证定义
dg check defs
---dg check defs
---Common Patterns
常见模式
Job Definition
作业定义
python
trip_update_job = dg.define_asset_job(
name="trip_update_job",
selection=["taxi_trips", "taxi_zones"],
)python
trip_update_job = dg.define_asset_job(
name="trip_update_job",
selection=["taxi_trips", "taxi_zones"],
)Run Configuration
运行配置
python
from dagster import Config
class MyAssetConfig(Config):
filename: str
limit: int = 100
@dg.asset
def configurable_asset(config: MyAssetConfig) -> None:
print(f"Processing {config.filename} with limit {config.limit}")python
from dagster import Config
class MyAssetConfig(Config):
filename: str
limit: int = 100
@dg.asset
def configurable_asset(config: MyAssetConfig) -> None:
print(f"Processing {config.filename} with limit {config.limit}")Asset Dependencies with External Sources
带外部源的资产依赖
python
@dg.asset(deps=["external_table"])
def derived_asset() -> None:
"""Depends on external_table which isn't managed by Dagster."""
passpython
@dg.asset(deps=["external_table"])
def derived_asset() -> None:
"""依赖于未被Dagster管理的external_table。"""
passAnti-Patterns to Avoid
需避免的反模式
| Anti-Pattern | Better Approach |
|---|---|
| Hardcoding credentials in assets | Use |
| Giant assets that do everything | Split into focused, composable assets |
| Ignoring asset return types | Use type annotations for clarity |
| Skipping tests for assets | Test assets like regular Python functions |
| Not using partitions for time-series | Use |
| Putting all assets in one file | Organize by domain in separate modules |
| 反模式 | 更佳方案 |
|---|---|
| 在资产中硬编码凭证 | 使用带环境变量的 |
| 包揽所有功能的巨型资产 | 拆分为专注、可组合的资产 |
| 忽略资产返回类型 | 使用类型注解提升清晰度 |
| 不为资产编写测试 | 像测试普通Python函数一样测试资产 |
| 不为时间序列数据使用分区 | 使用 |
| 将所有资产放在一个文件中 | 按领域划分到不同模块 |
CLI Quick Reference
CLI快速参考
bash
undefinedbash
undefinedDevelopment
开发
dg dev # Start Dagster UI
dg check defs # Validate definitions
dg dev # 启动Dagster UI
dg check defs # 验证定义
Scaffolding
脚手架生成
dg scaffold defs dagster.asset assets/file.py
dg scaffold defs dagster.schedule schedules.py
dg scaffold defs dagster.sensor sensors.py
dg scaffold defs dagster.asset assets/file.py
dg scaffold defs dagster.schedule schedules.py
dg scaffold defs dagster.sensor sensors.py
Production
生产环境
dagster job execute -j my_job # Execute a job
dagster asset materialize -a my_asset # Materialize an asset
---dagster job execute -j my_job # 执行作业
dagster asset materialize -a my_asset # 物化资产
---References
参考文档
- Assets: - Detailed asset patterns
references/assets.md - Resources: - Resource configuration
references/resources.md - Automation: - Schedules, sensors, partitions
references/automation.md - Testing: - Testing patterns and asset checks
references/testing.md - ETL Patterns: - dlt, Sling, file/API ingestion
references/etl-patterns.md - Project Structure: - Definitions, Components
references/project-structure.md - Official Docs: https://docs.dagster.io
- API Reference: https://docs.dagster.io/api/dagster
- 资产:- 详细资产模式
references/assets.md - 资源:- 资源配置
references/resources.md - 自动化:- 调度、传感器、分区
references/automation.md - 测试:- 测试模式和资产检查
references/testing.md - ETL模式:- dlt、Sling、文件/API数据导入
references/etl-patterns.md - 项目结构:- Definitions、组件
references/project-structure.md - 官方文档:https://docs.dagster.io
- API参考:https://docs.dagster.io/api/dagster