data-quality-frameworks

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Data Quality Frameworks

数据质量框架

Production patterns for implementing data quality with Great Expectations, dbt tests, and data contracts to ensure reliable data pipelines.
使用Great Expectations、dbt tests和数据契约实现数据质量的生产级实践模式,确保数据管道的可靠性。

When to Use This Skill

何时使用该技能

  • Implementing data quality checks in pipelines
  • Setting up Great Expectations validation
  • Building comprehensive dbt test suites
  • Establishing data contracts between teams
  • Monitoring data quality metrics
  • Automating data validation in CI/CD
  • 在数据管道中实施数据质量检查
  • 配置Great Expectations验证
  • 构建全面的dbt测试套件
  • 在团队间建立数据契约
  • 监控数据质量指标
  • 在CI/CD中自动化数据验证

Core Concepts

核心概念

1. Data Quality Dimensions

1. 数据质量维度

DimensionDescriptionExample Check
CompletenessNo missing values
expect_column_values_to_not_be_null
UniquenessNo duplicates
expect_column_values_to_be_unique
ValidityValues in expected range
expect_column_values_to_be_in_set
AccuracyData matches realityCross-reference validation
ConsistencyNo contradictions
expect_column_pair_values_A_to_be_greater_than_B
TimelinessData is recent
expect_column_max_to_be_between
维度描述示例检查
完整性无缺失值
expect_column_values_to_not_be_null
唯一性无重复值
expect_column_values_to_be_unique
有效性值在预期范围内
expect_column_values_to_be_in_set
准确性数据与实际情况匹配交叉引用验证
一致性无矛盾冲突
expect_column_pair_values_A_to_be_greater_than_B
及时性数据是最新的
expect_column_max_to_be_between

2. Testing Pyramid for Data

2. 数据测试金字塔

          /\
         /  \     Integration Tests (cross-table)
        /────\
       /      \   Unit Tests (single column)
      /────────\
     /          \ Schema Tests (structure)
    /────────────\
          /\
         /  \     集成测试(跨表)
        /────\
       /      \   单元测试(单列)
      /────────\
     /          \ Schema测试(结构)
    /────────────\

Quick Start

快速开始

Great Expectations Setup

Great Expectations 安装配置

bash
undefined
bash
undefined

Install

Install

pip install great_expectations
pip install great_expectations

Initialize project

Initialize project

great_expectations init
great_expectations init

Create datasource

Create datasource

great_expectations datasource new

```python
great_expectations datasource new

```python

great_expectations/checkpoints/daily_validation.yml

great_expectations/checkpoints/daily_validation.yml

import great_expectations as gx
import great_expectations as gx

Create context

Create context

context = gx.get_context()
context = gx.get_context()

Create expectation suite

Create expectation suite

suite = context.add_expectation_suite("orders_suite")
suite = context.add_expectation_suite("orders_suite")

Add expectations

Add expectations

suite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id") ) suite.add_expectation( gx.expectations.ExpectColumnValuesToBeUnique(column="order_id") )
suite.add_expectation( gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id") ) suite.add_expectation( gx.expectations.ExpectColumnValuesToBeUnique(column="order_id") )

Validate

Validate

results = context.run_checkpoint(checkpoint_name="daily_orders")
undefined
results = context.run_checkpoint(checkpoint_name="daily_orders")
undefined

Patterns

实践模式

Pattern 1: Great Expectations Suite

模式1:Great Expectations 验证套件

python
undefined
python
undefined

expectations/orders_suite.py

expectations/orders_suite.py

import great_expectations as gx from great_expectations.core import ExpectationSuite from great_expectations.core.expectation_configuration import ExpectationConfiguration
def build_orders_suite() -> ExpectationSuite: """Build comprehensive orders expectation suite"""
suite = ExpectationSuite(expectation_suite_name="orders_suite")

# Schema expectations
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_table_columns_to_match_set",
    kwargs={
        "column_set": ["order_id", "customer_id", "amount", "status", "created_at"],
        "exact_match": False  # Allow additional columns
    }
))

# Primary key
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "order_id"}
))
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_unique",
    kwargs={"column": "order_id"}
))

# Foreign key
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "customer_id"}
))

# Categorical values
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_in_set",
    kwargs={
        "column": "status",
        "value_set": ["pending", "processing", "shipped", "delivered", "cancelled"]
    }
))

# Numeric ranges
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_between",
    kwargs={
        "column": "amount",
        "min_value": 0,
        "max_value": 100000,
        "strict_min": True  # amount > 0
    }
))

# Date validity
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_dateutil_parseable",
    kwargs={"column": "created_at"}
))

# Freshness - data should be recent
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_max_to_be_between",
    kwargs={
        "column": "created_at",
        "min_value": {"$PARAMETER": "now - timedelta(days=1)"},
        "max_value": {"$PARAMETER": "now"}
    }
))

# Row count sanity
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_table_row_count_to_be_between",
    kwargs={
        "min_value": 1000,  # Expect at least 1000 rows
        "max_value": 10000000
    }
))

# Statistical expectations
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_mean_to_be_between",
    kwargs={
        "column": "amount",
        "min_value": 50,
        "max_value": 500
    }
))

return suite
undefined
import great_expectations as gx from great_expectations.core import ExpectationSuite from great_expectations.core.expectation_configuration import ExpectationConfiguration
def build_orders_suite() -> ExpectationSuite: """Build comprehensive orders expectation suite"""
suite = ExpectationSuite(expectation_suite_name="orders_suite")

# Schema expectations
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_table_columns_to_match_set",
    kwargs={
        "column_set": ["order_id", "customer_id", "amount", "status", "created_at"],
        "exact_match": False  # Allow additional columns
    }
))

# Primary key
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "order_id"}
))
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_unique",
    kwargs={"column": "order_id"}
))

# Foreign key
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_not_be_null",
    kwargs={"column": "customer_id"}
))

# Categorical values
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_in_set",
    kwargs={
        "column": "status",
        "value_set": ["pending", "processing", "shipped", "delivered", "cancelled"]
    }
))

# Numeric ranges
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_between",
    kwargs={
        "column": "amount",
        "min_value": 0,
        "max_value": 100000,
        "strict_min": True  # amount > 0
    }
))

# Date validity
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_values_to_be_dateutil_parseable",
    kwargs={"column": "created_at"}
))

# Freshness - data should be recent
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_max_to_be_between",
    kwargs={
        "column": "created_at",
        "min_value": {"$PARAMETER": "now - timedelta(days=1)"},
        "max_value": {"$PARAMETER": "now"}
    }
))

# Row count sanity
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_table_row_count_to_be_between",
    kwargs={
        "min_value": 1000,  # Expect at least 1000 rows
        "max_value": 10000000
    }
))

# Statistical expectations
suite.add_expectation(ExpectationConfiguration(
    expectation_type="expect_column_mean_to_be_between",
    kwargs={
        "column": "amount",
        "min_value": 50,
        "max_value": 500
    }
))

return suite
undefined

Pattern 2: Great Expectations Checkpoint

模式2:Great Expectations 检查点

yaml
undefined
yaml
undefined

great_expectations/checkpoints/orders_checkpoint.yml

great_expectations/checkpoints/orders_checkpoint.yml

name: orders_checkpoint config_version: 1.0 class_name: Checkpoint run_name_template: "%Y%m%d-%H%M%S-orders-validation"
validations:
  • batch_request: datasource_name: warehouse data_connector_name: default_inferred_data_connector_name data_asset_name: orders data_connector_query: index: -1 # Latest batch expectation_suite_name: orders_suite
action_list:
  • name: store_validation_result action: class_name: StoreValidationResultAction
  • name: store_evaluation_parameters action: class_name: StoreEvaluationParametersAction
  • name: update_data_docs action: class_name: UpdateDataDocsAction

Slack notification on failure

  • name: send_slack_notification action: class_name: SlackNotificationAction slack_webhook: ${SLACK_WEBHOOK} notify_on: failure renderer: module_name: great_expectations.render.renderer.slack_renderer class_name: SlackRenderer

```python
name: orders_checkpoint config_version: 1.0 class_name: Checkpoint run_name_template: "%Y%m%d-%H%M%S-orders-validation"
validations:
  • batch_request: datasource_name: warehouse data_connector_name: default_inferred_data_connector_name data_asset_name: orders data_connector_query: index: -1 # Latest batch expectation_suite_name: orders_suite
action_list:
  • name: store_validation_result action: class_name: StoreValidationResultAction
  • name: store_evaluation_parameters action: class_name: StoreEvaluationParametersAction
  • name: update_data_docs action: class_name: UpdateDataDocsAction

Slack notification on failure

  • name: send_slack_notification action: class_name: SlackNotificationAction slack_webhook: ${SLACK_WEBHOOK} notify_on: failure renderer: module_name: great_expectations.render.renderer.slack_renderer class_name: SlackRenderer

```python

Run checkpoint

Run checkpoint

import great_expectations as gx
context = gx.get_context() result = context.run_checkpoint(checkpoint_name="orders_checkpoint")
if not result.success: failed_expectations = [ r for r in result.run_results.values() if not r.success ] raise ValueError(f"Data quality check failed: {failed_expectations}")
undefined
import great_expectations as gx
context = gx.get_context() result = context.run_checkpoint(checkpoint_name="orders_checkpoint")
if not result.success: failed_expectations = [ r for r in result.run_results.values() if not r.success ] raise ValueError(f"Data quality check failed: {failed_expectations}")
undefined

Pattern 3: dbt Data Tests

模式3:dbt 数据测试

yaml
undefined
yaml
undefined

models/marts/core/_core__models.yml

models/marts/core/_core__models.yml

version: 2
models:
  • name: fct_orders description: Order fact table tests:

    Table-level tests

    • dbt_utils.recency: datepart: day field: created_at interval: 1
    • dbt_utils.at_least_one
    • dbt_utils.expression_is_true: expression: "total_amount >= 0"
    columns:
    • name: order_id description: Primary key tests:
      • unique
      • not_null
    • name: customer_id description: Foreign key to dim_customers tests:
      • not_null
      • relationships: to: ref('dim_customers') field: customer_id
    • name: order_status tests:
      • accepted_values: values: ["pending", "processing", "shipped", "delivered", "cancelled"]
    • name: total_amount tests:
      • not_null
      • dbt_utils.expression_is_true: expression: ">= 0"
    • name: created_at tests:
      • not_null
      • dbt_utils.expression_is_true: expression: "<= current_timestamp"
  • name: dim_customers columns:
    • name: customer_id tests:
      • unique
      • not_null
    • name: email tests:
      • unique
      • not_null

      Custom regex test

      • dbt_utils.expression_is_true: expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'"
undefined
version: 2
models:
  • name: fct_orders description: Order fact table tests:

    Table-level tests

    • dbt_utils.recency: datepart: day field: created_at interval: 1
    • dbt_utils.at_least_one
    • dbt_utils.expression_is_true: expression: "total_amount >= 0"
    columns:
    • name: order_id description: Primary key tests:
      • unique
      • not_null
    • name: customer_id description: Foreign key to dim_customers tests:
      • not_null
      • relationships: to: ref('dim_customers') field: customer_id
    • name: order_status tests:
      • accepted_values: values: ["pending", "processing", "shipped", "delivered", "cancelled"]
    • name: total_amount tests:
      • not_null
      • dbt_utils.expression_is_true: expression: ">= 0"
    • name: created_at tests:
      • not_null
      • dbt_utils.expression_is_true: expression: "<= current_timestamp"
  • name: dim_customers columns:
    • name: customer_id tests:
      • unique
      • not_null
    • name: email tests:
      • unique
      • not_null

      Custom regex test

      • dbt_utils.expression_is_true: expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'"
undefined

Pattern 4: Custom dbt Tests

模式4:自定义dbt测试

sql
-- tests/generic/test_row_count_in_range.sql
{% test row_count_in_range(model, min_count, max_count) %}

with row_count as (
    select count(*) as cnt from {{ model }}
)

select cnt
from row_count
where cnt < {{ min_count }} or cnt > {{ max_count }}

{% endtest %}

-- Usage in schema.yml:
-- tests:
--   - row_count_in_range:
--       min_count: 1000
--       max_count: 10000000
sql
-- tests/generic/test_sequential_values.sql
{% test sequential_values(model, column_name, interval=1) %}

with lagged as (
    select
        {{ column_name }},
        lag({{ column_name }}) over (order by {{ column_name }}) as prev_value
    from {{ model }}
)

select *
from lagged
where {{ column_name }} - prev_value != {{ interval }}
  and prev_value is not null

{% endtest %}
sql
-- tests/singular/assert_orders_customers_match.sql
-- Singular test: specific business rule

with orders_customers as (
    select distinct customer_id from {{ ref('fct_orders') }}
),

dim_customers as (
    select customer_id from {{ ref('dim_customers') }}
),

orphaned_orders as (
    select o.customer_id
    from orders_customers o
    left join dim_customers c using (customer_id)
    where c.customer_id is null
)

select * from orphaned_orders
-- Test passes if this returns 0 rows
sql
-- tests/generic/test_row_count_in_range.sql
{% test row_count_in_range(model, min_count, max_count) %}

with row_count as (
    select count(*) as cnt from {{ model }}
)

select cnt
from row_count
where cnt < {{ min_count }} or cnt > {{ max_count }}

{% endtest %}

-- Usage in schema.yml:
-- tests:
--   - row_count_in_range:
--       min_count: 1000
--       max_count: 10000000
sql
-- tests/generic/test_sequential_values.sql
{% test sequential_values(model, column_name, interval=1) %}

with lagged as (
    select
        {{ column_name }},
        lag({{ column_name }}) over (order by {{ column_name }}) as prev_value
    from {{ model }}
)

select *
from lagged
where {{ column_name }} - prev_value != {{ interval }}
  and prev_value is not null

{% endtest %}
sql
-- tests/singular/assert_orders_customers_match.sql
-- Singular test: specific business rule

with orders_customers as (
    select distinct customer_id from {{ ref('fct_orders') }}
),

dim_customers as (
    select customer_id from {{ ref('dim_customers') }}
),

orphaned_orders as (
    select o.customer_id
    from orders_customers o
    left join dim_customers c using (customer_id)
    where c.customer_id is null
)

select * from orphaned_orders
-- Test passes if this returns 0 rows

Pattern 5: Data Contracts

模式5:数据契约

yaml
undefined
yaml
undefined

contracts/orders_contract.yaml

contracts/orders_contract.yaml

apiVersion: datacontract.com/v1.0.0 kind: DataContract metadata: name: orders version: 1.0.0 owner: data-platform-team contact: data-team@company.com
info: title: Orders Data Contract description: Contract for order event data from the ecommerce platform purpose: Analytics, reporting, and ML features
servers: production: type: snowflake account: company.us-east-1 database: ANALYTICS schema: CORE
terms: usage: Internal analytics only limitations: PII must not be exposed in downstream marts billing: Charged per query TB scanned
schema: type: object properties: order_id: type: string format: uuid description: Unique order identifier required: true unique: true pii: false
customer_id:
  type: string
  format: uuid
  description: Customer identifier
  required: true
  pii: true
  piiClassification: indirect

total_amount:
  type: number
  minimum: 0
  maximum: 100000
  description: Order total in USD

created_at:
  type: string
  format: date-time
  description: Order creation timestamp
  required: true

status:
  type: string
  enum: [pending, processing, shipped, delivered, cancelled]
  description: Current order status
quality: type: SodaCL specification: checks for orders: - row_count > 0 - missing_count(order_id) = 0 - duplicate_count(order_id) = 0 - invalid_count(status) = 0: valid values: [pending, processing, shipped, delivered, cancelled] - freshness(created_at) < 24h
sla: availability: 99.9% freshness: 1 hour latency: 5 minutes
undefined
apiVersion: datacontract.com/v1.0.0 kind: DataContract metadata: name: orders version: 1.0.0 owner: data-platform-team contact: data-team@company.com
info: title: Orders Data Contract description: Contract for order event data from the ecommerce platform purpose: Analytics, reporting, and ML features
servers: production: type: snowflake account: company.us-east-1 database: ANALYTICS schema: CORE
terms: usage: Internal analytics only limitations: PII must not be exposed in downstream marts billing: Charged per query TB scanned
schema: type: object properties: order_id: type: string format: uuid description: Unique order identifier required: true unique: true pii: false
customer_id:
  type: string
  format: uuid
  description: Customer identifier
  required: true
  pii: true
  piiClassification: indirect

total_amount:
  type: number
  minimum: 0
  maximum: 100000
  description: Order total in USD

created_at:
  type: string
  format: date-time
  description: Order creation timestamp
  required: true

status:
  type: string
  enum: [pending, processing, shipped, delivered, cancelled]
  description: Current order status
quality: type: SodaCL specification: checks for orders: - row_count > 0 - missing_count(order_id) = 0 - duplicate_count(order_id) = 0 - invalid_count(status) = 0: valid values: [pending, processing, shipped, delivered, cancelled] - freshness(created_at) < 24h
sla: availability: 99.9% freshness: 1 hour latency: 5 minutes
undefined

Pattern 6: Automated Quality Pipeline

模式6:自动化质量管道

python
undefined
python
undefined

quality_pipeline.py

quality_pipeline.py

from dataclasses import dataclass from typing import List, Dict, Any import great_expectations as gx from datetime import datetime
@dataclass class QualityResult: table: str passed: bool total_expectations: int failed_expectations: int details: List[Dict[str, Any]] timestamp: datetime
class DataQualityPipeline: """Orchestrate data quality checks across tables"""
def __init__(self, context: gx.DataContext):
    self.context = context
    self.results: List[QualityResult] = []

def validate_table(self, table: str, suite: str) -> QualityResult:
    """Validate a single table against expectation suite"""

    checkpoint_config = {
        "name": f"{table}_validation",
        "config_version": 1.0,
        "class_name": "Checkpoint",
        "validations": [{
            "batch_request": {
                "datasource_name": "warehouse",
                "data_asset_name": table,
            },
            "expectation_suite_name": suite,
        }],
    }

    result = self.context.run_checkpoint(**checkpoint_config)

    # Parse results
    validation_result = list(result.run_results.values())[0]
    results = validation_result.results

    failed = [r for r in results if not r.success]

    return QualityResult(
        table=table,
        passed=result.success,
        total_expectations=len(results),
        failed_expectations=len(failed),
        details=[{
            "expectation": r.expectation_config.expectation_type,
            "success": r.success,
            "observed_value": r.result.get("observed_value"),
        } for r in results],
        timestamp=datetime.now()
    )

def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]:
    """Run validation for all tables"""
    results = {}

    for table, suite in tables.items():
        print(f"Validating {table}...")
        results[table] = self.validate_table(table, suite)

    return results

def generate_report(self, results: Dict[str, QualityResult]) -> str:
    """Generate quality report"""
    report = ["# Data Quality Report", f"Generated: {datetime.now()}", ""]

    total_passed = sum(1 for r in results.values() if r.passed)
    total_tables = len(results)

    report.append(f"## Summary: {total_passed}/{total_tables} tables passed")
    report.append("")

    for table, result in results.items():
        status = "✅" if result.passed else "❌"
        report.append(f"### {status} {table}")
        report.append(f"- Expectations: {result.total_expectations}")
        report.append(f"- Failed: {result.failed_expectations}")

        if not result.passed:
            report.append("- Failed checks:")
            for detail in result.details:
                if not detail["success"]:
                    report.append(f"  - {detail['expectation']}: {detail['observed_value']}")
        report.append("")

    return "\n".join(report)
from dataclasses import dataclass from typing import List, Dict, Any import great_expectations as gx from datetime import datetime
@dataclass class QualityResult: table: str passed: bool total_expectations: int failed_expectations: int details: List[Dict[str, Any]] timestamp: datetime
class DataQualityPipeline: """Orchestrate data quality checks across tables"""
def __init__(self, context: gx.DataContext):
    self.context = context
    self.results: List[QualityResult] = []

def validate_table(self, table: str, suite: str) -> QualityResult:
    """Validate a single table against expectation suite"""

    checkpoint_config = {
        "name": f"{table}_validation",
        "config_version": 1.0,
        "class_name": "Checkpoint",
        "validations": [{
            "batch_request": {
                "datasource_name": "warehouse",
                "data_asset_name": table,
            },
            "expectation_suite_name": suite,
        }],
    }

    result = self.context.run_checkpoint(**checkpoint_config)

    # Parse results
    validation_result = list(result.run_results.values())[0]
    results = validation_result.results

    failed = [r for r in results if not r.success]

    return QualityResult(
        table=table,
        passed=result.success,
        total_expectations=len(results),
        failed_expectations=len(failed),
        details=[{
            "expectation": r.expectation_config.expectation_type,
            "success": r.success,
            "observed_value": r.result.get("observed_value"),
        } for r in results],
        timestamp=datetime.now()
    )

def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]:
    """Run validation for all tables"""
    results = {}

    for table, suite in tables.items():
        print(f"Validating {table}...")
        results[table] = self.validate_table(table, suite)

    return results

def generate_report(self, results: Dict[str, QualityResult]) -> str:
    """Generate quality report"""
    report = ["# Data Quality Report", f"Generated: {datetime.now()}", ""]

    total_passed = sum(1 for r in results.values() if r.passed)
    total_tables = len(results)

    report.append(f"## Summary: {total_passed}/{total_tables} tables passed")
    report.append("")

    for table, result in results.items():
        status = "✅" if result.passed else "❌"
        report.append(f"### {status} {table}")
        report.append(f"- Expectations: {result.total_expectations}")
        report.append(f"- Failed: {result.failed_expectations}")

        if not result.passed:
            report.append("- Failed checks:")
            for detail in result.details:
                if not detail["success"]:
                    report.append(f"  - {detail['expectation']}: {detail['observed_value']}")
        report.append("")

    return "\n".join(report)

Usage

Usage

context = gx.get_context() pipeline = DataQualityPipeline(context)
tables_to_validate = { "orders": "orders_suite", "customers": "customers_suite", "products": "products_suite", }
results = pipeline.run_all(tables_to_validate) report = pipeline.generate_report(results)
context = gx.get_context() pipeline = DataQualityPipeline(context)
tables_to_validate = { "orders": "orders_suite", "customers": "customers_suite", "products": "products_suite", }
results = pipeline.run_all(tables_to_validate) report = pipeline.generate_report(results)

Fail pipeline if any table failed

Fail pipeline if any table failed

if not all(r.passed for r in results.values()): print(report) raise ValueError("Data quality checks failed!")
undefined
if not all(r.passed for r in results.values()): print(report) raise ValueError("Data quality checks failed!")
undefined

Best Practices

最佳实践

Do's

建议事项

  • Test early - Validate source data before transformations
  • Test incrementally - Add tests as you find issues
  • Document expectations - Clear descriptions for each test
  • Alert on failures - Integrate with monitoring
  • Version contracts - Track schema changes
  • 尽早测试 - 在转换前验证源数据
  • 增量测试 - 发现问题时添加测试
  • 记录验证规则 - 为每个测试添加清晰描述
  • 失败时告警 - 与监控系统集成
  • 版本化契约 - 跟踪Schema变更

Don'ts

禁忌事项

  • Don't test everything - Focus on critical columns
  • Don't ignore warnings - They often precede failures
  • Don't skip freshness - Stale data is bad data
  • Don't hardcode thresholds - Use dynamic baselines
  • Don't test in isolation - Test relationships too
  • 不要测试所有内容 - 聚焦关键列
  • 不要忽略警告 - 警告通常是失败的前兆
  • 不要跳过新鲜度检查 - 过期数据毫无价值
  • 不要硬编码阈值 - 使用动态基线
  • 不要孤立测试 - 也要测试关联关系

Resources

参考资源