Loading...
Loading...
Compare original and translation side by side
undefinedundefined- name: marketing
bounded_context: "Customer acquisition and campaigns"
data_products:
- campaign_performance
- lead_attribution
- customer_segments
team:
product_owner: "Marketing Analytics Lead"
data_engineers: 2
analytics_engineers: 2
- name: product
bounded_context: "Product usage and features"
data_products:
- feature_usage
- product_events
- user_engagement
team:
product_owner: "Product Analytics Lead"
data_engineers: 3
analytics_engineers: 1
- name: finance
bounded_context: "Financial reporting and compliance"
data_products:
- general_ledger
- accounts_receivable
- financial_metrics
team:
product_owner: "Finance Analytics Lead"
data_engineers: 2
analytics_engineers: 2
**Domain Data Product Architecture:**undefined- name: marketing
bounded_context: "Customer acquisition and campaigns"
data_products:
- campaign_performance
- lead_attribution
- customer_segments
team:
product_owner: "Marketing Analytics Lead"
data_engineers: 2
analytics_engineers: 2
- name: product
bounded_context: "Product usage and features"
data_products:
- feature_usage
- product_events
- user_engagement
team:
product_owner: "Product Analytics Lead"
data_engineers: 3
analytics_engineers: 1
- name: finance
bounded_context: "Financial reporting and compliance"
data_products:
- general_ledger
- accounts_receivable
- financial_metrics
team:
product_owner: "Finance Analytics Lead"
data_engineers: 2
analytics_engineers: 2
**领域数据产品架构:**undefinedundefinedundefined
**Data Product Implementation (Python):**
```python
**数据产品实现(Python):**
```pythondef __init__(self, config: Dict):
self.config = config
self.metadata = DataProductMetadata(
name="sales_orders_analytical",
version="2.1.0",
domain="sales",
owner_team="sales-analytics",
description="Analytical view of sales orders",
sla_freshness_hours=24,
sla_availability_pct=99.9
)
self.quality_checks = self._load_quality_checks()
def _load_quality_checks(self) -> List[DataProductQualityCheck]:
"""Load quality checks from config"""
return [
DataProductQualityCheck(
name="no_negative_amounts",
query="SELECT COUNT(*) FROM orders WHERE total_amount < 0",
threshold=0,
severity="critical"
),
DataProductQualityCheck(
name="valid_status",
query="SELECT COUNT(*) FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')",
threshold=0,
severity="critical"
),
DataProductQualityCheck(
name="referential_integrity",
query="SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL",
threshold=0,
severity="critical"
)
]
def extract(self) -> pd.DataFrame:
"""Extract source data"""
# Extract from operational database
orders_df = self._extract_orders()
customers_df = self._extract_customers()
products_df = self._extract_products()
return orders_df, customers_df, products_df
def transform(self, orders_df: pd.DataFrame,
customers_df: pd.DataFrame,
products_df: pd.DataFrame) -> pd.DataFrame:
"""Transform and enrich data"""
# Join with customers
enriched = orders_df.merge(
customers_df[['customer_id', 'customer_segment']],
on='customer_id',
how='left'
)
# Calculate product count per order
product_counts = products_df.groupby('order_id').size().reset_index(name='product_count')
enriched = enriched.merge(product_counts, on='order_id', how='left')
# Apply business logic
enriched['product_count'] = enriched['product_count'].fillna(0)
return enriched
def validate(self, df: pd.DataFrame) -> Dict:
"""Validate data quality"""
results = {
'passed': True,
'checks': []
}
# Schema validation
expected_columns = [
'order_id', 'customer_id', 'order_date', 'total_amount',
'status', 'customer_segment', 'product_count'
]
missing_columns = set(expected_columns) - set(df.columns)
if missing_columns:
results['passed'] = False
results['checks'].append({
'name': 'schema_validation',
'passed': False,
'message': f"Missing columns: {missing_columns}"
})
# Quality checks
for check in self.quality_checks:
result = self._run_quality_check(df, check)
results['checks'].append(result)
if not result['passed']:
results['passed'] = False
return results
def _run_quality_check(self, df: pd.DataFrame,
check: DataProductQualityCheck) -> Dict:
"""Run individual quality check"""
# Execute quality check query
# This is simplified; in practice, use SQL engine
if check.name == "no_negative_amounts":
count = len(df[df['total_amount'] < 0])
elif check.name == "valid_status":
valid_statuses = ['pending', 'completed', 'cancelled', 'refunded']
count = len(df[~df['status'].isin(valid_statuses)])
else:
count = 0
passed = count <= check.threshold
return {
'name': check.name,
'passed': passed,
'count': count,
'threshold': check.threshold,
'severity': check.severity
}
def publish(self, df: pd.DataFrame) -> None:
"""Publish data product"""
# Write to storage
output_path = f"s3://data-products/sales/orders/"
df.to_parquet(
output_path,
partition_cols=['order_date'],
engine='pyarrow'
)
# Register in data catalog
self._register_in_catalog(output_path)
# Update metrics
self._publish_metrics(df)
def _register_in_catalog(self, path: str) -> None:
"""Register data product in catalog"""
catalog_entry = {
'name': self.metadata.name,
'version': self.metadata.version,
'domain': self.metadata.domain,
'location': path,
'last_updated': datetime.utcnow().isoformat(),
'owner': self.metadata.owner_team
}
# Register with data catalog service
pass
def _publish_metrics(self, df: pd.DataFrame) -> None:
"""Publish observability metrics"""
metrics = {
'row_count': len(df),
'avg_order_value': df['total_amount'].mean(),
'null_percentage': df.isnull().sum().to_dict(),
'timestamp': datetime.utcnow().isoformat()
}
# Send to monitoring system
pass
def get_metadata(self) -> Dict:
"""Return data product metadata"""
return {
'name': self.metadata.name,
'version': self.metadata.version,
'domain': self.metadata.domain,
'owner': self.metadata.owner_team,
'description': self.metadata.description,
'sla': {
'freshness_hours': self.metadata.sla_freshness_hours,
'availability_pct': self.metadata.sla_availability_pct
}
}undefineddef __init__(self, config: Dict):
self.config = config
self.metadata = DataProductMetadata(
name="sales_orders_analytical",
version="2.1.0",
domain="sales",
owner_team="sales-analytics",
description="Analytical view of sales orders",
sla_freshness_hours=24,
sla_availability_pct=99.9
)
self.quality_checks = self._load_quality_checks()
def _load_quality_checks(self) -> List[DataProductQualityCheck]:
"""Load quality checks from config"""
return [
DataProductQualityCheck(
name="no_negative_amounts",
query="SELECT COUNT(*) FROM orders WHERE total_amount < 0",
threshold=0,
severity="critical"
),
DataProductQualityCheck(
name="valid_status",
query="SELECT COUNT(*) FROM orders WHERE status NOT IN ('pending', 'completed', 'cancelled', 'refunded')",
threshold=0,
severity="critical"
),
DataProductQualityCheck(
name="referential_integrity",
query="SELECT COUNT(*) FROM orders o LEFT JOIN customers c ON o.customer_id = c.id WHERE c.id IS NULL",
threshold=0,
severity="critical"
)
]
def extract(self) -> pd.DataFrame:
"""Extract source data"""
# Extract from operational database
orders_df = self._extract_orders()
customers_df = self._extract_customers()
products_df = self._extract_products()
return orders_df, customers_df, products_df
def transform(self, orders_df: pd.DataFrame,
customers_df: pd.DataFrame,
products_df: pd.DataFrame) -> pd.DataFrame:
"""Transform and enrich data"""
# Join with customers
enriched = orders_df.merge(
customers_df[['customer_id', 'customer_segment']],
on='customer_id',
how='left'
)
# Calculate product count per order
product_counts = products_df.groupby('order_id').size().reset_index(name='product_count')
enriched = enriched.merge(product_counts, on='order_id', how='left')
# Apply business logic
enriched['product_count'] = enriched['product_count'].fillna(0)
return enriched
def validate(self, df: pd.DataFrame) -> Dict:
"""Validate data quality"""
results = {
'passed': True,
'checks': []
}
# Schema validation
expected_columns = [
'order_id', 'customer_id', 'order_date', 'total_amount',
'status', 'customer_segment', 'product_count'
]
missing_columns = set(expected_columns) - set(df.columns)
if missing_columns:
results['passed'] = False
results['checks'].append({
'name': 'schema_validation',
'passed': False,
'message': f"Missing columns: {missing_columns}"
})
# Quality checks
for check in self.quality_checks:
result = self._run_quality_check(df, check)
results['checks'].append(result)
if not result['passed']:
results['passed'] = False
return results
def _run_quality_check(self, df: pd.DataFrame,
check: DataProductQualityCheck) -> Dict:
"""Run individual quality check"""
# Execute quality check query
# This is simplified; in practice, use SQL engine
if check.name == "no_negative_amounts":
count = len(df[df['total_amount'] < 0])
elif check.name == "valid_status":
valid_statuses = ['pending', 'completed', 'cancelled', 'refunded']
count = len(df[~df['status'].isin(valid_statuses)])
else:
count = 0
passed = count <= check.threshold
return {
'name': check.name,
'passed': passed,
'count': count,
'threshold': check.threshold,
'severity': check.severity
}
def publish(self, df: pd.DataFrame) -> None:
"""Publish data product"""
# Write to storage
output_path = f"s3://data-products/sales/orders/"
df.to_parquet(
output_path,
partition_cols=['order_date'],
engine='pyarrow'
)
# Register in data catalog
self._register_in_catalog(output_path)
# Update metrics
self._publish_metrics(df)
def _register_in_catalog(self, path: str) -> None:
"""Register data product in catalog"""
catalog_entry = {
'name': self.metadata.name,
'version': self.metadata.version,
'domain': self.metadata.domain,
'location': path,
'last_updated': datetime.utcnow().isoformat(),
'owner': self.metadata.owner_team
}
# Register with data catalog service
pass
def _publish_metrics(self, df: pd.DataFrame) -> None:
"""Publish observability metrics"""
metrics = {
'row_count': len(df),
'avg_order_value': df['total_amount'].mean(),
'null_percentage': df.isnull().sum().to_dict(),
'timestamp': datetime.utcnow().isoformat()
}
# Send to monitoring system
pass
def get_metadata(self) -> Dict:
"""Return data product metadata"""
return {
'name': self.metadata.name,
'version': self.metadata.version,
'domain': self.metadata.domain,
'owner': self.metadata.owner_team,
'description': self.metadata.description,
'sla': {
'freshness_hours': self.metadata.sla_freshness_hours,
'availability_pct': self.metadata.sla_availability_pct
}
}undefinedundefinedundefined- name: dbt_runner
type: kubernetes
purpose: SQL transformations
resources:
cpu: 4
memory: 16Gi- name: data_warehouse
type: snowflake
purpose: Analytical queries
auto_suspend: 10_minutes- name: ranger
type: authorization
purpose: Fine-grained access control
**Platform APIs:**
```python- name: dbt_runner
type: kubernetes
purpose: SQL transformations
resources:
cpu: 4
memory: 16Gi- name: data_warehouse
type: snowflake
purpose: Analytical queries
auto_suspend: 10_minutes- name: ranger
type: authorization
purpose: Fine-grained access control
**平台API:**
```pythondef create_data_product(self, spec: DataProductSpec) -> str:
"""
Create new data product with platform automation
Steps:
1. Provision compute resources
2. Create storage location
3. Deploy transformation pipeline
4. Configure quality checks
5. Register in catalog
6. Set up monitoring
"""
# Generate unique ID
product_id = f"{spec.domain}_{spec.name}"
# Create storage location
storage_path = self._provision_storage(product_id)
# Deploy dbt project
dbt_project = self._create_dbt_project(spec)
self._deploy_dbt_project(dbt_project)
# Create Airflow DAG
dag = self._create_airflow_dag(spec, storage_path)
self._deploy_dag(dag)
# Register in catalog
self._register_in_catalog(product_id, spec, storage_path)
# Set up monitoring
self._setup_monitoring(product_id, spec)
return product_id
def _provision_storage(self, product_id: str) -> str:
"""Provision storage for data product"""
path = f"s3://data-products/{product_id}/"
# Create S3 bucket/prefix
# Set lifecycle policies
# Configure access control
return path
def _create_dbt_project(self, spec: DataProductSpec) -> Dict:
"""Generate dbt project for data product"""
return {
'name': spec.name,
'models': {
f"{spec.name}.sql": spec.transformation_sql
},
'tests': self._generate_dbt_tests(spec.quality_checks),
'docs': self._generate_dbt_docs(spec)
}
def _create_airflow_dag(self, spec: DataProductSpec, storage_path: str) -> str:
"""Generate Airflow DAG for data product"""
dag_template = f"""def get_data_product(self, product_id: str) -> Dict:
"""Retrieve data product information"""
return self._catalog.get(product_id)
def list_data_products(self, domain: Optional[str] = None) -> List[Dict]:
"""List available data products"""
products = self._catalog.search(domain=domain)
return products
def discover_data_products(self, query: str) -> List[Dict]:
"""Search for data products"""
return self._catalog.search(query=query)
def request_access(self, product_id: str, requester: str) -> str:
"""Request access to data product"""
# Create access request ticket
# Notify data product owner
# Track approval workflow
pass
def grant_access(self, product_id: str, user: str, access_level: str):
"""Grant access to data product"""
# Update IAM policies
# Configure row-level security
# Log access grant
passundefineddef create_data_product(self, spec: DataProductSpec) -> str:
"""
Create new data product with platform automation
Steps:
1. Provision compute resources
2. Create storage location
3. Deploy transformation pipeline
4. Configure quality checks
5. Register in catalog
6. Set up monitoring
"""
# Generate unique ID
product_id = f"{spec.domain}_{spec.name}"
# Create storage location
storage_path = self._provision_storage(product_id)
# Deploy dbt project
dbt_project = self._create_dbt_project(spec)
self._deploy_dbt_project(dbt_project)
# Create Airflow DAG
dag = self._create_airflow_dag(spec, storage_path)
self._deploy_dag(dag)
# Register in catalog
self._register_in_catalog(product_id, spec, storage_path)
# Set up monitoring
self._setup_monitoring(product_id, spec)
return product_id
def _provision_storage(self, product_id: str) -> str:
"""Provision storage for data product"""
path = f"s3://data-products/{product_id}/"
# Create S3 bucket/prefix
# Set lifecycle policies
# Configure access control
return path
def _create_dbt_project(self, spec: DataProductSpec) -> Dict:
"""Generate dbt project for data product"""
return {
'name': spec.name,
'models': {
f"{spec.name}.sql": spec.transformation_sql
},
'tests': self._generate_dbt_tests(spec.quality_checks),
'docs': self._generate_dbt_docs(spec)
}
def _create_airflow_dag(self, spec: DataProductSpec, storage_path: str) -> str:
"""Generate Airflow DAG for data product"""
dag_template = f"""def get_data_product(self, product_id: str) -> Dict:
"""Retrieve data product information"""
return self._catalog.get(product_id)
def list_data_products(self, domain: Optional[str] = None) -> List[Dict]:
"""List available data products"""
products = self._catalog.search(domain=domain)
return products
def discover_data_products(self, query: str) -> List[Dict]:
"""Search for data products"""
return self._catalog.search(query=query)
def request_access(self, product_id: str, requester: str) -> str:
"""Request access to data product"""
# Create access request ticket
# Notify data product owner
# Track approval workflow
pass
def grant_access(self, product_id: str, user: str, access_level: str):
"""Grant access to data product"""
# Update IAM policies
# Configure row-level security
# Log access grant
passundefinedundefinedundefined- name: pii_handling
mandatory: true
policy: |
Data products containing PII must:
- Mark PII fields in schema
- Implement column-level encryption
- Enable audit logging
- Comply with GDPR/CCPA requirements
- name: data_retention
mandatory: true
policy: |
Data retention periods:
- Operational data: 7 years
- Analytical data: 3 years
- Logs: 1 year
- Deleted data: 30 days in trashfinance:
data_quality:
- completeness: ">= 99.9%"
- accuracy: ">= 99.99%"
- freshness: "<= 1 hour"
access_control:
- default_access: confidential
- sox_compliance: true
- audit_all_access: true- name: quality_gates
enforcement: pre-publish
check: |
All quality checks must pass:
- No critical failures
- Warning threshold: <= 5%
- name: breaking_changes
enforcement: pre-publish
check: |
Breaking changes require:
- Major version increment
- 30-day deprecation notice
- Migration guide
**Governance Implementation:**
```python- name: pii_handling
mandatory: true
policy: |
Data products containing PII must:
- Mark PII fields in schema
- Implement column-level encryption
- Enable audit logging
- Comply with GDPR/CCPA requirements
- name: data_retention
mandatory: true
policy: |
Data retention periods:
- Operational data: 7 years
- Analytical data: 3 years
- Logs: 1 year
- Deleted data: 30 days in trashfinance:
data_quality:
- completeness: ">= 99.9%"
- accuracy: ">= 99.99%"
- freshness: "<= 1 hour"
access_control:
- default_access: confidential
- sox_compliance: true
- audit_all_access: true- name: quality_gates
enforcement: pre-publish
check: |
All quality checks must pass:
- No critical failures
- Warning threshold: <= 5%
- name: breaking_changes
enforcement: pre-publish
check: |
Breaking changes require:
- Major version increment
- 30-day deprecation notice
- Migration guide
**治理实现:**
```pythondef __init__(self, policies: Dict):
self.policies = policies
def validate_data_product(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate data product against governance policies"""
violations = []
# Check data classification
violations.extend(self._check_data_classification(product_spec))
# Check PII handling
violations.extend(self._check_pii_compliance(product_spec))
# Check schema requirements
violations.extend(self._check_schema_requirements(product_spec))
# Check quality checks
violations.extend(self._check_quality_requirements(product_spec))
# Check retention policy
violations.extend(self._check_retention_policy(product_spec))
return violations
def _check_data_classification(self, product_spec: Dict) -> List[PolicyViolation]:
"""Verify data classification is set"""
violations = []
if 'classification' not in product_spec:
violations.append(PolicyViolation(
policy_name="data_classification",
severity=PolicyViolationSeverity.ERROR,
message="Data classification not specified"
))
valid_classifications = ['public', 'internal', 'confidential', 'restricted']
if product_spec.get('classification') not in valid_classifications:
violations.append(PolicyViolation(
policy_name="data_classification",
severity=PolicyViolationSeverity.ERROR,
message=f"Invalid classification. Must be one of: {valid_classifications}"
))
return violations
def _check_pii_compliance(self, product_spec: Dict) -> List[PolicyViolation]:
"""Check PII handling compliance"""
violations = []
schema = product_spec.get('schema', {})
pii_fields = [f for f in schema.get('fields', []) if f.get('is_pii')]
if pii_fields:
# Check encryption
if not product_spec.get('encryption_enabled'):
violations.append(PolicyViolation(
policy_name="pii_handling",
severity=PolicyViolationSeverity.CRITICAL,
message="PII fields present but encryption not enabled"
))
# Check audit logging
if not product_spec.get('audit_logging_enabled'):
violations.append(PolicyViolation(
policy_name="pii_handling",
severity=PolicyViolationSeverity.CRITICAL,
message="PII fields present but audit logging not enabled"
))
# Check field marking
for field in pii_fields:
if not field.get('pii_category'):
violations.append(PolicyViolation(
policy_name="pii_handling",
severity=PolicyViolationSeverity.ERROR,
message=f"PII field {field['name']} missing pii_category",
field=field['name']
))
return violations
def _check_schema_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate schema completeness"""
violations = []
schema = product_spec.get('schema', {})
if not schema:
violations.append(PolicyViolation(
policy_name="schema_validation",
severity=PolicyViolationSeverity.ERROR,
message="Schema not defined"
))
return violations
# Check for primary key
fields = schema.get('fields', [])
has_primary_key = any(f.get('is_primary_key') for f in fields)
if not has_primary_key:
violations.append(PolicyViolation(
policy_name="schema_validation",
severity=PolicyViolationSeverity.WARNING,
message="No primary key defined"
))
# Check field documentation
for field in fields:
if not field.get('description'):
violations.append(PolicyViolation(
policy_name="schema_validation",
severity=PolicyViolationSeverity.WARNING,
message=f"Field {field['name']} missing description",
field=field['name']
))
return violations
def _check_quality_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate quality check configuration"""
violations = []
quality_checks = product_spec.get('sla', {}).get('quality_checks', [])
if not quality_checks:
violations.append(PolicyViolation(
policy_name="quality_gates",
severity=PolicyViolationSeverity.WARNING,
message="No quality checks defined"
))
# Check for minimum required checks
check_names = [check['name'] for check in quality_checks]
required_checks = ['completeness', 'freshness']
missing_checks = set(required_checks) - set(check_names)
if missing_checks:
violations.append(PolicyViolation(
policy_name="quality_gates",
severity=PolicyViolationSeverity.WARNING,
message=f"Missing required quality checks: {missing_checks}"
))
return violations
def _check_retention_policy(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate retention policy"""
violations = []
if 'retention_days' not in product_spec:
violations.append(PolicyViolation(
policy_name="data_retention",
severity=PolicyViolationSeverity.ERROR,
message="Retention policy not specified"
))
return violations
def enforce_policies(self, violations: List[PolicyViolation]) -> bool:
"""Determine if data product can be published based on violations"""
# Block on ERROR or CRITICAL violations
blocking_violations = [
v for v in violations
if v.severity in [PolicyViolationSeverity.ERROR, PolicyViolationSeverity.CRITICAL]
]
return len(blocking_violations) == 0
def generate_compliance_report(self, product_id: str) -> Dict:
"""Generate compliance report for data product"""
return {
'product_id': product_id,
'compliance_status': 'compliant',
'last_checked': datetime.utcnow().isoformat(),
'policies_evaluated': len(self.policies),
'violations': []
}undefineddef __init__(self, policies: Dict):
self.policies = policies
def validate_data_product(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate data product against governance policies"""
violations = []
# Check data classification
violations.extend(self._check_data_classification(product_spec))
# Check PII handling
violations.extend(self._check_pii_compliance(product_spec))
# Check schema requirements
violations.extend(self._check_schema_requirements(product_spec))
# Check quality checks
violations.extend(self._check_quality_requirements(product_spec))
# Check retention policy
violations.extend(self._check_retention_policy(product_spec))
return violations
def _check_data_classification(self, product_spec: Dict) -> List[PolicyViolation]:
"""Verify data classification is set"""
violations = []
if 'classification' not in product_spec:
violations.append(PolicyViolation(
policy_name="data_classification",
severity=PolicyViolationSeverity.ERROR,
message="Data classification not specified"
))
valid_classifications = ['public', 'internal', 'confidential', 'restricted']
if product_spec.get('classification') not in valid_classifications:
violations.append(PolicyViolation(
policy_name="data_classification",
severity=PolicyViolationSeverity.ERROR,
message=f"Invalid classification. Must be one of: {valid_classifications}"
))
return violations
def _check_pii_compliance(self, product_spec: Dict) -> List[PolicyViolation]:
"""Check PII handling compliance"""
violations = []
schema = product_spec.get('schema', {})
pii_fields = [f for f in schema.get('fields', []) if f.get('is_pii')]
if pii_fields:
# Check encryption
if not product_spec.get('encryption_enabled'):
violations.append(PolicyViolation(
policy_name="pii_handling",
severity=PolicyViolationSeverity.CRITICAL,
message="PII fields present but encryption not enabled"
))
# Check audit logging
if not product_spec.get('audit_logging_enabled'):
violations.append(PolicyViolation(
policy_name="pii_handling",
severity=PolicyViolationSeverity.CRITICAL,
message="PII fields present but audit logging not enabled"
))
# Check field marking
for field in pii_fields:
if not field.get('pii_category'):
violations.append(PolicyViolation(
policy_name="pii_handling",
severity=PolicyViolationSeverity.ERROR,
message=f"PII field {field['name']} missing pii_category",
field=field['name']
))
return violations
def _check_schema_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate schema completeness"""
violations = []
schema = product_spec.get('schema', {})
if not schema:
violations.append(PolicyViolation(
policy_name="schema_validation",
severity=PolicyViolationSeverity.ERROR,
message="Schema not defined"
))
return violations
# Check for primary key
fields = schema.get('fields', [])
has_primary_key = any(f.get('is_primary_key') for f in fields)
if not has_primary_key:
violations.append(PolicyViolation(
policy_name="schema_validation",
severity=PolicyViolationSeverity.WARNING,
message="No primary key defined"
))
# Check field documentation
for field in fields:
if not field.get('description'):
violations.append(PolicyViolation(
policy_name="schema_validation",
severity=PolicyViolationSeverity.WARNING,
message=f"Field {field['name']} missing description",
field=field['name']
))
return violations
def _check_quality_requirements(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate quality check configuration"""
violations = []
quality_checks = product_spec.get('sla', {}).get('quality_checks', [])
if not quality_checks:
violations.append(PolicyViolation(
policy_name="quality_gates",
severity=PolicyViolationSeverity.WARNING,
message="No quality checks defined"
))
# Check for minimum required checks
check_names = [check['name'] for check in quality_checks]
required_checks = ['completeness', 'freshness']
missing_checks = set(required_checks) - set(check_names)
if missing_checks:
violations.append(PolicyViolation(
policy_name="quality_gates",
severity=PolicyViolationSeverity.WARNING,
message=f"Missing required quality checks: {missing_checks}"
))
return violations
def _check_retention_policy(self, product_spec: Dict) -> List[PolicyViolation]:
"""Validate retention policy"""
violations = []
if 'retention_days' not in product_spec:
violations.append(PolicyViolation(
policy_name="data_retention",
severity=PolicyViolationSeverity.ERROR,
message="Retention policy not specified"
))
return violations
def enforce_policies(self, violations: List[PolicyViolation]) -> bool:
"""Determine if data product can be published based on violations"""
# Block on ERROR or CRITICAL violations
blocking_violations = [
v for v in violations
if v.severity in [PolicyViolationSeverity.ERROR, PolicyViolationSeverity.CRITICAL]
]
return len(blocking_violations) == 0
def generate_compliance_report(self, product_id: str) -> Dict:
"""Generate compliance report for data product"""
return {
'product_id': product_id,
'compliance_status': 'compliant',
'last_checked': datetime.utcnow().isoformat(),
'policies_evaluated': len(self.policies),
'violations': []
}undefined// Bad: Central data team owns all data
Central Team -> All domains (bottleneck)
// Good: Domain teams own their data
Sales Domain -> Sales data products
Marketing Domain -> Marketing data products
Product Domain -> Product data products// 错误:单一中心化团队掌控所有数据
中央团队 -> 所有领域(瓶颈)
// 正确:各领域团队自有数据
销售领域 -> 销售数据产品
营销领域 -> 营销数据产品
产品领域 -> 产品数据产品// Bad: Single giant data lake
s3://data-lake/everything/
// Good: Domain-oriented storage
s3://data-products/sales/
s3://data-products/marketing/
s3://data-products/product/// 错误:单一巨型数据湖
s3://data-lake/everything/
// 正确:领域导向的存储
s3://data-products/sales/
s3://data-products/marketing/
s3://data-products/product/// Bad: Undocumented schema changes
Breaking change deployed without notice
// Good: Versioned contracts with deprecation
v1: Deprecated (30 days notice)
v2: Current
v3: Beta// 错误:无文档的 schema 变更
未通知即发布破坏性变更
// 正确:带弃用通知的版本化契约
v1: 已弃用(提前30天通知)
v2: 当前版本
v3: 测试版// Bad: Manual approval processes
Email -> Ticket -> Manual review -> Access granted (weeks)
// Good: Automated governance
Request -> Policy check -> Auto-approval (minutes)// 错误:手动审批流程
邮件 -> 工单 -> 人工审核 -> 授权访问(耗时数周)
// 正确:自动化治理
申请 -> 政策校验 -> 自动审批(耗时数分钟)