creating-openlineage-extractors
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCreating OpenLineage Extractors
创建OpenLineage提取器
This skill guides you through creating custom OpenLineage extractors to capture lineage from Airflow operators that don't have built-in support.
Reference: See the OpenLineage provider developer guide for the latest patterns and list of supported operators/hooks.
本技能将引导你创建自定义OpenLineage提取器,以从没有内置支持的Airflow算子中捕获血缘信息。
参考文档: 查看OpenLineage提供者开发者指南获取最新模式和支持的算子/钩子列表。
When to Use Each Approach
各方案适用场景
| Scenario | Approach |
|---|---|
| Operator you own/maintain | OpenLineage Methods (recommended, simplest) |
| Third-party operator you can't modify | Custom Extractor |
| Need column-level lineage | OpenLineage Methods or Custom Extractor |
| Complex extraction logic | OpenLineage Methods or Custom Extractor |
| Simple table-level lineage | Inlets/Outlets (simplest, but lowest priority) |
Important: Always prefer OpenLineage methods over custom extractors when possible. Extractors are harder to write, easier to diverge from operator behavior after changes, and harder to debug.
| 场景 | 方案 |
|---|---|
| 你拥有/维护的算子 | OpenLineage Methods(推荐,最简单) |
| 无法修改的第三方算子 | 自定义提取器 |
| 需要列级血缘 | OpenLineage Methods 或自定义提取器 |
| 需要复杂提取逻辑 | OpenLineage Methods 或自定义提取器 |
| 简单表级血缘 | Inlets/Outlets(最简单,但优先级最低) |
重要提示: 只要有可能,优先使用OpenLineage方法而非自定义提取器。提取器编写难度更高,算子变更后更容易与算子行为脱节,且更难调试。
Two Approaches
两种实现方案
1. OpenLineage Methods (Recommended)
1. OpenLineage Methods(推荐)
Use when you can add methods directly to your custom operator. This is the go-to solution for operators you own.
当你拥有算子的所有权时,可直接为自定义算子添加OpenLineage方法。这是针对你所拥有算子的首选方案。
2. Custom Extractors
2. 自定义提取器
Use when you need lineage from third-party or provider operators that you cannot modify.
当你需要从无法修改的第三方或提供者算子中获取血缘信息时,使用此方案。
Approach 1: OpenLineage Methods (Recommended)
方案1:OpenLineage Methods(推荐)
When you own the operator, add OpenLineage methods directly:
python
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
"""Custom operator with built-in OpenLineage support."""
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
self._rows_processed = 0 # Set during execution
def execute(self, context):
# Do the actual work
self._rows_processed = self._process_data()
return self._rows_processed
def get_openlineage_facets_on_start(self):
"""Called when task starts. Return known inputs/outputs."""
# Import locally to avoid circular imports
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
)
def get_openlineage_facets_on_complete(self, task_instance):
"""Called after success. Add runtime metadata."""
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import output_statistics_output_dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[
Dataset(
namespace="postgres://db",
name=self.target_table,
facets={
"outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
rowCount=self._rows_processed
)
},
)
],
)
def get_openlineage_facets_on_failure(self, task_instance):
"""Called after failure. Optional - for partial lineage."""
return None当你拥有算子所有权时,直接添加OpenLineage方法:
python
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
"""Custom operator with built-in OpenLineage support."""
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
self._rows_processed = 0 # Set during execution
def execute(self, context):
# Do the actual work
self._rows_processed = self._process_data()
return self._rows_processed
def get_openlineage_facets_on_start(self):
"""Called when task starts. Return known inputs/outputs."""
# Import locally to avoid circular imports
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
)
def get_openlineage_facets_on_complete(self, task_instance):
"""Called after success. Add runtime metadata."""
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import output_statistics_output_dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[
Dataset(
namespace="postgres://db",
name=self.target_table,
facets={
"outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
rowCount=self._rows_processed
)
},
)
],
)
def get_openlineage_facets_on_failure(self, task_instance):
"""Called after failure. Optional - for partial lineage."""
return NoneOpenLineage Methods Reference
OpenLineage方法参考
| Method | When Called | Required |
|---|---|---|
| Task enters RUNNING | No |
| Task succeeds | No |
| Task fails | No |
Implement only the methods you need. Unimplemented methods fall through to Hook-Level Lineage or inlets/outlets.
| 方法 | 调用时机 | 是否必填 |
|---|---|---|
| 任务进入RUNNING状态时 | 否 |
| 任务成功后 | 否 |
| 任务失败后 | 否 |
仅实现你需要的方法。未实现的方法会降级到Hook级血缘或inlets/outlets。
Approach 2: Custom Extractors
方案2:自定义提取器
Use this approach only when you cannot modify the operator (e.g., third-party or provider operators).
仅当你无法修改算子(例如第三方或提供者算子)时,使用此方案。
Basic Structure
基本结构
python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class MyOperatorExtractor(BaseExtractor):
"""Extract lineage from MyCustomOperator."""
@classmethod
def get_operator_classnames(cls) -> list[str]:
"""Return operator class names this extractor handles."""
return ["MyCustomOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
"""Called BEFORE operator executes. Use for known inputs/outputs."""
# Access operator properties via self.operator
source_table = self.operator.source_table
target_table = self.operator.target_table
return OperatorLineage(
inputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{source_table}",
)
],
outputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{target_table}",
)
],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
"""Called AFTER operator executes. Use for runtime-determined lineage."""
# Access properties set during execution
# Useful for operators that determine outputs at runtime
return Nonepython
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class MyOperatorExtractor(BaseExtractor):
"""Extract lineage from MyCustomOperator."""
@classmethod
def get_operator_classnames(cls) -> list[str]:
"""Return operator class names this extractor handles."""
return ["MyCustomOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
"""Called BEFORE operator executes. Use for known inputs/outputs."""
# Access operator properties via self.operator
source_table = self.operator.source_table
target_table = self.operator.target_table
return OperatorLineage(
inputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{source_table}",
)
],
outputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{target_table}",
)
],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
"""Called AFTER operator executes. Use for runtime-determined lineage."""
# Access properties set during execution
# Useful for operators that determine outputs at runtime
return NoneOperatorLineage Structure
OperatorLineage结构
python
from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
lineage = OperatorLineage(
inputs=[Dataset(namespace="...", name="...")], # Input datasets
outputs=[Dataset(namespace="...", name="...")], # Output datasets
run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")}, # Run metadata
job_facets={}, # Job metadata
)python
from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
lineage = OperatorLineage(
inputs=[Dataset(namespace="...", name="...")], # Input datasets
outputs=[Dataset(namespace="...", name="...")], # Output datasets
run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")}, # Run metadata
job_facets={}, # Job metadata
)Extraction Methods
提取方法
| Method | When Called | Use For |
|---|---|---|
| Before operator runs | Static/known lineage |
| After success | Runtime-determined lineage |
| After failure | Partial lineage on errors |
| 方法 | 调用时机 | 用途 |
|---|---|---|
| 算子运行前 | 静态/已知血缘 |
| 任务成功后 | 运行时确定的血缘 |
| 任务失败后 | 错误场景下的部分血缘 |
Registering Extractors
注册提取器
Option 1: Configuration file ()
airflow.cfgini
[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractorOption 2: Environment variable
bash
AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'Important: The path must be importable from the Airflow worker. Place extractors in your DAGs folder or installed package.
选项1:配置文件()
airflow.cfgini
[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor选项2:环境变量
bash
AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'重要提示: 路径必须能被Airflow worker导入。将提取器放在你的DAG目录或已安装的包中。
Common Patterns
常见模式
SQL Operator Extractor
SQL算子提取器
python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
class MySqlOperatorExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["MySqlOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
sql = self.operator.sql
conn_id = self.operator.conn_id
# Parse SQL to find tables (simplified example)
# In practice, use a SQL parser like sqlglot
inputs, outputs = self._parse_sql(sql)
namespace = f"postgres://{conn_id}"
return OperatorLineage(
inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
job_facets={
"sql": sql_job.SQLJobFacet(query=sql)
},
)
def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
"""Parse SQL to extract table names. Use sqlglot for real parsing."""
# Simplified example - use proper SQL parser in production
inputs = []
outputs = []
# ... parsing logic ...
return inputs, outputspython
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
class MySqlOperatorExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["MySqlOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
sql = self.operator.sql
conn_id = self.operator.conn_id
# Parse SQL to find tables (simplified example)
# In practice, use a SQL parser like sqlglot
inputs, outputs = self._parse_sql(sql)
namespace = f"postgres://{conn_id}"
return OperatorLineage(
inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
job_facets={
"sql": sql_job.SQLJobFacet(query=sql)
},
)
def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
"""Parse SQL to extract table names. Use sqlglot for real parsing."""
# Simplified example - use proper SQL parser in production
inputs = []
outputs = []
# ... parsing logic ...
return inputs, outputsFile Transfer Extractor
文件传输提取器
python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class S3ToSnowflakeExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["S3ToSnowflakeOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
s3_bucket = self.operator.s3_bucket
s3_key = self.operator.s3_key
table = self.operator.table
schema = self.operator.schema
return OperatorLineage(
inputs=[
Dataset(
namespace=f"s3://{s3_bucket}",
name=s3_key,
)
],
outputs=[
Dataset(
namespace="snowflake://myaccount.snowflakecomputing.com",
name=f"{schema}.{table}",
)
],
)python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class S3ToSnowflakeExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["S3ToSnowflakeOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
s3_bucket = self.operator.s3_bucket
s3_key = self.operator.s3_key
table = self.operator.table
schema = self.operator.schema
return OperatorLineage(
inputs=[
Dataset(
namespace=f"s3://{s3_bucket}",
name=s3_key,
)
],
outputs=[
Dataset(
namespace="snowflake://myaccount.snowflakecomputing.com",
name=f"{schema}.{table}",
)
],
)Dynamic Lineage from Execution
基于执行的动态血缘
python
from openlineage.client.event_v2 import Dataset
class DynamicOutputExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["DynamicOutputOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
# Only inputs known before execution
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
# Outputs determined during execution
# Access via operator properties set in execute()
outputs = self.operator.created_tables # Set during execute()
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
outputs=[Dataset(namespace="...", name=t) for t in outputs],
)python
from openlineage.client.event_v2 import Dataset
class DynamicOutputExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["DynamicOutputOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
# Only inputs known before execution
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
# Outputs determined during execution
# Access via operator properties set in execute()
outputs = self.operator.created_tables # Set during execute()
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
outputs=[Dataset(namespace="...", name=t) for t in outputs],
)Common Pitfalls
常见陷阱
1. Circular Imports
1. 循环导入
Problem: Importing Airflow modules at the top level causes circular imports.
python
undefined问题: 在顶层导入Airflow模块会导致循环导入问题。
python
undefined❌ BAD - can cause circular import issues
❌ 错误 - 会导致循环导入问题
from airflow.models import TaskInstance
from openlineage.client.event_v2 import Dataset
class MyExtractor(BaseExtractor):
...
```pythonfrom airflow.models import TaskInstance
from openlineage.client.event_v2 import Dataset
class MyExtractor(BaseExtractor):
...
```python✅ GOOD - import inside methods
✅ 正确 - 在方法内部导入
class MyExtractor(BaseExtractor):
def _execute_extraction(self):
from openlineage.client.event_v2 import Dataset
# ...
undefinedclass MyExtractor(BaseExtractor):
def _execute_extraction(self):
from openlineage.client.event_v2 import Dataset
# ...
undefined2. Wrong Import Path
2. 导入路径错误
Problem: Extractor path doesn't match actual module location.
bash
undefined问题: 提取器路径与实际模块位置不匹配。
bash
undefined❌ Wrong - path doesn't exist
❌ 错误 - 路径不存在
AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'
AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'
✅ Correct - full importable path
✅ 正确 - 完整可导入路径
AIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'
undefinedAIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'
undefined3. Not Handling None
3. 未处理None值
Problem: Extraction fails when operator properties are None.
python
undefined问题: 当算子属性为None时,提取失败。
python
undefined✅ Handle optional properties
✅ 处理可选属性
def _execute_extraction(self) -> OperatorLineage | None:
if not self.operator.source_table:
return None # Skip extraction
return OperatorLineage(...)
---def _execute_extraction(self) -> OperatorLineage | None:
if not self.operator.source_table:
return None # 跳过提取
return OperatorLineage(...)
---Testing Extractors
测试提取器
Unit Testing
单元测试
python
import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor
def test_extractor():
# Mock the operator
operator = MagicMock()
operator.source_table = "input_table"
operator.target_table = "output_table"
# Create extractor
extractor = MyOperatorExtractor(operator)
# Test extraction
lineage = extractor._execute_extraction()
assert len(lineage.inputs) == 1
assert lineage.inputs[0].name == "input_table"
assert len(lineage.outputs) == 1
assert lineage.outputs[0].name == "output_table"python
import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor
def test_extractor():
# Mock the operator
operator = MagicMock()
operator.source_table = "input_table"
operator.target_table = "output_table"
# Create extractor
extractor = MyOperatorExtractor(operator)
# Test extraction
lineage = extractor._execute_extraction()
assert len(lineage.inputs) == 1
assert lineage.inputs[0].name == "input_table"
assert len(lineage.outputs) == 1
assert lineage.outputs[0].name == "output_table"Precedence Rules
优先级规则
OpenLineage checks for lineage in this order:
- Custom Extractors (highest priority)
- OpenLineage Methods on operator
- Hook-Level Lineage (from )
HookLineageCollector - Inlets/Outlets (lowest priority)
If a custom extractor exists, it overrides built-in extraction and inlets/outlets.
OpenLineage按以下顺序检查血缘:
- 自定义提取器(最高优先级)
- 算子上的OpenLineage方法
- Hook级血缘(来自)
HookLineageCollector - Inlets/Outlets(最低优先级)
如果存在自定义提取器,它会覆盖内置提取和inlets/outlets。
Related Skills
相关技能
- annotating-task-lineage: For simple table-level lineage with inlets/outlets
- tracing-upstream-lineage: Investigate data origins
- tracing-downstream-lineage: Investigate data dependencies
- annotating-task-lineage: 使用inlets/outlets实现简单表级血缘
- tracing-upstream-lineage: 调查数据来源
- tracing-downstream-lineage: 调查数据依赖关系