data-quality-frameworks
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseData Quality Frameworks
数据质量框架
Production patterns for implementing data quality with Great Expectations, dbt tests, and data contracts to ensure reliable data pipelines.
使用Great Expectations、dbt tests和数据契约实现数据质量的生产级实践模式,确保数据管道的可靠性。
When to Use This Skill
何时使用该技能
- Implementing data quality checks in pipelines
- Setting up Great Expectations validation
- Building comprehensive dbt test suites
- Establishing data contracts between teams
- Monitoring data quality metrics
- Automating data validation in CI/CD
- 在数据管道中实施数据质量检查
- 配置Great Expectations验证
- 构建全面的dbt测试套件
- 在团队间建立数据契约
- 监控数据质量指标
- 在CI/CD中自动化数据验证
Core Concepts
核心概念
1. Data Quality Dimensions
1. 数据质量维度
| Dimension | Description | Example Check |
|---|---|---|
| Completeness | No missing values | |
| Uniqueness | No duplicates | |
| Validity | Values in expected range | |
| Accuracy | Data matches reality | Cross-reference validation |
| Consistency | No contradictions | |
| Timeliness | Data is recent | |
| 维度 | 描述 | 示例检查 |
|---|---|---|
| 完整性 | 无缺失值 | |
| 唯一性 | 无重复值 | |
| 有效性 | 值在预期范围内 | |
| 准确性 | 数据与实际情况匹配 | 交叉引用验证 |
| 一致性 | 无矛盾冲突 | |
| 及时性 | 数据是最新的 | |
2. Testing Pyramid for Data
2. 数据测试金字塔
/\
/ \ Integration Tests (cross-table)
/────\
/ \ Unit Tests (single column)
/────────\
/ \ Schema Tests (structure)
/────────────\ /\
/ \ 集成测试(跨表)
/────\
/ \ 单元测试(单列)
/────────\
/ \ Schema测试(结构)
/────────────\Quick Start
快速开始
Great Expectations Setup
Great Expectations 安装配置
bash
undefinedbash
undefinedInstall
Install
pip install great_expectations
pip install great_expectations
Initialize project
Initialize project
great_expectations init
great_expectations init
Create datasource
Create datasource
great_expectations datasource new
```pythongreat_expectations datasource new
```pythongreat_expectations/checkpoints/daily_validation.yml
great_expectations/checkpoints/daily_validation.yml
import great_expectations as gx
import great_expectations as gx
Create context
Create context
context = gx.get_context()
context = gx.get_context()
Create expectation suite
Create expectation suite
suite = context.add_expectation_suite("orders_suite")
suite = context.add_expectation_suite("orders_suite")
Add expectations
Add expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
Validate
Validate
results = context.run_checkpoint(checkpoint_name="daily_orders")
undefinedresults = context.run_checkpoint(checkpoint_name="daily_orders")
undefinedPatterns
实践模式
Pattern 1: Great Expectations Suite
模式1:Great Expectations 验证套件
python
undefinedpython
undefinedexpectations/orders_suite.py
expectations/orders_suite.py
import great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.core.expectation_configuration import ExpectationConfiguration
def build_orders_suite() -> ExpectationSuite:
"""Build comprehensive orders expectation suite"""
suite = ExpectationSuite(expectation_suite_name="orders_suite")
# Schema expectations
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_table_columns_to_match_set",
kwargs={
"column_set": ["order_id", "customer_id", "amount", "status", "created_at"],
"exact_match": False # Allow additional columns
}
))
# Primary key
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "order_id"}
))
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_unique",
kwargs={"column": "order_id"}
))
# Foreign key
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "customer_id"}
))
# Categorical values
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": "status",
"value_set": ["pending", "processing", "shipped", "delivered", "cancelled"]
}
))
# Numeric ranges
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={
"column": "amount",
"min_value": 0,
"max_value": 100000,
"strict_min": True # amount > 0
}
))
# Date validity
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_dateutil_parseable",
kwargs={"column": "created_at"}
))
# Freshness - data should be recent
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_max_to_be_between",
kwargs={
"column": "created_at",
"min_value": {"$PARAMETER": "now - timedelta(days=1)"},
"max_value": {"$PARAMETER": "now"}
}
))
# Row count sanity
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 1000, # Expect at least 1000 rows
"max_value": 10000000
}
))
# Statistical expectations
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_mean_to_be_between",
kwargs={
"column": "amount",
"min_value": 50,
"max_value": 500
}
))
return suiteundefinedimport great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.core.expectation_configuration import ExpectationConfiguration
def build_orders_suite() -> ExpectationSuite:
"""Build comprehensive orders expectation suite"""
suite = ExpectationSuite(expectation_suite_name="orders_suite")
# Schema expectations
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_table_columns_to_match_set",
kwargs={
"column_set": ["order_id", "customer_id", "amount", "status", "created_at"],
"exact_match": False # Allow additional columns
}
))
# Primary key
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "order_id"}
))
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_unique",
kwargs={"column": "order_id"}
))
# Foreign key
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "customer_id"}
))
# Categorical values
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": "status",
"value_set": ["pending", "processing", "shipped", "delivered", "cancelled"]
}
))
# Numeric ranges
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={
"column": "amount",
"min_value": 0,
"max_value": 100000,
"strict_min": True # amount > 0
}
))
# Date validity
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_dateutil_parseable",
kwargs={"column": "created_at"}
))
# Freshness - data should be recent
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_max_to_be_between",
kwargs={
"column": "created_at",
"min_value": {"$PARAMETER": "now - timedelta(days=1)"},
"max_value": {"$PARAMETER": "now"}
}
))
# Row count sanity
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 1000, # Expect at least 1000 rows
"max_value": 10000000
}
))
# Statistical expectations
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_mean_to_be_between",
kwargs={
"column": "amount",
"min_value": 50,
"max_value": 500
}
))
return suiteundefinedPattern 2: Great Expectations Checkpoint
模式2:Great Expectations 检查点
yaml
undefinedyaml
undefinedgreat_expectations/checkpoints/orders_checkpoint.yml
great_expectations/checkpoints/orders_checkpoint.yml
name: orders_checkpoint
config_version: 1.0
class_name: Checkpoint
run_name_template: "%Y%m%d-%H%M%S-orders-validation"
validations:
- batch_request: datasource_name: warehouse data_connector_name: default_inferred_data_connector_name data_asset_name: orders data_connector_query: index: -1 # Latest batch expectation_suite_name: orders_suite
action_list:
-
name: store_validation_result action: class_name: StoreValidationResultAction
-
name: store_evaluation_parameters action: class_name: StoreEvaluationParametersAction
-
name: update_data_docs action: class_name: UpdateDataDocsAction
Slack notification on failure
- name: send_slack_notification action: class_name: SlackNotificationAction slack_webhook: ${SLACK_WEBHOOK} notify_on: failure renderer: module_name: great_expectations.render.renderer.slack_renderer class_name: SlackRenderer
```pythonname: orders_checkpoint
config_version: 1.0
class_name: Checkpoint
run_name_template: "%Y%m%d-%H%M%S-orders-validation"
validations:
- batch_request: datasource_name: warehouse data_connector_name: default_inferred_data_connector_name data_asset_name: orders data_connector_query: index: -1 # Latest batch expectation_suite_name: orders_suite
action_list:
-
name: store_validation_result action: class_name: StoreValidationResultAction
-
name: store_evaluation_parameters action: class_name: StoreEvaluationParametersAction
-
name: update_data_docs action: class_name: UpdateDataDocsAction
Slack notification on failure
- name: send_slack_notification action: class_name: SlackNotificationAction slack_webhook: ${SLACK_WEBHOOK} notify_on: failure renderer: module_name: great_expectations.render.renderer.slack_renderer class_name: SlackRenderer
```pythonRun checkpoint
Run checkpoint
import great_expectations as gx
context = gx.get_context()
result = context.run_checkpoint(checkpoint_name="orders_checkpoint")
if not result.success:
failed_expectations = [
r for r in result.run_results.values()
if not r.success
]
raise ValueError(f"Data quality check failed: {failed_expectations}")
undefinedimport great_expectations as gx
context = gx.get_context()
result = context.run_checkpoint(checkpoint_name="orders_checkpoint")
if not result.success:
failed_expectations = [
r for r in result.run_results.values()
if not r.success
]
raise ValueError(f"Data quality check failed: {failed_expectations}")
undefinedPattern 3: dbt Data Tests
模式3:dbt 数据测试
yaml
undefinedyaml
undefinedmodels/marts/core/_core__models.yml
models/marts/core/_core__models.yml
version: 2
models:
-
name: fct_orders description: Order fact table tests:
Table-level tests
- dbt_utils.recency: datepart: day field: created_at interval: 1
- dbt_utils.at_least_one
- dbt_utils.expression_is_true: expression: "total_amount >= 0"
columns:-
name: order_id description: Primary key tests:
- unique
- not_null
-
name: customer_id description: Foreign key to dim_customers tests:
- not_null
- relationships: to: ref('dim_customers') field: customer_id
-
name: order_status tests:
- accepted_values: values: ["pending", "processing", "shipped", "delivered", "cancelled"]
-
name: total_amount tests:
- not_null
- dbt_utils.expression_is_true: expression: ">= 0"
-
name: created_at tests:
- not_null
- dbt_utils.expression_is_true: expression: "<= current_timestamp"
-
name: dim_customers columns:
-
name: customer_id tests:
- unique
- not_null
-
name: email tests:
- unique
- not_null
Custom regex test
- dbt_utils.expression_is_true: expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'"
-
undefinedversion: 2
models:
-
name: fct_orders description: Order fact table tests:
Table-level tests
- dbt_utils.recency: datepart: day field: created_at interval: 1
- dbt_utils.at_least_one
- dbt_utils.expression_is_true: expression: "total_amount >= 0"
columns:-
name: order_id description: Primary key tests:
- unique
- not_null
-
name: customer_id description: Foreign key to dim_customers tests:
- not_null
- relationships: to: ref('dim_customers') field: customer_id
-
name: order_status tests:
- accepted_values: values: ["pending", "processing", "shipped", "delivered", "cancelled"]
-
name: total_amount tests:
- not_null
- dbt_utils.expression_is_true: expression: ">= 0"
-
name: created_at tests:
- not_null
- dbt_utils.expression_is_true: expression: "<= current_timestamp"
-
name: dim_customers columns:
-
name: customer_id tests:
- unique
- not_null
-
name: email tests:
- unique
- not_null
Custom regex test
- dbt_utils.expression_is_true: expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'"
-
undefinedPattern 4: Custom dbt Tests
模式4:自定义dbt测试
sql
-- tests/generic/test_row_count_in_range.sql
{% test row_count_in_range(model, min_count, max_count) %}
with row_count as (
select count(*) as cnt from {{ model }}
)
select cnt
from row_count
where cnt < {{ min_count }} or cnt > {{ max_count }}
{% endtest %}
-- Usage in schema.yml:
-- tests:
-- - row_count_in_range:
-- min_count: 1000
-- max_count: 10000000sql
-- tests/generic/test_sequential_values.sql
{% test sequential_values(model, column_name, interval=1) %}
with lagged as (
select
{{ column_name }},
lag({{ column_name }}) over (order by {{ column_name }}) as prev_value
from {{ model }}
)
select *
from lagged
where {{ column_name }} - prev_value != {{ interval }}
and prev_value is not null
{% endtest %}sql
-- tests/singular/assert_orders_customers_match.sql
-- Singular test: specific business rule
with orders_customers as (
select distinct customer_id from {{ ref('fct_orders') }}
),
dim_customers as (
select customer_id from {{ ref('dim_customers') }}
),
orphaned_orders as (
select o.customer_id
from orders_customers o
left join dim_customers c using (customer_id)
where c.customer_id is null
)
select * from orphaned_orders
-- Test passes if this returns 0 rowssql
-- tests/generic/test_row_count_in_range.sql
{% test row_count_in_range(model, min_count, max_count) %}
with row_count as (
select count(*) as cnt from {{ model }}
)
select cnt
from row_count
where cnt < {{ min_count }} or cnt > {{ max_count }}
{% endtest %}
-- Usage in schema.yml:
-- tests:
-- - row_count_in_range:
-- min_count: 1000
-- max_count: 10000000sql
-- tests/generic/test_sequential_values.sql
{% test sequential_values(model, column_name, interval=1) %}
with lagged as (
select
{{ column_name }},
lag({{ column_name }}) over (order by {{ column_name }}) as prev_value
from {{ model }}
)
select *
from lagged
where {{ column_name }} - prev_value != {{ interval }}
and prev_value is not null
{% endtest %}sql
-- tests/singular/assert_orders_customers_match.sql
-- Singular test: specific business rule
with orders_customers as (
select distinct customer_id from {{ ref('fct_orders') }}
),
dim_customers as (
select customer_id from {{ ref('dim_customers') }}
),
orphaned_orders as (
select o.customer_id
from orders_customers o
left join dim_customers c using (customer_id)
where c.customer_id is null
)
select * from orphaned_orders
-- Test passes if this returns 0 rowsPattern 5: Data Contracts
模式5:数据契约
yaml
undefinedyaml
undefinedcontracts/orders_contract.yaml
contracts/orders_contract.yaml
apiVersion: datacontract.com/v1.0.0
kind: DataContract
metadata:
name: orders
version: 1.0.0
owner: data-platform-team
contact: data-team@company.com
info:
title: Orders Data Contract
description: Contract for order event data from the ecommerce platform
purpose: Analytics, reporting, and ML features
servers:
production:
type: snowflake
account: company.us-east-1
database: ANALYTICS
schema: CORE
terms:
usage: Internal analytics only
limitations: PII must not be exposed in downstream marts
billing: Charged per query TB scanned
schema:
type: object
properties:
order_id:
type: string
format: uuid
description: Unique order identifier
required: true
unique: true
pii: false
customer_id:
type: string
format: uuid
description: Customer identifier
required: true
pii: true
piiClassification: indirect
total_amount:
type: number
minimum: 0
maximum: 100000
description: Order total in USD
created_at:
type: string
format: date-time
description: Order creation timestamp
required: true
status:
type: string
enum: [pending, processing, shipped, delivered, cancelled]
description: Current order statusquality:
type: SodaCL
specification:
checks for orders:
- row_count > 0
- missing_count(order_id) = 0
- duplicate_count(order_id) = 0
- invalid_count(status) = 0:
valid values: [pending, processing, shipped, delivered, cancelled]
- freshness(created_at) < 24h
sla:
availability: 99.9%
freshness: 1 hour
latency: 5 minutes
undefinedapiVersion: datacontract.com/v1.0.0
kind: DataContract
metadata:
name: orders
version: 1.0.0
owner: data-platform-team
contact: data-team@company.com
info:
title: Orders Data Contract
description: Contract for order event data from the ecommerce platform
purpose: Analytics, reporting, and ML features
servers:
production:
type: snowflake
account: company.us-east-1
database: ANALYTICS
schema: CORE
terms:
usage: Internal analytics only
limitations: PII must not be exposed in downstream marts
billing: Charged per query TB scanned
schema:
type: object
properties:
order_id:
type: string
format: uuid
description: Unique order identifier
required: true
unique: true
pii: false
customer_id:
type: string
format: uuid
description: Customer identifier
required: true
pii: true
piiClassification: indirect
total_amount:
type: number
minimum: 0
maximum: 100000
description: Order total in USD
created_at:
type: string
format: date-time
description: Order creation timestamp
required: true
status:
type: string
enum: [pending, processing, shipped, delivered, cancelled]
description: Current order statusquality:
type: SodaCL
specification:
checks for orders:
- row_count > 0
- missing_count(order_id) = 0
- duplicate_count(order_id) = 0
- invalid_count(status) = 0:
valid values: [pending, processing, shipped, delivered, cancelled]
- freshness(created_at) < 24h
sla:
availability: 99.9%
freshness: 1 hour
latency: 5 minutes
undefinedPattern 6: Automated Quality Pipeline
模式6:自动化质量管道
python
undefinedpython
undefinedquality_pipeline.py
quality_pipeline.py
from dataclasses import dataclass
from typing import List, Dict, Any
import great_expectations as gx
from datetime import datetime
@dataclass
class QualityResult:
table: str
passed: bool
total_expectations: int
failed_expectations: int
details: List[Dict[str, Any]]
timestamp: datetime
class DataQualityPipeline:
"""Orchestrate data quality checks across tables"""
def __init__(self, context: gx.DataContext):
self.context = context
self.results: List[QualityResult] = []
def validate_table(self, table: str, suite: str) -> QualityResult:
"""Validate a single table against expectation suite"""
checkpoint_config = {
"name": f"{table}_validation",
"config_version": 1.0,
"class_name": "Checkpoint",
"validations": [{
"batch_request": {
"datasource_name": "warehouse",
"data_asset_name": table,
},
"expectation_suite_name": suite,
}],
}
result = self.context.run_checkpoint(**checkpoint_config)
# Parse results
validation_result = list(result.run_results.values())[0]
results = validation_result.results
failed = [r for r in results if not r.success]
return QualityResult(
table=table,
passed=result.success,
total_expectations=len(results),
failed_expectations=len(failed),
details=[{
"expectation": r.expectation_config.expectation_type,
"success": r.success,
"observed_value": r.result.get("observed_value"),
} for r in results],
timestamp=datetime.now()
)
def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]:
"""Run validation for all tables"""
results = {}
for table, suite in tables.items():
print(f"Validating {table}...")
results[table] = self.validate_table(table, suite)
return results
def generate_report(self, results: Dict[str, QualityResult]) -> str:
"""Generate quality report"""
report = ["# Data Quality Report", f"Generated: {datetime.now()}", ""]
total_passed = sum(1 for r in results.values() if r.passed)
total_tables = len(results)
report.append(f"## Summary: {total_passed}/{total_tables} tables passed")
report.append("")
for table, result in results.items():
status = "✅" if result.passed else "❌"
report.append(f"### {status} {table}")
report.append(f"- Expectations: {result.total_expectations}")
report.append(f"- Failed: {result.failed_expectations}")
if not result.passed:
report.append("- Failed checks:")
for detail in result.details:
if not detail["success"]:
report.append(f" - {detail['expectation']}: {detail['observed_value']}")
report.append("")
return "\n".join(report)from dataclasses import dataclass
from typing import List, Dict, Any
import great_expectations as gx
from datetime import datetime
@dataclass
class QualityResult:
table: str
passed: bool
total_expectations: int
failed_expectations: int
details: List[Dict[str, Any]]
timestamp: datetime
class DataQualityPipeline:
"""Orchestrate data quality checks across tables"""
def __init__(self, context: gx.DataContext):
self.context = context
self.results: List[QualityResult] = []
def validate_table(self, table: str, suite: str) -> QualityResult:
"""Validate a single table against expectation suite"""
checkpoint_config = {
"name": f"{table}_validation",
"config_version": 1.0,
"class_name": "Checkpoint",
"validations": [{
"batch_request": {
"datasource_name": "warehouse",
"data_asset_name": table,
},
"expectation_suite_name": suite,
}],
}
result = self.context.run_checkpoint(**checkpoint_config)
# Parse results
validation_result = list(result.run_results.values())[0]
results = validation_result.results
failed = [r for r in results if not r.success]
return QualityResult(
table=table,
passed=result.success,
total_expectations=len(results),
failed_expectations=len(failed),
details=[{
"expectation": r.expectation_config.expectation_type,
"success": r.success,
"observed_value": r.result.get("observed_value"),
} for r in results],
timestamp=datetime.now()
)
def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]:
"""Run validation for all tables"""
results = {}
for table, suite in tables.items():
print(f"Validating {table}...")
results[table] = self.validate_table(table, suite)
return results
def generate_report(self, results: Dict[str, QualityResult]) -> str:
"""Generate quality report"""
report = ["# Data Quality Report", f"Generated: {datetime.now()}", ""]
total_passed = sum(1 for r in results.values() if r.passed)
total_tables = len(results)
report.append(f"## Summary: {total_passed}/{total_tables} tables passed")
report.append("")
for table, result in results.items():
status = "✅" if result.passed else "❌"
report.append(f"### {status} {table}")
report.append(f"- Expectations: {result.total_expectations}")
report.append(f"- Failed: {result.failed_expectations}")
if not result.passed:
report.append("- Failed checks:")
for detail in result.details:
if not detail["success"]:
report.append(f" - {detail['expectation']}: {detail['observed_value']}")
report.append("")
return "\n".join(report)Usage
Usage
context = gx.get_context()
pipeline = DataQualityPipeline(context)
tables_to_validate = {
"orders": "orders_suite",
"customers": "customers_suite",
"products": "products_suite",
}
results = pipeline.run_all(tables_to_validate)
report = pipeline.generate_report(results)
context = gx.get_context()
pipeline = DataQualityPipeline(context)
tables_to_validate = {
"orders": "orders_suite",
"customers": "customers_suite",
"products": "products_suite",
}
results = pipeline.run_all(tables_to_validate)
report = pipeline.generate_report(results)
Fail pipeline if any table failed
Fail pipeline if any table failed
if not all(r.passed for r in results.values()):
print(report)
raise ValueError("Data quality checks failed!")
undefinedif not all(r.passed for r in results.values()):
print(report)
raise ValueError("Data quality checks failed!")
undefinedBest Practices
最佳实践
Do's
建议事项
- Test early - Validate source data before transformations
- Test incrementally - Add tests as you find issues
- Document expectations - Clear descriptions for each test
- Alert on failures - Integrate with monitoring
- Version contracts - Track schema changes
- 尽早测试 - 在转换前验证源数据
- 增量测试 - 发现问题时添加测试
- 记录验证规则 - 为每个测试添加清晰描述
- 失败时告警 - 与监控系统集成
- 版本化契约 - 跟踪Schema变更
Don'ts
禁忌事项
- Don't test everything - Focus on critical columns
- Don't ignore warnings - They often precede failures
- Don't skip freshness - Stale data is bad data
- Don't hardcode thresholds - Use dynamic baselines
- Don't test in isolation - Test relationships too
- 不要测试所有内容 - 聚焦关键列
- 不要忽略警告 - 警告通常是失败的前兆
- 不要跳过新鲜度检查 - 过期数据毫无价值
- 不要硬编码阈值 - 使用动态基线
- 不要孤立测试 - 也要测试关联关系