Loading...
Loading...
Expert guidance for Dagster data orchestration including assets, resources, schedules, sensors, partitions, testing, and ETL patterns. Use when building or extending Dagster projects, writing assets, configuring automation, or integrating with dbt/dlt/Sling.
npx skill4agent add c00ldudenoonan/economic-data-project dagster-development| If you're writing... | Check this section/reference |
|---|---|
| Assets or |
| Resources or |
| Automation or |
| Sensors or |
| Partitions or |
Tests with | Testing or |
| |
| |
| dbt Integration or |
| |
@dg.assetConfigurableResourcedg.define_asset_job()dg.ScheduleDefinition@dg.sensorPartitionsDefinitionimport dagster as dg
@dg.asset
def my_asset() -> None:
"""Asset description appears in the UI."""
# Your computation logic here
pass@dg.asset
def downstream_asset(upstream_asset) -> dict:
"""Depends on upstream_asset by naming it as a parameter."""
return {"processed": upstream_asset}@dg.asset(
group_name="analytics",
key_prefix=["warehouse", "staging"],
description="Cleaned customer data",
)
def customers() -> None:
passcustomersdaily_revenueload_customersfrom dagster import ConfigurableResource
class DatabaseResource(ConfigurableResource):
connection_string: str
def query(self, sql: str) -> list:
# Implementation here
pass@dg.asset
def my_asset(database: DatabaseResource) -> None:
results = database.query("SELECT * FROM table")dg.Definitions(
assets=[my_asset],
resources={"database": DatabaseResource(connection_string="...")},
)import dagster as dg
from my_project.defs.jobs import my_job
my_schedule = dg.ScheduleDefinition(
job=my_job,
cron_schedule="0 0 * * *", # Daily at midnight
)| Pattern | Meaning |
|---|---|
| Every hour |
| Daily at midnight |
| Weekly on Monday |
| Monthly on the 1st |
| Monthly on the 5th |
@dg.sensor(job=my_job)
def my_sensor(context: dg.SensorEvaluationContext):
# 1. Read cursor (previous state)
previous_state = json.loads(context.cursor) if context.cursor else {}
current_state = {}
runs_to_request = []
# 2. Check for changes
for item in get_items_to_check():
current_state[item.id] = item.modified_at
if item.id not in previous_state or previous_state[item.id] != item.modified_at:
runs_to_request.append(dg.RunRequest(
run_key=f"run_{item.id}_{item.modified_at}",
run_config={...}
))
# 3. Return result with updated cursor
return dg.SensorResult(
run_requests=runs_to_request,
cursor=json.dumps(current_state)
)weekly_partition = dg.WeeklyPartitionsDefinition(start_date="2023-01-01")
@dg.asset(partitions_def=weekly_partition)
def weekly_data(context: dg.AssetExecutionContext) -> None:
partition_key = context.partition_key # e.g., "2023-01-01"
# Process data for this partitionregion_partition = dg.StaticPartitionsDefinition(["us-east", "us-west", "eu"])
@dg.asset(partitions_def=region_partition)
def regional_data(context: dg.AssetExecutionContext) -> None:
region = context.partition_key| Type | Use Case |
|---|---|
| One partition per day |
| One partition per week |
| One partition per month |
| Fixed set of partitions |
| Combine multiple partition dimensions |
def test_my_asset():
result = my_asset()
assert result == expected_valuedef test_asset_graph():
result = dg.materialize(
assets=[asset_a, asset_b],
resources={"database": mock_database},
)
assert result.success
assert result.output_for_node("asset_b") == expectedfrom unittest.mock import Mock
def test_with_mocked_resource():
mocked_resource = Mock()
mocked_resource.query.return_value = [{"id": 1}]
result = dg.materialize(
assets=[my_asset],
resources={"database": mocked_resource},
)
assert result.success@dg.asset_check(asset=my_asset)
def validate_non_empty(my_asset):
return dg.AssetCheckResult(
passed=len(my_asset) > 0,
metadata={"row_count": len(my_asset)},
)dbt-developmentfrom dagster_dbt import DbtCliResource, dbt_assets
from pathlib import Path
dbt_project_dir = Path(__file__).parent / "dbt_project"
@dbt_assets(manifest=dbt_project_dir / "target" / "manifest.json")
def my_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()dg.Definitions(
assets=[my_dbt_assets],
resources={"dbt": DbtCliResource(project_dir=dbt_project_dir)},
)references/assets.mdreferences/resources.mdConfigurableResourcereferences/automation.mdreferences/testing.mddg.materialize()references/etl-patterns.mdreferences/project-structure.mdDefinitionsdgmy_project/
├── pyproject.toml
├── src/
│ └── my_project/
│ ├── definitions.py # Main Definitions
│ └── defs/
│ ├── assets/
│ │ ├── __init__.py
│ │ └── my_assets.py
│ ├── jobs.py
│ ├── schedules.py
│ ├── sensors.py
│ └── resources.py
└── tests/
└── test_assets.py# src/my_project/definitions.py
from pathlib import Path
from dagster import definitions, load_from_defs_folder
@definitions
def defs():
return load_from_defs_folder(project_root=Path(__file__).parent.parent.parent)# Create new project
uvx create-dagster my_project
# Scaffold new asset file
dg scaffold defs dagster.asset assets/new_asset.py
# Scaffold schedule
dg scaffold defs dagster.schedule schedules.py
# Scaffold sensor
dg scaffold defs dagster.sensor sensors.py
# Validate definitions
dg check defstrip_update_job = dg.define_asset_job(
name="trip_update_job",
selection=["taxi_trips", "taxi_zones"],
)from dagster import Config
class MyAssetConfig(Config):
filename: str
limit: int = 100
@dg.asset
def configurable_asset(config: MyAssetConfig) -> None:
print(f"Processing {config.filename} with limit {config.limit}")@dg.asset(deps=["external_table"])
def derived_asset() -> None:
"""Depends on external_table which isn't managed by Dagster."""
pass| Anti-Pattern | Better Approach |
|---|---|
| Hardcoding credentials in assets | Use |
| Giant assets that do everything | Split into focused, composable assets |
| Ignoring asset return types | Use type annotations for clarity |
| Skipping tests for assets | Test assets like regular Python functions |
| Not using partitions for time-series | Use |
| Putting all assets in one file | Organize by domain in separate modules |
# Development
dg dev # Start Dagster UI
dg check defs # Validate definitions
# Scaffolding
dg scaffold defs dagster.asset assets/file.py
dg scaffold defs dagster.schedule schedules.py
dg scaffold defs dagster.sensor sensors.py
# Production
dagster job execute -j my_job # Execute a job
dagster asset materialize -a my_asset # Materialize an assetreferences/assets.mdreferences/resources.mdreferences/automation.mdreferences/testing.mdreferences/etl-patterns.mdreferences/project-structure.md