tracing-upstream-lineage
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseUpstream Lineage: Sources
上游数据血缘:数据源
Trace the origins of data - answer "Where does this data come from?"
追踪数据的起源——回答“这些数据来自哪里?”
Lineage Investigation
血缘调查
Step 1: Identify the Target Type
步骤1:确定目标类型
Determine what we're tracing:
- Table: Trace what populates this table
- Column: Trace where this specific column comes from
- DAG: Trace what data sources this DAG reads from
明确我们要追踪的对象:
- 表:追踪哪些数据填充了该表
- 列:追踪该特定列的来源
- DAG:追踪该DAG读取的数据源
Step 2: Find the Producing DAG
步骤2:找到生成数据的DAG
Tables are typically populated by Airflow DAGs. Find the connection:
-
Search DAGs by name: Useand look for DAG names matching the table name
list_dags- ->
load_customerstablecustomers - ->
etl_daily_orderstableorders
-
Explore DAG source code: Useto read the DAG definition
get_dag_source- Look for INSERT, MERGE, CREATE TABLE statements
- Find the target table in the code
-
Check DAG tasks: Useto see what operations the DAG performs
list_tasks
表通常由Airflow DAG填充。找到关联关系:
-
按名称搜索DAG:使用查找与表名匹配的DAG名称
list_dags- ->
load_customers表customers - ->
etl_daily_orders表orders
-
查看DAG源代码:使用读取DAG定义
get_dag_source- 查找INSERT、MERGE、CREATE TABLE语句
- 在代码中找到目标表
-
检查DAG任务:使用查看DAG执行的操作
list_tasks
Step 3: Trace Data Sources
步骤3:追踪数据源
From the DAG code, identify source tables and systems:
SQL Sources (look for FROM clauses):
python
undefined从DAG代码中识别源表和系统:
SQL数据源(查找FROM子句):
python
undefinedIn DAG code:
In DAG code:
SELECT * FROM source_schema.source_table # <- This is an upstream source
**External Sources** (look for connection references):
- `S3Operator` -> S3 bucket source
- `PostgresOperator` -> Postgres database source
- `SalesforceOperator` -> Salesforce API source
- `HttpOperator` -> REST API source
**File Sources**:
- CSV/Parquet files in object storage
- SFTP drops
- Local file pathsSELECT * FROM source_schema.source_table # <- This is an upstream source
**外部数据源**(查找连接引用):
- `S3Operator` -> S3存储桶数据源
- `PostgresOperator` -> Postgres数据库数据源
- `SalesforceOperator` -> Salesforce API数据源
- `HttpOperator` -> REST API数据源
**文件数据源**:
- 对象存储中的CSV/Parquet文件
- SFTP上传文件
- 本地文件路径Step 4: Build the Lineage Chain
步骤4:构建血缘链
Recursively trace each source:
TARGET: analytics.orders_daily
^
+-- DAG: etl_daily_orders
^
+-- SOURCE: raw.orders (table)
| ^
| +-- DAG: ingest_orders
| ^
| +-- SOURCE: Salesforce API (external)
|
+-- SOURCE: dim.customers (table)
^
+-- DAG: load_customers
^
+-- SOURCE: PostgreSQL (external DB)递归追踪每个数据源:
TARGET: analytics.orders_daily
^
+-- DAG: etl_daily_orders
^
+-- SOURCE: raw.orders (table)
| ^
| +-- DAG: ingest_orders
| ^
| +-- SOURCE: Salesforce API (external)
|
+-- SOURCE: dim.customers (table)
^
+-- DAG: load_customers
^
+-- SOURCE: PostgreSQL (external DB)Step 5: Check Source Health
步骤5:检查数据源健康状况
For each upstream source:
- Tables: Check freshness with the checking-freshness skill
- DAGs: Check recent run status with
get_dag_stats - External systems: Note connection info from DAG code
针对每个上游数据源:
- 表:使用checking-freshness技能检查数据新鲜度
- DAG:使用检查最近的运行状态
get_dag_stats - 外部系统:从DAG代码中查看连接信息
Lineage for Columns
列级血缘追踪
When tracing a specific column:
- Find the column in the target table schema
- Search DAG source code for references to that column name
- Trace through transformations:
- Direct mappings:
source.col AS target_col - Transformations:
COALESCE(a.col, b.col) AS target_col - Aggregations:
SUM(detail.amount) AS total_amount
- Direct mappings:
当追踪特定列时:
- 在目标表的 schema 中找到该列
- 在DAG源代码中搜索该列名的引用
- 追踪数据转换过程:
- 直接映射:
source.col AS target_col - 转换操作:
COALESCE(a.col, b.col) AS target_col - 聚合操作:
SUM(detail.amount) AS total_amount
- 直接映射:
Output: Lineage Report
输出:血缘报告
Summary
摘要
One-line answer: "This table is populated by DAG X from sources Y and Z"
一句话总结:“该表由DAG X从数据源Y和Z填充”
Lineage Diagram
血缘关系图
[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
| |
DAG: ingest_sfdc DAG: transform_sales[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
| |
DAG: ingest_sfdc DAG: transform_salesSource Details
数据源详情
| Source | Type | Connection | Freshness | Owner |
|---|---|---|---|---|
| raw.orders | Table | Internal | 2h ago | data-team |
| Salesforce | API | salesforce_conn | Real-time | sales-ops |
| 数据源 | 类型 | 连接信息 | 新鲜度 | 负责人 |
|---|---|---|---|---|
| raw.orders | 表 | 内部 | 2小时前 | 数据团队 |
| Salesforce | API | salesforce_conn | 实时 | 销售运营团队 |
Transformation Chain
转换链
Describe how data flows and transforms:
- Raw data lands in via Salesforce API sync
raw.orders - DAG cleans and dedupes into
transform_ordersstg.orders - DAG joins with dimensions into
build_order_factsfct.orders
描述数据的流动和转换过程:
- 原始数据通过Salesforce API同步到
raw.orders - DAG 清洗并去重数据,存入
transform_ordersstg.orders - DAG 将数据与维度表关联,存入
build_order_factsfct.orders
Data Quality Implications
数据质量影响
- Single points of failure?
- Stale upstream sources?
- Complex transformation chains that could break?
- 是否存在单点故障?
- 上游数据源是否过时?
- 复杂的转换链是否可能断裂?
Related Skills
相关技能
- Check source freshness: checking-freshness skill
- Debug source DAG: debugging-dags skill
- Trace downstream impacts: tracing-downstream-lineage skill
- Add manual lineage annotations: annotating-task-lineage skill
- Build custom lineage extractors: creating-openlineage-extractors skill
- 检查数据源新鲜度:checking-freshness技能
- 调试源DAG:debugging-dags技能
- 追踪下游影响:tracing-downstream-lineage技能
- 添加手动血缘注释:annotating-task-lineage技能
- 构建自定义血缘提取器:creating-openlineage-extractors技能