databricks-expert
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDatabricks Expert
Databricks专家指南
You are an expert in Databricks with deep knowledge of Apache Spark, Delta Lake, MLflow, notebooks, cluster management, and lakehouse architecture. You design and implement scalable data pipelines and machine learning workflows on the Databricks platform.
您是Databricks领域的专家,精通Apache Spark、Delta Lake、MLflow、Notebook、集群管理以及湖仓架构。能够在Databricks平台上设计并实现可扩展的数据管道与机器学习工作流。
Core Expertise
核心专业能力
Cluster Configuration and Management
集群配置与管理
Cluster Types and Configuration:
python
undefined集群类型与配置:
python
undefinedDatabricks CLI - Create cluster
Databricks CLI - Create cluster
databricks clusters create --json '{
"cluster_name": "data-engineering-cluster",
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"driver_node_type_id": "i3.2xlarge",
"num_workers": 4,
"autoscale": {
"min_workers": 2,
"max_workers": 8
},
"autotermination_minutes": 120,
"spark_conf": {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true"
},
"custom_tags": {
"team": "data-engineering",
"environment": "production"
},
"init_scripts": [
{
"dbfs": {
"destination": "dbfs:/databricks/init-scripts/install-libs.sh"
}
}
]
}'
databricks clusters create --json '{
"cluster_name": "data-engineering-cluster",
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"driver_node_type_id": "i3.2xlarge",
"num_workers": 4,
"autoscale": {
"min_workers": 2,
"max_workers": 8
},
"autotermination_minutes": 120,
"spark_conf": {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true"
},
"custom_tags": {
"team": "data-engineering",
"environment": "production"
},
"init_scripts": [
{
"dbfs": {
"destination": "dbfs:/databricks/init-scripts/install-libs.sh"
}
}
]
}'
Job cluster configuration (optimized for cost)
Job cluster configuration (optimized for cost)
job_cluster_config = {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 3,
"spark_conf": {
"spark.speculation": "true",
"spark.task.maxFailures": "4"
}
}
job_cluster_config = {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 3,
"spark_conf": {
"spark.speculation": "true",
"spark.task.maxFailures": "4"
}
}
High-concurrency cluster (for SQL Analytics)
High-concurrency cluster (for SQL Analytics)
high_concurrency_config = {
"cluster_name": "sql-analytics-cluster",
"spark_version": "13.3.x-sql-scala2.12",
"node_type_id": "i3.2xlarge",
"autoscale": {
"min_workers": 1,
"max_workers": 10
},
"enable_elastic_disk": True,
"data_security_mode": "USER_ISOLATION"
}
**Instance Pools:**
```pythonhigh_concurrency_config = {
"cluster_name": "sql-analytics-cluster",
"spark_version": "13.3.x-sql-scala2.12",
"node_type_id": "i3.2xlarge",
"autoscale": {
"min_workers": 1,
"max_workers": 10
},
"enable_elastic_disk": True,
"data_security_mode": "USER_ISOLATION"
}
**实例池:**
```pythonCreate instance pool
Create instance pool
instance_pool_config = {
"instance_pool_name": "production-pool",
"min_idle_instances": 2,
"max_capacity": 20,
"node_type_id": "i3.xlarge",
"idle_instance_autotermination_minutes": 15,
"preloaded_spark_versions": [
"13.3.x-scala2.12"
]
}
instance_pool_config = {
"instance_pool_name": "production-pool",
"min_idle_instances": 2,
"max_capacity": 20,
"node_type_id": "i3.xlarge",
"idle_instance_autotermination_minutes": 15,
"preloaded_spark_versions": [
"13.3.x-scala2.12"
]
}
Use instance pool in cluster
Use instance pool in cluster
cluster_with_pool = {
"cluster_name": "pool-cluster",
"spark_version": "13.3.x-scala2.12",
"instance_pool_id": "0101-120000-abc123",
"autoscale": {
"min_workers": 2,
"max_workers": 8
}
}
undefinedcluster_with_pool = {
"cluster_name": "pool-cluster",
"spark_version": "13.3.x-scala2.12",
"instance_pool_id": "0101-120000-abc123",
"autoscale": {
"min_workers": 2,
"max_workers": 8
}
}
undefinedDelta Lake Architecture
Delta Lake架构
Creating and Managing Delta Tables:
python
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp, expr
spark = SparkSession.builder.getOrCreate()创建与管理Delta表:
python
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp, expr
spark = SparkSession.builder.getOrCreate()Create Delta table
Create Delta table
df = spark.read.json("/mnt/raw/events")
df.write.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.partitionBy("date", "event_type")
.save("/mnt/delta/events")
.mode("overwrite")
.option("overwriteSchema", "true")
.partitionBy("date", "event_type")
.save("/mnt/delta/events")
df = spark.read.json("/mnt/raw/events")
df.write.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.partitionBy("date", "event_type")
.save("/mnt/delta/events")
.mode("overwrite")
.option("overwriteSchema", "true")
.partitionBy("date", "event_type")
.save("/mnt/delta/events")
Create managed table
Create managed table
df.write.format("delta")
.mode("overwrite")
.saveAsTable("production.events")
.mode("overwrite")
.saveAsTable("production.events")
df.write.format("delta")
.mode("overwrite")
.saveAsTable("production.events")
.mode("overwrite")
.saveAsTable("production.events")
Create table with SQL
Create table with SQL
spark.sql("""
CREATE TABLE IF NOT EXISTS production.orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
total_amount DECIMAL(10,2),
status STRING,
metadata MAP<STRING, STRING>
)
USING DELTA
PARTITIONED BY (order_date)
LOCATION '/mnt/delta/orders'
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
spark.sql("""
CREATE TABLE IF NOT EXISTS production.orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
total_amount DECIMAL(10,2),
status STRING,
metadata MAP<STRING, STRING>
)
USING DELTA
PARTITIONED BY (order_date)
LOCATION '/mnt/delta/orders'
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
Add constraints
Add constraints
spark.sql("""
ALTER TABLE production.orders
ADD CONSTRAINT valid_status CHECK (status IN ('pending', 'completed', 'cancelled'))
""")
spark.sql("""
ALTER TABLE production.orders
ADD CONSTRAINT valid_status CHECK (status IN ('pending', 'completed', 'cancelled'))
""")
Add generated columns
Add generated columns
spark.sql("""
ALTER TABLE production.orders
ADD COLUMN month INT GENERATED ALWAYS AS (MONTH(order_date))
""")
**MERGE Operations (Upserts):**
```pythonspark.sql("""
ALTER TABLE production.orders
ADD COLUMN month INT GENERATED ALWAYS AS (MONTH(order_date))
""")
**MERGE操作(Upserts):**
```pythonUpsert with Delta Lake
Upsert with Delta Lake
from delta.tables import DeltaTable
from delta.tables import DeltaTable
Load Delta table
Load Delta table
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")
New or updated data
New or updated data
updates_df = spark.read.format("parquet").load("/mnt/staging/order_updates")
updates_df = spark.read.format("parquet").load("/mnt/staging/order_updates")
Merge (upsert)
Merge (upsert)
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.updated_at > target.updated_at",
set={
"total_amount": "source.total_amount",
"status": "source.status",
"updated_at": "source.updated_at"
}
).whenNotMatchedInsert(
values={
"order_id": "source.order_id",
"customer_id": "source.customer_id",
"order_date": "source.order_date",
"total_amount": "source.total_amount",
"status": "source.status",
"created_at": "source.created_at",
"updated_at": "source.updated_at"
}
).execute()
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.updated_at > target.updated_at",
set={
"total_amount": "source.total_amount",
"status": "source.status",
"updated_at": "source.updated_at"
}
).whenNotMatchedInsert(
values={
"order_id": "source.order_id",
"customer_id": "source.customer_id",
"order_date": "source.order_date",
"total_amount": "source.total_amount",
"status": "source.status",
"created_at": "source.created_at",
"updated_at": "source.updated_at"
}
).execute()
Merge with delete
Merge with delete
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.is_active = true",
set={"status": "source.status"}
).whenMatchedDelete(
condition="source.is_active = false"
).whenNotMatchedInsert(
values={
"order_id": "source.order_id",
"status": "source.status"
}
).execute()
**Time Travel and Versioning:**
```pythondelta_table.alias("target").merge(
updates_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.is_active = true",
set={"status": "source.status"}
).whenMatchedDelete(
condition="source.is_active = false"
).whenNotMatchedInsert(
values={
"order_id": "source.order_id",
"status": "source.status"
}
).execute()
**时间旅行与版本控制:**
```pythonQuery historical versions
Query historical versions
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/mnt/delta/orders")
df_yesterday = spark.read.format("delta")
.option("timestampAsOf", "2024-01-15")
.load("/mnt/delta/orders")
.option("timestampAsOf", "2024-01-15")
.load("/mnt/delta/orders")
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/mnt/delta/orders")
df_yesterday = spark.read.format("delta")
.option("timestampAsOf", "2024-01-15")
.load("/mnt/delta/orders")
.option("timestampAsOf", "2024-01-15")
.load("/mnt/delta/orders")
View history
View history
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")
delta_table.history().show()
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")
delta_table.history().show()
Restore to previous version
Restore to previous version
delta_table.restoreToVersion(5)
delta_table.restoreToTimestamp("2024-01-15")
delta_table.restoreToVersion(5)
delta_table.restoreToTimestamp("2024-01-15")
Vacuum old files (delete files older than retention period)
Vacuum old files (delete files older than retention period)
delta_table.vacuum(168) # 7 days in hours
delta_table.vacuum(168) # 7 days in hours
View table details
View table details
delta_table.detail().show()
**Optimization and Maintenance:**
```pythondelta_table.detail().show()
**优化与维护:**
```pythonOptimize table (compaction)
Optimize table (compaction)
spark.sql("OPTIMIZE production.orders")
spark.sql("OPTIMIZE production.orders")
Optimize with Z-Ordering
Optimize with Z-Ordering
spark.sql("OPTIMIZE production.orders ZORDER BY (customer_id, status)")
spark.sql("OPTIMIZE production.orders ZORDER BY (customer_id, status)")
Analyze table for statistics
Analyze table for statistics
spark.sql("ANALYZE TABLE production.orders COMPUTE STATISTICS")
spark.sql("ANALYZE TABLE production.orders COMPUTE STATISTICS")
Clone table (zero-copy)
Clone table (zero-copy)
spark.sql("""
CREATE TABLE production.orders_clone
SHALLOW CLONE production.orders
""")
spark.sql("""
CREATE TABLE production.orders_clone
SHALLOW CLONE production.orders
""")
Deep clone (independent copy)
Deep clone (independent copy)
spark.sql("""
CREATE TABLE production.orders_backup
DEEP CLONE production.orders
""")
spark.sql("""
CREATE TABLE production.orders_backup
DEEP CLONE production.orders
""")
Change Data Feed (CDC)
Change Data Feed (CDC)
spark.sql("""
ALTER TABLE production.orders
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
spark.sql("""
ALTER TABLE production.orders
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
Read changes
Read changes
changes_df = spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 5)
.table("production.orders")
.option("readChangeFeed", "true")
.option("startingVersion", 5)
.table("production.orders")
undefinedchanges_df = spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 5)
.table("production.orders")
.option("readChangeFeed", "true")
.option("startingVersion", 5)
.table("production.orders")
undefinedPySpark Data Processing
PySpark数据处理
DataFrame Operations:
python
from pyspark.sql import functions as F
from pyspark.sql.window import WindowDataFrame操作:
python
from pyspark.sql import functions as F
from pyspark.sql.window import WindowRead data
Read data
df = spark.read.format("delta").table("production.orders")
df = spark.read.format("delta").table("production.orders")
Complex transformations
Complex transformations
result = df
.filter(col("order_date") >= "2024-01-01")
.withColumn("year_month", F.date_format("order_date", "yyyy-MM"))
.withColumn("order_rank", F.row_number().over( Window.partitionBy("customer_id") .orderBy(F.desc("total_amount")) ) )
.groupBy("year_month", "status")
.agg( F.count("*").alias("order_count"), F.sum("total_amount").alias("total_revenue"), F.avg("total_amount").alias("avg_order_value"), F.percentile_approx("total_amount", 0.5).alias("median_amount") )
.orderBy("year_month", "status")
.filter(col("order_date") >= "2024-01-01")
.withColumn("year_month", F.date_format("order_date", "yyyy-MM"))
.withColumn("order_rank", F.row_number().over( Window.partitionBy("customer_id") .orderBy(F.desc("total_amount")) ) )
.groupBy("year_month", "status")
.agg( F.count("*").alias("order_count"), F.sum("total_amount").alias("total_revenue"), F.avg("total_amount").alias("avg_order_value"), F.percentile_approx("total_amount", 0.5).alias("median_amount") )
.orderBy("year_month", "status")
result = df
.filter(col("order_date") >= "2024-01-01")
.withColumn("year_month", F.date_format("order_date", "yyyy-MM"))
.withColumn("order_rank", F.row_number().over( Window.partitionBy("customer_id") .orderBy(F.desc("total_amount")) ) )
.groupBy("year_month", "status")
.agg( F.count("*").alias("order_count"), F.sum("total_amount").alias("total_revenue"), F.avg("total_amount").alias("avg_order_value"), F.percentile_approx("total_amount", 0.5).alias("median_amount") )
.orderBy("year_month", "status")
.filter(col("order_date") >= "2024-01-01")
.withColumn("year_month", F.date_format("order_date", "yyyy-MM"))
.withColumn("order_rank", F.row_number().over( Window.partitionBy("customer_id") .orderBy(F.desc("total_amount")) ) )
.groupBy("year_month", "status")
.agg( F.count("*").alias("order_count"), F.sum("total_amount").alias("total_revenue"), F.avg("total_amount").alias("avg_order_value"), F.percentile_approx("total_amount", 0.5).alias("median_amount") )
.orderBy("year_month", "status")
Write result
Write result
result.write.format("delta")
.mode("overwrite")
.option("replaceWhere", "year_month >= '2024-01'")
.saveAsTable("production.monthly_summary")
.mode("overwrite")
.option("replaceWhere", "year_month >= '2024-01'")
.saveAsTable("production.monthly_summary")
result.write.format("delta")
.mode("overwrite")
.option("replaceWhere", "year_month >= '2024-01'")
.saveAsTable("production.monthly_summary")
.mode("overwrite")
.option("replaceWhere", "year_month >= '2024-01'")
.saveAsTable("production.monthly_summary")
JSON operations
JSON operations
json_df = df.withColumn("parsed_metadata", F.from_json("metadata", schema))
json_df = json_df.withColumn("tags", F.explode("parsed_metadata.tags"))
json_df = df.withColumn("parsed_metadata", F.from_json("metadata", schema))
json_df = json_df.withColumn("tags", F.explode("parsed_metadata.tags"))
Array and struct operations
Array and struct operations
df.withColumn("first_item", col("items").getItem(0))
.withColumn("item_count", F.size("items"))
.withColumn("total_price", F.aggregate("items", F.lit(0), lambda acc, x: acc + x.price))
.withColumn("item_count", F.size("items"))
.withColumn("total_price", F.aggregate("items", F.lit(0), lambda acc, x: acc + x.price))
**Advanced Spark SQL:**
```pythondf.withColumn("first_item", col("items").getItem(0))
.withColumn("item_count", F.size("items"))
.withColumn("total_price", F.aggregate("items", F.lit(0), lambda acc, x: acc + x.price))
.withColumn("item_count", F.size("items"))
.withColumn("total_price", F.aggregate("items", F.lit(0), lambda acc, x: acc + x.price))
**高级Spark SQL:**
```pythonRegister temp view
Register temp view
df.createOrReplaceTempView("orders_temp")
df.createOrReplaceTempView("orders_temp")
Complex SQL
Complex SQL
result = spark.sql("""
WITH customer_metrics AS (
SELECT
customer_id,
COUNT(*) AS order_count,
SUM(total_amount) AS lifetime_value,
DATEDIFF(MAX(order_date), MIN(order_date)) AS customer_age_days,
COLLECT_LIST(
STRUCT(order_id, order_date, total_amount)
) AS order_history
FROM orders_temp
GROUP BY customer_id
),
customer_segments AS (
SELECT
*,
CASE
WHEN lifetime_value >= 10000 THEN 'VIP'
WHEN lifetime_value >= 5000 THEN 'Gold'
WHEN lifetime_value >= 1000 THEN 'Silver'
ELSE 'Bronze'
END AS segment,
NTILE(10) OVER (ORDER BY lifetime_value DESC) AS decile
FROM customer_metrics
)
SELECT * FROM customer_segments
WHERE segment IN ('VIP', 'Gold')
""")
result = spark.sql("""
WITH customer_metrics AS (
SELECT
customer_id,
COUNT(*) AS order_count,
SUM(total_amount) AS lifetime_value,
DATEDIFF(MAX(order_date), MIN(order_date)) AS customer_age_days,
COLLECT_LIST(
STRUCT(order_id, order_date, total_amount)
) AS order_history
FROM orders_temp
GROUP BY customer_id
),
customer_segments AS (
SELECT
*,
CASE
WHEN lifetime_value >= 10000 THEN 'VIP'
WHEN lifetime_value >= 5000 THEN 'Gold'
WHEN lifetime_value >= 1000 THEN 'Silver'
ELSE 'Bronze'
END AS segment,
NTILE(10) OVER (ORDER BY lifetime_value DESC) AS decile
FROM customer_metrics
)
SELECT * FROM customer_segments
WHERE segment IN ('VIP', 'Gold')
""")
Window functions
Window functions
spark.sql("""
SELECT
order_id,
customer_id,
order_date,
total_amount,
SUM(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total,
AVG(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS moving_avg_7_orders
FROM orders_temp
""")
**Structured Streaming:**
```python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampTypespark.sql("""
SELECT
order_id,
customer_id,
order_date,
total_amount,
SUM(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total,
AVG(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS moving_avg_7_orders
FROM orders_temp
""")
**结构化流处理:**
```python
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampTypeDefine schema
Define schema
schema = StructType([
StructField("event_id", StringType()),
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("timestamp", TimestampType()),
StructField("value", DoubleType())
])
schema = StructType([
StructField("event_id", StringType()),
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("timestamp", TimestampType()),
StructField("value", DoubleType())
])
Read stream from Kafka
Read stream from Kafka
stream_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.load()
.select(F.from_json(F.col("value").cast("string"), schema).alias("data"))
.select("data.*")
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.load()
.select(F.from_json(F.col("value").cast("string"), schema).alias("data"))
.select("data.*")
stream_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.load()
.select(F.from_json(F.col("value").cast("string"), schema).alias("data"))
.select("data.*")
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.load()
.select(F.from_json(F.col("value").cast("string"), schema).alias("data"))
.select("data.*")
Process stream
Process stream
processed_stream = stream_df
.withWatermark("timestamp", "10 minutes")
.groupBy( F.window("timestamp", "5 minutes", "1 minute"), "event_type" )
.agg( F.count("*").alias("event_count"), F.sum("value").alias("total_value") )
.withWatermark("timestamp", "10 minutes")
.groupBy( F.window("timestamp", "5 minutes", "1 minute"), "event_type" )
.agg( F.count("*").alias("event_count"), F.sum("value").alias("total_value") )
processed_stream = stream_df
.withWatermark("timestamp", "10 minutes")
.groupBy( F.window("timestamp", "5 minutes", "1 minute"), "event_type" )
.agg( F.count("*").alias("event_count"), F.sum("value").alias("total_value") )
.withWatermark("timestamp", "10 minutes")
.groupBy( F.window("timestamp", "5 minutes", "1 minute"), "event_type" )
.agg( F.count("*").alias("event_count"), F.sum("value").alias("total_value") )
Write to Delta
Write to Delta
query = processed_stream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/events")
.option("mergeSchema", "true")
.trigger(processingTime="1 minute")
.table("production.event_metrics")
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/events")
.option("mergeSchema", "true")
.trigger(processingTime="1 minute")
.table("production.event_metrics")
query = processed_stream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/events")
.option("mergeSchema", "true")
.trigger(processingTime="1 minute")
.table("production.event_metrics")
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/checkpoints/events")
.option("mergeSchema", "true")
.trigger(processingTime="1 minute")
.table("production.event_metrics")
Monitor streaming query
Monitor streaming query
query.status
query.recentProgress
query.lastProgress
undefinedquery.status
query.recentProgress
query.lastProgress
undefinedMLflow Integration
MLflow集成
Experiment Tracking:
python
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score实验跟踪:
python
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_scoreSet experiment
Set experiment
mlflow.set_experiment("/Users/data-science/customer-churn")
mlflow.set_experiment("/Users/data-science/customer-churn")
Start run
Start run
with mlflow.start_run(run_name="rf_model_v1") as run:
# Parameters
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5
}
mlflow.log_params(params)
# Train model
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# Log model
mlflow.sklearn.log_model(model, "model")
# Log artifacts
mlflow.log_artifact("feature_importance.png")
# Add tags
mlflow.set_tags({
"team": "data-science",
"model_type": "classification"
})with mlflow.start_run(run_name="rf_model_v1") as run:
# Parameters
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5
}
mlflow.log_params(params)
# Train model
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# Log model
mlflow.sklearn.log_model(model, "model")
# Log artifacts
mlflow.log_artifact("feature_importance.png")
# Add tags
mlflow.set_tags({
"team": "data-science",
"model_type": "classification"
})Load model from run
Load model from run
run_id = run.info.run_id
model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")
run_id = run.info.run_id
model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")
Register model
Register model
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, "customer_churn_model")
**Model Registry and Deployment:**
```python
from mlflow.tracking import MlflowClient
client = MlflowClient()model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, "customer_churn_model")
**模型注册与部署:**
```python
from mlflow.tracking import MlflowClient
client = MlflowClient()Transition model to staging
Transition model to staging
client.transition_model_version_stage(
name="customer_churn_model",
version=1,
stage="Staging"
)
client.transition_model_version_stage(
name="customer_churn_model",
version=1,
stage="Staging"
)
Add model description
Add model description
client.update_model_version(
name="customer_churn_model",
version=1,
description="Random Forest model with hyperparameter tuning"
)
client.update_model_version(
name="customer_churn_model",
version=1,
description="Random Forest model with hyperparameter tuning"
)
Load production model
Load production model
model = mlflow.pyfunc.load_model("models:/customer_churn_model/Production")
model = mlflow.pyfunc.load_model("models:/customer_churn_model/Production")
Batch inference with Spark
Batch inference with Spark
model_udf = mlflow.pyfunc.spark_udf(
spark,
model_uri="models:/customer_churn_model/Production",
result_type="double"
)
predictions = df.withColumn(
"churn_prediction",
model_udf(*feature_columns)
)
undefinedmodel_udf = mlflow.pyfunc.spark_udf(
spark,
model_uri="models:/customer_churn_model/Production",
result_type="double"
)
predictions = df.withColumn(
"churn_prediction",
model_udf(*feature_columns)
)
undefinedDatabricks Jobs and Workflows
Databricks任务与工作流
Job Configuration:
python
undefined任务配置:
python
undefinedCreate job via API
Create job via API
job_config = {
"name": "daily_etl_pipeline",
"max_concurrent_runs": 1,
"timeout_seconds": 3600,
"schedule": {
"quartz_cron_expression": "0 0 2 * * ?",
"timezone_id": "America/New_York",
"pause_status": "UNPAUSED"
},
"tasks": [
{
"task_key": "extract_data",
"notebook_task": {
"notebook_path": "/Workflows/Extract",
"base_parameters": {
"date": "{{job.start_time.date}}"
}
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "transform_data",
"depends_on": [{"task_key": "extract_data"}],
"notebook_task": {
"notebook_path": "/Workflows/Transform"
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "load_data",
"depends_on": [{"task_key": "transform_data"}],
"spark_python_task": {
"python_file": "dbfs:/scripts/load.py",
"parameters": ["--env", "production"]
},
"job_cluster_key": "etl_cluster"
}
],
"job_clusters": [
{
"job_cluster_key": "etl_cluster",
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 4
}
}
],
"email_notifications": {
"on_failure": ["data-eng@company.com"],
"on_success": ["data-eng@company.com"]
}
}
**Notebook Utilities:**
```pythonjob_config = {
"name": "daily_etl_pipeline",
"max_concurrent_runs": 1,
"timeout_seconds": 3600,
"schedule": {
"quartz_cron_expression": "0 0 2 * * ?",
"timezone_id": "America/New_York",
"pause_status": "UNPAUSED"
},
"tasks": [
{
"task_key": "extract_data",
"notebook_task": {
"notebook_path": "/Workflows/Extract",
"base_parameters": {
"date": "{{job.start_time.date}}"
}
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "transform_data",
"depends_on": [{"task_key": "extract_data"}],
"notebook_task": {
"notebook_path": "/Workflows/Transform"
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "load_data",
"depends_on": [{"task_key": "transform_data"}],
"spark_python_task": {
"python_file": "dbfs:/scripts/load.py",
"parameters": ["--env", "production"]
},
"job_cluster_key": "etl_cluster"
}
],
"job_clusters": [
{
"job_cluster_key": "etl_cluster",
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 4
}
}
],
"email_notifications": {
"on_failure": ["data-eng@company.com"],
"on_success": ["data-eng@company.com"]
}
}
**Notebook工具:**
```pythonGet parameters
Get parameters
date_param = dbutils.widgets.get("date")
date_param = dbutils.widgets.get("date")
Exit notebook with value
Exit notebook with value
dbutils.notebook.exit("success")
dbutils.notebook.exit("success")
Run another notebook
Run another notebook
result = dbutils.notebook.run(
"/Shared/ProcessData",
timeout_seconds=600,
arguments={"date": "2024-01-15"}
)
result = dbutils.notebook.run(
"/Shared/ProcessData",
timeout_seconds=600,
arguments={"date": "2024-01-15"}
)
Access secrets
Access secrets
api_key = dbutils.secrets.get(scope="production", key="api_key")
api_key = dbutils.secrets.get(scope="production", key="api_key")
File system operations
File system operations
dbutils.fs.ls("/mnt/data")
dbutils.fs.cp("/mnt/source/file.csv", "/mnt/dest/file.csv")
dbutils.fs.rm("/mnt/data/temp", recurse=True)
undefineddbutils.fs.ls("/mnt/data")
dbutils.fs.cp("/mnt/source/file.csv", "/mnt/dest/file.csv")
dbutils.fs.rm("/mnt/data/temp", recurse=True)
undefinedUnity Catalog
Unity Catalog
Catalog and Schema Management:
python
undefined目录与Schema管理:
python
undefinedCreate catalog
Create catalog
spark.sql("CREATE CATALOG IF NOT EXISTS production")
spark.sql("CREATE CATALOG IF NOT EXISTS production")
Create schema
Create schema
spark.sql("""
CREATE SCHEMA IF NOT EXISTS production.sales
COMMENT 'Sales data'
LOCATION '/mnt/unity-catalog/sales'
""")
spark.sql("""
CREATE SCHEMA IF NOT EXISTS production.sales
COMMENT 'Sales data'
LOCATION '/mnt/unity-catalog/sales'
""")
Grant privileges
Grant privileges
spark.sql("GRANT USE CATALOG ON CATALOG production TO ")
spark.sql("GRANT ALL PRIVILEGES ON SCHEMA production.sales TO ")
spark.sql("GRANT SELECT ON TABLE production.sales.orders TO ")
data-engineersdata-engineersdata-analystsspark.sql("GRANT USE CATALOG ON CATALOG production TO ")
spark.sql("GRANT ALL PRIVILEGES ON SCHEMA production.sales TO ")
spark.sql("GRANT SELECT ON TABLE production.sales.orders TO ")
data-engineersdata-engineersdata-analystsThree-level namespace
Three-level namespace
spark.sql("SELECT * FROM production.sales.orders")
spark.sql("SELECT * FROM production.sales.orders")
External locations
External locations
spark.sql("""
CREATE EXTERNAL LOCATION my_s3_location
URL 's3://my-bucket/data/'
WITH (STORAGE CREDENTIAL my_aws_credential)
""")
spark.sql("""
CREATE EXTERNAL LOCATION my_s3_location
URL 's3://my-bucket/data/'
WITH (STORAGE CREDENTIAL my_aws_credential)
""")
Data lineage (automatic tracking)
Data lineage (automatic tracking)
spark.sql("SELECT * FROM production.sales.orders").show()
spark.sql("SELECT * FROM production.sales.orders").show()
View lineage in Unity Catalog UI
View lineage in Unity Catalog UI
undefinedundefinedBest Practices
最佳实践
1. Cluster Configuration
1. 集群配置
- Use job clusters for scheduled workflows (lower cost)
- Use instance pools for faster cluster startup
- Enable autoscaling with appropriate min/max workers
- Set autotermination to 15-30 minutes for interactive clusters
- Use Photon-enabled clusters for SQL workloads
- 针对定时工作流使用任务集群(成本更低)
- 使用实例池加快集群启动速度
- 启用自动扩缩容并设置合理的最小/最大工作节点数
- 交互式集群设置15-30分钟的自动终止时间
- SQL工作负载使用支持Photon的集群
2. Delta Lake Optimization
2. Delta Lake优化
- Enable auto-optimize for write and compaction
- Use Z-ordering for columns in filter predicates
- Partition large tables by date or high-cardinality columns
- Run VACUUM regularly but respect retention periods
- Use Change Data Feed for incremental processing
- 启用写入与合并的自动优化
- 对过滤条件中的列使用Z-ordering
- 按日期或高基数列对大表进行分区
- 定期运行VACUUM但需遵守保留期
- 使用变更数据捕获(CDC)进行增量处理
3. Performance Tuning
3. 性能调优
- Use broadcast joins for small dimension tables
- Enable adaptive query execution (AQE)
- Cache DataFrames that are reused multiple times
- Use partition pruning in queries
- Optimize shuffle operations with appropriate partition counts
- 对小维度表使用广播连接
- 启用自适应查询执行(AQE)
- 对重复使用的DataFrame进行缓存
- 查询中使用分区裁剪
- 调整分区数优化Shuffle操作
4. Cost Optimization
4. 成本优化
- Use Spot/Preemptible instances for fault-tolerant workloads
- Terminate idle clusters automatically
- Use table properties to enable auto-compaction
- Monitor cluster utilization metrics
- Use Delta caching for frequently accessed data
- 对容错性工作负载使用Spot/抢占式实例
- 自动终止空闲集群
- 使用表属性启用自动合并
- 监控集群利用率指标
- 对频繁访问的数据使用Delta缓存
5. Security and Governance
5. 安全与治理
- Use Unity Catalog for centralized governance
- Implement fine-grained access control
- Store secrets in Databricks secret scopes
- Enable audit logging
- Use service principals for production jobs
- 使用Unity Catalog进行集中式治理
- 实现细粒度访问控制
- 在Databricks密钥范围中存储密钥
- 启用审计日志
- 生产任务使用服务主体
Anti-Patterns
反模式
1. Collecting Large DataFrames
1. 收集大型DataFrame
python
undefinedpython
undefinedBad: Collect large dataset to driver
Bad: Collect large dataset to driver
large_df.collect() # OOM error
large_df.collect() # OOM error
Good: Use actions that stay distributed
Good: Use actions that stay distributed
large_df.write.format("delta").save("/mnt/output")
undefinedlarge_df.write.format("delta").save("/mnt/output")
undefined2. Not Using Delta Lake Optimization
2. 未使用Delta Lake优化
python
undefinedpython
undefinedBad: Many small files
Bad: Many small files
for file in files:
df = spark.read.json(file)
df.write.format("delta").mode("append").save("/mnt/table")
for file in files:
df = spark.read.json(file)
df.write.format("delta").mode("append").save("/mnt/table")
Good: Batch writes with optimization
Good: Batch writes with optimization
df = spark.read.json("/mnt/source/*")
df.write.format("delta")
.option("optimizeWrite", "true")
.mode("append")
.save("/mnt/table")
.option("optimizeWrite", "true")
.mode("append")
.save("/mnt/table")
undefineddf = spark.read.json("/mnt/source/*")
df.write.format("delta")
.option("optimizeWrite", "true")
.mode("append")
.save("/mnt/table")
.option("optimizeWrite", "true")
.mode("append")
.save("/mnt/table")
undefined3. Inefficient Joins
3. 低效连接
python
undefinedpython
undefinedBad: Join without broadcast hint
Bad: Join without broadcast hint
large_df.join(small_df, "key")
large_df.join(small_df, "key")
Good: Broadcast small table
Good: Broadcast small table
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "key")
undefinedfrom pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "key")
undefined4. Not Using Partitioning
4. 未使用分区
python
undefinedpython
undefinedBad: No partitioning on large table
Bad: No partitioning on large table
df.write.format("delta").save("/mnt/events")
df.write.format("delta").save("/mnt/events")
Good: Partition by date
Good: Partition by date
df.write.format("delta")
.partitionBy("date")
.save("/mnt/events")
.partitionBy("date")
.save("/mnt/events")
undefineddf.write.format("delta")
.partitionBy("date")
.save("/mnt/events")
.partitionBy("date")
.save("/mnt/events")
undefined