annotating-task-lineage
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAnnotating Task Lineage with Inlets & Outlets
使用Inlets & Outlets标注任务血缘
This skill guides you through adding manual lineage annotations to Airflow tasks using and .
inletsoutletsReference: See the OpenLineage provider developer guide for the latest supported operators and patterns.
本技能将引导你使用和为Airflow任务添加手动血缘标注。
inletsoutlets参考文档: 查看OpenLineage提供者开发者指南获取最新支持的算子和模式。
When to Use This Approach
何时使用此方法
| Scenario | Use Inlets/Outlets? |
|---|---|
Operator has OpenLineage methods ( | ❌ Modify the OL method directly |
| Operator has no built-in OpenLineage extractor | ✅ Yes |
| Simple table-level lineage is sufficient | ✅ Yes |
| Quick lineage setup without custom code | ✅ Yes |
| Need column-level lineage | ❌ Use OpenLineage methods or custom extractor |
| Complex extraction logic needed | ❌ Use OpenLineage methods or custom extractor |
Note: Inlets/outlets are the lowest-priority fallback. If an OpenLineage extractor or method exists for the operator, it takes precedence. Use this approach for operators without extractors.
| 场景 | 是否使用Inlets/Outlets? |
|---|---|
算子具备OpenLineage方法( | ❌ 直接修改OL方法 |
| 算子无内置OpenLineage提取器 | ✅ 是 |
| 仅需简单的表级血缘即可满足需求 | ✅ 是 |
| 无需自定义代码快速搭建血缘 | ✅ 是 |
| 需要列级血缘 | ❌ 使用OpenLineage方法或自定义提取器 |
| 需要复杂的提取逻辑 | ❌ 使用OpenLineage方法或自定义提取器 |
注意: Inlets/Outlets是优先级最低的 fallback 方案。如果算子存在OpenLineage提取器或方法,将优先使用它们。此方法仅适用于无提取器的算子。
Supported Types for Inlets/Outlets
Inlets/Outlets支持的类型
You can use OpenLineage Dataset objects or Airflow Assets for inlets and outlets:
你可以使用OpenLineage Dataset对象或Airflow Assets作为inlets和outlets:
OpenLineage Datasets (Recommended)
OpenLineage数据集(推荐)
python
from openlineage.client.event_v2 import Datasetpython
from openlineage.client.event_v2 import DatasetDatabase tables
数据库表
source_table = Dataset(
namespace="postgres://mydb:5432",
name="public.orders",
)
target_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="staging.orders_clean",
)
source_table = Dataset(
namespace="postgres://mydb:5432",
name="public.orders",
)
target_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="staging.orders_clean",
)
Files
文件
input_file = Dataset(
namespace="s3://my-bucket",
name="raw/events/2024-01-01.json",
)
undefinedinput_file = Dataset(
namespace="s3://my-bucket",
name="raw/events/2024-01-01.json",
)
undefinedAirflow Assets (Airflow 3+)
Airflow Assets(Airflow 3+)
python
from airflow.sdk import Assetpython
from airflow.sdk import AssetUsing Airflow's native Asset type
使用Airflow原生Asset类型
orders_asset = Asset(uri="s3://my-bucket/data/orders")
undefinedorders_asset = Asset(uri="s3://my-bucket/data/orders")
undefinedAirflow Datasets (Airflow 2.4+)
Airflow数据集(Airflow 2.4+)
python
from airflow.datasets import Datasetpython
from airflow.datasets import DatasetUsing Airflow's Dataset type (Airflow 2.4-2.x)
使用Airflow的Dataset类型(Airflow 2.4-2.x)
orders_dataset = Dataset(uri="s3://my-bucket/data/orders")
---orders_dataset = Dataset(uri="s3://my-bucket/data/orders")
---Basic Usage
基础用法
Setting Inlets and Outlets on Operators
为算子设置Inlets和Outlets
python
from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulumpython
from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulumDefine your lineage datasets
定义血缘数据集
source_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="raw.orders",
)
target_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="staging.orders_clean",
)
output_file = Dataset(
namespace="s3://my-bucket",
name="exports/orders.parquet",
)
with DAG(
dag_id="etl_with_lineage",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
schedule="@daily",
) as dag:
transform = BashOperator(
task_id="transform_orders",
bash_command="echo 'transforming...'",
inlets=[source_table], # What this task reads
outlets=[target_table], # What this task writes
)
export = BashOperator(
task_id="export_to_s3",
bash_command="echo 'exporting...'",
inlets=[target_table], # Reads from previous output
outlets=[output_file], # Writes to S3
)
transform >> exportundefinedsource_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="raw.orders",
)
target_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="staging.orders_clean",
)
output_file = Dataset(
namespace="s3://my-bucket",
name="exports/orders.parquet",
)
with DAG(
dag_id="etl_with_lineage",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
schedule="@daily",
) as dag:
transform = BashOperator(
task_id="transform_orders",
bash_command="echo 'transforming...'",
inlets=[source_table], # 该任务读取的数据源
outlets=[target_table], # 该任务写入的目标
)
export = BashOperator(
task_id="export_to_s3",
bash_command="echo 'exporting...'",
inlets=[target_table], # 读取前一个任务的输出
outlets=[output_file], # 写入到S3
)
transform >> exportundefinedMultiple Inputs and Outputs
多输入和多输出
Tasks often read from multiple sources and write to multiple destinations:
python
from openlineage.client.event_v2 import Dataset任务通常会从多个源读取数据并写入多个目标:
python
from openlineage.client.event_v2 import DatasetMultiple source tables
多个源表
customers = Dataset(namespace="postgres://crm:5432", name="public.customers")
orders = Dataset(namespace="postgres://sales:5432", name="public.orders")
products = Dataset(namespace="postgres://inventory:5432", name="public.products")
customers = Dataset(namespace="postgres://crm:5432", name="public.customers")
orders = Dataset(namespace="postgres://sales:5432", name="public.orders")
products = Dataset(namespace="postgres://inventory:5432", name="public.products")
Multiple output tables
多个输出表
daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary")
customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")
aggregate_task = PythonOperator(
task_id="build_daily_aggregates",
python_callable=build_aggregates,
inlets=[customers, orders, products], # All inputs
outlets=[daily_summary, customer_metrics], # All outputs
)
---daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary")
customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")
aggregate_task = PythonOperator(
task_id="build_daily_aggregates",
python_callable=build_aggregates,
inlets=[customers, orders, products], # 所有输入源
outlets=[daily_summary, customer_metrics], # 所有输出目标
)
---Setting Lineage in Custom Operators
在自定义算子中设置血缘
When building custom operators, you have two options:
构建自定义算子时,有两种选择:
Option 1: Implement OpenLineage Methods (Recommended)
选项1:实现OpenLineage方法(推荐)
This is the preferred approach as it gives you full control over lineage extraction:
python
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
def execute(self, context):
# ... perform the actual work ...
self.log.info(f"Processing {self.source_table} -> {self.target_table}")
def get_openlineage_facets_on_complete(self, task_instance):
"""Return lineage after successful execution."""
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
)这是首选方法,因为它能让你完全控制血缘提取:
python
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
def execute(self, context):
# ... 执行实际工作 ...
self.log.info(f"Processing {self.source_table} -> {self.target_table}")
def get_openlineage_facets_on_complete(self, task_instance):
"""任务执行成功后返回血缘信息。"""
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
)Option 2: Set Inlets/Outlets Dynamically
选项2:动态设置Inlets/Outlets
For simpler cases, set lineage within the method (non-deferrable operators only):
executepython
from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset
class MyCustomOperator(BaseOperator):
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
def execute(self, context):
# Set lineage dynamically based on operator parameters
self.inlets = [
Dataset(namespace="warehouse://db", name=self.source_table)
]
self.outlets = [
Dataset(namespace="warehouse://db", name=self.target_table)
]
# ... perform the actual work ...
self.log.info(f"Processing {self.source_table} -> {self.target_table}")对于简单场景,可在方法中设置血缘(仅适用于非可延迟算子):
executepython
from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset
class MyCustomOperator(BaseOperator):
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
def execute(self, context):
# 根据算子参数动态设置血缘
self.inlets = [
Dataset(namespace="warehouse://db", name=self.source_table)
]
self.outlets = [
Dataset(namespace="warehouse://db", name=self.target_table)
]
# ... 执行实际工作 ...
self.log.info(f"Processing {self.source_table} -> {self.target_table}")Dataset Naming Helpers
数据集命名助手
Use the OpenLineage dataset naming helpers to ensure consistent naming across platforms:
python
from openlineage.client.event_v2 import Dataset使用OpenLineage数据集命名助手确保跨平台的命名一致性:
python
from openlineage.client.event_v2 import DatasetSnowflake
Snowflake
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
naming = SnowflakeDatasetNaming(
account_identifier="myorg-myaccount",
database="mydb",
schema="myschema",
table="mytable",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
naming = SnowflakeDatasetNaming(
account_identifier="myorg-myaccount",
database="mydb",
schema="myschema",
table="mytable",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
-> namespace: "snowflake://myorg-myaccount", name: "mydb.myschema.mytable"
-> namespace: "snowflake://myorg-myaccount", name: "mydb.myschema.mytable"
BigQuery
BigQuery
from openlineage.client.naming.bigquery import BigQueryDatasetNaming
naming = BigQueryDatasetNaming(
project="my-project",
dataset="my_dataset",
table="my_table",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
from openlineage.client.naming.bigquery import BigQueryDatasetNaming
naming = BigQueryDatasetNaming(
project="my-project",
dataset="my_dataset",
table="my_table",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
-> namespace: "bigquery", name: "my-project.my_dataset.my_table"
-> namespace: "bigquery", name: "my-project.my_dataset.my_table"
S3
S3
from openlineage.client.naming.s3 import S3DatasetNaming
naming = S3DatasetNaming(bucket="my-bucket", key="path/to/file.parquet")
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
from openlineage.client.naming.s3 import S3DatasetNaming
naming = S3DatasetNaming(bucket="my-bucket", key="path/to/file.parquet")
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
-> namespace: "s3://my-bucket", name: "path/to/file.parquet"
-> namespace: "s3://my-bucket", name: "path/to/file.parquet"
PostgreSQL
PostgreSQL
from openlineage.client.naming.postgres import PostgresDatasetNaming
naming = PostgresDatasetNaming(
host="localhost",
port=5432,
database="mydb",
schema="public",
table="users",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
from openlineage.client.naming.postgres import PostgresDatasetNaming
naming = PostgresDatasetNaming(
host="localhost",
port=5432,
database="mydb",
schema="public",
table="users",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
-> namespace: "postgres://localhost:5432", name: "mydb.public.users"
-> namespace: "postgres://localhost:5432", name: "mydb.public.users"
> **Note:** Always use the naming helpers instead of constructing namespaces manually. If a helper is missing for your platform, check the [OpenLineage repo](https://github.com/OpenLineage/OpenLineage) or request it.
---
> **注意:** 始终使用命名助手,而非手动构造命名空间。如果你的平台没有对应的命名助手,请查看[OpenLineage仓库](https://github.com/OpenLineage/OpenLineage)或提出需求。
---Precedence Rules
优先级规则
OpenLineage uses this precedence for lineage extraction:
- Custom Extractors (highest) - User-registered extractors
- OpenLineage Methods - in operator
get_openlineage_facets_on_* - Hook-Level Lineage - Lineage collected from hooks via
HookLineageCollector - Inlets/Outlets (lowest) - Falls back to these if nothing else extracts lineage
Note: If an extractor or method exists but returns no datasets, OpenLineage will check hook-level lineage, then fall back to inlets/outlets.
OpenLineage使用以下优先级进行血缘提取:
- 自定义提取器(最高)- 用户注册的提取器
- OpenLineage方法 - 算子中的方法
get_openlineage_facets_on_* - Hook级血缘 - 通过从hooks收集的血缘
HookLineageCollector - Inlets/Outlets(最低)- 如果其他方法都无法提取血缘,则回退到该方式
注意: 如果提取器或方法存在但未返回数据集,OpenLineage将检查hook级血缘,然后回退到inlets/outlets。
Best Practices
最佳实践
Use the Naming Helpers
使用命名助手
Always use OpenLineage naming helpers for consistent dataset creation:
python
from openlineage.client.event_v2 import Dataset
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
def snowflake_dataset(schema: str, table: str) -> Dataset:
"""Create a Snowflake Dataset using the naming helper."""
naming = SnowflakeDatasetNaming(
account_identifier="mycompany",
database="analytics",
schema=schema,
table=table,
)
return Dataset(namespace=naming.get_namespace(), name=naming.get_name())始终使用OpenLineage命名助手来创建一致的数据集:
python
from openlineage.client.event_v2 import Dataset
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
def snowflake_dataset(schema: str, table: str) -> Dataset:
"""使用命名助手创建Snowflake Dataset。"""
naming = SnowflakeDatasetNaming(
account_identifier="mycompany",
database="analytics",
schema=schema,
table=table,
)
return Dataset(namespace=naming.get_namespace(), name=naming.get_name())Usage
使用示例
source = snowflake_dataset("raw", "orders")
target = snowflake_dataset("staging", "orders_clean")
undefinedsource = snowflake_dataset("raw", "orders")
target = snowflake_dataset("staging", "orders_clean")
undefinedDocument Your Lineage
记录你的血缘
Add comments explaining the data flow:
python
transform = SqlOperator(
task_id="transform_orders",
sql="...",
# Lineage: Reads raw orders, joins with customers, writes to staging
inlets=[
snowflake_dataset("raw", "orders"),
snowflake_dataset("raw", "customers"),
],
outlets=[
snowflake_dataset("staging", "order_details"),
],
)添加注释说明数据流:
python
transform = SqlOperator(
task_id="transform_orders",
sql="...",
# 血缘:读取原始订单表,关联客户表,写入到 staging 表
inlets=[
snowflake_dataset("raw", "orders"),
snowflake_dataset("raw", "customers"),
],
outlets=[
snowflake_dataset("staging", "order_details"),
],
)Keep Lineage Accurate
保持血缘准确
- Update inlets/outlets when SQL queries change
- Include all tables referenced in JOINs as inlets
- Include all tables written to (including temp tables if relevant)
- 当SQL查询变更时,更新inlets/outlets
- 将JOIN中引用的所有表都作为inlets
- 包含所有写入的表(如果相关,包括临时表)
Limitations
局限性
| Limitation | Workaround |
|---|---|
| Table-level only (no column lineage) | Use OpenLineage methods or custom extractor |
| Overridden by extractors/methods | Only use for operators without extractors |
| Static at DAG parse time | Set dynamically in |
| Deferrable operators lose dynamic lineage | Use OL methods instead; attributes set in |
| 局限性 | 解决方法 |
|---|---|
| 仅支持表级血缘(无列级血缘) | 使用OpenLineage方法或自定义提取器 |
| 会被提取器/方法覆盖 | 仅用于无提取器的算子 |
| 在DAG解析时是静态的 | 在 |
| 可延迟算子会丢失动态血缘 | 使用OL方法替代;在 |
Related Skills
相关技能
- creating-openlineage-extractors: For column-level lineage or complex extraction
- tracing-upstream-lineage: Investigate where data comes from
- tracing-downstream-lineage: Investigate what depends on data
- creating-openlineage-extractors: 用于列级血缘或复杂提取场景
- tracing-upstream-lineage: 追踪数据来源
- tracing-downstream-lineage: 追踪数据的下游依赖