annotating-task-lineage

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Annotating Task Lineage with Inlets & Outlets

使用Inlets & Outlets标注任务血缘

This skill guides you through adding manual lineage annotations to Airflow tasks using
inlets
and
outlets
.
Reference: See the OpenLineage provider developer guide for the latest supported operators and patterns.
本技能将引导你使用
inlets
outlets
为Airflow任务添加手动血缘标注。
参考文档: 查看OpenLineage提供者开发者指南获取最新支持的算子和模式。

When to Use This Approach

何时使用此方法

ScenarioUse Inlets/Outlets?
Operator has OpenLineage methods (
get_openlineage_facets_on_*
)
❌ Modify the OL method directly
Operator has no built-in OpenLineage extractor✅ Yes
Simple table-level lineage is sufficient✅ Yes
Quick lineage setup without custom code✅ Yes
Need column-level lineage❌ Use OpenLineage methods or custom extractor
Complex extraction logic needed❌ Use OpenLineage methods or custom extractor
Note: Inlets/outlets are the lowest-priority fallback. If an OpenLineage extractor or method exists for the operator, it takes precedence. Use this approach for operators without extractors.

场景是否使用Inlets/Outlets?
算子具备OpenLineage方法(
get_openlineage_facets_on_*
❌ 直接修改OL方法
算子无内置OpenLineage提取器✅ 是
仅需简单的表级血缘即可满足需求✅ 是
无需自定义代码快速搭建血缘✅ 是
需要列级血缘❌ 使用OpenLineage方法或自定义提取器
需要复杂的提取逻辑❌ 使用OpenLineage方法或自定义提取器
注意: Inlets/Outlets是优先级最低的 fallback 方案。如果算子存在OpenLineage提取器或方法,将优先使用它们。此方法仅适用于无提取器的算子。

Supported Types for Inlets/Outlets

Inlets/Outlets支持的类型

You can use OpenLineage Dataset objects or Airflow Assets for inlets and outlets:
你可以使用OpenLineage Dataset对象或Airflow Assets作为inlets和outlets:

OpenLineage Datasets (Recommended)

OpenLineage数据集(推荐)

python
from openlineage.client.event_v2 import Dataset
python
from openlineage.client.event_v2 import Dataset

Database tables

数据库表

source_table = Dataset( namespace="postgres://mydb:5432", name="public.orders", ) target_table = Dataset( namespace="snowflake://account.snowflakecomputing.com", name="staging.orders_clean", )
source_table = Dataset( namespace="postgres://mydb:5432", name="public.orders", ) target_table = Dataset( namespace="snowflake://account.snowflakecomputing.com", name="staging.orders_clean", )

Files

文件

input_file = Dataset( namespace="s3://my-bucket", name="raw/events/2024-01-01.json", )
undefined
input_file = Dataset( namespace="s3://my-bucket", name="raw/events/2024-01-01.json", )
undefined

Airflow Assets (Airflow 3+)

Airflow Assets(Airflow 3+)

python
from airflow.sdk import Asset
python
from airflow.sdk import Asset

Using Airflow's native Asset type

使用Airflow原生Asset类型

orders_asset = Asset(uri="s3://my-bucket/data/orders")
undefined
orders_asset = Asset(uri="s3://my-bucket/data/orders")
undefined

Airflow Datasets (Airflow 2.4+)

Airflow数据集(Airflow 2.4+)

python
from airflow.datasets import Dataset
python
from airflow.datasets import Dataset

Using Airflow's Dataset type (Airflow 2.4-2.x)

使用Airflow的Dataset类型(Airflow 2.4-2.x)

orders_dataset = Dataset(uri="s3://my-bucket/data/orders")

---
orders_dataset = Dataset(uri="s3://my-bucket/data/orders")

---

Basic Usage

基础用法

Setting Inlets and Outlets on Operators

为算子设置Inlets和Outlets

python
from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulum
python
from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulum

Define your lineage datasets

定义血缘数据集

source_table = Dataset( namespace="snowflake://account.snowflakecomputing.com", name="raw.orders", ) target_table = Dataset( namespace="snowflake://account.snowflakecomputing.com", name="staging.orders_clean", ) output_file = Dataset( namespace="s3://my-bucket", name="exports/orders.parquet", )
with DAG( dag_id="etl_with_lineage", start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), schedule="@daily", ) as dag:
transform = BashOperator(
    task_id="transform_orders",
    bash_command="echo 'transforming...'",
    inlets=[source_table],           # What this task reads
    outlets=[target_table],          # What this task writes
)

export = BashOperator(
    task_id="export_to_s3",
    bash_command="echo 'exporting...'",
    inlets=[target_table],           # Reads from previous output
    outlets=[output_file],           # Writes to S3
)

transform >> export
undefined
source_table = Dataset( namespace="snowflake://account.snowflakecomputing.com", name="raw.orders", ) target_table = Dataset( namespace="snowflake://account.snowflakecomputing.com", name="staging.orders_clean", ) output_file = Dataset( namespace="s3://my-bucket", name="exports/orders.parquet", )
with DAG( dag_id="etl_with_lineage", start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), schedule="@daily", ) as dag:
transform = BashOperator(
    task_id="transform_orders",
    bash_command="echo 'transforming...'",
    inlets=[source_table],           # 该任务读取的数据源
    outlets=[target_table],          # 该任务写入的目标
)

export = BashOperator(
    task_id="export_to_s3",
    bash_command="echo 'exporting...'",
    inlets=[target_table],           # 读取前一个任务的输出
    outlets=[output_file],           # 写入到S3
)

transform >> export
undefined

Multiple Inputs and Outputs

多输入和多输出

Tasks often read from multiple sources and write to multiple destinations:
python
from openlineage.client.event_v2 import Dataset
任务通常会从多个源读取数据并写入多个目标:
python
from openlineage.client.event_v2 import Dataset

Multiple source tables

多个源表

customers = Dataset(namespace="postgres://crm:5432", name="public.customers") orders = Dataset(namespace="postgres://sales:5432", name="public.orders") products = Dataset(namespace="postgres://inventory:5432", name="public.products")
customers = Dataset(namespace="postgres://crm:5432", name="public.customers") orders = Dataset(namespace="postgres://sales:5432", name="public.orders") products = Dataset(namespace="postgres://inventory:5432", name="public.products")

Multiple output tables

多个输出表

daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary") customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")
aggregate_task = PythonOperator( task_id="build_daily_aggregates", python_callable=build_aggregates, inlets=[customers, orders, products], # All inputs outlets=[daily_summary, customer_metrics], # All outputs )

---
daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary") customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")
aggregate_task = PythonOperator( task_id="build_daily_aggregates", python_callable=build_aggregates, inlets=[customers, orders, products], # 所有输入源 outlets=[daily_summary, customer_metrics], # 所有输出目标 )

---

Setting Lineage in Custom Operators

在自定义算子中设置血缘

When building custom operators, you have two options:
构建自定义算子时,有两种选择:

Option 1: Implement OpenLineage Methods (Recommended)

选项1:实现OpenLineage方法(推荐)

This is the preferred approach as it gives you full control over lineage extraction:
python
from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        # ... perform the actual work ...
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

    def get_openlineage_facets_on_complete(self, task_instance):
        """Return lineage after successful execution."""
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
            outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
        )
这是首选方法,因为它能让你完全控制血缘提取:
python
from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        # ... 执行实际工作 ...
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

    def get_openlineage_facets_on_complete(self, task_instance):
        """任务执行成功后返回血缘信息。"""
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
            outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
        )

Option 2: Set Inlets/Outlets Dynamically

选项2:动态设置Inlets/Outlets

For simpler cases, set lineage within the
execute
method (non-deferrable operators only):
python
from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        # Set lineage dynamically based on operator parameters
        self.inlets = [
            Dataset(namespace="warehouse://db", name=self.source_table)
        ]
        self.outlets = [
            Dataset(namespace="warehouse://db", name=self.target_table)
        ]

        # ... perform the actual work ...
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

对于简单场景,可在
execute
方法中设置血缘(仅适用于非可延迟算子):
python
from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        # 根据算子参数动态设置血缘
        self.inlets = [
            Dataset(namespace="warehouse://db", name=self.source_table)
        ]
        self.outlets = [
            Dataset(namespace="warehouse://db", name=self.target_table)
        ]

        # ... 执行实际工作 ...
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

Dataset Naming Helpers

数据集命名助手

Use the OpenLineage dataset naming helpers to ensure consistent naming across platforms:
python
from openlineage.client.event_v2 import Dataset
使用OpenLineage数据集命名助手确保跨平台的命名一致性:
python
from openlineage.client.event_v2 import Dataset

Snowflake

Snowflake

from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
naming = SnowflakeDatasetNaming( account_identifier="myorg-myaccount", database="mydb", schema="myschema", table="mytable", ) dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
naming = SnowflakeDatasetNaming( account_identifier="myorg-myaccount", database="mydb", schema="myschema", table="mytable", ) dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())

-> namespace: "snowflake://myorg-myaccount", name: "mydb.myschema.mytable"

-> namespace: "snowflake://myorg-myaccount", name: "mydb.myschema.mytable"

BigQuery

BigQuery

from openlineage.client.naming.bigquery import BigQueryDatasetNaming
naming = BigQueryDatasetNaming( project="my-project", dataset="my_dataset", table="my_table", ) dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
from openlineage.client.naming.bigquery import BigQueryDatasetNaming
naming = BigQueryDatasetNaming( project="my-project", dataset="my_dataset", table="my_table", ) dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())

-> namespace: "bigquery", name: "my-project.my_dataset.my_table"

-> namespace: "bigquery", name: "my-project.my_dataset.my_table"

S3

S3

from openlineage.client.naming.s3 import S3DatasetNaming
naming = S3DatasetNaming(bucket="my-bucket", key="path/to/file.parquet") dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
from openlineage.client.naming.s3 import S3DatasetNaming
naming = S3DatasetNaming(bucket="my-bucket", key="path/to/file.parquet") dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())

-> namespace: "s3://my-bucket", name: "path/to/file.parquet"

-> namespace: "s3://my-bucket", name: "path/to/file.parquet"

PostgreSQL

PostgreSQL

from openlineage.client.naming.postgres import PostgresDatasetNaming
naming = PostgresDatasetNaming( host="localhost", port=5432, database="mydb", schema="public", table="users", ) dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
from openlineage.client.naming.postgres import PostgresDatasetNaming
naming = PostgresDatasetNaming( host="localhost", port=5432, database="mydb", schema="public", table="users", ) dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())

-> namespace: "postgres://localhost:5432", name: "mydb.public.users"

-> namespace: "postgres://localhost:5432", name: "mydb.public.users"


> **Note:** Always use the naming helpers instead of constructing namespaces manually. If a helper is missing for your platform, check the [OpenLineage repo](https://github.com/OpenLineage/OpenLineage) or request it.

---

> **注意:** 始终使用命名助手,而非手动构造命名空间。如果你的平台没有对应的命名助手,请查看[OpenLineage仓库](https://github.com/OpenLineage/OpenLineage)或提出需求。

---

Precedence Rules

优先级规则

OpenLineage uses this precedence for lineage extraction:
  1. Custom Extractors (highest) - User-registered extractors
  2. OpenLineage Methods -
    get_openlineage_facets_on_*
    in operator
  3. Hook-Level Lineage - Lineage collected from hooks via
    HookLineageCollector
  4. Inlets/Outlets (lowest) - Falls back to these if nothing else extracts lineage
Note: If an extractor or method exists but returns no datasets, OpenLineage will check hook-level lineage, then fall back to inlets/outlets.

OpenLineage使用以下优先级进行血缘提取:
  1. 自定义提取器(最高)- 用户注册的提取器
  2. OpenLineage方法 - 算子中的
    get_openlineage_facets_on_*
    方法
  3. Hook级血缘 - 通过
    HookLineageCollector
    从hooks收集的血缘
  4. Inlets/Outlets(最低)- 如果其他方法都无法提取血缘,则回退到该方式
注意: 如果提取器或方法存在但未返回数据集,OpenLineage将检查hook级血缘,然后回退到inlets/outlets。

Best Practices

最佳实践

Use the Naming Helpers

使用命名助手

Always use OpenLineage naming helpers for consistent dataset creation:
python
from openlineage.client.event_v2 import Dataset
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming


def snowflake_dataset(schema: str, table: str) -> Dataset:
    """Create a Snowflake Dataset using the naming helper."""
    naming = SnowflakeDatasetNaming(
        account_identifier="mycompany",
        database="analytics",
        schema=schema,
        table=table,
    )
    return Dataset(namespace=naming.get_namespace(), name=naming.get_name())
始终使用OpenLineage命名助手来创建一致的数据集:
python
from openlineage.client.event_v2 import Dataset
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming


def snowflake_dataset(schema: str, table: str) -> Dataset:
    """使用命名助手创建Snowflake Dataset。"""
    naming = SnowflakeDatasetNaming(
        account_identifier="mycompany",
        database="analytics",
        schema=schema,
        table=table,
    )
    return Dataset(namespace=naming.get_namespace(), name=naming.get_name())

Usage

使用示例

source = snowflake_dataset("raw", "orders") target = snowflake_dataset("staging", "orders_clean")
undefined
source = snowflake_dataset("raw", "orders") target = snowflake_dataset("staging", "orders_clean")
undefined

Document Your Lineage

记录你的血缘

Add comments explaining the data flow:
python
transform = SqlOperator(
    task_id="transform_orders",
    sql="...",
    # Lineage: Reads raw orders, joins with customers, writes to staging
    inlets=[
        snowflake_dataset("raw", "orders"),
        snowflake_dataset("raw", "customers"),
    ],
    outlets=[
        snowflake_dataset("staging", "order_details"),
    ],
)
添加注释说明数据流:
python
transform = SqlOperator(
    task_id="transform_orders",
    sql="...",
    # 血缘:读取原始订单表,关联客户表,写入到 staging 表
    inlets=[
        snowflake_dataset("raw", "orders"),
        snowflake_dataset("raw", "customers"),
    ],
    outlets=[
        snowflake_dataset("staging", "order_details"),
    ],
)

Keep Lineage Accurate

保持血缘准确

  • Update inlets/outlets when SQL queries change
  • Include all tables referenced in JOINs as inlets
  • Include all tables written to (including temp tables if relevant)

  • 当SQL查询变更时,更新inlets/outlets
  • 将JOIN中引用的所有表都作为inlets
  • 包含所有写入的表(如果相关,包括临时表)

Limitations

局限性

LimitationWorkaround
Table-level only (no column lineage)Use OpenLineage methods or custom extractor
Overridden by extractors/methodsOnly use for operators without extractors
Static at DAG parse timeSet dynamically in
execute()
or use OL methods
Deferrable operators lose dynamic lineageUse OL methods instead; attributes set in
execute()
are lost when deferring

局限性解决方法
仅支持表级血缘(无列级血缘)使用OpenLineage方法或自定义提取器
会被提取器/方法覆盖仅用于无提取器的算子
在DAG解析时是静态的
execute()
中动态设置或使用OL方法
可延迟算子会丢失动态血缘使用OL方法替代;在
execute()
中设置的属性会在延迟时丢失

Related Skills

相关技能

  • creating-openlineage-extractors: For column-level lineage or complex extraction
  • tracing-upstream-lineage: Investigate where data comes from
  • tracing-downstream-lineage: Investigate what depends on data
  • creating-openlineage-extractors: 用于列级血缘或复杂提取场景
  • tracing-upstream-lineage: 追踪数据来源
  • tracing-downstream-lineage: 追踪数据的下游依赖