blueprint

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Blueprint Implementation

蓝图实现

You are helping a user work with Blueprint, a system for composing Airflow DAGs from YAML using reusable Python templates. Execute steps in order and prefer the simplest configuration that meets the user's needs.
Package:
airflow-blueprint
on PyPI Repo: https://github.com/astronomer/blueprint Requires: Python 3.10+, Airflow 2.5+, Blueprint 0.2.0+
你正在帮助用户使用Blueprint,这是一个通过可复用Python模板、基于YAML编排Airflow DAG的系统。请按顺序执行步骤,优先选择满足用户需求的最简配置。
:PyPI上的
airflow-blueprint
代码仓库https://github.com/astronomer/blueprint 依赖要求:Python 3.10+、Airflow 2.5+、Blueprint 0.2.0+

Before Starting

开始前准备

Confirm with the user:
  1. Airflow version ≥2.5
  2. Python version ≥3.10
  3. Use case: Blueprint is for standardized, validated templates. If user needs full Airflow flexibility, suggest writing DAGs directly or using DAG Factory instead.

请先和用户确认以下信息:
  1. Airflow版本 ≥2.5
  2. Python版本 ≥3.10
  3. 使用场景:Blueprint适用于标准化、带校验的模板场景。如果用户需要完整的Airflow灵活性,建议直接编写DAG或者使用DAG Factory。

Determine What the User Needs

确定用户需求

User RequestAction
"Create a blueprint" / "Define a template"Go to Creating Blueprints
"Create a DAG from YAML" / "Compose steps"Go to Composing DAGs in YAML
"Customize DAG args" / "Add tags to DAG"Go to Customizing DAG-Level Configuration
"Override config at runtime" / "Trigger with params"Go to Runtime Parameter Overrides
"Post-process DAGs" / "Add callback"Go to Post-Build Callbacks
"Validate my YAML" / "Lint blueprint"Go to Validation Commands
"Set up blueprint in my project"Go to Project Setup
"Version my blueprint"Go to Versioning
"Generate schema" / "Astro IDE setup"Go to Schema Generation
Blueprint errors / troubleshootingGo to Troubleshooting

用户请求对应操作
"创建一个蓝图" / "定义模板"前往 创建蓝图 章节
"通过YAML创建DAG" / "编排步骤"前往 通过YAML编排DAG 章节
"自定义DAG参数" / "给DAG添加标签"前往 自定义DAG层级配置 章节
"运行时覆盖配置" / "带参数触发"前往 运行时参数覆盖 章节
"DAG后置处理" / "添加回调"前往 构建后回调 章节
"校验我的YAML" / "检查蓝图语法"前往 校验命令 章节
"在我的项目中设置蓝图"前往 项目设置 章节
"给我的蓝图做版本管理"前往 版本管理 章节
"生成schema" / "Astro IDE设置"前往 Schema生成 章节
蓝图报错 / 故障排查前往 故障排查 章节

Project Setup

项目设置

If the user is starting fresh, guide them through setup:
如果用户是从零开始,引导他们完成以下设置:

1. Install the Package

1. 安装包

bash
undefined
bash
undefined

Add to requirements.txt

添加到requirements.txt

airflow-blueprint>=0.2.0
airflow-blueprint>=0.2.0

Or install directly

或者直接安装

pip install airflow-blueprint
undefined
pip install airflow-blueprint
undefined

2. Create the Loader

2. 创建加载器

Create
dags/loader.py
:
python
from blueprint import build_all

build_all()
DAG-level configuration (schedule, description, tags, default_args, etc.) is handled via YAML fields and
BlueprintDagArgs
templates — see Customizing DAG-Level Configuration.
创建
dags/loader.py
python
from blueprint import build_all

build_all()
DAG层级的配置(调度周期、描述、标签、default_args等)通过YAML字段和
BlueprintDagArgs
模板处理,参考 自定义DAG层级配置 章节。

3. Verify Installation

3. 验证安装

bash
uvx --from airflow-blueprint blueprint list
If no blueprints found, user needs to create blueprint classes first.

bash
uvx --from airflow-blueprint blueprint list
如果没有找到蓝图,用户需要先创建蓝图类。

Creating Blueprints

创建蓝图

When user wants to create a new blueprint template:
当用户想要创建新的蓝图模板时:

Blueprint Structure

蓝图结构

python
undefined
python
undefined

dags/templates/my_blueprints.py

dags/templates/my_blueprints.py

from airflow.operators.bash import BashOperator from airflow.utils.task_group import TaskGroup from blueprint import Blueprint, BaseModel, Field
class MyConfig(BaseModel): # Required field with description (used in CLI output and JSON schema) source_table: str = Field(description="Source table name") # Optional field with default and validation batch_size: int = Field(default=1000, ge=1)
class MyBlueprint(Blueprint[MyConfig]): """Docstring becomes blueprint description."""
def render(self, config: MyConfig) -> TaskGroup:
    with TaskGroup(group_id=self.step_id) as group:
        BashOperator(
            task_id="my_task",
            bash_command=f"echo '{config.source_table}'"
        )
    return group
undefined
from airflow.operators.bash import BashOperator from airflow.utils.task_group import TaskGroup from blueprint import Blueprint, BaseModel, Field
class MyConfig(BaseModel): # 带描述的必填字段(用于CLI输出和JSON schema) source_table: str = Field(description="源表名称") # 带默认值和校验的可选字段 batch_size: int = Field(default=1000, ge=1)
class MyBlueprint(Blueprint[MyConfig]): """类的文档字符串会作为蓝图的描述。"""
def render(self, config: MyConfig) -> TaskGroup:
    with TaskGroup(group_id=self.step_id) as group:
        BashOperator(
            task_id="my_task",
            bash_command=f"echo '{config.source_table}'"
        )
    return group
undefined

Key Rules

核心规则

ElementRequirement
Config classMust inherit from
BaseModel
Blueprint classMust inherit from
Blueprint[ConfigClass]
render()
method
Must return
TaskGroup
or
BaseOperator
Task IDsUse
self.step_id
for the group/task ID
元素要求
配置类必须继承自
BaseModel
蓝图类必须继承自
Blueprint[ConfigClass]
render()
方法
必须返回
TaskGroup
或者
BaseOperator
任务ID分组/任务ID请使用
self.step_id

Recommend Strict Validation

建议开启严格校验

Suggest adding
extra="forbid"
to catch YAML typos:
python
from pydantic import ConfigDict

class MyConfig(BaseModel):
    model_config = ConfigDict(extra="forbid")
    # fields...

建议添加
extra="forbid"
来捕获YAML拼写错误:
python
from pydantic import ConfigDict

class MyConfig(BaseModel):
    model_config = ConfigDict(extra="forbid")
    # 其他字段...

Composing DAGs in YAML

通过YAML编排DAG

When user wants to create a DAG from blueprints:
当用户想要通过蓝图创建DAG时:

YAML Structure

YAML结构

yaml
undefined
yaml
undefined

dags/my_pipeline.dag.yaml

dags/my_pipeline.dag.yaml

dag_id: my_pipeline schedule: "@daily" description: "My data pipeline"
steps: step_one: blueprint: my_blueprint source_table: raw.customers batch_size: 500
step_two: blueprint: another_blueprint depends_on: [step_one] target: analytics.output

By default, only `schedule` and `description` are supported as DAG-level fields (via the built-in `DefaultDagArgs`). For other fields like `tags`, `default_args`, `catchup`, etc., see **Customizing DAG-Level Configuration**.
dag_id: my_pipeline schedule: "@daily" description: "我的数据管道"
steps: step_one: blueprint: my_blueprint source_table: raw.customers batch_size: 500
step_two: blueprint: another_blueprint depends_on: [step_one] target: analytics.output

默认情况下,仅支持`schedule`和`description`作为DAG层级字段(通过内置的`DefaultDagArgs`实现)。如果需要其他字段比如`tags`、`default_args`、`catchup`等,参考 **自定义DAG层级配置** 章节。

Reserved Keys in Steps

步骤中的保留关键字

KeyPurpose
blueprint
Template name (required)
depends_on
List of upstream step names
version
Pin to specific blueprint version
Everything else passes to the blueprint's config.
关键字用途
blueprint
模板名称(必填)
depends_on
上游步骤名称列表
version
绑定到指定的蓝图版本
其他所有字段都会传递给蓝图的配置。

Jinja2 Support

支持Jinja2

YAML supports Jinja2 templating with access to environment variables, Airflow variables/connections, and runtime context:
yaml
dag_id: "{{ env.get('ENV', 'dev') }}_pipeline"
schedule: "{{ var.value.schedule | default('@daily') }}"

steps:
  extract:
    blueprint: extract
    output_path: "/data/{{ context.ds_nodash }}/output.csv"
    run_id: "{{ context.dag_run.run_id }}"
Available template variables:
  • env
    — environment variables
  • var
    — Airflow Variables
  • conn
    — Airflow Connections
  • context
    — proxy that generates Airflow template expressions for runtime macros (e.g.
    context.ds_nodash
    ,
    context.dag_run.conf
    ,
    context.task_instance.xcom_pull(...)
    )

YAML支持Jinja2模板,可以访问环境变量、Airflow变量/连接,以及运行时上下文:
yaml
dag_id: "{{ env.get('ENV', 'dev') }}_pipeline"
schedule: "{{ var.value.schedule | default('@daily') }}"

steps:
  extract:
    blueprint: extract
    output_path: "/data/{{ context.ds_nodash }}/output.csv"
    run_id: "{{ context.dag_run.run_id }}"
可用的模板变量:
  • env
    — 环境变量
  • var
    — Airflow变量
  • conn
    — Airflow连接
  • context
    — 代理对象,用于生成运行时宏对应的Airflow模板表达式(比如
    context.ds_nodash
    context.dag_run.conf
    context.task_instance.xcom_pull(...)

Customizing DAG-Level Configuration

自定义DAG层级配置

By default, Blueprint supports
schedule
and
description
as DAG-level YAML fields. To use other DAG constructor arguments (tags, default_args, catchup, etc.), define a
BlueprintDagArgs
subclass.
默认情况下,Blueprint支持
schedule
description
作为DAG层级的YAML字段。如果需要使用其他DAG构造参数(标签、default_args、catchup等),请定义一个
BlueprintDagArgs
子类。

When to Use

使用场景

  • User wants
    tags
    ,
    default_args
    ,
    catchup
    ,
    start_date
    , or any other DAG kwargs in YAML
  • User wants to derive DAG properties from config (e.g. team name → owner, tier → retries)
  • 用户希望在YAML中配置
    tags
    default_args
    catchup
    start_date
    或者其他任意DAG参数
  • 用户希望从配置派生DAG属性(比如团队名称→所有者,层级→重试次数)

Defining a BlueprintDagArgs Subclass

定义BlueprintDagArgs子类

python
undefined
python
undefined

dags/templates/my_dag_args.py

dags/templates/my_dag_args.py

from pydantic import BaseModel from blueprint import BlueprintDagArgs
class MyDagArgsConfig(BaseModel): schedule: str | None = None description: str | None = None tags: list[str] = [] owner: str = "data-team" retries: int = 2
class MyDagArgs(BlueprintDagArgs[MyDagArgsConfig]): def render(self, config: MyDagArgsConfig) -> dict[str, Any]: return { "schedule": config.schedule, "description": config.description, "tags": config.tags, "default_args": { "owner": config.owner, "retries": config.retries, }, }

Then in YAML, the extra fields are validated by the config model:

```yaml
dag_id: my_pipeline
schedule: "@daily"
tags: [etl, production]
owner: data-team
retries: 3

steps:
  extract:
    blueprint: extract
    source_table: raw.data
from pydantic import BaseModel from blueprint import BlueprintDagArgs
class MyDagArgsConfig(BaseModel): schedule: str | None = None description: str | None = None tags: list[str] = [] owner: str = "data-team" retries: int = 2
class MyDagArgs(BlueprintDagArgs[MyDagArgsConfig]): def render(self, config: MyDagArgsConfig) -> dict[str, Any]: return { "schedule": config.schedule, "description": config.description, "tags": config.tags, "default_args": { "owner": config.owner, "retries": config.retries, }, }

之后在YAML中,新增的字段会被配置模型校验:

```yaml
dag_id: my_pipeline
schedule: "@daily"
tags: [etl, production]
owner: data-team
retries: 3

steps:
  extract:
    blueprint: extract
    source_table: raw.data

Rules

规则

  • Only one
    BlueprintDagArgs
    subclass per project (raises
    MultipleDagArgsError
    if more than one exists)
  • The
    render()
    method returns a dict of kwargs passed to the Airflow
    DAG()
    constructor
  • If no custom subclass exists, the built-in
    DefaultDagArgs
    is used (supports only
    schedule
    and
    description
    )

  • 每个项目仅允许一个
    BlueprintDagArgs
    子类(如果存在多个会抛出
    MultipleDagArgsError
  • render()
    方法返回的字典会作为参数传递给Airflow
    DAG()
    构造函数
  • 如果没有自定义子类,会使用内置的
    DefaultDagArgs
    (仅支持
    schedule
    description

Runtime Parameter Overrides

运行时参数覆盖

Blueprint config fields can be overridden at DAG trigger time using Airflow params. This enables users to customize behavior when manually triggering DAGs from the Airflow UI.
可以在DAG触发时通过Airflow参数覆盖蓝图配置字段,方便用户在Airflow UI手动触发DAG时自定义行为。

Using
self.param()
in Template Fields

在模板字段中使用
self.param()

Use
self.param("field")
in operator template fields to make a config field overridable at runtime:
python
class ExtractConfig(BaseModel):
    query: str = Field(description="SQL query to run")
    batch_size: int = Field(default=1000, ge=1)

class Extract(Blueprint[ExtractConfig]):
    def render(self, config: ExtractConfig) -> TaskGroup:
        with TaskGroup(group_id=self.step_id) as group:
            BashOperator(
                task_id="run_query",
                bash_command=f"run-etl --query {self.param('query')} --batch {self.param('batch_size')}"
            )
        return group
在算子的模板字段中使用
self.param("字段名")
,可以让配置字段支持运行时覆盖:
python
class ExtractConfig(BaseModel):
    query: str = Field(description="要执行的SQL查询")
    batch_size: int = Field(default=1000, ge=1)

class Extract(Blueprint[ExtractConfig]):
    def render(self, config: ExtractConfig) -> TaskGroup:
        with TaskGroup(group_id=self.step_id) as group:
            BashOperator(
                task_id="run_query",
                bash_command=f"run-etl --query {self.param('query')} --batch {self.param('batch_size')}"
            )
        return group

Using
self.resolve_config()
in Python Callables

在Python可调用对象中使用
self.resolve_config()

For
@task
or
PythonOperator
callables, use
self.resolve_config()
to merge runtime params into config:
python
class Extract(Blueprint[ExtractConfig]):
    def render(self, config: ExtractConfig) -> TaskGroup:
        bp = self  # capture reference for closure

        @task(task_id="run_query")
        def run_query(**context):
            resolved = bp.resolve_config(config, context)
            # resolved.query has the runtime override if one was provided
            execute(resolved.query, resolved.batch_size)

        with TaskGroup(group_id=self.step_id) as group:
            run_query()
        return group
对于
@task
或者
PythonOperator
的可调用对象,使用
self.resolve_config()
将运行时参数合并到配置中:
python
class Extract(Blueprint[ExtractConfig]):
    def render(self, config: ExtractConfig) -> TaskGroup:
        bp = self  # 捕获引用供闭包使用

        @task(task_id="run_query")
        def run_query(**context):
            resolved = bp.resolve_config(config, context)
            # 如果提供了运行时覆盖,resolved.query会使用覆盖后的值
            execute(resolved.query, resolved.batch_size)

        with TaskGroup(group_id=self.step_id) as group:
            run_query()
        return group

How It Works

实现原理

  • Params are auto-generated from Pydantic config models and namespaced per step (e.g.
    step_name__field
    )
  • YAML values become param defaults; Pydantic metadata (description, constraints, enum values) flows through to the Airflow trigger form
  • Invalid overrides raise
    ValidationError
    at execution time

  • 参数会从Pydantic配置模型自动生成,并且按步骤做命名空间隔离(比如
    step_name__field
  • YAML中的值会作为参数默认值;Pydantic元数据(描述、约束、枚举值)会同步到Airflow触发表单
  • 无效的覆盖会在执行时抛出
    ValidationError

Post-Build Callbacks

构建后回调

Use
on_dag_built
to post-process DAGs after they are constructed. This is useful for adding tags, access controls, audit metadata, or any cross-cutting concern.
python
from pathlib import Path
from blueprint import build_all

def add_audit_tags(dag, yaml_path: Path) -> None:
    dag.tags.append("managed-by-blueprint")
    dag.tags.append(f"source:{yaml_path.name}")

build_all(on_dag_built=add_audit_tags)
The callback receives:
  • dag
    — the constructed Airflow
    DAG
    object (mutable)
  • yaml_path
    — the
    Path
    to the YAML file that defined the DAG

使用
on_dag_built
在DAG构造完成后对其进行后置处理,适用于添加标签、访问控制、审计元数据或者其他横切关注点。
python
from pathlib import Path
from blueprint import build_all

def add_audit_tags(dag, yaml_path: Path) -> None:
    dag.tags.append("managed-by-blueprint")
    dag.tags.append(f"source:{yaml_path.name}")

build_all(on_dag_built=add_audit_tags)
回调会接收:
  • dag
    — 构造完成的Airflow
    DAG
    对象(可修改)
  • yaml_path
    — 定义该DAG的YAML文件的
    Path
    路径

Validation Commands

校验命令

Run CLI commands with uvx:
bash
uvx --from airflow-blueprint blueprint <command>
CommandWhen to Use
blueprint list
Show available blueprints
blueprint describe <name>
Show config schema for a blueprint
blueprint describe <name> -v N
Show schema for specific version
blueprint lint
Validate all
*.dag.yaml
files
blueprint lint <path>
Validate specific file
blueprint schema <name>
Generate JSON schema
blueprint new
Interactive DAG YAML creation
使用uvx运行CLI命令:
bash
uvx --from airflow-blueprint blueprint <command>
命令使用场景
blueprint list
展示可用的蓝图
blueprint describe <name>
展示某个蓝图的配置schema
blueprint describe <name> -v N
展示指定版本的schema
blueprint lint
校验所有
*.dag.yaml
文件
blueprint lint <path>
校验指定文件
blueprint schema <name>
生成JSON schema
blueprint new
交互式创建DAG YAML

Validation Workflow

校验工作流

bash
undefined
bash
undefined

Check all YAML files

检查所有YAML文件

blueprint lint
blueprint lint

Expected output for valid files:

有效文件的预期输出:

PASS customer_pipeline.dag.yaml (dag_id=customer_pipeline)

PASS customer_pipeline.dag.yaml (dag_id=customer_pipeline)


---

---

Versioning

版本管理

When user needs to version blueprints for backwards compatibility:
当用户需要对蓝图做版本管理以保证向后兼容性时:

Version Naming Convention

版本命名规范

  • v1:
    MyBlueprint
    (no suffix)
  • v2:
    MyBlueprintV2
  • v3:
    MyBlueprintV3
python
undefined
  • v1:
    MyBlueprint
    (无后缀)
  • v2:
    MyBlueprintV2
  • v3:
    MyBlueprintV3
python
undefined

v1 - original

v1 - 初始版本

class ExtractConfig(BaseModel): source_table: str
class Extract(Blueprint[ExtractConfig]): def render(self, config): ...
class ExtractConfig(BaseModel): source_table: str
class Extract(Blueprint[ExtractConfig]): def render(self, config): ...

v2 - breaking changes, new class

v2 - 破坏性变更,新建类

class ExtractV2Config(BaseModel): sources: list[dict] # Different schema
class ExtractV2(Blueprint[ExtractV2Config]): def render(self, config): ...
undefined
class ExtractV2Config(BaseModel): sources: list[dict] # 不同的schema
class ExtractV2(Blueprint[ExtractV2Config]): def render(self, config): ...
undefined

Explicit Name and Version

显式指定名称和版本

As an alternative to the class name convention, blueprints can set
name
and
version
directly:
python
class MyCustomExtractor(Blueprint[ExtractV3Config]):
    name = "extract"
    version = 3

    def render(self, config): ...
This is useful when the class name doesn't follow the
NameV{N}
convention or when you want clearer control.
作为类名命名规范的替代方案,蓝图可以直接设置
name
version
python
class MyCustomExtractor(Blueprint[ExtractV3Config]):
    name = "extract"
    version = 3

    def render(self, config): ...
当类名不符合
名称V{N}
的规范,或者你想要更清晰的控制权时,这种方式非常有用。

Using Versions in YAML

在YAML中使用版本

yaml
steps:
  # Pin to v1
  legacy_extract:
    blueprint: extract
    version: 1
    source_table: raw.data

  # Use latest (v2)
  new_extract:
    blueprint: extract
    sources: [{table: orders}]

yaml
steps:
  # 绑定到v1
  legacy_extract:
    blueprint: extract
    version: 1
    source_table: raw.data

  # 使用最新版本(v2)
  new_extract:
    blueprint: extract
    sources: [{table: orders}]

Schema Generation

Schema生成

Generate JSON schemas for editor autocompletion or external tooling:
bash
undefined
为编辑器自动补全或者外部工具生成JSON schema:
bash
undefined

Generate schema for a blueprint

为某个蓝图生成schema

blueprint schema extract > extract.schema.json
undefined
blueprint schema extract > extract.schema.json
undefined

Astro Project Auto-Detection

Astro项目自动检测

After creating or modifying a blueprint, automatically check if the project is an Astro project by looking for a
.astro/
directory (created by
astro dev init
).
If the project is an Astro project, automatically regenerate schemas without prompting:
bash
mkdir -p blueprint/generated-schemas
创建或者修改蓝图后,通过查找
.astro/
目录(由
astro dev init
创建)自动检查项目是否是Astro项目。
如果是Astro项目,无需提示自动重新生成schema
bash
mkdir -p blueprint/generated-schemas

For each name from
blueprint list
: blueprint schema NAME > blueprint/generated-schemas/NAME.schema.json

blueprint list
返回的每个名称执行:blueprint schema 名称 > blueprint/generated-schemas/名称.schema.json


The Astro IDE reads `blueprint/generated-schemas/` to render configuration forms. Keeping schemas in sync ensures the visual builder always reflects the latest blueprint configs.

If you cannot determine whether the project is an Astro project, ask the user once and remember for the rest of the session.

---

Astro IDE会读取`blueprint/generated-schemas/`目录来渲染配置表单。保持schema同步可以保证可视化构建器始终展示最新的蓝图配置。

如果你无法确定项目是否是Astro项目,询问用户一次并在会话剩余时间记住该结果。

---

Troubleshooting

故障排查

"Blueprint not found"

"未找到蓝图"

Cause: Blueprint class not in Python path.
Fix: Check template directory or use
--template-dir
:
bash
blueprint list --template-dir dags/templates/
原因:蓝图类不在Python路径中。
修复方案:检查模板目录,或者使用
--template-dir
参数:
bash
blueprint list --template-dir dags/templates/

"Extra inputs are not permitted"

"不允许额外输入"

Cause: YAML field name typo with
extra="forbid"
enabled.
Fix: Run
blueprint describe <name>
to see valid field names.
原因:开启
extra="forbid"
的情况下YAML字段名称拼写错误。
修复方案:运行
blueprint describe <名称>
查看合法的字段名称。

DAG not appearing in Airflow

DAG没有出现在Airflow中

Cause: Missing or broken loader.
Fix: Ensure
dags/loader.py
exists and calls
build_all()
:
python
from blueprint import build_all
build_all()
原因:缺少加载器或者加载器损坏。
修复方案:确保
dags/loader.py
存在并且调用了
build_all()
python
from blueprint import build_all
build_all()

Validation errors shown as Airflow import errors

校验错误显示为Airflow导入错误

As of v0.2.0, Pydantic validation errors are surfaced as Airflow import errors with actionable messages instead of being silently swallowed. The error message includes details on missing fields, unexpected fields, and type mismatches, along with guidance to run
blueprint lint
or
blueprint describe
.
从v0.2.0开始,Pydantic校验错误会作为Airflow导入错误抛出,附带可操作的提示信息,而不会被静默吞掉。错误信息包含缺失字段、意外字段、类型不匹配的详细信息,同时会引导用户运行
blueprint lint
或者
blueprint describe

"Cyclic dependency detected"

"检测到循环依赖"

Cause: Circular
depends_on
references.
Fix: Review step dependencies and remove cycles.
原因
depends_on
引用存在循环。
修复方案:检查步骤依赖,移除循环。

"MultipleDagArgsError"

"MultipleDagArgsError"

Cause: More than one
BlueprintDagArgs
subclass discovered in the project.
Fix: Only one
BlueprintDagArgs
subclass is allowed. Remove or merge duplicates.
原因:项目中发现了多个
BlueprintDagArgs
子类。
修复方案:仅允许存在一个
BlueprintDagArgs
子类,移除或者合并重复的子类。

Debugging in Airflow UI

在Airflow UI中调试

Every Blueprint task has extra fields in Rendered Template:
  • blueprint_step_config
    - resolved YAML config
  • blueprint_step_code
    - Python source of blueprint

每个Blueprint任务的渲染模板页面都有额外字段:
  • blueprint_step_config
    - 解析后的YAML配置
  • blueprint_step_code
    - 蓝图的Python源码

Verification Checklist

验证清单

Before finishing, verify with user:
  • blueprint list
    shows their templates
  • blueprint lint
    passes for all YAML files
  • dags/loader.py
    exists with
    build_all()
  • DAG appears in Airflow UI without parse errors

结束前,和用户确认以下项:
  • blueprint list
    可以展示他们的模板
  • 所有YAML文件都通过了
    blueprint lint
    校验
  • dags/loader.py
    存在并且包含
    build_all()
    调用
  • DAG出现在Airflow UI中,没有解析错误

Reference

参考

Astro IDE

Astro IDE