dagster-development

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Dagster Development Expert

Dagster开发专家

Quick Reference

快速参考

If you're writing...Check this section/reference
@dg.asset
Assets or
references/assets.md
ConfigurableResource
Resources or
references/resources.md
@dg.schedule
or
ScheduleDefinition
Automation or
references/automation.md
@dg.sensor
Sensors or
references/automation.md
PartitionsDefinition
Partitions or
references/automation.md
Tests with
dg.materialize()
Testing or
references/testing.md
@asset_check
references/testing.md#asset-checks
@dlt_assets
or
@sling_assets
references/etl-patterns.md
@dbt_assets
dbt Integration or
dbt-development
skill
Definitions
or code locations
references/project-structure.md

如果你正在编写...查看此章节/参考文档
@dg.asset
资产
references/assets.md
ConfigurableResource
资源
references/resources.md
@dg.schedule
ScheduleDefinition
自动化
references/automation.md
@dg.sensor
传感器
references/automation.md
PartitionsDefinition
分区
references/automation.md
使用
dg.materialize()
编写测试
测试
references/testing.md
@asset_check
references/testing.md#asset-checks
@dlt_assets
@sling_assets
references/etl-patterns.md
@dbt_assets
dbt集成
dbt-development
技能
Definitions
或代码位置
references/project-structure.md

Core Concepts

核心概念

Asset: A persistent object (table, file, model) that your pipeline produces. Define with
@dg.asset
.
Resource: External services/tools (databases, APIs) shared across assets. Define with
ConfigurableResource
.
Job: A selection of assets to execute together. Create with
dg.define_asset_job()
.
Schedule: Time-based automation for jobs. Create with
dg.ScheduleDefinition
.
Sensor: Event-driven automation that watches for changes. Define with
@dg.sensor
.
Partition: Logical divisions of data (by date, category). Define with
PartitionsDefinition
.
Definitions: The container for all Dagster objects in a code location.

Asset:管道生成的持久化对象(表、文件、模型)。使用
@dg.asset
定义。
Resource:资产间共享的外部服务/工具(数据库、API)。使用
ConfigurableResource
定义。
Job:一组要一起执行的资产。使用
dg.define_asset_job()
创建。
Schedule:基于时间的作业自动化。使用
dg.ScheduleDefinition
创建。
Sensor:监听变更的事件驱动自动化。使用
@dg.sensor
定义。
Partition:数据的逻辑划分(按日期、类别)。使用
PartitionsDefinition
定义。
Definitions:代码位置中所有Dagster对象的容器。

Assets Quick Reference

资产快速参考

Basic Asset

基础资产

python
import dagster as dg

@dg.asset
def my_asset() -> None:
    """Asset description appears in the UI."""
    # Your computation logic here
    pass
python
import dagster as dg

@dg.asset
def my_asset() -> None:
    """资产描述会显示在UI中。"""
    # 此处编写你的计算逻辑
    pass

Asset with Dependencies

带依赖的资产

python
@dg.asset
def downstream_asset(upstream_asset) -> dict:
    """Depends on upstream_asset by naming it as a parameter."""
    return {"processed": upstream_asset}
python
@dg.asset
def downstream_asset(upstream_asset) -> dict:
    """通过将上游资产作为参数传入来建立依赖。"""
    return {"processed": upstream_asset}

Asset with Metadata

带元数据的资产

python
@dg.asset(
    group_name="analytics",
    key_prefix=["warehouse", "staging"],
    description="Cleaned customer data",
)
def customers() -> None:
    pass
Naming: Use nouns describing what is produced (
customers
,
daily_revenue
), not verbs (
load_customers
).

python
@dg.asset(
    group_name="analytics",
    key_prefix=["warehouse", "staging"],
    description="清洗后的客户数据",
)
def customers() -> None:
    pass
命名规范:使用描述产出物的名词(如
customers
daily_revenue
),而非动词(如
load_customers
)。

Resources Quick Reference

资源快速参考

Define a Resource

定义资源

python
from dagster import ConfigurableResource

class DatabaseResource(ConfigurableResource):
    connection_string: str
    
    def query(self, sql: str) -> list:
        # Implementation here
        pass
python
from dagster import ConfigurableResource

class DatabaseResource(ConfigurableResource):
    connection_string: str
    
    def query(self, sql: str) -> list:
        # 此处编写实现逻辑
        pass

Use in Assets

在资产中使用资源

python
@dg.asset
def my_asset(database: DatabaseResource) -> None:
    results = database.query("SELECT * FROM table")
python
@dg.asset
def my_asset(database: DatabaseResource) -> None:
    results = database.query("SELECT * FROM table")

Register in Definitions

在Definitions中注册资源

python
dg.Definitions(
    assets=[my_asset],
    resources={"database": DatabaseResource(connection_string="...")},
)

python
dg.Definitions(
    assets=[my_asset],
    resources={"database": DatabaseResource(connection_string="...")},
)

Automation Quick Reference

自动化快速参考

Schedule

调度

python
import dagster as dg
from my_project.defs.jobs import my_job

my_schedule = dg.ScheduleDefinition(
    job=my_job,
    cron_schedule="0 0 * * *",  # Daily at midnight
)
python
import dagster as dg
from my_project.defs.jobs import my_job

my_schedule = dg.ScheduleDefinition(
    job=my_job,
    cron_schedule="0 0 * * *",  # 每日午夜执行
)

Common Cron Patterns

常见Cron表达式

PatternMeaning
0 * * * *
Every hour
0 0 * * *
Daily at midnight
0 0 * * 1
Weekly on Monday
0 0 1 * *
Monthly on the 1st
0 0 5 * *
Monthly on the 5th

表达式含义
0 * * * *
每小时执行一次
0 0 * * *
每日午夜执行
0 0 * * 1
每周一执行
0 0 1 * *
每月1日执行
0 0 5 * *
每月5日执行

Sensors Quick Reference

传感器快速参考

Basic Sensor Pattern

基础传感器模式

python
@dg.sensor(job=my_job)
def my_sensor(context: dg.SensorEvaluationContext):
    # 1. Read cursor (previous state)
    previous_state = json.loads(context.cursor) if context.cursor else {}
    current_state = {}
    runs_to_request = []
    
    # 2. Check for changes
    for item in get_items_to_check():
        current_state[item.id] = item.modified_at
        if item.id not in previous_state or previous_state[item.id] != item.modified_at:
            runs_to_request.append(dg.RunRequest(
                run_key=f"run_{item.id}_{item.modified_at}",
                run_config={...}
            ))
    
    # 3. Return result with updated cursor
    return dg.SensorResult(
        run_requests=runs_to_request,
        cursor=json.dumps(current_state)
    )
Key: Use cursors to track state between sensor evaluations.

python
@dg.sensor(job=my_job)
def my_sensor(context: dg.SensorEvaluationContext):
    # 1. 读取游标(之前的状态)
    previous_state = json.loads(context.cursor) if context.cursor else {}
    current_state = {}
    runs_to_request = []
    
    # 2. 检查变更
    for item in get_items_to_check():
        current_state[item.id] = item.modified_at
        if item.id not in previous_state or previous_state[item.id] != item.modified_at:
            runs_to_request.append(dg.RunRequest(
                run_key=f"run_{item.id}_{item.modified_at}",
                run_config={...}
            ))
    
    # 3. 返回更新游标后的结果
    return dg.SensorResult(
        run_requests=runs_to_request,
        cursor=json.dumps(current_state)
    )
关键要点:使用游标跟踪传感器评估之间的状态。

Partitions Quick Reference

分区快速参考

Time-Based Partition

基于时间的分区

python
weekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01")

@dg.asset(partitions_def=weekly_partition)
def weekly_data(context: dg.AssetExecutionContext) -> None:
    partition_key = context.partition_key  # e.g., "2023-01-01"
    # Process data for this partition
python
weekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01")

@dg.asset(partitions_def=weekly_partition)
def weekly_data(context: dg.AssetExecutionContext) -> None:
    partition_key = context.partition_key  # 示例:"2023-01-01"
    # 处理该分区的数据

Static Partition

静态分区

python
region_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"])

@dg.asset(partitions_def=region_partition)
def regional_data(context: dg.AssetExecutionContext) -> None:
    region = context.partition_key
python
region_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"])

@dg.asset(partitions_def=region_partition)
def regional_data(context: dg.AssetExecutionContext) -> None:
    region = context.partition_key

Partition Types

分区类型

TypeUse Case
DailyPartitionsDefinition
One partition per day
WeeklyPartitionsDefinition
One partition per week
MonthlyPartitionsDefinition
One partition per month
StaticPartitionsDefinition
Fixed set of partitions
MultiPartitionsDefinition
Combine multiple partition dimensions

类型使用场景
DailyPartitionsDefinition
按天划分分区
WeeklyPartitionsDefinition
按周划分分区
MonthlyPartitionsDefinition
按月划分分区
StaticPartitionsDefinition
固定集合的分区
MultiPartitionsDefinition
组合多个分区维度

Testing Quick Reference

测试快速参考

Direct Function Testing

直接函数测试

python
def test_my_asset():
    result = my_asset()
    assert result == expected_value
python
def test_my_asset():
    result = my_asset()
    assert result == expected_value

Testing with Materialization

基于Materialization的测试

python
def test_asset_graph():
    result = dg.materialize(
        assets=[asset_a, asset_b],
        resources={"database": mock_database},
    )
    assert result.success
    assert result.output_for_node("asset_b") == expected
python
def test_asset_graph():
    result = dg.materialize(
        assets=[asset_a, asset_b],
        resources={"database": mock_database},
    )
    assert result.success
    assert result.output_for_node("asset_b") == expected

Mocking Resources

模拟资源

python
from unittest.mock import Mock

def test_with_mocked_resource():
    mocked_resource = Mock()
    mocked_resource.query.return_value = [{"id": 1}]
    
    result = dg.materialize(
        assets=[my_asset],
        resources={"database": mocked_resource},
    )
    assert result.success
python
from unittest.mock import Mock

def test_with_mocked_resource():
    mocked_resource = Mock()
    mocked_resource.query.return_value = [{"id": 1}]
    
    result = dg.materialize(
        assets=[my_asset],
        resources={"database": mocked_resource},
    )
    assert result.success

Asset Checks

资产检查

python
@dg.asset_check(asset=my_asset)
def validate_non_empty(my_asset):
    return dg.AssetCheckResult(
        passed=len(my_asset) > 0,
        metadata={"row_count": len(my_asset)},
    )

python
@dg.asset_check(asset=my_asset)
def validate_non_empty(my_asset):
    return dg.AssetCheckResult(
        passed=len(my_asset) > 0,
        metadata={"row_count": len(my_asset)},
    )

dbt Integration

dbt集成

For dbt integration, use the minimal pattern below. For comprehensive dbt patterns, see the
dbt-development
skill.
对于dbt集成,可使用以下极简模式。如需完整的dbt模式,请查看
dbt-development
技能。

Basic dbt Assets

基础dbt资产

python
from dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path

dbt_project_dir = Path(__file__).parent / "dbt_project"

@dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json")
def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()
python
from dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path

dbt_project_dir = Path(__file__).parent / "dbt_project"

@dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json")
def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

dbt Resource

dbt资源

python
dg.Definitions(
    assets=[my_dbt_assets],
    resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)},
)
Full patterns: See Dagster dbt docs

python
dg.Definitions(
    assets=[my_dbt_assets],
    resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)},
)
完整模式:查看Dagster dbt文档

When to Load References

何时加载参考文档

Load
references/assets.md
when:

加载
references/assets.md
的场景:

  • Defining complex asset dependencies
  • Adding metadata, groups, or key prefixes
  • Working with asset factories
  • Understanding asset materialization patterns
  • 定义复杂的资产依赖
  • 添加元数据、分组或键前缀
  • 使用资产工厂
  • 理解资产物化模式

Load
references/resources.md
when:

加载
references/resources.md
的场景:

  • Creating custom
    ConfigurableResource
    classes
  • Integrating with databases, APIs, or cloud services
  • Understanding resource scoping and lifecycle
  • 创建自定义
    ConfigurableResource
  • 与数据库、API或云服务集成
  • 理解资源作用域和生命周期

Load
references/automation.md
when:

加载
references/automation.md
的场景:

  • Creating schedules with complex cron patterns
  • Building sensors with cursors and state management
  • Implementing partitions and backfills
  • Automating dbt or other integration runs
  • 创建带复杂Cron表达式的调度
  • 构建带游标和状态管理的传感器
  • 实现分区和回填
  • 自动化dbt或其他集成运行

Load
references/testing.md
when:

加载
references/testing.md
的场景:

  • Writing unit tests for assets
  • Mocking resources and dependencies
  • Using
    dg.materialize()
    for integration tests
  • Creating asset checks for data validation
  • 为资产编写单元测试
  • 模拟资源和依赖
  • 使用
    dg.materialize()
    进行集成测试
  • 创建用于数据验证的资产检查

Load
references/etl-patterns.md
when:

加载
references/etl-patterns.md
的场景:

  • Using dlt for embedded ETL
  • Using Sling for database replication
  • Loading data from files or APIs
  • Integrating external ETL tools
  • 使用dlt进行嵌入式ETL
  • 使用Sling进行数据库复制
  • 从文件或API加载数据
  • 集成外部ETL工具

Load
references/project-structure.md
when:

加载
references/project-structure.md
的场景:

  • Setting up a new Dagster project
  • Configuring
    Definitions
    and code locations
  • Using
    dg
    CLI for scaffolding
  • Organizing large projects with Components

  • 搭建新的Dagster项目
  • 配置
    Definitions
    和代码位置
  • 使用
    dg
    CLI生成脚手架
  • 使用组件组织大型项目

Project Structure

项目结构

Recommended Layout

推荐布局

my_project/
├── pyproject.toml
├── src/
│   └── my_project/
│       ├── definitions.py     # Main Definitions
│       └── defs/
│           ├── assets/
│           │   ├── __init__.py
│           │   └── my_assets.py
│           ├── jobs.py
│           ├── schedules.py
│           ├── sensors.py
│           └── resources.py
└── tests/
    └── test_assets.py
my_project/
├── pyproject.toml
├── src/
│   └── my_project/
│       ├── definitions.py     # 主Definitions文件
│       └── defs/
│           ├── assets/
│           │   ├── __init__.py
│           │   └── my_assets.py
│           ├── jobs.py
│           ├── schedules.py
│           ├── sensors.py
│           └── resources.py
└── tests/
    └── test_assets.py

Definitions Pattern (Modern)

现代Definitions模式

python
undefined
python
undefined

src/my_project/definitions.py

src/my_project/definitions.py

from pathlib import Path from dagster import definitions, load_from_defs_folder
@definitions def defs(): return load_from_defs_folder(project_root=Path(file).parent.parent.parent)
undefined
from pathlib import Path from dagster import definitions, load_from_defs_folder
@definitions def defs(): return load_from_defs_folder(project_root=Path(file).parent.parent.parent)
undefined

Scaffolding with dg CLI

使用dg CLI生成脚手架

bash
undefined
bash
undefined

Create new project

创建新项目

uvx create-dagster my_project
uvx create-dagster my_project

Scaffold new asset file

生成新资产文件

dg scaffold defs dagster.asset assets/new_asset.py
dg scaffold defs dagster.asset assets/new_asset.py

Scaffold schedule

生成调度

dg scaffold defs dagster.schedule schedules.py
dg scaffold defs dagster.schedule schedules.py

Scaffold sensor

生成传感器

dg scaffold defs dagster.sensor sensors.py
dg scaffold defs dagster.sensor sensors.py

Validate definitions

验证定义

dg check defs

---
dg check defs

---

Common Patterns

常见模式

Job Definition

作业定义

python
trip_update_job = dg.define_asset_job(
    name="trip_update_job",
    selection=["taxi_trips", "taxi_zones"],
)
python
trip_update_job = dg.define_asset_job(
    name="trip_update_job",
    selection=["taxi_trips", "taxi_zones"],
)

Run Configuration

运行配置

python
from dagster import Config

class MyAssetConfig(Config):
    filename: str
    limit: int = 100

@dg.asset
def configurable_asset(config: MyAssetConfig) -> None:
    print(f"Processing {config.filename} with limit {config.limit}")
python
from dagster import Config

class MyAssetConfig(Config):
    filename: str
    limit: int = 100

@dg.asset
def configurable_asset(config: MyAssetConfig) -> None:
    print(f"Processing {config.filename} with limit {config.limit}")

Asset Dependencies with External Sources

带外部源的资产依赖

python
@dg.asset(deps=["external_table"])
def derived_asset() -> None:
    """Depends on external_table which isn't managed by Dagster."""
    pass

python
@dg.asset(deps=["external_table"])
def derived_asset() -> None:
    """依赖于未被Dagster管理的external_table。"""
    pass

Anti-Patterns to Avoid

需避免的反模式

Anti-PatternBetter Approach
Hardcoding credentials in assetsUse
ConfigurableResource
with env vars
Giant assets that do everythingSplit into focused, composable assets
Ignoring asset return typesUse type annotations for clarity
Skipping tests for assetsTest assets like regular Python functions
Not using partitions for time-seriesUse
DailyPartitionsDefinition
etc.
Putting all assets in one fileOrganize by domain in separate modules

反模式更佳方案
在资产中硬编码凭证使用带环境变量的
ConfigurableResource
包揽所有功能的巨型资产拆分为专注、可组合的资产
忽略资产返回类型使用类型注解提升清晰度
不为资产编写测试像测试普通Python函数一样测试资产
不为时间序列数据使用分区使用
DailyPartitionsDefinition
等分区类型
将所有资产放在一个文件中按领域划分到不同模块

CLI Quick Reference

CLI快速参考

bash
undefined
bash
undefined

Development

开发

dg dev # Start Dagster UI dg check defs # Validate definitions
dg dev # 启动Dagster UI dg check defs # 验证定义

Scaffolding

脚手架生成

dg scaffold defs dagster.asset assets/file.py dg scaffold defs dagster.schedule schedules.py dg scaffold defs dagster.sensor sensors.py
dg scaffold defs dagster.asset assets/file.py dg scaffold defs dagster.schedule schedules.py dg scaffold defs dagster.sensor sensors.py

Production

生产环境

dagster job execute -j my_job # Execute a job dagster asset materialize -a my_asset # Materialize an asset

---
dagster job execute -j my_job # 执行作业 dagster asset materialize -a my_asset # 物化资产

---

References

参考文档

  • Assets:
    references/assets.md
    - Detailed asset patterns
  • Resources:
    references/resources.md
    - Resource configuration
  • Automation:
    references/automation.md
    - Schedules, sensors, partitions
  • Testing:
    references/testing.md
    - Testing patterns and asset checks
  • ETL Patterns:
    references/etl-patterns.md
    - dlt, Sling, file/API ingestion
  • Project Structure:
    references/project-structure.md
    - Definitions, Components
  • Official Docs: https://docs.dagster.io
  • API Reference: https://docs.dagster.io/api/dagster
  • 资产
    references/assets.md
    - 详细资产模式
  • 资源
    references/resources.md
    - 资源配置
  • 自动化
    references/automation.md
    - 调度、传感器、分区
  • 测试
    references/testing.md
    - 测试模式和资产检查
  • ETL模式
    references/etl-patterns.md
    - dlt、Sling、文件/API数据导入
  • 项目结构
    references/project-structure.md
    - Definitions、组件
  • 官方文档https://docs.dagster.io
  • API参考https://docs.dagster.io/api/dagster