creating-openlineage-extractors

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Creating OpenLineage Extractors

创建OpenLineage提取器

This skill guides you through creating custom OpenLineage extractors to capture lineage from Airflow operators that don't have built-in support.
Reference: See the OpenLineage provider developer guide for the latest patterns and list of supported operators/hooks.
本技能将引导你创建自定义OpenLineage提取器,以从没有内置支持的Airflow算子中捕获血缘信息。
参考文档: 查看OpenLineage提供者开发者指南获取最新模式和支持的算子/钩子列表。

When to Use Each Approach

各方案适用场景

ScenarioApproach
Operator you own/maintainOpenLineage Methods (recommended, simplest)
Third-party operator you can't modifyCustom Extractor
Need column-level lineageOpenLineage Methods or Custom Extractor
Complex extraction logicOpenLineage Methods or Custom Extractor
Simple table-level lineageInlets/Outlets (simplest, but lowest priority)
Important: Always prefer OpenLineage methods over custom extractors when possible. Extractors are harder to write, easier to diverge from operator behavior after changes, and harder to debug.

场景方案
你拥有/维护的算子OpenLineage Methods(推荐,最简单)
无法修改的第三方算子自定义提取器
需要列级血缘OpenLineage Methods 或自定义提取器
需要复杂提取逻辑OpenLineage Methods 或自定义提取器
简单表级血缘Inlets/Outlets(最简单,但优先级最低)
重要提示: 只要有可能,优先使用OpenLineage方法而非自定义提取器。提取器编写难度更高,算子变更后更容易与算子行为脱节,且更难调试。

Two Approaches

两种实现方案

1. OpenLineage Methods (Recommended)

1. OpenLineage Methods(推荐)

Use when you can add methods directly to your custom operator. This is the go-to solution for operators you own.
当你拥有算子的所有权时,可直接为自定义算子添加OpenLineage方法。这是针对你所拥有算子的首选方案

2. Custom Extractors

2. 自定义提取器

Use when you need lineage from third-party or provider operators that you cannot modify.

当你需要从无法修改的第三方或提供者算子中获取血缘信息时,使用此方案。

Approach 1: OpenLineage Methods (Recommended)

方案1:OpenLineage Methods(推荐)

When you own the operator, add OpenLineage methods directly:
python
from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    """Custom operator with built-in OpenLineage support."""

    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table
        self._rows_processed = 0  # Set during execution

    def execute(self, context):
        # Do the actual work
        self._rows_processed = self._process_data()
        return self._rows_processed

    def get_openlineage_facets_on_start(self):
        """Called when task starts. Return known inputs/outputs."""
        # Import locally to avoid circular imports
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
        )

    def get_openlineage_facets_on_complete(self, task_instance):
        """Called after success. Add runtime metadata."""
        from openlineage.client.event_v2 import Dataset
        from openlineage.client.facet_v2 import output_statistics_output_dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[
                Dataset(
                    namespace="postgres://db",
                    name=self.target_table,
                    facets={
                        "outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
                            rowCount=self._rows_processed
                        )
                    },
                )
            ],
        )

    def get_openlineage_facets_on_failure(self, task_instance):
        """Called after failure. Optional - for partial lineage."""
        return None
当你拥有算子所有权时,直接添加OpenLineage方法:
python
from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    """Custom operator with built-in OpenLineage support."""

    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table
        self._rows_processed = 0  # Set during execution

    def execute(self, context):
        # Do the actual work
        self._rows_processed = self._process_data()
        return self._rows_processed

    def get_openlineage_facets_on_start(self):
        """Called when task starts. Return known inputs/outputs."""
        # Import locally to avoid circular imports
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
        )

    def get_openlineage_facets_on_complete(self, task_instance):
        """Called after success. Add runtime metadata."""
        from openlineage.client.event_v2 import Dataset
        from openlineage.client.facet_v2 import output_statistics_output_dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[
                Dataset(
                    namespace="postgres://db",
                    name=self.target_table,
                    facets={
                        "outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
                            rowCount=self._rows_processed
                        )
                    },
                )
            ],
        )

    def get_openlineage_facets_on_failure(self, task_instance):
        """Called after failure. Optional - for partial lineage."""
        return None

OpenLineage Methods Reference

OpenLineage方法参考

MethodWhen CalledRequired
get_openlineage_facets_on_start()
Task enters RUNNINGNo
get_openlineage_facets_on_complete(ti)
Task succeedsNo
get_openlineage_facets_on_failure(ti)
Task failsNo
Implement only the methods you need. Unimplemented methods fall through to Hook-Level Lineage or inlets/outlets.

方法调用时机是否必填
get_openlineage_facets_on_start()
任务进入RUNNING状态时
get_openlineage_facets_on_complete(ti)
任务成功后
get_openlineage_facets_on_failure(ti)
任务失败后
仅实现你需要的方法。未实现的方法会降级到Hook级血缘或inlets/outlets。

Approach 2: Custom Extractors

方案2:自定义提取器

Use this approach only when you cannot modify the operator (e.g., third-party or provider operators).
仅当你无法修改算子(例如第三方或提供者算子)时,使用此方案。

Basic Structure

基本结构

python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class MyOperatorExtractor(BaseExtractor):
    """Extract lineage from MyCustomOperator."""

    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        """Return operator class names this extractor handles."""
        return ["MyCustomOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        """Called BEFORE operator executes. Use for known inputs/outputs."""
        # Access operator properties via self.operator
        source_table = self.operator.source_table
        target_table = self.operator.target_table

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{source_table}",
                )
            ],
            outputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{target_table}",
                )
            ],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        """Called AFTER operator executes. Use for runtime-determined lineage."""
        # Access properties set during execution
        # Useful for operators that determine outputs at runtime
        return None
python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class MyOperatorExtractor(BaseExtractor):
    """Extract lineage from MyCustomOperator."""

    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        """Return operator class names this extractor handles."""
        return ["MyCustomOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        """Called BEFORE operator executes. Use for known inputs/outputs."""
        # Access operator properties via self.operator
        source_table = self.operator.source_table
        target_table = self.operator.target_table

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{source_table}",
                )
            ],
            outputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{target_table}",
                )
            ],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        """Called AFTER operator executes. Use for runtime-determined lineage."""
        # Access properties set during execution
        # Useful for operators that determine outputs at runtime
        return None

OperatorLineage Structure

OperatorLineage结构

python
from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job

lineage = OperatorLineage(
    inputs=[Dataset(namespace="...", name="...")],      # Input datasets
    outputs=[Dataset(namespace="...", name="...")],     # Output datasets
    run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")},  # Run metadata
    job_facets={},                                      # Job metadata
)
python
from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job

lineage = OperatorLineage(
    inputs=[Dataset(namespace="...", name="...")],      # Input datasets
    outputs=[Dataset(namespace="...", name="...")],     # Output datasets
    run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")},  # Run metadata
    job_facets={},                                      # Job metadata
)

Extraction Methods

提取方法

MethodWhen CalledUse For
_execute_extraction()
Before operator runsStatic/known lineage
extract_on_complete(task_instance)
After successRuntime-determined lineage
extract_on_failure(task_instance)
After failurePartial lineage on errors
方法调用时机用途
_execute_extraction()
算子运行前静态/已知血缘
extract_on_complete(task_instance)
任务成功后运行时确定的血缘
extract_on_failure(task_instance)
任务失败后错误场景下的部分血缘

Registering Extractors

注册提取器

Option 1: Configuration file (
airflow.cfg
)
ini
[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor
Option 2: Environment variable
bash
AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'
Important: The path must be importable from the Airflow worker. Place extractors in your DAGs folder or installed package.

选项1:配置文件(
airflow.cfg
ini
[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor
选项2:环境变量
bash
AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'
重要提示: 路径必须能被Airflow worker导入。将提取器放在你的DAG目录或已安装的包中。

Common Patterns

常见模式

SQL Operator Extractor

SQL算子提取器

python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job


class MySqlOperatorExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["MySqlOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        sql = self.operator.sql
        conn_id = self.operator.conn_id

        # Parse SQL to find tables (simplified example)
        # In practice, use a SQL parser like sqlglot
        inputs, outputs = self._parse_sql(sql)

        namespace = f"postgres://{conn_id}"

        return OperatorLineage(
            inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
            outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
            job_facets={
                "sql": sql_job.SQLJobFacet(query=sql)
            },
        )

    def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
        """Parse SQL to extract table names. Use sqlglot for real parsing."""
        # Simplified example - use proper SQL parser in production
        inputs = []
        outputs = []
        # ... parsing logic ...
        return inputs, outputs
python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job


class MySqlOperatorExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["MySqlOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        sql = self.operator.sql
        conn_id = self.operator.conn_id

        # Parse SQL to find tables (simplified example)
        # In practice, use a SQL parser like sqlglot
        inputs, outputs = self._parse_sql(sql)

        namespace = f"postgres://{conn_id}"

        return OperatorLineage(
            inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
            outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
            job_facets={
                "sql": sql_job.SQLJobFacet(query=sql)
            },
        )

    def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
        """Parse SQL to extract table names. Use sqlglot for real parsing."""
        # Simplified example - use proper SQL parser in production
        inputs = []
        outputs = []
        # ... parsing logic ...
        return inputs, outputs

File Transfer Extractor

文件传输提取器

python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class S3ToSnowflakeExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["S3ToSnowflakeOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        s3_bucket = self.operator.s3_bucket
        s3_key = self.operator.s3_key
        table = self.operator.table
        schema = self.operator.schema

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace=f"s3://{s3_bucket}",
                    name=s3_key,
                )
            ],
            outputs=[
                Dataset(
                    namespace="snowflake://myaccount.snowflakecomputing.com",
                    name=f"{schema}.{table}",
                )
            ],
        )
python
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class S3ToSnowflakeExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["S3ToSnowflakeOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        s3_bucket = self.operator.s3_bucket
        s3_key = self.operator.s3_key
        table = self.operator.table
        schema = self.operator.schema

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace=f"s3://{s3_bucket}",
                    name=s3_key,
                )
            ],
            outputs=[
                Dataset(
                    namespace="snowflake://myaccount.snowflakecomputing.com",
                    name=f"{schema}.{table}",
                )
            ],
        )

Dynamic Lineage from Execution

基于执行的动态血缘

python
from openlineage.client.event_v2 import Dataset


class DynamicOutputExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["DynamicOutputOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        # Only inputs known before execution
        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        # Outputs determined during execution
        # Access via operator properties set in execute()
        outputs = self.operator.created_tables  # Set during execute()

        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
            outputs=[Dataset(namespace="...", name=t) for t in outputs],
        )

python
from openlineage.client.event_v2 import Dataset


class DynamicOutputExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["DynamicOutputOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        # Only inputs known before execution
        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        # Outputs determined during execution
        # Access via operator properties set in execute()
        outputs = self.operator.created_tables  # Set during execute()

        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
            outputs=[Dataset(namespace="...", name=t) for t in outputs],
        )

Common Pitfalls

常见陷阱

1. Circular Imports

1. 循环导入

Problem: Importing Airflow modules at the top level causes circular imports.
python
undefined
问题: 在顶层导入Airflow模块会导致循环导入问题。
python
undefined

❌ BAD - can cause circular import issues

❌ 错误 - 会导致循环导入问题

from airflow.models import TaskInstance from openlineage.client.event_v2 import Dataset
class MyExtractor(BaseExtractor): ...

```python
from airflow.models import TaskInstance from openlineage.client.event_v2 import Dataset
class MyExtractor(BaseExtractor): ...

```python

✅ GOOD - import inside methods

✅ 正确 - 在方法内部导入

class MyExtractor(BaseExtractor): def _execute_extraction(self): from openlineage.client.event_v2 import Dataset # ...
undefined
class MyExtractor(BaseExtractor): def _execute_extraction(self): from openlineage.client.event_v2 import Dataset # ...
undefined

2. Wrong Import Path

2. 导入路径错误

Problem: Extractor path doesn't match actual module location.
bash
undefined
问题: 提取器路径与实际模块位置不匹配。
bash
undefined

❌ Wrong - path doesn't exist

❌ 错误 - 路径不存在

AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'
AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'

✅ Correct - full importable path

✅ 正确 - 完整可导入路径

AIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'
undefined
AIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'
undefined

3. Not Handling None

3. 未处理None值

Problem: Extraction fails when operator properties are None.
python
undefined
问题: 当算子属性为None时,提取失败。
python
undefined

✅ Handle optional properties

✅ 处理可选属性

def _execute_extraction(self) -> OperatorLineage | None: if not self.operator.source_table: return None # Skip extraction
return OperatorLineage(...)

---
def _execute_extraction(self) -> OperatorLineage | None: if not self.operator.source_table: return None # 跳过提取
return OperatorLineage(...)

---

Testing Extractors

测试提取器

Unit Testing

单元测试

python
import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor


def test_extractor():
    # Mock the operator
    operator = MagicMock()
    operator.source_table = "input_table"
    operator.target_table = "output_table"

    # Create extractor
    extractor = MyOperatorExtractor(operator)

    # Test extraction
    lineage = extractor._execute_extraction()

    assert len(lineage.inputs) == 1
    assert lineage.inputs[0].name == "input_table"
    assert len(lineage.outputs) == 1
    assert lineage.outputs[0].name == "output_table"

python
import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor


def test_extractor():
    # Mock the operator
    operator = MagicMock()
    operator.source_table = "input_table"
    operator.target_table = "output_table"

    # Create extractor
    extractor = MyOperatorExtractor(operator)

    # Test extraction
    lineage = extractor._execute_extraction()

    assert len(lineage.inputs) == 1
    assert lineage.inputs[0].name == "input_table"
    assert len(lineage.outputs) == 1
    assert lineage.outputs[0].name == "output_table"

Precedence Rules

优先级规则

OpenLineage checks for lineage in this order:
  1. Custom Extractors (highest priority)
  2. OpenLineage Methods on operator
  3. Hook-Level Lineage (from
    HookLineageCollector
    )
  4. Inlets/Outlets (lowest priority)
If a custom extractor exists, it overrides built-in extraction and inlets/outlets.

OpenLineage按以下顺序检查血缘:
  1. 自定义提取器(最高优先级)
  2. 算子上的OpenLineage方法
  3. Hook级血缘(来自
    HookLineageCollector
  4. Inlets/Outlets(最低优先级)
如果存在自定义提取器,它会覆盖内置提取和inlets/outlets。

Related Skills

相关技能

  • annotating-task-lineage: For simple table-level lineage with inlets/outlets
  • tracing-upstream-lineage: Investigate data origins
  • tracing-downstream-lineage: Investigate data dependencies
  • annotating-task-lineage: 使用inlets/outlets实现简单表级血缘
  • tracing-upstream-lineage: 调查数据来源
  • tracing-downstream-lineage: 调查数据依赖关系