Loading...
Loading...
Track data lineage and provenance from source to consumption. Use when auditing data flows, debugging data quality issues, ensuring compliance (GDPR, SOX), or understanding data dependencies. Covers lineage tracking, impact analysis, data catalogs, and metadata management.
npx skill4agent add sunnypatneedi/claude-starter-kit data-provenanceQuestion: "Where does the revenue number in this dashboard come from?"
Answer (with provenance):
Dashboard.revenue (computed 2026-01-21 08:00)
← warehouse.daily_sales.total (aggregated 2026-01-21 02:00)
← etl_pipeline.transform_sales (ran 2026-01-21 01:30)
← production_db.orders.amount (order #12345, created 2026-01-20 15:23)
← stripe_api.charge (charge_id: ch_abc123, processed 2026-01-20 15:23)
← user input (customer: cust_xyz, card ending 4242)┌────────────┐
│ orders │──┐
└────────────┘ │
├──► ┌──────────────┐
┌────────────┐ │ │ daily_sales │
│ customers │──┘ └──────────────┘
└────────────┘CREATE TABLE table_lineage (
downstream_table VARCHAR(255),
upstream_table VARCHAR(255),
relationship_type VARCHAR(50), -- 'direct_copy', 'join', 'aggregate'
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (downstream_table, upstream_table)
);
INSERT INTO table_lineage VALUES
('daily_sales', 'orders', 'aggregate'),
('daily_sales', 'customers', 'join');SELECT upstream_table
FROM table_lineage
WHERE downstream_table = 'daily_sales';
-- Result: orders, customersSELECT downstream_table
FROM table_lineage
WHERE upstream_table = 'orders';
-- Result: daily_sales, weekly_report, customer_lifetime_valueorders.amount ──┐
orders.tax ──┼──► daily_sales.total_revenue
orders.shipping─┘CREATE TABLE column_lineage (
downstream_table VARCHAR(255),
downstream_column VARCHAR(255),
upstream_table VARCHAR(255),
upstream_column VARCHAR(255),
transformation TEXT, -- SQL or description
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (downstream_table, downstream_column, upstream_table, upstream_column)
);
INSERT INTO column_lineage VALUES
('daily_sales', 'total_revenue', 'orders', 'amount', 'SUM(amount + tax + shipping)'),
('daily_sales', 'order_count', 'orders', 'id', 'COUNT(id)'),
('daily_sales', 'customer_name', 'customers', 'name', 'LEFT JOIN on customer_id');SELECT
upstream_table,
upstream_column,
transformation
FROM column_lineage
WHERE downstream_table = 'daily_sales'
AND downstream_column = 'total_revenue';orders.id=12345 (amount=$100) ──► daily_sales.id=67 (date=2026-01-20, total=$100)
orders.id=12346 (amount=$50) ──┘CREATE TABLE row_lineage (
id BIGSERIAL PRIMARY KEY,
downstream_table VARCHAR(255),
downstream_pk BIGINT,
upstream_table VARCHAR(255),
upstream_pk BIGINT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- After ETL run
INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk)
SELECT
'daily_sales',
ds.id,
'orders',
o.id
FROM daily_sales ds
JOIN orders o ON DATE(o.created_at) = ds.sale_date;SELECT o.*
FROM row_lineage rl
JOIN orders o ON rl.upstream_pk = o.id
WHERE rl.downstream_table = 'daily_sales'
AND rl.downstream_pk = 67;order.amount = $100
order.tax = $10
order.shipping = $5
↓ (SUM transformation)
daily_sales.total_revenue = $115CREATE TABLE value_lineage (
id BIGSERIAL PRIMARY KEY,
entity_type VARCHAR(50),
entity_id BIGINT,
field_name VARCHAR(100),
old_value TEXT,
new_value TEXT,
transformation TEXT,
source_values JSONB, -- Array of source values
created_at TIMESTAMPTZ DEFAULT NOW(),
created_by VARCHAR(255) -- User or process
);
-- Example: Revenue calculation
INSERT INTO value_lineage VALUES (
DEFAULT,
'daily_sales',
67,
'total_revenue',
NULL,
'115.00',
'SUM(orders.amount + orders.tax + orders.shipping) WHERE date = 2026-01-20',
'[{"table": "orders", "id": 12345, "amount": 100, "tax": 10, "shipping": 5}]',
NOW(),
'etl_pipeline_v1.2.3'
);def etl_orders_to_daily_sales():
# Extract
orders = db.query("SELECT * FROM orders WHERE date = ?", yesterday)
# Transform
daily_sales = {}
for order in orders:
date = order['created_at'].date()
if date not in daily_sales:
daily_sales[date] = {'total': 0, 'count': 0, 'order_ids': []}
daily_sales[date]['total'] += order['amount']
daily_sales[date]['count'] += 1
daily_sales[date]['order_ids'].append(order['id'])
# Load with lineage tracking
for date, metrics in daily_sales.items():
ds_id = db.insert(
"INSERT INTO daily_sales (date, total, count) VALUES (?, ?, ?)",
date, metrics['total'], metrics['count']
)
# Track lineage
for order_id in metrics['order_ids']:
db.insert(
"INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk) VALUES (?, ?, ?, ?)",
'daily_sales', ds_id, 'orders', order_id
)import sqlparse
from sqllineage.runner import LineageRunner
sql = """
INSERT INTO daily_sales (date, total_revenue, order_count)
SELECT
DATE(created_at) as date,
SUM(amount + tax + shipping) as total_revenue,
COUNT(*) as order_count
FROM orders
LEFT JOIN customers ON orders.customer_id = customers.id
WHERE created_at >= '2026-01-20'
GROUP BY DATE(created_at)
"""
# Parse lineage
runner = LineageRunner(sql)
print("Source tables:", runner.source_tables)
# {'orders', 'customers'}
print("Target tables:", runner.target_tables)
# {'daily_sales'}
# Store in lineage table
for source in runner.source_tables:
db.insert(
"INSERT INTO table_lineage (downstream_table, upstream_table) VALUES (?, ?)",
'daily_sales', source
)-- Audit trail for all changes
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
table_name VARCHAR(255),
record_id BIGINT,
operation VARCHAR(10), -- INSERT, UPDATE, DELETE
old_values JSONB,
new_values JSONB,
changed_by VARCHAR(255),
changed_at TIMESTAMPTZ DEFAULT NOW()
);
-- Trigger on orders table
CREATE OR REPLACE FUNCTION audit_orders()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO audit_log (table_name, record_id, operation, old_values, new_values, changed_by)
VALUES (
'orders',
COALESCE(NEW.id, OLD.id),
TG_OP,
row_to_json(OLD),
row_to_json(NEW),
current_user
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER orders_audit
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_orders();# Using Debezium or similar CDC tool
from kafka import KafkaConsumer
consumer = KafkaConsumer('postgres.public.orders')
for message in consumer:
change_event = json.loads(message.value)
# Store in lineage system
db.insert(
"INSERT INTO change_log (table_name, operation, before, after, timestamp) VALUES (?, ?, ?, ?, ?)",
change_event['source']['table'],
change_event['op'], # 'c' (create), 'u' (update), 'd' (delete)
change_event.get('before'),
change_event.get('after'),
change_event['ts_ms']
)-- Find all downstream dependencies
WITH RECURSIVE dependencies AS (
-- Base: Direct dependencies
SELECT
downstream_table,
downstream_column,
1 as depth
FROM column_lineage
WHERE upstream_table = 'orders'
AND upstream_column = 'amount'
UNION ALL
-- Recursive: Dependencies of dependencies
SELECT
cl.downstream_table,
cl.downstream_column,
d.depth + 1
FROM column_lineage cl
JOIN dependencies d
ON cl.upstream_table = d.downstream_table
AND cl.upstream_column = d.downstream_column
WHERE d.depth < 10 -- Prevent infinite loops
)
SELECT DISTINCT
downstream_table,
downstream_column,
depth
FROM dependencies
ORDER BY depth, downstream_table, downstream_column;| downstream_table | downstream_column | depth |
|------------------------|--------------------|-------|
| daily_sales | total_revenue | 1 |
| monthly_revenue | total | 2 |
| executive_dashboard | ytd_revenue | 3 |
| investor_report | arr | 4 |orders.amount-- Trace backwards to original sources
WITH RECURSIVE sources AS (
-- Base: Direct sources
SELECT
upstream_table,
upstream_column,
1 as depth
FROM column_lineage
WHERE downstream_table = 'executive_dashboard'
AND downstream_column = 'ytd_revenue'
UNION ALL
-- Recursive: Sources of sources
SELECT
cl.upstream_table,
cl.upstream_column,
s.depth + 1
FROM column_lineage cl
JOIN sources s
ON cl.downstream_table = s.upstream_table
AND cl.downstream_column = s.upstream_column
WHERE s.depth < 10
)
SELECT DISTINCT
upstream_table,
upstream_column,
depth
FROM sources
WHERE upstream_table NOT IN (
SELECT DISTINCT downstream_table FROM column_lineage
) -- Only leaf nodes (true sources)
ORDER BY upstream_table, upstream_column;| upstream_table | upstream_column | depth |
|----------------|-----------------|-------|
| orders | amount | 4 |
| orders | tax | 4 |
| orders | shipping | 4 |
| stripe_events | charge_amount | 5 |CREATE TABLE data_catalog (
id BIGSERIAL PRIMARY KEY,
dataset_name VARCHAR(255) UNIQUE NOT NULL,
dataset_type VARCHAR(50), -- 'table', 'view', 'api', 'file'
description TEXT,
owner VARCHAR(255),
steward VARCHAR(255), -- Data steward (responsible for quality)
sensitivity VARCHAR(50), -- 'public', 'internal', 'confidential', 'restricted'
contains_pii BOOLEAN DEFAULT FALSE,
retention_days INT, -- How long to keep data
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE data_catalog_columns (
dataset_id BIGINT REFERENCES data_catalog(id),
column_name VARCHAR(255),
data_type VARCHAR(50),
description TEXT,
is_nullable BOOLEAN,
is_pii BOOLEAN DEFAULT FALSE,
pii_type VARCHAR(50), -- 'email', 'ssn', 'phone', 'name', etc.
sample_values TEXT[],
PRIMARY KEY (dataset_id, column_name)
);
-- Example: Register orders table
INSERT INTO data_catalog VALUES (
DEFAULT,
'orders',
'table',
'Customer orders from e-commerce platform',
'engineering@company.com',
'data-team@company.com',
'internal',
TRUE, -- Contains PII
2555, -- 7 years retention
NOW(),
NOW()
);
INSERT INTO data_catalog_columns VALUES
(1, 'id', 'BIGINT', 'Unique order ID', FALSE, FALSE, NULL, NULL),
(1, 'customer_email', 'VARCHAR(255)', 'Customer email address', FALSE, TRUE, 'email', NULL),
(1, 'amount', 'DECIMAL(10,2)', 'Order total in USD', FALSE, FALSE, NULL, '{10.99, 25.50, 100.00}');-- Full-text search
CREATE INDEX idx_catalog_search ON data_catalog
USING GIN(to_tsvector('english', dataset_name || ' ' || description));
-- Search for "revenue"
SELECT
dataset_name,
dataset_type,
description,
owner
FROM data_catalog
WHERE to_tsvector('english', dataset_name || ' ' || description)
@@ to_tsquery('english', 'revenue')
ORDER BY dataset_name;-- Find all PII for a user
SELECT
dc.dataset_name,
dcc.column_name,
dcc.pii_type
FROM data_catalog dc
JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
WHERE dcc.is_pii = TRUE;
-- Result: Tables/columns containing PII
| dataset_name | column_name | pii_type |
|-----------------|----------------|----------|
| orders | customer_email | email |
| users | email | email |
| users | name | name |
| support_tickets | email | email |
| analytics_events| user_id | user_id |
-- Generate deletion script
SELECT
'DELETE FROM ' || dataset_name || ' WHERE ' || column_name || ' = ''' || user_email || ''';'
FROM (
SELECT DISTINCT
dc.dataset_name,
dcc.column_name
FROM data_catalog dc
JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
WHERE dcc.pii_type = 'email'
) subq;
-- Output:
-- DELETE FROM orders WHERE customer_email = 'user@example.com';
-- DELETE FROM users WHERE email = 'user@example.com';
-- DELETE FROM support_tickets WHERE email = 'user@example.com';def track_pii_flow(source_table, dest_table, pii_fields):
"""Track movement of PII between tables"""
for field in pii_fields:
db.insert(
"""
INSERT INTO pii_lineage (source_table, source_column, dest_table, dest_column, tracked_at)
VALUES (?, ?, ?, ?, NOW())
""",
source_table, field, dest_table, field
)
# Usage
track_pii_flow('users', 'orders', ['email'])
track_pii_flow('orders', 'daily_sales_with_emails', ['email'])
# Query: "Where has this user's email propagated?"
db.query("""
WITH RECURSIVE pii_flow AS (
SELECT dest_table, dest_column, 1 as depth
FROM pii_lineage
WHERE source_table = 'users' AND source_column = 'email'
UNION ALL
SELECT pl.dest_table, pl.dest_column, pf.depth + 1
FROM pii_lineage pl
JOIN pii_flow pf ON pl.source_table = pf.dest_table AND pl.source_column = pf.dest_column
WHERE pf.depth < 10
)
SELECT DISTINCT dest_table, dest_column FROM pii_flow;
""")import graphviz
def visualize_lineage(table_name):
# Fetch lineage
lineage = db.query("""
SELECT upstream_table, downstream_table
FROM table_lineage
WHERE upstream_table = ? OR downstream_table = ?
""", table_name, table_name)
# Create graph
dot = graphviz.Digraph()
for row in lineage:
dot.edge(row['upstream_table'], row['downstream_table'])
dot.render('lineage_graph', format='png', view=True)
visualize_lineage('orders')stripe_api ──► orders ──┬──► daily_sales ──► monthly_revenue
│
customers ──────────────┘| Tool | Use Case | Features |
|---|---|---|
| Apache Atlas | Open-source data governance | Metadata management, lineage, search |
| Collibra | Enterprise data governance | Catalog, lineage, policies, workflows |
| Alation | Data catalog | Metadata search, collaboration, lineage |
| Amundsen (Lyft) | Open-source data discovery | Search, lineage, usage analytics |
| DataHub (LinkedIn) | Open-source metadata platform | Lineage, discovery, governance |
| dbt | Analytics engineering | SQL lineage, documentation, tests |
[ ] Table-level lineage tracking
[ ] Audit logs for critical tables
[ ] Data catalog for major datasets
[ ] Documentation of ETL processes[ ] Column-level lineage
[ ] Automated lineage extraction from SQL
[ ] PII tagging and tracking
[ ] Impact analysis queries
[ ] Change notifications for downstream consumers[ ] Row-level lineage
[ ] Real-time lineage from CDC
[ ] Searchable data catalog
[ ] Automated GDPR compliance tools
[ ] Data quality metrics tied to lineage
[ ] Machine learning for anomaly detection## Provenance Strategy
### Lineage Level
- [ ] Table-level
- [ ] Column-level
- [ ] Row-level
- [ ] Value-level
### Capture Method
- [ ] Code instrumentation
- [ ] SQL parsing
- [ ] Database triggers
- [ ] CDC (Change Data Capture)
### Data Catalog Schema
[SQL DDL for catalog tables]
### Impact Analysis Queries
[SQL queries for upstream/downstream impact]
### PII Tracking
Tables with PII:
- [Table 1]: [Columns]
- [Table 2]: [Columns]
Deletion strategy:
[Step-by-step process]
### Visualization
[Lineage graph representation]
### Compliance Requirements
- [ ] GDPR
- [ ] CCPA
- [ ] HIPAA
- [ ] SOX
- [ ] Other: [specify]
### Tooling
- Lineage tracking: [Tool/Custom]
- Data catalog: [Tool/Custom]
- Visualization: [Tool/Custom]