data-provenance

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Data Provenance & Lineage

数据溯源与血缘

Track where data comes from, how it transforms, and where it goes—essential for trust, compliance, and debugging.
追踪数据的来源、转换方式以及流向——这是建立数据信任、合规性和调试的关键。

When to Use

适用场景

Use this skill when:
  • Auditing data for compliance (GDPR, HIPAA, SOX, CCPA)
  • Debugging data quality issues ("Where did this bad data come from?")
  • Understanding impact of schema changes ("What breaks if I change this field?")
  • Building data catalogs or governance systems
  • Tracking sensitive data (PII, PHI) through systems
  • Responding to data deletion requests (GDPR "right to be forgotten")

在以下场景中使用此技能:
  • 为合规性审计数据(GDPR、HIPAA、SOX、CCPA)
  • 调试数据质量问题(如“这条错误数据来自哪里?”)
  • 理解 schema 变更的影响(如“如果我修改这个字段,会有哪些内容受影响?”)
  • 构建数据目录或治理系统
  • 追踪敏感数据(PII、PHI)在系统中的流转
  • 响应数据删除请求(GDPR“被遗忘权”)

What is Data Provenance?

什么是数据溯源?

Provenance: The complete history and lineage of a data element
Question: "Where does the revenue number in this dashboard come from?"

Answer (with provenance):
Dashboard.revenue (computed 2026-01-21 08:00)
  ← warehouse.daily_sales.total (aggregated 2026-01-21 02:00)
    ← etl_pipeline.transform_sales (ran 2026-01-21 01:30)
      ← production_db.orders.amount (order #12345, created 2026-01-20 15:23)
        ← stripe_api.charge (charge_id: ch_abc123, processed 2026-01-20 15:23)
          ← user input (customer: cust_xyz, card ending 4242)
Key questions provenance answers:
  1. Where did this data come from? (source)
  2. When was it created/updated? (timestamp)
  3. How was it transformed? (logic, code version)
  4. Who created/modified it? (user, system, process)
  5. Why does it have this value? (business logic)
  6. What depends on it? (downstream consumers)

溯源:数据元素的完整历史与血缘
问题:“这个仪表板中的收入数据来自哪里?”

溯源回答:
Dashboard.revenue(计算于2026-01-21 08:00)
  ← warehouse.daily_sales.total(聚合于2026-01-21 02:00)
    ← etl_pipeline.transform_sales(运行于2026-01-21 01:30)
      ← production_db.orders.amount(订单#12345,创建于2026-01-20 15:23)
        ← stripe_api.charge(charge_id: ch_abc123,处理于2026-01-20 15:23)
          ← 用户输入(客户: cust_xyz,卡号尾号4242)
溯源能解答的关键问题
  1. 来源:这条数据来自哪里?(数据源)
  2. 时间:它何时被创建/更新?(时间戳)
  3. 转换:它是如何被转换的?(逻辑、代码版本)
  4. 操作者:谁创建/修改了它?(用户、系统、流程)
  5. 原因:它为何是这个值?(业务逻辑)
  6. 依赖:哪些内容依赖于它?(下游消费者)

Levels of Provenance Tracking

溯源追踪的层级

Level 1: Table-Level Lineage

层级1:表级血缘

What: Track which tables feed into other tables
┌────────────┐
│ orders     │──┐
└────────────┘  │
                ├──► ┌──────────────┐
┌────────────┐  │    │ daily_sales  │
│ customers  │──┘    └──────────────┘
└────────────┘
Implementation: Metadata table
sql
CREATE TABLE table_lineage (
  downstream_table VARCHAR(255),
  upstream_table VARCHAR(255),
  relationship_type VARCHAR(50),  -- 'direct_copy', 'join', 'aggregate'
  created_at TIMESTAMPTZ DEFAULT NOW(),

  PRIMARY KEY (downstream_table, upstream_table)
);

INSERT INTO table_lineage VALUES
  ('daily_sales', 'orders', 'aggregate'),
  ('daily_sales', 'customers', 'join');
Query: "What tables does daily_sales depend on?"
sql
SELECT upstream_table
FROM table_lineage
WHERE downstream_table = 'daily_sales';
-- Result: orders, customers
Query: "What tables depend on orders?"
sql
SELECT downstream_table
FROM table_lineage
WHERE upstream_table = 'orders';
-- Result: daily_sales, weekly_report, customer_lifetime_value
定义:追踪哪些表作为输入流向其他表
┌────────────┐
│ orders     │──┐
└────────────┘  │
                ├──► ┌──────────────┐
┌────────────┐  │    │ daily_sales  │
│ customers  │──┘    └──────────────┘
└────────────┘
实现:元数据表
sql
CREATE TABLE table_lineage (
  downstream_table VARCHAR(255),
  upstream_table VARCHAR(255),
  relationship_type VARCHAR(50),  -- 'direct_copy', 'join', 'aggregate'
  created_at TIMESTAMPTZ DEFAULT NOW(),

  PRIMARY KEY (downstream_table, upstream_table)
);

INSERT INTO table_lineage VALUES
  ('daily_sales', 'orders', 'aggregate'),
  ('daily_sales', 'customers', 'join');
查询:“daily_sales依赖哪些表?”
sql
SELECT upstream_table
FROM table_lineage
WHERE downstream_table = 'daily_sales';
-- 结果: orders, customers
查询:“哪些表依赖orders?”
sql
SELECT downstream_table
FROM table_lineage
WHERE upstream_table = 'orders';
-- 结果: daily_sales, weekly_report, customer_lifetime_value

Level 2: Column-Level Lineage

层级2:列级血缘

What: Track which columns feed into which columns
orders.amount ──┐
orders.tax    ──┼──► daily_sales.total_revenue
orders.shipping─┘
Implementation:
sql
CREATE TABLE column_lineage (
  downstream_table VARCHAR(255),
  downstream_column VARCHAR(255),
  upstream_table VARCHAR(255),
  upstream_column VARCHAR(255),
  transformation TEXT,  -- SQL or description
  created_at TIMESTAMPTZ DEFAULT NOW(),

  PRIMARY KEY (downstream_table, downstream_column, upstream_table, upstream_column)
);

INSERT INTO column_lineage VALUES
  ('daily_sales', 'total_revenue', 'orders', 'amount', 'SUM(amount + tax + shipping)'),
  ('daily_sales', 'order_count', 'orders', 'id', 'COUNT(id)'),
  ('daily_sales', 'customer_name', 'customers', 'name', 'LEFT JOIN on customer_id');
Query: "Where does daily_sales.total_revenue come from?"
sql
SELECT
  upstream_table,
  upstream_column,
  transformation
FROM column_lineage
WHERE downstream_table = 'daily_sales'
  AND downstream_column = 'total_revenue';
定义:追踪哪些列作为输入流向哪些列
orders.amount ──┐
orders.tax    ──┼──► daily_sales.total_revenue
orders.shipping─┘
实现:
sql
CREATE TABLE column_lineage (
  downstream_table VARCHAR(255),
  downstream_column VARCHAR(255),
  upstream_table VARCHAR(255),
  upstream_column VARCHAR(255),
  transformation TEXT,  -- SQL或描述
  created_at TIMESTAMPTZ DEFAULT NOW(),

  PRIMARY KEY (downstream_table, downstream_column, upstream_table, upstream_column)
);

INSERT INTO column_lineage VALUES
  ('daily_sales', 'total_revenue', 'orders', 'amount', 'SUM(amount + tax + shipping)'),
  ('daily_sales', 'order_count', 'orders', 'id', 'COUNT(id)'),
  ('daily_sales', 'customer_name', 'customers', 'name', 'LEFT JOIN on customer_id');
查询:“daily_sales.total_revenue来自哪里?”
sql
SELECT
  upstream_table,
  upstream_column,
  transformation
FROM column_lineage
WHERE downstream_table = 'daily_sales'
  AND downstream_column = 'total_revenue';

Level 3: Row-Level Lineage

层级3:行级血缘

What: Track individual record transformations
orders.id=12345 (amount=$100) ──► daily_sales.id=67 (date=2026-01-20, total=$100)
orders.id=12346 (amount=$50)  ──┘
Implementation: Lineage table
sql
CREATE TABLE row_lineage (
  id BIGSERIAL PRIMARY KEY,
  downstream_table VARCHAR(255),
  downstream_pk BIGINT,
  upstream_table VARCHAR(255),
  upstream_pk BIGINT,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- After ETL run
INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk)
SELECT
  'daily_sales',
  ds.id,
  'orders',
  o.id
FROM daily_sales ds
JOIN orders o ON DATE(o.created_at) = ds.sale_date;
Query: "What orders contributed to daily_sales row 67?"
sql
SELECT o.*
FROM row_lineage rl
JOIN orders o ON rl.upstream_pk = o.id
WHERE rl.downstream_table = 'daily_sales'
  AND rl.downstream_pk = 67;
定义:追踪单个记录的转换过程
orders.id=12345 (amount=$100) ──► daily_sales.id=67 (date=2026-01-20, total=$100)
orders.id=12346 (amount=$50)  ──┘
实现:血缘表
sql
CREATE TABLE row_lineage (
  id BIGSERIAL PRIMARY KEY,
  downstream_table VARCHAR(255),
  downstream_pk BIGINT,
  upstream_table VARCHAR(255),
  upstream_pk BIGINT,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

-- ETL运行后
INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk)
SELECT
  'daily_sales',
  ds.id,
  'orders',
  o.id
FROM daily_sales ds
JOIN orders o ON DATE(o.created_at) = ds.sale_date;
查询:“哪些订单构成了daily_sales的第67行数据?”
sql
SELECT o.*
FROM row_lineage rl
JOIN orders o ON rl.upstream_pk = o.id
WHERE rl.downstream_table = 'daily_sales'
  AND rl.downstream_pk = 67;

Level 4: Value-Level Lineage (Finest)

层级4:值级血缘(最细粒度)

What: Track transformations at the field value level
order.amount = $100
order.tax = $10
order.shipping = $5
  ↓ (SUM transformation)
daily_sales.total_revenue = $115
Implementation: Event log
sql
CREATE TABLE value_lineage (
  id BIGSERIAL PRIMARY KEY,
  entity_type VARCHAR(50),
  entity_id BIGINT,
  field_name VARCHAR(100),
  old_value TEXT,
  new_value TEXT,
  transformation TEXT,
  source_values JSONB,  -- Array of source values
  created_at TIMESTAMPTZ DEFAULT NOW(),
  created_by VARCHAR(255)  -- User or process
);

-- Example: Revenue calculation
INSERT INTO value_lineage VALUES (
  DEFAULT,
  'daily_sales',
  67,
  'total_revenue',
  NULL,
  '115.00',
  'SUM(orders.amount + orders.tax + orders.shipping) WHERE date = 2026-01-20',
  '[{"table": "orders", "id": 12345, "amount": 100, "tax": 10, "shipping": 5}]',
  NOW(),
  'etl_pipeline_v1.2.3'
);

定义:追踪字段值层面的转换
order.amount = $100
order.tax = $10
order.shipping = $5
  ↓ (SUM转换)
daily_sales.total_revenue = $115
实现:事件日志
sql
CREATE TABLE value_lineage (
  id BIGSERIAL PRIMARY KEY,
  entity_type VARCHAR(50),
  entity_id BIGINT,
  field_name VARCHAR(100),
  old_value TEXT,
  new_value TEXT,
  transformation TEXT,
  source_values JSONB,  -- 源值数组
  created_at TIMESTAMPTZ DEFAULT NOW(),
  created_by VARCHAR(255)  -- 用户或流程
);

-- 示例:收入计算
INSERT INTO value_lineage VALUES (
  DEFAULT,
  'daily_sales',
  67,
  'total_revenue',
  NULL,
  '115.00',
  'SUM(orders.amount + orders.tax + orders.shipping) WHERE date = 2026-01-20',
  '[{"table": "orders", "id": 12345, "amount": 100, "tax": 10, "shipping": 5}]',
  NOW(),
  'etl_pipeline_v1.2.3'
);

Provenance Capture Methods

溯源捕获方法

Method 1: Code Instrumentation

方法1:代码埋点

Manual tracking in ETL code:
python
def etl_orders_to_daily_sales():
    # Extract
    orders = db.query("SELECT * FROM orders WHERE date = ?", yesterday)

    # Transform
    daily_sales = {}
    for order in orders:
        date = order['created_at'].date()
        if date not in daily_sales:
            daily_sales[date] = {'total': 0, 'count': 0, 'order_ids': []}

        daily_sales[date]['total'] += order['amount']
        daily_sales[date]['count'] += 1
        daily_sales[date]['order_ids'].append(order['id'])

    # Load with lineage tracking
    for date, metrics in daily_sales.items():
        ds_id = db.insert(
            "INSERT INTO daily_sales (date, total, count) VALUES (?, ?, ?)",
            date, metrics['total'], metrics['count']
        )

        # Track lineage
        for order_id in metrics['order_ids']:
            db.insert(
                "INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk) VALUES (?, ?, ?, ?)",
                'daily_sales', ds_id, 'orders', order_id
            )
在ETL代码中手动追踪:
python
def etl_orders_to_daily_sales():
    # 提取
    orders = db.query("SELECT * FROM orders WHERE date = ?", yesterday)

    # 转换
    daily_sales = {}
    for order in orders:
        date = order['created_at'].date()
        if date not in daily_sales:
            daily_sales[date] = {'total': 0, 'count': 0, 'order_ids': []}

        daily_sales[date]['total'] += order['amount']
        daily_sales[date]['count'] += 1
        daily_sales[date]['order_ids'].append(order['id'])

    # 加载并追踪血缘
    for date, metrics in daily_sales.items():
        ds_id = db.insert(
            "INSERT INTO daily_sales (date, total, count) VALUES (?, ?, ?)",
            date, metrics['total'], metrics['count']
        )

        # 追踪血缘
        for order_id in metrics['order_ids']:
            db.insert(
                "INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk) VALUES (?, ?, ?, ?)",
                'daily_sales', ds_id, 'orders', order_id
            )

Method 2: SQL Parsing

方法2:SQL解析

Automatically extract lineage from SQL queries:
python
import sqlparse
from sqllineage.runner import LineageRunner

sql = """
INSERT INTO daily_sales (date, total_revenue, order_count)
SELECT
  DATE(created_at) as date,
  SUM(amount + tax + shipping) as total_revenue,
  COUNT(*) as order_count
FROM orders
LEFT JOIN customers ON orders.customer_id = customers.id
WHERE created_at >= '2026-01-20'
GROUP BY DATE(created_at)
"""
自动从SQL查询中提取血缘:
python
import sqlparse
from sqllineage.runner import LineageRunner

sql = """
INSERT INTO daily_sales (date, total_revenue, order_count)
SELECT
  DATE(created_at) as date,
  SUM(amount + tax + shipping) as total_revenue,
  COUNT(*) as order_count
FROM orders
LEFT JOIN customers ON orders.customer_id = customers.id
WHERE created_at >= '2026-01-20'
GROUP BY DATE(created_at)
"""

Parse lineage

解析血缘

runner = LineageRunner(sql)
print("Source tables:", runner.source_tables)
runner = LineageRunner(sql)
print("源表:", runner.source_tables)

{'orders', 'customers'}

{'orders', 'customers'}

print("Target tables:", runner.target_tables)
print("目标表:", runner.target_tables)

{'daily_sales'}

{'daily_sales'}

Store in lineage table

存储到血缘表

for source in runner.source_tables: db.insert( "INSERT INTO table_lineage (downstream_table, upstream_table) VALUES (?, ?)", 'daily_sales', source )
undefined
for source in runner.source_tables: db.insert( "INSERT INTO table_lineage (downstream_table, upstream_table) VALUES (?, ?)", 'daily_sales', source )
undefined

Method 3: Database Triggers

方法3:数据库触发器

Capture changes automatically:
sql
-- Audit trail for all changes
CREATE TABLE audit_log (
  id BIGSERIAL PRIMARY KEY,
  table_name VARCHAR(255),
  record_id BIGINT,
  operation VARCHAR(10),  -- INSERT, UPDATE, DELETE
  old_values JSONB,
  new_values JSONB,
  changed_by VARCHAR(255),
  changed_at TIMESTAMPTZ DEFAULT NOW()
);

-- Trigger on orders table
CREATE OR REPLACE FUNCTION audit_orders()
RETURNS TRIGGER AS $$
BEGIN
  INSERT INTO audit_log (table_name, record_id, operation, old_values, new_values, changed_by)
  VALUES (
    'orders',
    COALESCE(NEW.id, OLD.id),
    TG_OP,
    row_to_json(OLD),
    row_to_json(NEW),
    current_user
  );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER orders_audit
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_orders();
自动捕获变更:
sql
-- 所有变更的审计日志
CREATE TABLE audit_log (
  id BIGSERIAL PRIMARY KEY,
  table_name VARCHAR(255),
  record_id BIGINT,
  operation VARCHAR(10),  -- INSERT, UPDATE, DELETE
  old_values JSONB,
  new_values JSONB,
  changed_by VARCHAR(255),
  changed_at TIMESTAMPTZ DEFAULT NOW()
);

-- orders表的触发器
CREATE OR REPLACE FUNCTION audit_orders()
RETURNS TRIGGER AS $$
BEGIN
  INSERT INTO audit_log (table_name, record_id, operation, old_values, new_values, changed_by)
  VALUES (
    'orders',
    COALESCE(NEW.id, OLD.id),
    TG_OP,
    row_to_json(OLD),
    row_to_json(NEW),
    current_user
  );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER orders_audit
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_orders();

Method 4: CDC (Change Data Capture)

方法4:CDC(变更数据捕获)

Stream database changes:
python
undefined
流式捕获数据库变更:
python
undefined

Using Debezium or similar CDC tool

使用Debezium或类似CDC工具

from kafka import KafkaConsumer
consumer = KafkaConsumer('postgres.public.orders')
for message in consumer: change_event = json.loads(message.value)
# Store in lineage system
db.insert(
    "INSERT INTO change_log (table_name, operation, before, after, timestamp) VALUES (?, ?, ?, ?, ?)",
    change_event['source']['table'],
    change_event['op'],  # 'c' (create), 'u' (update), 'd' (delete)
    change_event.get('before'),
    change_event.get('after'),
    change_event['ts_ms']
)

---
from kafka import KafkaConsumer
consumer = KafkaConsumer('postgres.public.orders')
for message in consumer: change_event = json.loads(message.value)
# 存储到血缘系统
db.insert(
    "INSERT INTO change_log (table_name, operation, before, after, timestamp) VALUES (?, ?, ?, ?, ?)",
    change_event['source']['table'],
    change_event['op'],  # 'c' (create), 'u' (update), 'd' (delete)
    change_event.get('before'),
    change_event.get('after'),
    change_event['ts_ms']
)

---

Impact Analysis

影响分析

Downstream Impact

下游影响

Question: "If I change orders.amount, what breaks?"
sql
-- Find all downstream dependencies
WITH RECURSIVE dependencies AS (
  -- Base: Direct dependencies
  SELECT
    downstream_table,
    downstream_column,
    1 as depth
  FROM column_lineage
  WHERE upstream_table = 'orders'
    AND upstream_column = 'amount'

  UNION ALL

  -- Recursive: Dependencies of dependencies
  SELECT
    cl.downstream_table,
    cl.downstream_column,
    d.depth + 1
  FROM column_lineage cl
  JOIN dependencies d
    ON cl.upstream_table = d.downstream_table
   AND cl.upstream_column = d.downstream_column
  WHERE d.depth < 10  -- Prevent infinite loops
)
SELECT DISTINCT
  downstream_table,
  downstream_column,
  depth
FROM dependencies
ORDER BY depth, downstream_table, downstream_column;
Result:
| downstream_table       | downstream_column  | depth |
|------------------------|--------------------|-------|
| daily_sales            | total_revenue      | 1     |
| monthly_revenue        | total              | 2     |
| executive_dashboard    | ytd_revenue        | 3     |
| investor_report        | arr                | 4     |
Interpretation: Changing
orders.amount
affects 4 layers of downstream tables!
问题:“如果我修改orders.amount,哪些内容会受影响?”
sql
-- 查找所有下游依赖
WITH RECURSIVE dependencies AS (
  -- 基础:直接依赖
  SELECT
    downstream_table,
    downstream_column,
    1 as depth
  FROM column_lineage
  WHERE upstream_table = 'orders'
    AND upstream_column = 'amount'

  UNION ALL

  -- 递归:依赖的依赖
  SELECT
    cl.downstream_table,
    cl.downstream_column,
    d.depth + 1
  FROM column_lineage cl
  JOIN dependencies d
    ON cl.upstream_table = d.downstream_table
   AND cl.upstream_column = d.downstream_column
  WHERE d.depth < 10  -- 防止无限循环
)
SELECT DISTINCT
  downstream_table,
  downstream_column,
  depth
FROM dependencies
ORDER BY depth, downstream_table, downstream_column;
结果:
| downstream_table       | downstream_column  | depth |
|------------------------|--------------------|-------|
| daily_sales            | total_revenue      | 1     |
| monthly_revenue        | total              | 2     |
| executive_dashboard    | ytd_revenue        | 3     |
| investor_report        | arr                | 4     |
解读:修改
orders.amount
会影响4层下游表!

Upstream Impact

上游影响

Question: "What source data feeds into this dashboard metric?"
sql
-- Trace backwards to original sources
WITH RECURSIVE sources AS (
  -- Base: Direct sources
  SELECT
    upstream_table,
    upstream_column,
    1 as depth
  FROM column_lineage
  WHERE downstream_table = 'executive_dashboard'
    AND downstream_column = 'ytd_revenue'

  UNION ALL

  -- Recursive: Sources of sources
  SELECT
    cl.upstream_table,
    cl.upstream_column,
    s.depth + 1
  FROM column_lineage cl
  JOIN sources s
    ON cl.downstream_table = s.upstream_table
   AND cl.downstream_column = s.upstream_column
  WHERE s.depth < 10
)
SELECT DISTINCT
  upstream_table,
  upstream_column,
  depth
FROM sources
WHERE upstream_table NOT IN (
  SELECT DISTINCT downstream_table FROM column_lineage
)  -- Only leaf nodes (true sources)
ORDER BY upstream_table, upstream_column;
Result: Original sources for dashboard metric
| upstream_table | upstream_column | depth |
|----------------|-----------------|-------|
| orders         | amount          | 4     |
| orders         | tax             | 4     |
| orders         | shipping        | 4     |
| stripe_events  | charge_amount   | 5     |

问题:“哪些源数据流向了这个仪表板指标?”
sql
-- 反向追踪到原始数据源
WITH RECURSIVE sources AS (
  -- 基础:直接源
  SELECT
    upstream_table,
    upstream_column,
    1 as depth
  FROM column_lineage
  WHERE downstream_table = 'executive_dashboard'
    AND downstream_column = 'ytd_revenue'

  UNION ALL

  -- 递归:源的源
  SELECT
    cl.upstream_table,
    cl.upstream_column,
    s.depth + 1
  FROM column_lineage cl
  JOIN sources s
    ON cl.downstream_table = s.upstream_table
   AND cl.downstream_column = s.upstream_column
  WHERE s.depth < 10
)
SELECT DISTINCT
  upstream_table,
  upstream_column,
  depth
FROM sources
WHERE upstream_table NOT IN (
  SELECT DISTINCT downstream_table FROM column_lineage
)  -- 仅叶子节点(真正的源)
ORDER BY upstream_table, upstream_column;
结果:仪表板指标的原始数据源
| upstream_table | upstream_column | depth |
|----------------|-----------------|-------|
| orders         | amount          | 4     |
| orders         | tax             | 4     |
| orders         | shipping        | 4     |
| stripe_events  | charge_amount   | 5     |

Data Catalog

数据目录

Schema Registry

Schema 注册中心

Track all datasets and their metadata:
sql
CREATE TABLE data_catalog (
  id BIGSERIAL PRIMARY KEY,
  dataset_name VARCHAR(255) UNIQUE NOT NULL,
  dataset_type VARCHAR(50),  -- 'table', 'view', 'api', 'file'
  description TEXT,
  owner VARCHAR(255),
  steward VARCHAR(255),  -- Data steward (responsible for quality)
  sensitivity VARCHAR(50),  -- 'public', 'internal', 'confidential', 'restricted'
  contains_pii BOOLEAN DEFAULT FALSE,
  retention_days INT,  -- How long to keep data
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE data_catalog_columns (
  dataset_id BIGINT REFERENCES data_catalog(id),
  column_name VARCHAR(255),
  data_type VARCHAR(50),
  description TEXT,
  is_nullable BOOLEAN,
  is_pii BOOLEAN DEFAULT FALSE,
  pii_type VARCHAR(50),  -- 'email', 'ssn', 'phone', 'name', etc.
  sample_values TEXT[],

  PRIMARY KEY (dataset_id, column_name)
);

-- Example: Register orders table
INSERT INTO data_catalog VALUES (
  DEFAULT,
  'orders',
  'table',
  'Customer orders from e-commerce platform',
  'engineering@company.com',
  'data-team@company.com',
  'internal',
  TRUE,  -- Contains PII
  2555,  -- 7 years retention
  NOW(),
  NOW()
);

INSERT INTO data_catalog_columns VALUES
  (1, 'id', 'BIGINT', 'Unique order ID', FALSE, FALSE, NULL, NULL),
  (1, 'customer_email', 'VARCHAR(255)', 'Customer email address', FALSE, TRUE, 'email', NULL),
  (1, 'amount', 'DECIMAL(10,2)', 'Order total in USD', FALSE, FALSE, NULL, '{10.99, 25.50, 100.00}');
追踪所有数据集及其元数据:
sql
CREATE TABLE data_catalog (
  id BIGSERIAL PRIMARY KEY,
  dataset_name VARCHAR(255) UNIQUE NOT NULL,
  dataset_type VARCHAR(50),  -- 'table', 'view', 'api', 'file'
  description TEXT,
  owner VARCHAR(255),
  steward VARCHAR(255),  -- 数据管家(负责数据质量)
  sensitivity VARCHAR(50),  -- 'public', 'internal', 'confidential', 'restricted'
  contains_pii BOOLEAN DEFAULT FALSE,
  retention_days INT,  -- 数据保留时长
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE data_catalog_columns (
  dataset_id BIGINT REFERENCES data_catalog(id),
  column_name VARCHAR(255),
  data_type VARCHAR(50),
  description TEXT,
  is_nullable BOOLEAN,
  is_pii BOOLEAN DEFAULT FALSE,
  pii_type VARCHAR(50),  -- 'email', 'ssn', 'phone', 'name'等
  sample_values TEXT[],

  PRIMARY KEY (dataset_id, column_name)
);

-- 示例:注册orders表
INSERT INTO data_catalog VALUES (
  DEFAULT,
  'orders',
  'table',
  '电商平台的客户订单数据',
  'engineering@company.com',
  'data-team@company.com',
  'internal',
  TRUE,  -- 包含PII
  2555,  -- 保留7年
  NOW(),
  NOW()
);

INSERT INTO data_catalog_columns VALUES
  (1, 'id', 'BIGINT', '唯一订单ID', FALSE, FALSE, NULL, NULL),
  (1, 'customer_email', 'VARCHAR(255)', '客户邮箱地址', FALSE, TRUE, 'email', NULL),
  (1, 'amount', 'DECIMAL(10,2)', '订单总金额(美元)', FALSE, FALSE, NULL, '{10.99, 25.50, 100.00}');

Searchable Catalog

可搜索目录

Find datasets by keyword:
sql
-- Full-text search
CREATE INDEX idx_catalog_search ON data_catalog
USING GIN(to_tsvector('english', dataset_name || ' ' || description));

-- Search for "revenue"
SELECT
  dataset_name,
  dataset_type,
  description,
  owner
FROM data_catalog
WHERE to_tsvector('english', dataset_name || ' ' || description)
   @@ to_tsquery('english', 'revenue')
ORDER BY dataset_name;

按关键词查找数据集:
sql
-- 全文搜索
CREATE INDEX idx_catalog_search ON data_catalog
USING GIN(to_tsvector('english', dataset_name || ' ' || description));

-- 搜索“revenue”相关数据集
SELECT
  dataset_name,
  dataset_type,
  description,
  owner
FROM data_catalog
WHERE to_tsvector('english', dataset_name || ' ' || description)
   @@ to_tsquery('english', 'revenue')
ORDER BY dataset_name;

Compliance & Data Privacy

合规性与数据隐私

GDPR: Right to be Forgotten

GDPR:被遗忘权

Track all PII to enable deletion:
sql
-- Find all PII for a user
SELECT
  dc.dataset_name,
  dcc.column_name,
  dcc.pii_type
FROM data_catalog dc
JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
WHERE dcc.is_pii = TRUE;

-- Result: Tables/columns containing PII
| dataset_name    | column_name    | pii_type |
|-----------------|----------------|----------|
| orders          | customer_email | email    |
| users           | email          | email    |
| users           | name           | name     |
| support_tickets | email          | email    |
| analytics_events| user_id        | user_id  |

-- Generate deletion script
SELECT
  'DELETE FROM ' || dataset_name || ' WHERE ' || column_name || ' = ''' || user_email || ''';'
FROM (
  SELECT DISTINCT
    dc.dataset_name,
    dcc.column_name
  FROM data_catalog dc
  JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
  WHERE dcc.pii_type = 'email'
) subq;

-- Output:
-- DELETE FROM orders WHERE customer_email = 'user@example.com';
-- DELETE FROM users WHERE email = 'user@example.com';
-- DELETE FROM support_tickets WHERE email = 'user@example.com';
追踪所有PII以支持删除操作:
sql
-- 查找包含PII的所有数据
SELECT
  dc.dataset_name,
  dcc.column_name,
  dcc.pii_type
FROM data_catalog dc
JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
WHERE dcc.is_pii = TRUE;

-- 结果:包含PII的表/列
| dataset_name    | column_name    | pii_type |
|-----------------|----------------|----------|
| orders          | customer_email | email    |
| users           | email          | email    |
| users           | name           | name     |
| support_tickets | email          | email    |
| analytics_events| user_id        | user_id  |

-- 生成删除脚本
SELECT
  'DELETE FROM ' || dataset_name || ' WHERE ' || column_name || ' = ''' || user_email || ''';'
FROM (
  SELECT DISTINCT
    dc.dataset_name,
    dcc.column_name
  FROM data_catalog dc
  JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
  WHERE dcc.pii_type = 'email'
) subq;

-- 输出:
-- DELETE FROM orders WHERE customer_email = 'user@example.com';
-- DELETE FROM users WHERE email = 'user@example.com';
-- DELETE FROM support_tickets WHERE email = 'user@example.com';

PII Tracking in Data Flow

数据流中的PII追踪

Tag PII as it flows through pipeline:
python
def track_pii_flow(source_table, dest_table, pii_fields):
    """Track movement of PII between tables"""
    for field in pii_fields:
        db.insert(
            """
            INSERT INTO pii_lineage (source_table, source_column, dest_table, dest_column, tracked_at)
            VALUES (?, ?, ?, ?, NOW())
            """,
            source_table, field, dest_table, field
        )
在数据流转过程中标记PII:
python
def track_pii_flow(source_table, dest_table, pii_fields):
    """追踪PII在表之间的流转"""
    for field in pii_fields:
        db.insert(
            """
            INSERT INTO pii_lineage (source_table, source_column, dest_table, dest_column, tracked_at)
            VALUES (?, ?, ?, ?, NOW())
            """,
            source_table, field, dest_table, field
        )

-- 使用示例
track_pii_flow('users', 'orders', ['email'])
track_pii_flow('orders', 'daily_sales_with_emails', ['email'])

-- 查询:“该用户的邮箱流转到了哪些地方?”
db.query("""
  WITH RECURSIVE pii_flow AS (
    SELECT dest_table, dest_column, 1 as depth
    FROM pii_lineage
    WHERE source_table = 'users' AND source_column = 'email'

    UNION ALL

    SELECT pl.dest_table, pl.dest_column, pf.depth + 1
    FROM pii_lineage pl
    JOIN pii_flow pf ON pl.source_table = pf.dest_table AND pl.source_column = pf.dest_column
    WHERE pf.depth < 10
  )
  SELECT DISTINCT dest_table, dest_column FROM pii_flow;
""")

Usage

可视化与工具

血缘图

track_pii_flow('users', 'orders', ['email']) track_pii_flow('orders', 'daily_sales_with_emails', ['email'])
生成可视化血缘:
python
import graphviz

def visualize_lineage(table_name):
    # 获取血缘数据
    lineage = db.query("""
        SELECT upstream_table, downstream_table
        FROM table_lineage
        WHERE upstream_table = ? OR downstream_table = ?
    """, table_name, table_name)

    # 创建图
    dot = graphviz.Digraph()

    for row in lineage:
        dot.edge(row['upstream_table'], row['downstream_table'])

    dot.render('lineage_graph', format='png', view=True)

visualize_lineage('orders')
输出:
stripe_api ──► orders ──┬──► daily_sales ──► monthly_revenue
customers ──────────────┘

Query: "Where has this user's email propagated?"

商用工具

db.query(""" WITH RECURSIVE pii_flow AS ( SELECT dest_table, dest_column, 1 as depth FROM pii_lineage WHERE source_table = 'users' AND source_column = 'email'
UNION ALL

SELECT pl.dest_table, pl.dest_column, pf.depth + 1
FROM pii_lineage pl
JOIN pii_flow pf ON pl.source_table = pf.dest_table AND pl.source_column = pf.dest_column
WHERE pf.depth < 10
) SELECT DISTINCT dest_table, dest_column FROM pii_flow; """)

---
工具适用场景功能
Apache Atlas开源数据治理元数据管理、血缘追踪、搜索
Collibra企业级数据治理数据目录、血缘追踪、策略管理、工作流
Alation数据目录元数据搜索、协作、血缘追踪
Amundsen(Lyft)开源数据发现搜索、血缘追踪、使用分析
DataHub(LinkedIn)开源元数据平台血缘追踪、数据发现、治理
dbt分析工程SQL血缘、文档、测试

Visualization & Tools

实施检查清单

Lineage Graph

基础版(入门)

Generate visual lineage:
python
import graphviz

def visualize_lineage(table_name):
    # Fetch lineage
    lineage = db.query("""
        SELECT upstream_table, downstream_table
        FROM table_lineage
        WHERE upstream_table = ? OR downstream_table = ?
    """, table_name, table_name)

    # Create graph
    dot = graphviz.Digraph()

    for row in lineage:
        dot.edge(row['upstream_table'], row['downstream_table'])

    dot.render('lineage_graph', format='png', view=True)

visualize_lineage('orders')
Output:
stripe_api ──► orders ──┬──► daily_sales ──► monthly_revenue
customers ──────────────┘
[ ] 表级血缘追踪
[ ] 关键表的审计日志
[ ] 主要数据集的数据目录
[ ] ETL流程文档

Commercial Tools

标准版

ToolUse CaseFeatures
Apache AtlasOpen-source data governanceMetadata management, lineage, search
CollibraEnterprise data governanceCatalog, lineage, policies, workflows
AlationData catalogMetadata search, collaboration, lineage
Amundsen (Lyft)Open-source data discoverySearch, lineage, usage analytics
DataHub (LinkedIn)Open-source metadata platformLineage, discovery, governance
dbtAnalytics engineeringSQL lineage, documentation, tests

[ ] 列级血缘
[ ] 从SQL中自动提取血缘
[ ] PII标记与追踪
[ ] 影响分析查询
[ ] 下游消费者的变更通知

Implementation Checklist

高级版

Minimal (Start Here)

[ ] Table-level lineage tracking
[ ] Audit logs for critical tables
[ ] Data catalog for major datasets
[ ] Documentation of ETL processes
[ ] 行级血缘
[ ] 基于CDC的实时血缘
[ ] 可搜索数据目录
[ ] 自动化GDPR合规工具
[ ] 与血缘关联的数据质量指标
[ ] 异常检测机器学习

Standard

输出格式

[ ] Column-level lineage
[ ] Automated lineage extraction from SQL
[ ] PII tagging and tracking
[ ] Impact analysis queries
[ ] Change notifications for downstream consumers
提供数据溯源相关帮助时,遵循以下格式:
undefined

Advanced

溯源策略

血缘层级

[ ] Row-level lineage
[ ] Real-time lineage from CDC
[ ] Searchable data catalog
[ ] Automated GDPR compliance tools
[ ] Data quality metrics tied to lineage
[ ] Machine learning for anomaly detection

  • 表级
  • 列级
  • 行级
  • 值级

Output Format

捕获方法

When helping with data provenance:
undefined
  • 代码埋点
  • SQL解析
  • 数据库触发器
  • CDC(变更数据捕获)

Provenance Strategy

数据目录Schema

Lineage Level

  • Table-level
  • Column-level
  • Row-level
  • Value-level
[目录表的SQL DDL]

Capture Method

影响分析查询

  • Code instrumentation
  • SQL parsing
  • Database triggers
  • CDC (Change Data Capture)
[上下游影响分析的SQL查询]

Data Catalog Schema

PII追踪

[SQL DDL for catalog tables]
包含PII的表:
删除策略: [分步流程]

Impact Analysis Queries

可视化

[SQL queries for upstream/downstream impact]
[血缘图表示]

PII Tracking

合规要求

Tables with PII:
Deletion strategy: [Step-by-step process]
  • GDPR
  • CCPA
  • HIPAA
  • SOX
  • 其他: [说明]

Visualization

工具选型

[Lineage graph representation]
  • 血缘追踪: [工具/自定义]
  • 数据目录: [工具/自定义]
  • 可视化: [工具/自定义]

---

Compliance Requirements

集成

  • GDPR
  • CCPA
  • HIPAA
  • SOX
  • Other: [specify]
可与以下内容集成:
  • scalable-data-schema - 追踪Schema随时间的演变
  • data-infrastructure-at-scale - 流水线与ETL的血缘追踪
  • multi-source-data-conflation - 追踪合并数据的来源
  • systems-decompose - 在功能设计中规划血缘

Tooling

  • Lineage tracking: [Tool/Custom]
  • Data catalog: [Tool/Custom]
  • Visualization: [Tool/Custom]

---

Integration

Works with:
  • scalable-data-schema - Track schema evolution over time
  • data-infrastructure-at-scale - Lineage for pipelines and ETL
  • multi-source-data-conflation - Track source of merged data
  • systems-decompose - Plan lineage as part of feature design