data-provenance
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseData Provenance & Lineage
数据溯源与血缘
Track where data comes from, how it transforms, and where it goes—essential for trust, compliance, and debugging.
追踪数据的来源、转换方式以及流向——这是建立数据信任、合规性和调试的关键。
When to Use
适用场景
Use this skill when:
- Auditing data for compliance (GDPR, HIPAA, SOX, CCPA)
- Debugging data quality issues ("Where did this bad data come from?")
- Understanding impact of schema changes ("What breaks if I change this field?")
- Building data catalogs or governance systems
- Tracking sensitive data (PII, PHI) through systems
- Responding to data deletion requests (GDPR "right to be forgotten")
在以下场景中使用此技能:
- 为合规性审计数据(GDPR、HIPAA、SOX、CCPA)
- 调试数据质量问题(如“这条错误数据来自哪里?”)
- 理解 schema 变更的影响(如“如果我修改这个字段,会有哪些内容受影响?”)
- 构建数据目录或治理系统
- 追踪敏感数据(PII、PHI)在系统中的流转
- 响应数据删除请求(GDPR“被遗忘权”)
What is Data Provenance?
什么是数据溯源?
Provenance: The complete history and lineage of a data element
Question: "Where does the revenue number in this dashboard come from?"
Answer (with provenance):
Dashboard.revenue (computed 2026-01-21 08:00)
← warehouse.daily_sales.total (aggregated 2026-01-21 02:00)
← etl_pipeline.transform_sales (ran 2026-01-21 01:30)
← production_db.orders.amount (order #12345, created 2026-01-20 15:23)
← stripe_api.charge (charge_id: ch_abc123, processed 2026-01-20 15:23)
← user input (customer: cust_xyz, card ending 4242)Key questions provenance answers:
- Where did this data come from? (source)
- When was it created/updated? (timestamp)
- How was it transformed? (logic, code version)
- Who created/modified it? (user, system, process)
- Why does it have this value? (business logic)
- What depends on it? (downstream consumers)
溯源:数据元素的完整历史与血缘
问题:“这个仪表板中的收入数据来自哪里?”
溯源回答:
Dashboard.revenue(计算于2026-01-21 08:00)
← warehouse.daily_sales.total(聚合于2026-01-21 02:00)
← etl_pipeline.transform_sales(运行于2026-01-21 01:30)
← production_db.orders.amount(订单#12345,创建于2026-01-20 15:23)
← stripe_api.charge(charge_id: ch_abc123,处理于2026-01-20 15:23)
← 用户输入(客户: cust_xyz,卡号尾号4242)溯源能解答的关键问题:
- 来源:这条数据来自哪里?(数据源)
- 时间:它何时被创建/更新?(时间戳)
- 转换:它是如何被转换的?(逻辑、代码版本)
- 操作者:谁创建/修改了它?(用户、系统、流程)
- 原因:它为何是这个值?(业务逻辑)
- 依赖:哪些内容依赖于它?(下游消费者)
Levels of Provenance Tracking
溯源追踪的层级
Level 1: Table-Level Lineage
层级1:表级血缘
What: Track which tables feed into other tables
┌────────────┐
│ orders │──┐
└────────────┘ │
├──► ┌──────────────┐
┌────────────┐ │ │ daily_sales │
│ customers │──┘ └──────────────┘
└────────────┘Implementation: Metadata table
sql
CREATE TABLE table_lineage (
downstream_table VARCHAR(255),
upstream_table VARCHAR(255),
relationship_type VARCHAR(50), -- 'direct_copy', 'join', 'aggregate'
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (downstream_table, upstream_table)
);
INSERT INTO table_lineage VALUES
('daily_sales', 'orders', 'aggregate'),
('daily_sales', 'customers', 'join');Query: "What tables does daily_sales depend on?"
sql
SELECT upstream_table
FROM table_lineage
WHERE downstream_table = 'daily_sales';
-- Result: orders, customersQuery: "What tables depend on orders?"
sql
SELECT downstream_table
FROM table_lineage
WHERE upstream_table = 'orders';
-- Result: daily_sales, weekly_report, customer_lifetime_value定义:追踪哪些表作为输入流向其他表
┌────────────┐
│ orders │──┐
└────────────┘ │
├──► ┌──────────────┐
┌────────────┐ │ │ daily_sales │
│ customers │──┘ └──────────────┘
└────────────┘实现:元数据表
sql
CREATE TABLE table_lineage (
downstream_table VARCHAR(255),
upstream_table VARCHAR(255),
relationship_type VARCHAR(50), -- 'direct_copy', 'join', 'aggregate'
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (downstream_table, upstream_table)
);
INSERT INTO table_lineage VALUES
('daily_sales', 'orders', 'aggregate'),
('daily_sales', 'customers', 'join');查询:“daily_sales依赖哪些表?”
sql
SELECT upstream_table
FROM table_lineage
WHERE downstream_table = 'daily_sales';
-- 结果: orders, customers查询:“哪些表依赖orders?”
sql
SELECT downstream_table
FROM table_lineage
WHERE upstream_table = 'orders';
-- 结果: daily_sales, weekly_report, customer_lifetime_valueLevel 2: Column-Level Lineage
层级2:列级血缘
What: Track which columns feed into which columns
orders.amount ──┐
orders.tax ──┼──► daily_sales.total_revenue
orders.shipping─┘Implementation:
sql
CREATE TABLE column_lineage (
downstream_table VARCHAR(255),
downstream_column VARCHAR(255),
upstream_table VARCHAR(255),
upstream_column VARCHAR(255),
transformation TEXT, -- SQL or description
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (downstream_table, downstream_column, upstream_table, upstream_column)
);
INSERT INTO column_lineage VALUES
('daily_sales', 'total_revenue', 'orders', 'amount', 'SUM(amount + tax + shipping)'),
('daily_sales', 'order_count', 'orders', 'id', 'COUNT(id)'),
('daily_sales', 'customer_name', 'customers', 'name', 'LEFT JOIN on customer_id');Query: "Where does daily_sales.total_revenue come from?"
sql
SELECT
upstream_table,
upstream_column,
transformation
FROM column_lineage
WHERE downstream_table = 'daily_sales'
AND downstream_column = 'total_revenue';定义:追踪哪些列作为输入流向哪些列
orders.amount ──┐
orders.tax ──┼──► daily_sales.total_revenue
orders.shipping─┘实现:
sql
CREATE TABLE column_lineage (
downstream_table VARCHAR(255),
downstream_column VARCHAR(255),
upstream_table VARCHAR(255),
upstream_column VARCHAR(255),
transformation TEXT, -- SQL或描述
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (downstream_table, downstream_column, upstream_table, upstream_column)
);
INSERT INTO column_lineage VALUES
('daily_sales', 'total_revenue', 'orders', 'amount', 'SUM(amount + tax + shipping)'),
('daily_sales', 'order_count', 'orders', 'id', 'COUNT(id)'),
('daily_sales', 'customer_name', 'customers', 'name', 'LEFT JOIN on customer_id');查询:“daily_sales.total_revenue来自哪里?”
sql
SELECT
upstream_table,
upstream_column,
transformation
FROM column_lineage
WHERE downstream_table = 'daily_sales'
AND downstream_column = 'total_revenue';Level 3: Row-Level Lineage
层级3:行级血缘
What: Track individual record transformations
orders.id=12345 (amount=$100) ──► daily_sales.id=67 (date=2026-01-20, total=$100)
orders.id=12346 (amount=$50) ──┘Implementation: Lineage table
sql
CREATE TABLE row_lineage (
id BIGSERIAL PRIMARY KEY,
downstream_table VARCHAR(255),
downstream_pk BIGINT,
upstream_table VARCHAR(255),
upstream_pk BIGINT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- After ETL run
INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk)
SELECT
'daily_sales',
ds.id,
'orders',
o.id
FROM daily_sales ds
JOIN orders o ON DATE(o.created_at) = ds.sale_date;Query: "What orders contributed to daily_sales row 67?"
sql
SELECT o.*
FROM row_lineage rl
JOIN orders o ON rl.upstream_pk = o.id
WHERE rl.downstream_table = 'daily_sales'
AND rl.downstream_pk = 67;定义:追踪单个记录的转换过程
orders.id=12345 (amount=$100) ──► daily_sales.id=67 (date=2026-01-20, total=$100)
orders.id=12346 (amount=$50) ──┘实现:血缘表
sql
CREATE TABLE row_lineage (
id BIGSERIAL PRIMARY KEY,
downstream_table VARCHAR(255),
downstream_pk BIGINT,
upstream_table VARCHAR(255),
upstream_pk BIGINT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ETL运行后
INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk)
SELECT
'daily_sales',
ds.id,
'orders',
o.id
FROM daily_sales ds
JOIN orders o ON DATE(o.created_at) = ds.sale_date;查询:“哪些订单构成了daily_sales的第67行数据?”
sql
SELECT o.*
FROM row_lineage rl
JOIN orders o ON rl.upstream_pk = o.id
WHERE rl.downstream_table = 'daily_sales'
AND rl.downstream_pk = 67;Level 4: Value-Level Lineage (Finest)
层级4:值级血缘(最细粒度)
What: Track transformations at the field value level
order.amount = $100
order.tax = $10
order.shipping = $5
↓ (SUM transformation)
daily_sales.total_revenue = $115Implementation: Event log
sql
CREATE TABLE value_lineage (
id BIGSERIAL PRIMARY KEY,
entity_type VARCHAR(50),
entity_id BIGINT,
field_name VARCHAR(100),
old_value TEXT,
new_value TEXT,
transformation TEXT,
source_values JSONB, -- Array of source values
created_at TIMESTAMPTZ DEFAULT NOW(),
created_by VARCHAR(255) -- User or process
);
-- Example: Revenue calculation
INSERT INTO value_lineage VALUES (
DEFAULT,
'daily_sales',
67,
'total_revenue',
NULL,
'115.00',
'SUM(orders.amount + orders.tax + orders.shipping) WHERE date = 2026-01-20',
'[{"table": "orders", "id": 12345, "amount": 100, "tax": 10, "shipping": 5}]',
NOW(),
'etl_pipeline_v1.2.3'
);定义:追踪字段值层面的转换
order.amount = $100
order.tax = $10
order.shipping = $5
↓ (SUM转换)
daily_sales.total_revenue = $115实现:事件日志
sql
CREATE TABLE value_lineage (
id BIGSERIAL PRIMARY KEY,
entity_type VARCHAR(50),
entity_id BIGINT,
field_name VARCHAR(100),
old_value TEXT,
new_value TEXT,
transformation TEXT,
source_values JSONB, -- 源值数组
created_at TIMESTAMPTZ DEFAULT NOW(),
created_by VARCHAR(255) -- 用户或流程
);
-- 示例:收入计算
INSERT INTO value_lineage VALUES (
DEFAULT,
'daily_sales',
67,
'total_revenue',
NULL,
'115.00',
'SUM(orders.amount + orders.tax + orders.shipping) WHERE date = 2026-01-20',
'[{"table": "orders", "id": 12345, "amount": 100, "tax": 10, "shipping": 5}]',
NOW(),
'etl_pipeline_v1.2.3'
);Provenance Capture Methods
溯源捕获方法
Method 1: Code Instrumentation
方法1:代码埋点
Manual tracking in ETL code:
python
def etl_orders_to_daily_sales():
# Extract
orders = db.query("SELECT * FROM orders WHERE date = ?", yesterday)
# Transform
daily_sales = {}
for order in orders:
date = order['created_at'].date()
if date not in daily_sales:
daily_sales[date] = {'total': 0, 'count': 0, 'order_ids': []}
daily_sales[date]['total'] += order['amount']
daily_sales[date]['count'] += 1
daily_sales[date]['order_ids'].append(order['id'])
# Load with lineage tracking
for date, metrics in daily_sales.items():
ds_id = db.insert(
"INSERT INTO daily_sales (date, total, count) VALUES (?, ?, ?)",
date, metrics['total'], metrics['count']
)
# Track lineage
for order_id in metrics['order_ids']:
db.insert(
"INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk) VALUES (?, ?, ?, ?)",
'daily_sales', ds_id, 'orders', order_id
)在ETL代码中手动追踪:
python
def etl_orders_to_daily_sales():
# 提取
orders = db.query("SELECT * FROM orders WHERE date = ?", yesterday)
# 转换
daily_sales = {}
for order in orders:
date = order['created_at'].date()
if date not in daily_sales:
daily_sales[date] = {'total': 0, 'count': 0, 'order_ids': []}
daily_sales[date]['total'] += order['amount']
daily_sales[date]['count'] += 1
daily_sales[date]['order_ids'].append(order['id'])
# 加载并追踪血缘
for date, metrics in daily_sales.items():
ds_id = db.insert(
"INSERT INTO daily_sales (date, total, count) VALUES (?, ?, ?)",
date, metrics['total'], metrics['count']
)
# 追踪血缘
for order_id in metrics['order_ids']:
db.insert(
"INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk) VALUES (?, ?, ?, ?)",
'daily_sales', ds_id, 'orders', order_id
)Method 2: SQL Parsing
方法2:SQL解析
Automatically extract lineage from SQL queries:
python
import sqlparse
from sqllineage.runner import LineageRunner
sql = """
INSERT INTO daily_sales (date, total_revenue, order_count)
SELECT
DATE(created_at) as date,
SUM(amount + tax + shipping) as total_revenue,
COUNT(*) as order_count
FROM orders
LEFT JOIN customers ON orders.customer_id = customers.id
WHERE created_at >= '2026-01-20'
GROUP BY DATE(created_at)
"""自动从SQL查询中提取血缘:
python
import sqlparse
from sqllineage.runner import LineageRunner
sql = """
INSERT INTO daily_sales (date, total_revenue, order_count)
SELECT
DATE(created_at) as date,
SUM(amount + tax + shipping) as total_revenue,
COUNT(*) as order_count
FROM orders
LEFT JOIN customers ON orders.customer_id = customers.id
WHERE created_at >= '2026-01-20'
GROUP BY DATE(created_at)
"""Parse lineage
解析血缘
runner = LineageRunner(sql)
print("Source tables:", runner.source_tables)
runner = LineageRunner(sql)
print("源表:", runner.source_tables)
{'orders', 'customers'}
{'orders', 'customers'}
print("Target tables:", runner.target_tables)
print("目标表:", runner.target_tables)
{'daily_sales'}
{'daily_sales'}
Store in lineage table
存储到血缘表
for source in runner.source_tables:
db.insert(
"INSERT INTO table_lineage (downstream_table, upstream_table) VALUES (?, ?)",
'daily_sales', source
)
undefinedfor source in runner.source_tables:
db.insert(
"INSERT INTO table_lineage (downstream_table, upstream_table) VALUES (?, ?)",
'daily_sales', source
)
undefinedMethod 3: Database Triggers
方法3:数据库触发器
Capture changes automatically:
sql
-- Audit trail for all changes
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
table_name VARCHAR(255),
record_id BIGINT,
operation VARCHAR(10), -- INSERT, UPDATE, DELETE
old_values JSONB,
new_values JSONB,
changed_by VARCHAR(255),
changed_at TIMESTAMPTZ DEFAULT NOW()
);
-- Trigger on orders table
CREATE OR REPLACE FUNCTION audit_orders()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO audit_log (table_name, record_id, operation, old_values, new_values, changed_by)
VALUES (
'orders',
COALESCE(NEW.id, OLD.id),
TG_OP,
row_to_json(OLD),
row_to_json(NEW),
current_user
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER orders_audit
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_orders();自动捕获变更:
sql
-- 所有变更的审计日志
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
table_name VARCHAR(255),
record_id BIGINT,
operation VARCHAR(10), -- INSERT, UPDATE, DELETE
old_values JSONB,
new_values JSONB,
changed_by VARCHAR(255),
changed_at TIMESTAMPTZ DEFAULT NOW()
);
-- orders表的触发器
CREATE OR REPLACE FUNCTION audit_orders()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO audit_log (table_name, record_id, operation, old_values, new_values, changed_by)
VALUES (
'orders',
COALESCE(NEW.id, OLD.id),
TG_OP,
row_to_json(OLD),
row_to_json(NEW),
current_user
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER orders_audit
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_orders();Method 4: CDC (Change Data Capture)
方法4:CDC(变更数据捕获)
Stream database changes:
python
undefined流式捕获数据库变更:
python
undefinedUsing Debezium or similar CDC tool
使用Debezium或类似CDC工具
from kafka import KafkaConsumer
consumer = KafkaConsumer('postgres.public.orders')
for message in consumer:
change_event = json.loads(message.value)
# Store in lineage system
db.insert(
"INSERT INTO change_log (table_name, operation, before, after, timestamp) VALUES (?, ?, ?, ?, ?)",
change_event['source']['table'],
change_event['op'], # 'c' (create), 'u' (update), 'd' (delete)
change_event.get('before'),
change_event.get('after'),
change_event['ts_ms']
)
---from kafka import KafkaConsumer
consumer = KafkaConsumer('postgres.public.orders')
for message in consumer:
change_event = json.loads(message.value)
# 存储到血缘系统
db.insert(
"INSERT INTO change_log (table_name, operation, before, after, timestamp) VALUES (?, ?, ?, ?, ?)",
change_event['source']['table'],
change_event['op'], # 'c' (create), 'u' (update), 'd' (delete)
change_event.get('before'),
change_event.get('after'),
change_event['ts_ms']
)
---Impact Analysis
影响分析
Downstream Impact
下游影响
Question: "If I change orders.amount, what breaks?"
sql
-- Find all downstream dependencies
WITH RECURSIVE dependencies AS (
-- Base: Direct dependencies
SELECT
downstream_table,
downstream_column,
1 as depth
FROM column_lineage
WHERE upstream_table = 'orders'
AND upstream_column = 'amount'
UNION ALL
-- Recursive: Dependencies of dependencies
SELECT
cl.downstream_table,
cl.downstream_column,
d.depth + 1
FROM column_lineage cl
JOIN dependencies d
ON cl.upstream_table = d.downstream_table
AND cl.upstream_column = d.downstream_column
WHERE d.depth < 10 -- Prevent infinite loops
)
SELECT DISTINCT
downstream_table,
downstream_column,
depth
FROM dependencies
ORDER BY depth, downstream_table, downstream_column;Result:
| downstream_table | downstream_column | depth |
|------------------------|--------------------|-------|
| daily_sales | total_revenue | 1 |
| monthly_revenue | total | 2 |
| executive_dashboard | ytd_revenue | 3 |
| investor_report | arr | 4 |Interpretation: Changing affects 4 layers of downstream tables!
orders.amount问题:“如果我修改orders.amount,哪些内容会受影响?”
sql
-- 查找所有下游依赖
WITH RECURSIVE dependencies AS (
-- 基础:直接依赖
SELECT
downstream_table,
downstream_column,
1 as depth
FROM column_lineage
WHERE upstream_table = 'orders'
AND upstream_column = 'amount'
UNION ALL
-- 递归:依赖的依赖
SELECT
cl.downstream_table,
cl.downstream_column,
d.depth + 1
FROM column_lineage cl
JOIN dependencies d
ON cl.upstream_table = d.downstream_table
AND cl.upstream_column = d.downstream_column
WHERE d.depth < 10 -- 防止无限循环
)
SELECT DISTINCT
downstream_table,
downstream_column,
depth
FROM dependencies
ORDER BY depth, downstream_table, downstream_column;结果:
| downstream_table | downstream_column | depth |
|------------------------|--------------------|-------|
| daily_sales | total_revenue | 1 |
| monthly_revenue | total | 2 |
| executive_dashboard | ytd_revenue | 3 |
| investor_report | arr | 4 |解读:修改会影响4层下游表!
orders.amountUpstream Impact
上游影响
Question: "What source data feeds into this dashboard metric?"
sql
-- Trace backwards to original sources
WITH RECURSIVE sources AS (
-- Base: Direct sources
SELECT
upstream_table,
upstream_column,
1 as depth
FROM column_lineage
WHERE downstream_table = 'executive_dashboard'
AND downstream_column = 'ytd_revenue'
UNION ALL
-- Recursive: Sources of sources
SELECT
cl.upstream_table,
cl.upstream_column,
s.depth + 1
FROM column_lineage cl
JOIN sources s
ON cl.downstream_table = s.upstream_table
AND cl.downstream_column = s.upstream_column
WHERE s.depth < 10
)
SELECT DISTINCT
upstream_table,
upstream_column,
depth
FROM sources
WHERE upstream_table NOT IN (
SELECT DISTINCT downstream_table FROM column_lineage
) -- Only leaf nodes (true sources)
ORDER BY upstream_table, upstream_column;Result: Original sources for dashboard metric
| upstream_table | upstream_column | depth |
|----------------|-----------------|-------|
| orders | amount | 4 |
| orders | tax | 4 |
| orders | shipping | 4 |
| stripe_events | charge_amount | 5 |问题:“哪些源数据流向了这个仪表板指标?”
sql
-- 反向追踪到原始数据源
WITH RECURSIVE sources AS (
-- 基础:直接源
SELECT
upstream_table,
upstream_column,
1 as depth
FROM column_lineage
WHERE downstream_table = 'executive_dashboard'
AND downstream_column = 'ytd_revenue'
UNION ALL
-- 递归:源的源
SELECT
cl.upstream_table,
cl.upstream_column,
s.depth + 1
FROM column_lineage cl
JOIN sources s
ON cl.downstream_table = s.upstream_table
AND cl.downstream_column = s.upstream_column
WHERE s.depth < 10
)
SELECT DISTINCT
upstream_table,
upstream_column,
depth
FROM sources
WHERE upstream_table NOT IN (
SELECT DISTINCT downstream_table FROM column_lineage
) -- 仅叶子节点(真正的源)
ORDER BY upstream_table, upstream_column;结果:仪表板指标的原始数据源
| upstream_table | upstream_column | depth |
|----------------|-----------------|-------|
| orders | amount | 4 |
| orders | tax | 4 |
| orders | shipping | 4 |
| stripe_events | charge_amount | 5 |Data Catalog
数据目录
Schema Registry
Schema 注册中心
Track all datasets and their metadata:
sql
CREATE TABLE data_catalog (
id BIGSERIAL PRIMARY KEY,
dataset_name VARCHAR(255) UNIQUE NOT NULL,
dataset_type VARCHAR(50), -- 'table', 'view', 'api', 'file'
description TEXT,
owner VARCHAR(255),
steward VARCHAR(255), -- Data steward (responsible for quality)
sensitivity VARCHAR(50), -- 'public', 'internal', 'confidential', 'restricted'
contains_pii BOOLEAN DEFAULT FALSE,
retention_days INT, -- How long to keep data
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE data_catalog_columns (
dataset_id BIGINT REFERENCES data_catalog(id),
column_name VARCHAR(255),
data_type VARCHAR(50),
description TEXT,
is_nullable BOOLEAN,
is_pii BOOLEAN DEFAULT FALSE,
pii_type VARCHAR(50), -- 'email', 'ssn', 'phone', 'name', etc.
sample_values TEXT[],
PRIMARY KEY (dataset_id, column_name)
);
-- Example: Register orders table
INSERT INTO data_catalog VALUES (
DEFAULT,
'orders',
'table',
'Customer orders from e-commerce platform',
'engineering@company.com',
'data-team@company.com',
'internal',
TRUE, -- Contains PII
2555, -- 7 years retention
NOW(),
NOW()
);
INSERT INTO data_catalog_columns VALUES
(1, 'id', 'BIGINT', 'Unique order ID', FALSE, FALSE, NULL, NULL),
(1, 'customer_email', 'VARCHAR(255)', 'Customer email address', FALSE, TRUE, 'email', NULL),
(1, 'amount', 'DECIMAL(10,2)', 'Order total in USD', FALSE, FALSE, NULL, '{10.99, 25.50, 100.00}');追踪所有数据集及其元数据:
sql
CREATE TABLE data_catalog (
id BIGSERIAL PRIMARY KEY,
dataset_name VARCHAR(255) UNIQUE NOT NULL,
dataset_type VARCHAR(50), -- 'table', 'view', 'api', 'file'
description TEXT,
owner VARCHAR(255),
steward VARCHAR(255), -- 数据管家(负责数据质量)
sensitivity VARCHAR(50), -- 'public', 'internal', 'confidential', 'restricted'
contains_pii BOOLEAN DEFAULT FALSE,
retention_days INT, -- 数据保留时长
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE data_catalog_columns (
dataset_id BIGINT REFERENCES data_catalog(id),
column_name VARCHAR(255),
data_type VARCHAR(50),
description TEXT,
is_nullable BOOLEAN,
is_pii BOOLEAN DEFAULT FALSE,
pii_type VARCHAR(50), -- 'email', 'ssn', 'phone', 'name'等
sample_values TEXT[],
PRIMARY KEY (dataset_id, column_name)
);
-- 示例:注册orders表
INSERT INTO data_catalog VALUES (
DEFAULT,
'orders',
'table',
'电商平台的客户订单数据',
'engineering@company.com',
'data-team@company.com',
'internal',
TRUE, -- 包含PII
2555, -- 保留7年
NOW(),
NOW()
);
INSERT INTO data_catalog_columns VALUES
(1, 'id', 'BIGINT', '唯一订单ID', FALSE, FALSE, NULL, NULL),
(1, 'customer_email', 'VARCHAR(255)', '客户邮箱地址', FALSE, TRUE, 'email', NULL),
(1, 'amount', 'DECIMAL(10,2)', '订单总金额(美元)', FALSE, FALSE, NULL, '{10.99, 25.50, 100.00}');Searchable Catalog
可搜索目录
Find datasets by keyword:
sql
-- Full-text search
CREATE INDEX idx_catalog_search ON data_catalog
USING GIN(to_tsvector('english', dataset_name || ' ' || description));
-- Search for "revenue"
SELECT
dataset_name,
dataset_type,
description,
owner
FROM data_catalog
WHERE to_tsvector('english', dataset_name || ' ' || description)
@@ to_tsquery('english', 'revenue')
ORDER BY dataset_name;按关键词查找数据集:
sql
-- 全文搜索
CREATE INDEX idx_catalog_search ON data_catalog
USING GIN(to_tsvector('english', dataset_name || ' ' || description));
-- 搜索“revenue”相关数据集
SELECT
dataset_name,
dataset_type,
description,
owner
FROM data_catalog
WHERE to_tsvector('english', dataset_name || ' ' || description)
@@ to_tsquery('english', 'revenue')
ORDER BY dataset_name;Compliance & Data Privacy
合规性与数据隐私
GDPR: Right to be Forgotten
GDPR:被遗忘权
Track all PII to enable deletion:
sql
-- Find all PII for a user
SELECT
dc.dataset_name,
dcc.column_name,
dcc.pii_type
FROM data_catalog dc
JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
WHERE dcc.is_pii = TRUE;
-- Result: Tables/columns containing PII
| dataset_name | column_name | pii_type |
|-----------------|----------------|----------|
| orders | customer_email | email |
| users | email | email |
| users | name | name |
| support_tickets | email | email |
| analytics_events| user_id | user_id |
-- Generate deletion script
SELECT
'DELETE FROM ' || dataset_name || ' WHERE ' || column_name || ' = ''' || user_email || ''';'
FROM (
SELECT DISTINCT
dc.dataset_name,
dcc.column_name
FROM data_catalog dc
JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
WHERE dcc.pii_type = 'email'
) subq;
-- Output:
-- DELETE FROM orders WHERE customer_email = 'user@example.com';
-- DELETE FROM users WHERE email = 'user@example.com';
-- DELETE FROM support_tickets WHERE email = 'user@example.com';追踪所有PII以支持删除操作:
sql
-- 查找包含PII的所有数据
SELECT
dc.dataset_name,
dcc.column_name,
dcc.pii_type
FROM data_catalog dc
JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
WHERE dcc.is_pii = TRUE;
-- 结果:包含PII的表/列
| dataset_name | column_name | pii_type |
|-----------------|----------------|----------|
| orders | customer_email | email |
| users | email | email |
| users | name | name |
| support_tickets | email | email |
| analytics_events| user_id | user_id |
-- 生成删除脚本
SELECT
'DELETE FROM ' || dataset_name || ' WHERE ' || column_name || ' = ''' || user_email || ''';'
FROM (
SELECT DISTINCT
dc.dataset_name,
dcc.column_name
FROM data_catalog dc
JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
WHERE dcc.pii_type = 'email'
) subq;
-- 输出:
-- DELETE FROM orders WHERE customer_email = 'user@example.com';
-- DELETE FROM users WHERE email = 'user@example.com';
-- DELETE FROM support_tickets WHERE email = 'user@example.com';PII Tracking in Data Flow
数据流中的PII追踪
Tag PII as it flows through pipeline:
python
def track_pii_flow(source_table, dest_table, pii_fields):
"""Track movement of PII between tables"""
for field in pii_fields:
db.insert(
"""
INSERT INTO pii_lineage (source_table, source_column, dest_table, dest_column, tracked_at)
VALUES (?, ?, ?, ?, NOW())
""",
source_table, field, dest_table, field
)在数据流转过程中标记PII:
python
def track_pii_flow(source_table, dest_table, pii_fields):
"""追踪PII在表之间的流转"""
for field in pii_fields:
db.insert(
"""
INSERT INTO pii_lineage (source_table, source_column, dest_table, dest_column, tracked_at)
VALUES (?, ?, ?, ?, NOW())
""",
source_table, field, dest_table, field
)
-- 使用示例
track_pii_flow('users', 'orders', ['email'])
track_pii_flow('orders', 'daily_sales_with_emails', ['email'])
-- 查询:“该用户的邮箱流转到了哪些地方?”
db.query("""
WITH RECURSIVE pii_flow AS (
SELECT dest_table, dest_column, 1 as depth
FROM pii_lineage
WHERE source_table = 'users' AND source_column = 'email'
UNION ALL
SELECT pl.dest_table, pl.dest_column, pf.depth + 1
FROM pii_lineage pl
JOIN pii_flow pf ON pl.source_table = pf.dest_table AND pl.source_column = pf.dest_column
WHERE pf.depth < 10
)
SELECT DISTINCT dest_table, dest_column FROM pii_flow;
""")Usage
可视化与工具
—
血缘图
track_pii_flow('users', 'orders', ['email'])
track_pii_flow('orders', 'daily_sales_with_emails', ['email'])
生成可视化血缘:
python
import graphviz
def visualize_lineage(table_name):
# 获取血缘数据
lineage = db.query("""
SELECT upstream_table, downstream_table
FROM table_lineage
WHERE upstream_table = ? OR downstream_table = ?
""", table_name, table_name)
# 创建图
dot = graphviz.Digraph()
for row in lineage:
dot.edge(row['upstream_table'], row['downstream_table'])
dot.render('lineage_graph', format='png', view=True)
visualize_lineage('orders')输出:
stripe_api ──► orders ──┬──► daily_sales ──► monthly_revenue
│
customers ──────────────┘Query: "Where has this user's email propagated?"
商用工具
db.query("""
WITH RECURSIVE pii_flow AS (
SELECT dest_table, dest_column, 1 as depth
FROM pii_lineage
WHERE source_table = 'users' AND source_column = 'email'
UNION ALL
SELECT pl.dest_table, pl.dest_column, pf.depth + 1
FROM pii_lineage pl
JOIN pii_flow pf ON pl.source_table = pf.dest_table AND pl.source_column = pf.dest_column
WHERE pf.depth < 10)
SELECT DISTINCT dest_table, dest_column FROM pii_flow;
""")
---| 工具 | 适用场景 | 功能 |
|---|---|---|
| Apache Atlas | 开源数据治理 | 元数据管理、血缘追踪、搜索 |
| Collibra | 企业级数据治理 | 数据目录、血缘追踪、策略管理、工作流 |
| Alation | 数据目录 | 元数据搜索、协作、血缘追踪 |
| Amundsen(Lyft) | 开源数据发现 | 搜索、血缘追踪、使用分析 |
| DataHub(LinkedIn) | 开源元数据平台 | 血缘追踪、数据发现、治理 |
| dbt | 分析工程 | SQL血缘、文档、测试 |
Visualization & Tools
实施检查清单
Lineage Graph
基础版(入门)
Generate visual lineage:
python
import graphviz
def visualize_lineage(table_name):
# Fetch lineage
lineage = db.query("""
SELECT upstream_table, downstream_table
FROM table_lineage
WHERE upstream_table = ? OR downstream_table = ?
""", table_name, table_name)
# Create graph
dot = graphviz.Digraph()
for row in lineage:
dot.edge(row['upstream_table'], row['downstream_table'])
dot.render('lineage_graph', format='png', view=True)
visualize_lineage('orders')Output:
stripe_api ──► orders ──┬──► daily_sales ──► monthly_revenue
│
customers ──────────────┘[ ] 表级血缘追踪
[ ] 关键表的审计日志
[ ] 主要数据集的数据目录
[ ] ETL流程文档Commercial Tools
标准版
| Tool | Use Case | Features |
|---|---|---|
| Apache Atlas | Open-source data governance | Metadata management, lineage, search |
| Collibra | Enterprise data governance | Catalog, lineage, policies, workflows |
| Alation | Data catalog | Metadata search, collaboration, lineage |
| Amundsen (Lyft) | Open-source data discovery | Search, lineage, usage analytics |
| DataHub (LinkedIn) | Open-source metadata platform | Lineage, discovery, governance |
| dbt | Analytics engineering | SQL lineage, documentation, tests |
[ ] 列级血缘
[ ] 从SQL中自动提取血缘
[ ] PII标记与追踪
[ ] 影响分析查询
[ ] 下游消费者的变更通知Implementation Checklist
高级版
Minimal (Start Here)
—
[ ] Table-level lineage tracking
[ ] Audit logs for critical tables
[ ] Data catalog for major datasets
[ ] Documentation of ETL processes[ ] 行级血缘
[ ] 基于CDC的实时血缘
[ ] 可搜索数据目录
[ ] 自动化GDPR合规工具
[ ] 与血缘关联的数据质量指标
[ ] 异常检测机器学习Standard
输出格式
[ ] Column-level lineage
[ ] Automated lineage extraction from SQL
[ ] PII tagging and tracking
[ ] Impact analysis queries
[ ] Change notifications for downstream consumers提供数据溯源相关帮助时,遵循以下格式:
undefinedAdvanced
溯源策略
—
血缘层级
[ ] Row-level lineage
[ ] Real-time lineage from CDC
[ ] Searchable data catalog
[ ] Automated GDPR compliance tools
[ ] Data quality metrics tied to lineage
[ ] Machine learning for anomaly detection- 表级
- 列级
- 行级
- 值级
Output Format
捕获方法
When helping with data provenance:
undefined- 代码埋点
- SQL解析
- 数据库触发器
- CDC(变更数据捕获)
Provenance Strategy
数据目录Schema
Lineage Level
—
- Table-level
- Column-level
- Row-level
- Value-level
[目录表的SQL DDL]
Capture Method
影响分析查询
- Code instrumentation
- SQL parsing
- Database triggers
- CDC (Change Data Capture)
[上下游影响分析的SQL查询]
Data Catalog Schema
PII追踪
[SQL DDL for catalog tables]
包含PII的表:
删除策略:
[分步流程]
Impact Analysis Queries
可视化
[SQL queries for upstream/downstream impact]
[血缘图表示]
PII Tracking
合规要求
Tables with PII:
Deletion strategy:
[Step-by-step process]
- GDPR
- CCPA
- HIPAA
- SOX
- 其他: [说明]
Visualization
工具选型
[Lineage graph representation]
- 血缘追踪: [工具/自定义]
- 数据目录: [工具/自定义]
- 可视化: [工具/自定义]
---Compliance Requirements
集成
- GDPR
- CCPA
- HIPAA
- SOX
- Other: [specify]
可与以下内容集成:
- scalable-data-schema - 追踪Schema随时间的演变
- data-infrastructure-at-scale - 流水线与ETL的血缘追踪
- multi-source-data-conflation - 追踪合并数据的来源
- systems-decompose - 在功能设计中规划血缘
Tooling
—
- Lineage tracking: [Tool/Custom]
- Data catalog: [Tool/Custom]
- Visualization: [Tool/Custom]
---—
Integration
—
Works with:
- scalable-data-schema - Track schema evolution over time
- data-infrastructure-at-scale - Lineage for pipelines and ETL
- multi-source-data-conflation - Track source of merged data
- systems-decompose - Plan lineage as part of feature design
—