databricks-sdk-patterns
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDatabricks SDK Patterns
Databricks SDK Patterns
Overview
概述
Production-ready patterns for Databricks SDK usage in Python.
适用于Python中Databricks SDK使用的生产级模式。
Prerequisites
前提条件
- Completed setup
databricks-install-auth - Familiarity with async/await patterns
- Understanding of error handling best practices
- 已完成配置
databricks-install-auth - 熟悉async/await模式
- 了解错误处理最佳实践
Instructions
操作步骤
Step 1: Implement Singleton Pattern
步骤1:实现单例模式
python
undefinedpython
undefinedsrc/databricks/client.py
src/databricks/client.py
from databricks.sdk import WorkspaceClient
from functools import lru_cache
from typing import Optional
import os
@lru_cache(maxsize=1)
def get_workspace_client(profile: Optional[str] = None) -> WorkspaceClient:
"""Get singleton WorkspaceClient instance."""
return WorkspaceClient(profile=profile)
from databricks.sdk import WorkspaceClient
from functools import lru_cache
from typing import Optional
import os
@lru_cache(maxsize=1)
def get_workspace_client(profile: Optional[str] = None) -> WorkspaceClient:
"""Get singleton WorkspaceClient instance."""
return WorkspaceClient(profile=profile)
Multi-workspace support
Multi-workspace support
_clients: dict[str, WorkspaceClient] = {}
def get_client_for_workspace(workspace_name: str) -> WorkspaceClient:
"""Get or create client for specific workspace."""
if workspace_name not in _clients:
config = get_workspace_config(workspace_name)
_clients[workspace_name] = WorkspaceClient(
host=config["host"],
token=config["token"]
)
return _clients[workspace_name]
def get_workspace_config(name: str) -> dict:
"""Load workspace config from environment."""
return {
"host": os.environ[f"DATABRICKS_{name.upper()}HOST"],
"token": os.environ[f"DATABRICKS{name.upper()}_TOKEN"],
}
undefined_clients: dict[str, WorkspaceClient] = {}
def get_client_for_workspace(workspace_name: str) -> WorkspaceClient:
"""Get or create client for specific workspace."""
if workspace_name not in _clients:
config = get_workspace_config(workspace_name)
_clients[workspace_name] = WorkspaceClient(
host=config["host"],
token=config["token"]
)
return _clients[workspace_name]
def get_workspace_config(name: str) -> dict:
"""Load workspace config from environment."""
return {
"host": os.environ[f"DATABRICKS_{name.upper()}HOST"],
"token": os.environ[f"DATABRICKS{name.upper()}_TOKEN"],
}
undefinedStep 2: Add Error Handling Wrapper
步骤2:添加错误处理包装器
python
undefinedpython
undefinedsrc/databricks/errors.py
src/databricks/errors.py
from databricks.sdk.errors import (
DatabricksError,
NotFound,
ResourceAlreadyExists,
PermissionDenied,
ResourceConflict,
BadRequest,
)
from dataclasses import dataclass
from typing import TypeVar, Generic, Optional
import logging
logger = logging.getLogger(name)
T = TypeVar("T")
@dataclass
class Result(Generic[T]):
"""Result wrapper for API operations."""
data: Optional[T] = None
error: Optional[Exception] = None
@property
def is_success(self) -> bool:
return self.error is None
def unwrap(self) -> T:
if self.error:
raise self.error
return self.datadef safe_databricks_call(operation):
"""Decorator for safe Databricks API calls."""
def wrapper(*args, **kwargs) -> Result:
try:
result = operation(*args, **kwargs)
return Result(data=result)
except NotFound as e:
logger.warning(f"Resource not found: {e}")
return Result(error=e)
except ResourceAlreadyExists as e:
logger.warning(f"Resource already exists: {e}")
return Result(error=e)
except PermissionDenied as e:
logger.error(f"Permission denied: {e}")
return Result(error=e)
except DatabricksError as e:
logger.error(f"Databricks error: {e.error_code} - {e.message}")
return Result(error=e)
return wrapper
undefinedfrom databricks.sdk.errors import (
DatabricksError,
NotFound,
ResourceAlreadyExists,
PermissionDenied,
ResourceConflict,
BadRequest,
)
from dataclasses import dataclass
from typing import TypeVar, Generic, Optional
import logging
logger = logging.getLogger(name)
T = TypeVar("T")
@dataclass
class Result(Generic[T]):
"""Result wrapper for API operations."""
data: Optional[T] = None
error: Optional[Exception] = None
@property
def is_success(self) -> bool:
return self.error is None
def unwrap(self) -> T:
if self.error:
raise self.error
return self.datadef safe_databricks_call(operation):
"""Decorator for safe Databricks API calls."""
def wrapper(*args, **kwargs) -> Result:
try:
result = operation(*args, **kwargs)
return Result(data=result)
except NotFound as e:
logger.warning(f"Resource not found: {e}")
return Result(error=e)
except ResourceAlreadyExists as e:
logger.warning(f"Resource already exists: {e}")
return Result(error=e)
except PermissionDenied as e:
logger.error(f"Permission denied: {e}")
return Result(error=e)
except DatabricksError as e:
logger.error(f"Databricks error: {e.error_code} - {e.message}")
return Result(error=e)
return wrapper
undefinedStep 3: Implement Retry Logic with Backoff
步骤3:实现带退避的重试逻辑
python
undefinedpython
undefinedsrc/databricks/retry.py
src/databricks/retry.py
import time
from typing import Callable, TypeVar
from databricks.sdk.errors import (
DatabricksError,
TemporarilyUnavailable,
TooManyRequests,
)
T = TypeVar("T")
def with_retry(
operation: Callable[[], T],
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: float = 0.5,
) -> T:
"""Execute operation with exponential backoff retry."""
import random
for attempt in range(max_retries + 1):
try:
return operation()
except (TemporarilyUnavailable, TooManyRequests) as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
delay += random.uniform(0, jitter)
print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
time.sleep(delay)
except DatabricksError as e:
# Don't retry client errors
if e.error_code and e.error_code.startswith("4"):
raise
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
time.sleep(delay)
raise RuntimeError("Unreachable")undefinedimport time
from typing import Callable, TypeVar
from databricks.sdk.errors import (
DatabricksError,
TemporarilyUnavailable,
TooManyRequests,
)
T = TypeVar("T")
def with_retry(
operation: Callable[[], T],
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: float = 0.5,
) -> T:
"""Execute operation with exponential backoff retry."""
import random
for attempt in range(max_retries + 1):
try:
return operation()
except (TemporarilyUnavailable, TooManyRequests) as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
delay += random.uniform(0, jitter)
print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
time.sleep(delay)
except DatabricksError as e:
# Don't retry client errors
if e.error_code and e.error_code.startswith("4"):
raise
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
time.sleep(delay)
raise RuntimeError("Unreachable")undefinedStep 4: Context Manager for Clusters
步骤4:用于集群的上下文管理器
python
undefinedpython
undefinedsrc/databricks/cluster.py
src/databricks/cluster.py
from contextlib import contextmanager
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import State
import time
@contextmanager
def managed_cluster(w: WorkspaceClient, cluster_id: str):
"""Context manager for cluster lifecycle."""
try:
# Start cluster if not running
cluster = w.clusters.get(cluster_id)
if cluster.state != State.RUNNING:
print(f"Starting cluster {cluster_id}...")
w.clusters.start(cluster_id)
w.clusters.wait_get_cluster_running(cluster_id)
yield cluster_id
finally:
# Optionally terminate after use
pass # Or: w.clusters.delete(cluster_id)@contextmanager
def ephemeral_cluster(w: WorkspaceClient, spec: dict):
"""Create temporary cluster that auto-deletes."""
cluster = w.clusters.create_and_wait(**spec)
try:
yield cluster.cluster_id
finally:
w.clusters.permanent_delete(cluster.cluster_id)
undefinedfrom contextlib import contextmanager
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import State
import time
@contextmanager
def managed_cluster(w: WorkspaceClient, cluster_id: str):
"""Context manager for cluster lifecycle."""
try:
# Start cluster if not running
cluster = w.clusters.get(cluster_id)
if cluster.state != State.RUNNING:
print(f"Starting cluster {cluster_id}...")
w.clusters.start(cluster_id)
w.clusters.wait_get_cluster_running(cluster_id)
yield cluster_id
finally:
# Optionally terminate after use
pass # Or: w.clusters.delete(cluster_id)@contextmanager
def ephemeral_cluster(w: WorkspaceClient, spec: dict):
"""Create temporary cluster that auto-deletes."""
cluster = w.clusters.create_and_wait(**spec)
try:
yield cluster.cluster_id
finally:
w.clusters.permanent_delete(cluster.cluster_id)
undefinedStep 5: Type-Safe Job Builders
步骤5:类型安全的Job构建器
python
undefinedpython
undefinedsrc/databricks/jobs.py
src/databricks/jobs.py
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
Task, NotebookTask, PythonWheelTask,
JobCluster, JobSettings, CreateJob,
)
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class JobBuilder:
"""Fluent builder for Databricks jobs."""
name: str
tasks: list[Task] = field(default_factory=list)
job_clusters: list[JobCluster] = field(default_factory=list)
tags: dict[str, str] = field(default_factory=dict)
def add_notebook_task(
self,
task_key: str,
notebook_path: str,
cluster_key: str,
parameters: Optional[dict] = None,
) -> "JobBuilder":
"""Add a notebook task."""
self.tasks.append(Task(
task_key=task_key,
job_cluster_key=cluster_key,
notebook_task=NotebookTask(
notebook_path=notebook_path,
base_parameters=parameters or {},
)
))
return self
def add_wheel_task(
self,
task_key: str,
package_name: str,
entry_point: str,
cluster_key: str,
parameters: Optional[list[str]] = None,
) -> "JobBuilder":
"""Add a Python wheel task."""
self.tasks.append(Task(
task_key=task_key,
job_cluster_key=cluster_key,
python_wheel_task=PythonWheelTask(
package_name=package_name,
entry_point=entry_point,
parameters=parameters or [],
)
))
return self
def create(self, w: WorkspaceClient) -> int:
"""Create the job and return job_id."""
job = w.jobs.create(
name=self.name,
tasks=self.tasks,
job_clusters=self.job_clusters,
tags=self.tags,
)
return job.job_idfrom databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
Task, NotebookTask, PythonWheelTask,
JobCluster, JobSettings, CreateJob,
)
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class JobBuilder:
"""Fluent builder for Databricks jobs."""
name: str
tasks: list[Task] = field(default_factory=list)
job_clusters: list[JobCluster] = field(default_factory=list)
tags: dict[str, str] = field(default_factory=dict)
def add_notebook_task(
self,
task_key: str,
notebook_path: str,
cluster_key: str,
parameters: Optional[dict] = None,
) -> "JobBuilder":
"""Add a notebook task."""
self.tasks.append(Task(
task_key=task_key,
job_cluster_key=cluster_key,
notebook_task=NotebookTask(
notebook_path=notebook_path,
base_parameters=parameters or {},
)
))
return self
def add_wheel_task(
self,
task_key: str,
package_name: str,
entry_point: str,
cluster_key: str,
parameters: Optional[list[str]] = None,
) -> "JobBuilder":
"""Add a Python wheel task."""
self.tasks.append(Task(
task_key=task_key,
job_cluster_key=cluster_key,
python_wheel_task=PythonWheelTask(
package_name=package_name,
entry_point=entry_point,
parameters=parameters or [],
)
))
return self
def create(self, w: WorkspaceClient) -> int:
"""Create the job and return job_id."""
job = w.jobs.create(
name=self.name,
tasks=self.tasks,
job_clusters=self.job_clusters,
tags=self.tags,
)
return job.job_idUsage
Usage
job_id = (
JobBuilder("etl-pipeline")
.add_notebook_task("bronze", "/pipelines/bronze", "etl-cluster")
.add_notebook_task("silver", "/pipelines/silver", "etl-cluster")
.create(w)
)
undefinedjob_id = (
JobBuilder("etl-pipeline")
.add_notebook_task("bronze", "/pipelines/bronze", "etl-cluster")
.add_notebook_task("silver", "/pipelines/silver", "etl-cluster")
.create(w)
)
undefinedOutput
输出结果
- Type-safe client singleton
- Robust error handling with structured logging
- Automatic retry with exponential backoff
- Fluent job builder pattern
- 类型安全的客户端单例
- 带有结构化日志的健壮错误处理
- 带指数退避的自动重试
- 流畅的Job构建器模式
Error Handling
错误处理
| Pattern | Use Case | Benefit |
|---|---|---|
| Result wrapper | All API calls | Type-safe error handling |
| Retry logic | Transient failures | Improves reliability |
| Context managers | Cluster lifecycle | Resource cleanup |
| Builders | Job creation | Type safety and fluency |
| 模式 | 使用场景 | 优势 |
|---|---|---|
| 结果包装器 | 所有API调用 | 类型安全的错误处理 |
| 重试逻辑 | 临时故障 | 提升可靠性 |
| 上下文管理器 | 集群生命周期 | 资源清理 |
| 构建器 | Job创建 | 类型安全与流畅性 |
Examples
示例
Factory Pattern (Multi-Tenant)
工厂模式(多租户)
python
from functools import lru_cache
@lru_cache(maxsize=100)
def get_tenant_client(tenant_id: str) -> WorkspaceClient:
"""Get workspace client for tenant."""
config = get_tenant_config(tenant_id)
return WorkspaceClient(
host=config.workspace_url,
token=config.token,
)python
from functools import lru_cache
@lru_cache(maxsize=100)
def get_tenant_client(tenant_id: str) -> WorkspaceClient:
"""Get workspace client for tenant."""
config = get_tenant_config(tenant_id)
return WorkspaceClient(
host=config.workspace_url,
token=config.token,
)Async Operations
异步操作
python
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def async_list_clusters(w: WorkspaceClient):
"""Async wrapper for cluster listing."""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
clusters = await loop.run_in_executor(
executor,
lambda: list(w.clusters.list())
)
return clusterspython
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def async_list_clusters(w: WorkspaceClient):
"""Async wrapper for cluster listing."""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
clusters = await loop.run_in_executor(
executor,
lambda: list(w.clusters.list())
)
return clustersResources
参考资源
Next Steps
后续步骤
Apply patterns in for Delta Lake ETL.
databricks-core-workflow-a在中应用这些模式以实现Delta Lake ETL。
databricks-core-workflow-a