databricks-cost-tuning

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Databricks Cost Tuning

Databricks 成本优化调优

Overview

概述

Optimize Databricks costs through cluster policies, instance selection, and monitoring.
通过集群策略、实例选择和监控来优化Databricks成本。

Prerequisites

前提条件

  • Workspace admin access
  • Access to billing data
  • Understanding of workload patterns
  • 拥有工作区管理员权限
  • 可访问账单数据
  • 了解工作负载模式

Instructions

操作步骤

Step 1: Implement Cluster Policies

步骤1:实施集群策略

json
// policies/cost_controlled_policy.json
{
  "cluster_type": {
    "type": "fixed",
    "value": "job"
  },
  "autotermination_minutes": {
    "type": "range",
    "minValue": 10,
    "maxValue": 60,
    "defaultValue": 30
  },
  "num_workers": {
    "type": "range",
    "minValue": 1,
    "maxValue": 10,
    "defaultValue": 2
  },
  "node_type_id": {
    "type": "allowlist",
    "values": [
      "Standard_DS3_v2",
      "Standard_DS4_v2",
      "Standard_E4ds_v4"
    ],
    "defaultValue": "Standard_DS3_v2"
  },
  "spark_version": {
    "type": "regex",
    "pattern": "14\\.[0-9]+\\.x-scala2\\.12"
  },
  "azure_attributes.spot_bid_max_price": {
    "type": "fixed",
    "value": -1
  },
  "azure_attributes.first_on_demand": {
    "type": "fixed",
    "value": 1
  },
  "custom_tags.cost_center": {
    "type": "fixed",
    "value": "data-engineering"
  }
}
python
undefined
json
// policies/cost_controlled_policy.json
{
  "cluster_type": {
    "type": "fixed",
    "value": "job"
  },
  "autotermination_minutes": {
    "type": "range",
    "minValue": 10,
    "maxValue": 60,
    "defaultValue": 30
  },
  "num_workers": {
    "type": "range",
    "minValue": 1,
    "maxValue": 10,
    "defaultValue": 2
  },
  "node_type_id": {
    "type": "allowlist",
    "values": [
      "Standard_DS3_v2",
      "Standard_DS4_v2",
      "Standard_E4ds_v4"
    ],
    "defaultValue": "Standard_DS3_v2"
  },
  "spark_version": {
    "type": "regex",
    "pattern": "14\\.[0-9]+\\.x-scala2\\.12"
  },
  "azure_attributes.spot_bid_max_price": {
    "type": "fixed",
    "value": -1
  },
  "azure_attributes.first_on_demand": {
    "type": "fixed",
    "value": 1
  },
  "custom_tags.cost_center": {
    "type": "fixed",
    "value": "data-engineering"
  }
}
python
undefined

Create policy via SDK

Create policy via SDK

from databricks.sdk import WorkspaceClient import json
def create_cost_policy(w: WorkspaceClient, policy_name: str) -> str: """Create cost-controlled cluster policy."""
policy_definition = {
    "autotermination_minutes": {
        "type": "range",
        "minValue": 10,
        "maxValue": 60,
        "defaultValue": 30,
    },
    "num_workers": {
        "type": "range",
        "minValue": 1,
        "maxValue": 10,
    },
    "node_type_id": {
        "type": "allowlist",
        "values": [
            "Standard_DS3_v2",
            "Standard_DS4_v2",
        ],
    },
    "azure_attributes.spot_bid_max_price": {
        "type": "fixed",
        "value": -1,  # Use spot instances
    },
    "azure_attributes.first_on_demand": {
        "type": "fixed",
        "value": 1,  # At least 1 on-demand for driver
    },
}

policy = w.cluster_policies.create(
    name=policy_name,
    definition=json.dumps(policy_definition),
)

return policy.policy_id
undefined
from databricks.sdk import WorkspaceClient import json
def create_cost_policy(w: WorkspaceClient, policy_name: str) -> str: """Create cost-controlled cluster policy."""
policy_definition = {
    "autotermination_minutes": {
        "type": "range",
        "minValue": 10,
        "maxValue": 60,
        "defaultValue": 30,
    },
    "num_workers": {
        "type": "range",
        "minValue": 1,
        "maxValue": 10,
    },
    "node_type_id": {
        "type": "allowlist",
        "values": [
            "Standard_DS3_v2",
            "Standard_DS4_v2",
        ],
    },
    "azure_attributes.spot_bid_max_price": {
        "type": "fixed",
        "value": -1,  # Use spot instances
    },
    "azure_attributes.first_on_demand": {
        "type": "fixed",
        "value": 1,  # At least 1 on-demand for driver
    },
}

policy = w.cluster_policies.create(
    name=policy_name,
    definition=json.dumps(policy_definition),
)

return policy.policy_id
undefined

Step 2: Spot Instance Configuration

步骤2:Spot实例配置

yaml
undefined
yaml
undefined

resources/cost_optimized_cluster.yml

resources/cost_optimized_cluster.yml

job_clusters:
  • job_cluster_key: cost_optimized new_cluster: spark_version: "14.3.x-scala2.12" node_type_id: "Standard_DS3_v2" num_workers: 4

    Azure Spot Configuration

    azure_attributes: first_on_demand: 1 # Driver on-demand availability: SPOT_AZURE # Workers on spot spot_bid_max_price: -1 # Pay up to on-demand price

    AWS Spot Configuration (alternative)

    aws_attributes:

    first_on_demand: 1

    availability: SPOT_WITH_FALLBACK

    spot_bid_price_percent: 100

    autoscale: min_workers: 1 max_workers: 8
    custom_tags: cost_center: analytics environment: production
undefined
job_clusters:
  • job_cluster_key: cost_optimized new_cluster: spark_version: "14.3.x-scala2.12" node_type_id: "Standard_DS3_v2" num_workers: 4

    Azure Spot Configuration

    azure_attributes: first_on_demand: 1 # Driver on-demand availability: SPOT_AZURE # Workers on spot spot_bid_max_price: -1 # Pay up to on-demand price

    AWS Spot Configuration (alternative)

    aws_attributes:

    first_on_demand: 1

    availability: SPOT_WITH_FALLBACK

    spot_bid_price_percent: 100

    autoscale: min_workers: 1 max_workers: 8
    custom_tags: cost_center: analytics environment: production
undefined

Step 3: Instance Pool for Faster Startup

步骤3:实例池配置(加快启动速度)

python
undefined
python
undefined

Create instance pool for reduced startup time and costs

Create instance pool for reduced startup time and costs

from databricks.sdk import WorkspaceClient from databricks.sdk.service.compute import ( InstancePoolAndStats, InstancePoolAzureAttributes, )
def create_cost_optimized_pool( w: WorkspaceClient, pool_name: str, min_idle: int = 1, max_capacity: int = 10, idle_timeout: int = 15, ) -> str: """ Create instance pool for cost optimization.
Benefits:
- Faster cluster startup (instances pre-warmed)
- Lower spot instance preemption risk
- Consistent performance
"""
pool = w.instance_pools.create(
    instance_pool_name=pool_name,
    node_type_id="Standard_DS3_v2",
    min_idle_instances=min_idle,
    max_capacity=max_capacity,
    idle_instance_autotermination_minutes=idle_timeout,
    azure_attributes=InstancePoolAzureAttributes(
        availability="SPOT_AZURE",
        spot_bid_max_price=-1,
    ),
    preloaded_spark_versions=["14.3.x-scala2.12"],
    custom_tags={
        "pool_type": "cost_optimized",
        "managed_by": "terraform",
    },
)

return pool.instance_pool_id
undefined
from databricks.sdk import WorkspaceClient from databricks.sdk.service.compute import ( InstancePoolAndStats, InstancePoolAzureAttributes, )
def create_cost_optimized_pool( w: WorkspaceClient, pool_name: str, min_idle: int = 1, max_capacity: int = 10, idle_timeout: int = 15, ) -> str: """ Create instance pool for cost optimization.
Benefits:
- Faster cluster startup (instances pre-warmed)
- Lower spot instance preemption risk
- Consistent performance
"""
pool = w.instance_pools.create(
    instance_pool_name=pool_name,
    node_type_id="Standard_DS3_v2",
    min_idle_instances=min_idle,
    max_capacity=max_capacity,
    idle_instance_autotermination_minutes=idle_timeout,
    azure_attributes=InstancePoolAzureAttributes(
        availability="SPOT_AZURE",
        spot_bid_max_price=-1,
    ),
    preloaded_spark_versions=["14.3.x-scala2.12"],
    custom_tags={
        "pool_type": "cost_optimized",
        "managed_by": "terraform",
    },
)

return pool.instance_pool_id
undefined

Step 4: Cost Monitoring

步骤4:成本监控

sql
-- Analyze cluster costs from system tables
SELECT
    cluster_name,
    cluster_id,
    DATE(usage_date) as date,
    SUM(usage_quantity) as dbu_usage,
    SUM(usage_quantity * list_price) as estimated_cost
FROM system.billing.usage
WHERE usage_date > current_date() - INTERVAL 30 DAYS
  AND usage_type = 'COMPUTE'
GROUP BY cluster_name, cluster_id, DATE(usage_date)
ORDER BY estimated_cost DESC;

-- Top cost drivers by job
SELECT
    job_name,
    COUNT(DISTINCT run_id) as runs,
    SUM(duration) / 3600000 as total_hours,
    AVG(duration) / 60000 as avg_minutes,
    SUM(dbu_usage) as total_dbus
FROM system.lakeflow.job_run_timeline r
JOIN system.billing.usage u ON r.cluster_id = u.cluster_id
WHERE r.start_time > current_timestamp() - INTERVAL 30 DAYS
GROUP BY job_name
ORDER BY total_dbus DESC
LIMIT 20;

-- Identify idle clusters
SELECT
    cluster_name,
    cluster_id,
    state,
    last_activity_time,
    TIMESTAMPDIFF(HOUR, last_activity_time, current_timestamp()) as idle_hours
FROM system.compute.clusters
WHERE state = 'RUNNING'
  AND TIMESTAMPDIFF(MINUTE, last_activity_time, current_timestamp()) > 30
ORDER BY idle_hours DESC;
sql
-- Analyze cluster costs from system tables
SELECT
    cluster_name,
    cluster_id,
    DATE(usage_date) as date,
    SUM(usage_quantity) as dbu_usage,
    SUM(usage_quantity * list_price) as estimated_cost
FROM system.billing.usage
WHERE usage_date > current_date() - INTERVAL 30 DAYS
  AND usage_type = 'COMPUTE'
GROUP BY cluster_name, cluster_id, DATE(usage_date)
ORDER BY estimated_cost DESC;

-- Top cost drivers by job
SELECT
    job_name,
    COUNT(DISTINCT run_id) as runs,
    SUM(duration) / 3600000 as total_hours,
    AVG(duration) / 60000 as avg_minutes,
    SUM(dbu_usage) as total_dbus
FROM system.lakeflow.job_run_timeline r
JOIN system.billing.usage u ON r.cluster_id = u.cluster_id
WHERE r.start_time > current_timestamp() - INTERVAL 30 DAYS
GROUP BY job_name
ORDER BY total_dbus DESC
LIMIT 20;

-- Identify idle clusters
SELECT
    cluster_name,
    cluster_id,
    state,
    last_activity_time,
    TIMESTAMPDIFF(HOUR, last_activity_time, current_timestamp()) as idle_hours
FROM system.compute.clusters
WHERE state = 'RUNNING'
  AND TIMESTAMPDIFF(MINUTE, last_activity_time, current_timestamp()) > 30
ORDER BY idle_hours DESC;

Step 5: Cost Allocation Tags

步骤5:成本分配标签

python
undefined
python
undefined

Implement cost allocation with tags

Implement cost allocation with tags

from databricks.sdk import WorkspaceClient
def enforce_cost_tags( w: WorkspaceClient, required_tags: list[str] = ["cost_center", "team", "environment"], ) -> list[dict]: """ Audit and report clusters missing required cost tags.
Returns list of non-compliant clusters.
"""
non_compliant = []

for cluster in w.clusters.list():
    tags = cluster.custom_tags or {}
    missing_tags = [t for t in required_tags if t not in tags]

    if missing_tags:
        non_compliant.append({
            "cluster_id": cluster.cluster_id,
            "cluster_name": cluster.cluster_name,
            "missing_tags": missing_tags,
            "creator": cluster.creator_user_name,
        })

return non_compliant
from databricks.sdk import WorkspaceClient
def enforce_cost_tags( w: WorkspaceClient, required_tags: list[str] = ["cost_center", "team", "environment"], ) -> list[dict]: """ Audit and report clusters missing required cost tags.
Returns list of non-compliant clusters.
"""
non_compliant = []

for cluster in w.clusters.list():
    tags = cluster.custom_tags or {}
    missing_tags = [t for t in required_tags if t not in tags]

    if missing_tags:
        non_compliant.append({
            "cluster_id": cluster.cluster_id,
            "cluster_name": cluster.cluster_name,
            "missing_tags": missing_tags,
            "creator": cluster.creator_user_name,
        })

return non_compliant

Cost allocation report

Cost allocation report

def generate_cost_report(w: WorkspaceClient, spark) -> dict: """Generate cost allocation report by tags.""" query = """ SELECT custom_tags:cost_center as cost_center, custom_tags:team as team, SUM(usage_quantity * list_price) as total_cost FROM system.billing.usage WHERE usage_date > current_date() - INTERVAL 30 DAYS GROUP BY custom_tags:cost_center, custom_tags:team ORDER BY total_cost DESC """ return spark.sql(query).toPandas().to_dict('records')
undefined
def generate_cost_report(w: WorkspaceClient, spark) -> dict: """Generate cost allocation report by tags.""" query = """ SELECT custom_tags:cost_center as cost_center, custom_tags:team as team, SUM(usage_quantity * list_price) as total_cost FROM system.billing.usage WHERE usage_date > current_date() - INTERVAL 30 DAYS GROUP BY custom_tags:cost_center, custom_tags:team ORDER BY total_cost DESC """ return spark.sql(query).toPandas().to_dict('records')
undefined

Step 6: Auto-Termination and Scheduling

步骤6:自动终止与调度配置

python
undefined
python
undefined

Job scheduling for cost optimization

Job scheduling for cost optimization

from databricks.sdk import WorkspaceClient from databricks.sdk.service.jobs import CronSchedule
def configure_off_hours_schedule( w: WorkspaceClient, job_id: int, cron_expression: str = "0 0 2 * * ?", # 2 AM daily timezone: str = "America/New_York", ) -> None: """Configure job to run during off-peak hours.""" w.jobs.update( job_id=job_id, new_settings={ "schedule": { "quartz_cron_expression": cron_expression, "timezone_id": timezone, } } )
from databricks.sdk import WorkspaceClient from databricks.sdk.service.jobs import CronSchedule
def configure_off_hours_schedule( w: WorkspaceClient, job_id: int, cron_expression: str = "0 0 2 * * ?", # 2 AM daily timezone: str = "America/New_York", ) -> None: """Configure job to run during off-peak hours.""" w.jobs.update( job_id=job_id, new_settings={ "schedule": { "quartz_cron_expression": cron_expression, "timezone_id": timezone, } } )

Cluster auto-stop policy

Cluster auto-stop policy

def set_aggressive_auto_termination( w: WorkspaceClient, cluster_id: str, minutes: int = 15, ) -> None: """Set aggressive auto-termination for development clusters.""" cluster = w.clusters.get(cluster_id) w.clusters.edit( cluster_id=cluster_id, spark_version=cluster.spark_version, node_type_id=cluster.node_type_id, autotermination_minutes=minutes, )
undefined
def set_aggressive_auto_termination( w: WorkspaceClient, cluster_id: str, minutes: int = 15, ) -> None: """Set aggressive auto-termination for development clusters.""" cluster = w.clusters.get(cluster_id) w.clusters.edit( cluster_id=cluster_id, spark_version=cluster.spark_version, node_type_id=cluster.node_type_id, autotermination_minutes=minutes, )
undefined

Output

输出结果

  • Cost-controlled cluster policies
  • Spot instance optimization
  • Instance pools configured
  • Cost monitoring dashboards
  • Auto-termination policies
  • 成本受控的集群策略
  • Spot实例优化配置
  • 已配置的实例池
  • 成本监控仪表盘
  • 自动终止策略

Error Handling

错误处理

IssueCauseSolution
Spot preemptionInstance reclaimedUse first_on_demand for stability
Policy violationCluster over limitsAdjust policy or request exception
Missing tagsManual cluster creationEnforce policy with required tags
High idle costsLong auto-term timeoutReduce to 15-30 minutes
问题原因解决方案
Spot实例抢占实例被云服务商回收使用first_on_demand配置保证稳定性
策略违规集群超出限制调整策略或申请例外
缺少标签手动创建集群未添加标签实施强制要求标签的策略
闲置成本过高自动终止超时时间过长将超时时间缩短至15-30分钟

Examples

示例

Cost Savings Calculator

成本节省计算器

python
def estimate_spot_savings(
    on_demand_cost: float,
    spot_ratio: float = 0.7,  # 70% workers on spot
    spot_discount: float = 0.6,  # 60% discount
) -> dict:
    """Estimate savings from spot instances."""
    on_demand_portion = on_demand_cost * (1 - spot_ratio)
    spot_portion = on_demand_cost * spot_ratio * (1 - spot_discount)
    new_cost = on_demand_portion + spot_portion
    savings = on_demand_cost - new_cost

    return {
        "original_cost": on_demand_cost,
        "optimized_cost": new_cost,
        "savings": savings,
        "savings_percent": (savings / on_demand_cost) * 100,
    }
python
def estimate_spot_savings(
    on_demand_cost: float,
    spot_ratio: float = 0.7,  # 70% workers on spot
    spot_discount: float = 0.6,  # 60% discount
) -> dict:
    """Estimate savings from spot instances."""
    on_demand_portion = on_demand_cost * (1 - spot_ratio)
    spot_portion = on_demand_cost * spot_ratio * (1 - spot_discount)
    new_cost = on_demand_portion + spot_portion
    savings = on_demand_cost - new_cost

    return {
        "original_cost": on_demand_cost,
        "optimized_cost": new_cost,
        "savings": savings,
        "savings_percent": (savings / on_demand_cost) * 100,
    }

Weekly Cost Alert

每周成本告警

sql
-- Alert when costs exceed threshold
CREATE ALERT weekly_cost_alert
AS SELECT
    SUM(usage_quantity * list_price) as weekly_cost
FROM system.billing.usage
WHERE usage_date > current_date() - INTERVAL 7 DAYS
HAVING weekly_cost > 10000  -- $10k threshold
SCHEDULE CRON '0 9 * * 1'  -- Monday 9 AM
NOTIFICATIONS (email_addresses = ['finops@company.com']);
sql
-- Alert when costs exceed threshold
CREATE ALERT weekly_cost_alert
AS SELECT
    SUM(usage_quantity * list_price) as weekly_cost
FROM system.billing.usage
WHERE usage_date > current_date() - INTERVAL 7 DAYS
HAVING weekly_cost > 10000  -- $10k threshold
SCHEDULE CRON '0 9 * * 1'  -- Monday 9 AM
NOTIFICATIONS (email_addresses = ['finops@company.com']);

Resources

参考资源

Next Steps

下一步

For reference architecture, see
databricks-reference-architecture
.
如需参考架构,请查看
databricks-reference-architecture