databricks-observability

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Databricks Observability

Databricks可观测性

Overview

概述

Set up comprehensive observability for Databricks workloads.
为Databricks工作负载设置全面的可观测性方案。

Prerequisites

前置条件

  • Access to system tables
  • SQL Warehouse for dashboards
  • Notification destinations configured
  • Alert recipients defined
  • 拥有系统表访问权限
  • 用于搭建仪表板的SQL Warehouse
  • 已配置通知目标
  • 已定义告警接收人

Metrics Collection

指标收集

Key Metrics

核心指标

MetricSourceDescription
Job success ratesystem.lakeflow.job_run_timeline% of successful job runs
Job durationsystem.lakeflow.job_run_timelineRun time in minutes
Cluster utilizationsystem.compute.cluster_eventsCPU/memory usage
Data freshnesstable historyHours since last update
DBU consumptionsystem.billing.usageCost tracking
指标来源说明
任务成功率system.lakeflow.job_run_timeline成功任务运行占比
任务运行时长system.lakeflow.job_run_timeline运行时间(分钟)
集群利用率system.compute.cluster_eventsCPU/内存使用率
数据新鲜度table history距离上次更新的时长(小时)
DBU消耗system.billing.usage成本跟踪

Instructions

操作步骤

Step 1: System Tables Access

步骤1:系统表访问配置

sql
-- Enable system tables (workspace admin)
-- Check available system tables
SELECT * FROM system.information_schema.tables
WHERE table_schema = 'billing' OR table_schema = 'lakeflow';

-- Job run history
SELECT
    job_id,
    job_name,
    run_id,
    start_time,
    end_time,
    result_state,
    error_message,
    (end_time - start_time) / 60000 as duration_minutes
FROM system.lakeflow.job_run_timeline
WHERE start_time > current_timestamp() - INTERVAL 24 HOURS
ORDER BY start_time DESC;

-- Cluster events
SELECT
    cluster_id,
    timestamp,
    type,
    details
FROM system.compute.cluster_events
WHERE timestamp > current_timestamp() - INTERVAL 24 HOURS
ORDER BY timestamp DESC;
sql
-- Enable system tables (workspace admin)
-- Check available system tables
SELECT * FROM system.information_schema.tables
WHERE table_schema = 'billing' OR table_schema = 'lakeflow';

-- Job run history
SELECT
    job_id,
    job_name,
    run_id,
    start_time,
    end_time,
    result_state,
    error_message,
    (end_time - start_time) / 60000 as duration_minutes
FROM system.lakeflow.job_run_timeline
WHERE start_time > current_timestamp() - INTERVAL 24 HOURS
ORDER BY start_time DESC;

-- Cluster events
SELECT
    cluster_id,
    timestamp,
    type,
    details
FROM system.compute.cluster_events
WHERE timestamp > current_timestamp() - INTERVAL 24 HOURS
ORDER BY timestamp DESC;

Step 2: Create Monitoring Views

步骤2:创建监控视图

sql
-- Job health summary view
CREATE OR REPLACE VIEW monitoring.job_health_summary AS
SELECT
    job_name,
    COUNT(*) as total_runs,
    SUM(CASE WHEN result_state = 'SUCCESS' THEN 1 ELSE 0 END) as successes,
    SUM(CASE WHEN result_state = 'FAILED' THEN 1 ELSE 0 END) as failures,
    ROUND(SUM(CASE WHEN result_state = 'SUCCESS' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as success_rate,
    AVG((end_time - start_time) / 60000) as avg_duration_minutes,
    PERCENTILE((end_time - start_time) / 60000, 0.95) as p95_duration_minutes,
    MAX(start_time) as last_run_time,
    MAX(CASE WHEN result_state = 'FAILED' THEN start_time END) as last_failure_time
FROM system.lakeflow.job_run_timeline
WHERE start_time > current_timestamp() - INTERVAL 7 DAYS
GROUP BY job_name;

-- Data freshness view
CREATE OR REPLACE VIEW monitoring.data_freshness AS
SELECT
    table_catalog,
    table_schema,
    table_name,
    MAX(commit_timestamp) as last_update,
    TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) as hours_since_update,
    CASE
        WHEN TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) < 1 THEN 'FRESH'
        WHEN TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) < 6 THEN 'RECENT'
        WHEN TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) < 24 THEN 'STALE'
        ELSE 'VERY_STALE'
    END as freshness_status
FROM system.information_schema.table_history
GROUP BY table_catalog, table_schema, table_name;

-- Cost tracking view
CREATE OR REPLACE VIEW monitoring.daily_costs AS
SELECT
    DATE(usage_date) as date,
    workspace_id,
    sku_name,
    usage_type,
    SUM(usage_quantity) as total_dbus,
    SUM(usage_quantity * list_price) as estimated_cost
FROM system.billing.usage
WHERE usage_date > current_date() - INTERVAL 30 DAYS
GROUP BY DATE(usage_date), workspace_id, sku_name, usage_type
ORDER BY date DESC, estimated_cost DESC;
sql
-- Job health summary view
CREATE OR REPLACE VIEW monitoring.job_health_summary AS
SELECT
    job_name,
    COUNT(*) as total_runs,
    SUM(CASE WHEN result_state = 'SUCCESS' THEN 1 ELSE 0 END) as successes,
    SUM(CASE WHEN result_state = 'FAILED' THEN 1 ELSE 0 END) as failures,
    ROUND(SUM(CASE WHEN result_state = 'SUCCESS' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as success_rate,
    AVG((end_time - start_time) / 60000) as avg_duration_minutes,
    PERCENTILE((end_time - start_time) / 60000, 0.95) as p95_duration_minutes,
    MAX(start_time) as last_run_time,
    MAX(CASE WHEN result_state = 'FAILED' THEN start_time END) as last_failure_time
FROM system.lakeflow.job_run_timeline
WHERE start_time > current_timestamp() - INTERVAL 7 DAYS
GROUP BY job_name;

-- Data freshness view
CREATE OR REPLACE VIEW monitoring.data_freshness AS
SELECT
    table_catalog,
    table_schema,
    table_name,
    MAX(commit_timestamp) as last_update,
    TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) as hours_since_update,
    CASE
        WHEN TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) < 1 THEN 'FRESH'
        WHEN TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) < 6 THEN 'RECENT'
        WHEN TIMESTAMPDIFF(HOUR, MAX(commit_timestamp), current_timestamp()) < 24 THEN 'STALE'
        ELSE 'VERY_STALE'
    END as freshness_status
FROM system.information_schema.table_history
GROUP BY table_catalog, table_schema, table_name;

-- Cost tracking view
CREATE OR REPLACE VIEW monitoring.daily_costs AS
SELECT
    DATE(usage_date) as date,
    workspace_id,
    sku_name,
    usage_type,
    SUM(usage_quantity) as total_dbus,
    SUM(usage_quantity * list_price) as estimated_cost
FROM system.billing.usage
WHERE usage_date > current_date() - INTERVAL 30 DAYS
GROUP BY DATE(usage_date), workspace_id, sku_name, usage_type
ORDER BY date DESC, estimated_cost DESC;

Step 3: Configure Alerts

步骤3:配置告警

sql
-- Alert: Job failure
CREATE ALERT job_failure_alert
AS SELECT
    job_name,
    run_id,
    error_message,
    start_time
FROM system.lakeflow.job_run_timeline
WHERE result_state = 'FAILED'
  AND start_time > current_timestamp() - INTERVAL 15 MINUTES
SCHEDULE CRON '*/15 * * * *'
NOTIFICATIONS (
    email_addresses = ['oncall@company.com'],
    webhook_destinations = ['slack-alerts']
);

-- Alert: Long-running jobs
CREATE ALERT long_running_job_alert
AS SELECT
    job_name,
    run_id,
    start_time,
    TIMESTAMPDIFF(MINUTE, start_time, current_timestamp()) as running_minutes
FROM system.lakeflow.job_run_timeline
WHERE end_time IS NULL
  AND TIMESTAMPDIFF(MINUTE, start_time, current_timestamp()) > 120
SCHEDULE CRON '*/30 * * * *'
NOTIFICATIONS (
    email_addresses = ['oncall@company.com']
);

-- Alert: Data freshness SLA breach
CREATE ALERT data_freshness_sla
AS SELECT
    table_name,
    hours_since_update
FROM monitoring.data_freshness
WHERE table_schema = 'gold'
  AND hours_since_update > 6
SCHEDULE CRON '0 * * * *'
NOTIFICATIONS (
    email_addresses = ['data-team@company.com']
);

-- Alert: Cost spike
CREATE ALERT daily_cost_spike
AS SELECT
    date,
    estimated_cost,
    LAG(estimated_cost) OVER (ORDER BY date) as prev_day_cost,
    (estimated_cost - LAG(estimated_cost) OVER (ORDER BY date)) /
        NULLIF(LAG(estimated_cost) OVER (ORDER BY date), 0) * 100 as percent_change
FROM monitoring.daily_costs
WHERE date = current_date() - 1
HAVING percent_change > 50  -- 50% increase
SCHEDULE CRON '0 8 * * *'
NOTIFICATIONS (
    email_addresses = ['finops@company.com']
);
sql
-- Alert: Job failure
CREATE ALERT job_failure_alert
AS SELECT
    job_name,
    run_id,
    error_message,
    start_time
FROM system.lakeflow.job_run_timeline
WHERE result_state = 'FAILED'
  AND start_time > current_timestamp() - INTERVAL 15 MINUTES
SCHEDULE CRON '*/15 * * * *'
NOTIFICATIONS (
    email_addresses = ['oncall@company.com'],
    webhook_destinations = ['slack-alerts']
);

-- Alert: Long-running jobs
CREATE ALERT long_running_job_alert
AS SELECT
    job_name,
    run_id,
    start_time,
    TIMESTAMPDIFF(MINUTE, start_time, current_timestamp()) as running_minutes
FROM system.lakeflow.job_run_timeline
WHERE end_time IS NULL
  AND TIMESTAMPDIFF(MINUTE, start_time, current_timestamp()) > 120
SCHEDULE CRON '*/30 * * * *'
NOTIFICATIONS (
    email_addresses = ['oncall@company.com']
);

-- Alert: Data freshness SLA breach
CREATE ALERT data_freshness_sla
AS SELECT
    table_name,
    hours_since_update
FROM monitoring.data_freshness
WHERE table_schema = 'gold'
  AND hours_since_update > 6
SCHEDULE CRON '0 * * * *'
NOTIFICATIONS (
    email_addresses = ['data-team@company.com']
);

-- Alert: Cost spike
CREATE ALERT daily_cost_spike
AS SELECT
    date,
    estimated_cost,
    LAG(estimated_cost) OVER (ORDER BY date) as prev_day_cost,
    (estimated_cost - LAG(estimated_cost) OVER (ORDER BY date)) /
        NULLIF(LAG(estimated_cost) OVER (ORDER BY date), 0) * 100 as percent_change
FROM monitoring.daily_costs
WHERE date = current_date() - 1
HAVING percent_change > 50  -- 50% increase
SCHEDULE CRON '0 8 * * *'
NOTIFICATIONS (
    email_addresses = ['finops@company.com']
);

Step 4: Structured Logging

步骤4:结构化日志

python
undefined
python
undefined

src/utils/logging.py

src/utils/logging.py

import logging import json from datetime import datetime from typing import Any
class StructuredLogger: """Structured logging for Databricks notebooks."""
def __init__(self, job_name: str, run_id: str = None):
    self.job_name = job_name
    self.run_id = run_id or str(datetime.now().timestamp())
    self.logger = logging.getLogger(job_name)
    self.logger.setLevel(logging.INFO)

    # JSON formatter
    handler = logging.StreamHandler()
    handler.setFormatter(JsonFormatter())
    self.logger.addHandler(handler)

def _log(self, level: str, message: str, **context):
    """Log with structured context."""
    log_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "job_name": self.job_name,
        "run_id": self.run_id,
        "level": level,
        "message": message,
        **context
    }
    getattr(self.logger, level.lower())(json.dumps(log_entry))

def info(self, message: str, **context):
    self._log("INFO", message, **context)

def error(self, message: str, **context):
    self._log("ERROR", message, **context)

def metric(self, name: str, value: Any, **tags):
    """Log a metric for monitoring."""
    self._log("METRIC", f"{name}={value}", metric_name=name, metric_value=value, **tags)
class JsonFormatter(logging.Formatter): """JSON log formatter.""" def format(self, record): return record.getMessage()
import logging import json from datetime import datetime from typing import Any
class StructuredLogger: """Structured logging for Databricks notebooks."""
def __init__(self, job_name: str, run_id: str = None):
    self.job_name = job_name
    self.run_id = run_id or str(datetime.now().timestamp())
    self.logger = logging.getLogger(job_name)
    self.logger.setLevel(logging.INFO)

    # JSON formatter
    handler = logging.StreamHandler()
    handler.setFormatter(JsonFormatter())
    self.logger.addHandler(handler)

def _log(self, level: str, message: str, **context):
    """Log with structured context."""
    log_entry = {
        "timestamp": datetime.utcnow().isoformat(),
        "job_name": self.job_name,
        "run_id": self.run_id,
        "level": level,
        "message": message,
        **context
    }
    getattr(self.logger, level.lower())(json.dumps(log_entry))

def info(self, message: str, **context):
    self._log("INFO", message, **context)

def error(self, message: str, **context):
    self._log("ERROR", message, **context)

def metric(self, name: str, value: Any, **tags):
    """Log a metric for monitoring."""
    self._log("METRIC", f"{name}={value}", metric_name=name, metric_value=value, **tags)
class JsonFormatter(logging.Formatter): """JSON log formatter.""" def format(self, record): return record.getMessage()

Usage in notebooks

Usage in notebooks

logger = StructuredLogger("etl-pipeline", dbutils.notebook.entry_point.getDbutils().notebook().getContext().runId()) logger.info("Starting bronze ingestion", source="s3://bucket/raw") logger.metric("rows_processed", 10000, table="orders")
undefined
logger = StructuredLogger("etl-pipeline", dbutils.notebook.entry_point.getDbutils().notebook().getContext().runId()) logger.info("Starting bronze ingestion", source="s3://bucket/raw") logger.metric("rows_processed", 10000, table="orders")
undefined

Step 5: Custom Metrics Dashboard

步骤5:自定义指标仪表板

python
undefined
python
undefined

src/monitoring/dashboard.py

src/monitoring/dashboard.py

from databricks.sdk import WorkspaceClient from databricks.sdk.service.sql import ( Dashboard, Widget, Query, )
def create_monitoring_dashboard(w: WorkspaceClient) -> str: """Create operational monitoring dashboard."""
# Create dashboard
dashboard = w.dashboards.create(
    name="Data Platform Monitoring",
    tags=["monitoring", "operations"],
)

# Job Success Rate Widget
job_success_query = """
SELECT
    DATE(start_time) as date,
    job_name,
    ROUND(SUM(CASE WHEN result_state = 'SUCCESS' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as success_rate
FROM system.lakeflow.job_run_timeline
WHERE start_time > current_timestamp() - INTERVAL 7 DAYS
GROUP BY DATE(start_time), job_name
ORDER BY date, job_name
"""

# Add widgets...
# (Dashboard API implementation)

return dashboard.id
from databricks.sdk import WorkspaceClient from databricks.sdk.service.sql import ( Dashboard, Widget, Query, )
def create_monitoring_dashboard(w: WorkspaceClient) -> str: """Create operational monitoring dashboard."""
# Create dashboard
dashboard = w.dashboards.create(
    name="Data Platform Monitoring",
    tags=["monitoring", "operations"],
)

# Job Success Rate Widget
job_success_query = """
SELECT
    DATE(start_time) as date,
    job_name,
    ROUND(SUM(CASE WHEN result_state = 'SUCCESS' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as success_rate
FROM system.lakeflow.job_run_timeline
WHERE start_time > current_timestamp() - INTERVAL 7 DAYS
GROUP BY DATE(start_time), job_name
ORDER BY date, job_name
"""

# Add widgets...
# (Dashboard API implementation)

return dashboard.id

Generate Grafana dashboard JSON

Generate Grafana dashboard JSON

def generate_grafana_dashboard() -> dict: """Generate Grafana dashboard configuration.""" return { "dashboard": { "title": "Databricks Monitoring", "panels": [ { "title": "Job Success Rate", "type": "timeseries", "targets": [{ "rawSql": """ SELECT start_time as time, success_rate FROM monitoring.job_health_summary """ }] }, { "title": "Daily DBU Usage", "type": "bargauge", "targets": [{ "rawSql": """ SELECT date, total_dbus FROM monitoring.daily_costs WHERE date > current_date() - 7 """ }] } ] } }
undefined
def generate_grafana_dashboard() -> dict: """Generate Grafana dashboard configuration.""" return { "dashboard": { "title": "Databricks Monitoring", "panels": [ { "title": "Job Success Rate", "type": "timeseries", "targets": [{ "rawSql": """ SELECT start_time as time, success_rate FROM monitoring.job_health_summary """ }] }, { "title": "Daily DBU Usage", "type": "bargauge", "targets": [{ "rawSql": """ SELECT date, total_dbus FROM monitoring.daily_costs WHERE date > current_date() - 7 """ }] } ] } }
undefined

Step 6: Integration with External Monitoring

步骤6:与外部监控系统集成

python
undefined
python
undefined

src/monitoring/external.py

src/monitoring/external.py

import requests from dataclasses import dataclass
@dataclass class MetricPoint: name: str value: float tags: dict timestamp: int = None
class DatadogExporter: """Export metrics to Datadog."""
def __init__(self, api_key: str, app_key: str):
    self.api_key = api_key
    self.app_key = app_key
    self.base_url = "https://api.datadoghq.com/api/v2"

def send_metrics(self, metrics: list[MetricPoint]):
    """Send metrics to Datadog."""
    series = []
    for m in metrics:
        series.append({
            "metric": f"databricks.{m.name}",
            "points": [[m.timestamp or int(time.time()), m.value]],
            "tags": [f"{k}:{v}" for k, v in m.tags.items()]
        })

    response = requests.post(
        f"{self.base_url}/series",
        headers={
            "DD-API-KEY": self.api_key,
            "DD-APPLICATION-KEY": self.app_key,
        },
        json={"series": series}
    )
    return response.status_code == 202
import requests from dataclasses import dataclass
@dataclass class MetricPoint: name: str value: float tags: dict timestamp: int = None
class DatadogExporter: """Export metrics to Datadog."""
def __init__(self, api_key: str, app_key: str):
    self.api_key = api_key
    self.app_key = app_key
    self.base_url = "https://api.datadoghq.com/api/v2"

def send_metrics(self, metrics: list[MetricPoint]):
    """Send metrics to Datadog."""
    series = []
    for m in metrics:
        series.append({
            "metric": f"databricks.{m.name}",
            "points": [[m.timestamp or int(time.time()), m.value]],
            "tags": [f"{k}:{v}" for k, v in m.tags.items()]
        })

    response = requests.post(
        f"{self.base_url}/series",
        headers={
            "DD-API-KEY": self.api_key,
            "DD-APPLICATION-KEY": self.app_key,
        },
        json={"series": series}
    )
    return response.status_code == 202

Usage

Usage

exporter = DatadogExporter(api_key, app_key) exporter.send_metrics([ MetricPoint("job.duration_minutes", 45.2, {"job": "etl-pipeline", "env": "prod"}), MetricPoint("job.rows_processed", 1000000, {"job": "etl-pipeline", "env": "prod"}), ])
undefined
exporter = DatadogExporter(api_key, app_key) exporter.send_metrics([ MetricPoint("job.duration_minutes", 45.2, {"job": "etl-pipeline", "env": "prod"}), MetricPoint("job.rows_processed", 1000000, {"job": "etl-pipeline", "env": "prod"}), ])
undefined

Output

输出结果

  • System table queries configured
  • Monitoring views created
  • SQL alerts active
  • Structured logging implemented
  • External integrations ready
  • 已配置系统表查询
  • 已创建监控视图
  • SQL告警已激活
  • 已实现结构化日志
  • 外部集成已就绪

Error Handling

错误处理

IssueCauseSolution
System tables unavailableFeature not enabledContact admin to enable
Alert not triggeringWrong scheduleCheck cron expression
Missing metricsQuery timeoutOptimize query or increase warehouse
High cardinalityToo many tagsReduce label dimensions
问题原因解决方案
系统表不可用功能未启用联系管理员启用
告警未触发调度规则错误检查Cron表达式
指标缺失查询超时优化查询或升级Warehouse
高基数问题标签过多减少标签维度

Examples

示例

Quick Health Check Query

快速健康检查查询

sql
SELECT
    job_name,
    success_rate,
    avg_duration_minutes,
    last_run_time
FROM monitoring.job_health_summary
WHERE success_rate < 95
ORDER BY success_rate ASC;
sql
SELECT
    job_name,
    success_rate,
    avg_duration_minutes,
    last_run_time
FROM monitoring.job_health_summary
WHERE success_rate < 95
ORDER BY success_rate ASC;

Resources

参考资源

Next Steps

后续步骤

For incident response, see
databricks-incident-runbook
.
如需事件响应指导,请查看
databricks-incident-runbook