spark-optimization

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Apache Spark Optimization

Apache Spark 作业优化

Production patterns for optimizing Apache Spark jobs including partitioning strategies, memory management, shuffle optimization, and performance tuning.
本文介绍优化Apache Spark作业的生产级模式,包括分区策略、内存管理、Shuffle优化和性能调优。

When to Use This Skill

适用场景

  • Optimizing slow Spark jobs
  • Tuning memory and executor configuration
  • Implementing efficient partitioning strategies
  • Debugging Spark performance issues
  • Scaling Spark pipelines for large datasets
  • Reducing shuffle and data skew
  • 优化运行缓慢的Spark作业
  • 调优内存与执行器配置
  • 实现高效的分区策略
  • 调试Spark性能问题
  • 为大规模数据集扩展Spark处理管道
  • 减少Shuffle操作和数据倾斜

Core Concepts

核心概念

1. Spark Execution Model

1. Spark 执行模型

Driver Program
Job (triggered by action)
Stages (separated by shuffles)
Tasks (one per partition)
Driver Program
Job (triggered by action)
Stages (separated by shuffles)
Tasks (one per partition)

2. Key Performance Factors

2. 关键性能影响因素

FactorImpactSolution
ShuffleNetwork I/O, disk I/OMinimize wide transformations
Data SkewUneven task durationSalting, broadcast joins
SerializationCPU overheadUse Kryo, columnar formats
MemoryGC pressure, spillsTune executor memory
PartitionsParallelismRight-size partitions
因素影响解决方案
Shuffle网络I/O、磁盘I/O减少宽转换操作
数据倾斜任务执行时间不均加盐(Salting)、广播连接(Broadcast Join)
序列化CPU开销大使用Kryo序列化、列存格式
内存GC压力大、数据溢写磁盘调优执行器内存
分区并行度不足或过高合理设置分区大小

Quick Start

快速开始

python
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
python
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

Create optimized Spark session

创建优化后的Spark会话

spark = (SparkSession.builder .appName("OptimizedJob") .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.adaptive.coalescePartitions.enabled", "true") .config("spark.sql.adaptive.skewJoin.enabled", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.shuffle.partitions", "200") .getOrCreate())
spark = (SparkSession.builder .appName("OptimizedJob") .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.adaptive.coalescePartitions.enabled", "true") .config("spark.sql.adaptive.skewJoin.enabled", "true") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.shuffle.partitions", "200") .getOrCreate())

Read with optimized settings

使用优化配置读取数据

df = (spark.read .format("parquet") .option("mergeSchema", "false") .load("s3://bucket/data/"))
df = (spark.read .format("parquet") .option("mergeSchema", "false") .load("s3://bucket/data/"))

Efficient transformations

高效的数据转换

result = (df .filter(F.col("date") >= "2024-01-01") .select("id", "amount", "category") .groupBy("category") .agg(F.sum("amount").alias("total")))
result.write.mode("overwrite").parquet("s3://bucket/output/")
undefined
result = (df .filter(F.col("date") >= "2024-01-01") .select("id", "amount", "category") .groupBy("category") .agg(F.sum("amount").alias("total")))
result.write.mode("overwrite").parquet("s3://bucket/output/")
undefined

Patterns

优化模式

Pattern 1: Optimal Partitioning

模式1:最优分区策略

python
undefined
python
undefined

Calculate optimal partition count

计算最优分区数量

def calculate_partitions(data_size_gb: float, partition_size_mb: int = 128) -> int: """ Optimal partition size: 128MB - 256MB Too few: Under-utilization, memory pressure Too many: Task scheduling overhead """ return max(int(data_size_gb * 1024 / partition_size_mb), 1)
def calculate_partitions(data_size_gb: float, partition_size_mb: int = 128) -> int: """ 最佳分区大小:128MB - 256MB 分区过少:资源利用不足,内存压力大 分区过多:任务调度开销增加 """ return max(int(data_size_gb * 1024 / partition_size_mb), 1)

Repartition for even distribution

重分区以实现数据均匀分布

df_repartitioned = df.repartition(200, "partition_key")
df_repartitioned = df.repartition(200, "partition_key")

Coalesce to reduce partitions (no shuffle)

合并分区以减少数量(无Shuffle操作)

df_coalesced = df.coalesce(100)
df_coalesced = df.coalesce(100)

Partition pruning with predicate pushdown

谓词下推实现分区裁剪

df = (spark.read.parquet("s3://bucket/data/") .filter(F.col("date") == "2024-01-01")) # Spark pushes this down
df = (spark.read.parquet("s3://bucket/data/") .filter(F.col("date") == "2024-01-01")) # Spark会将过滤条件下推到存储层

Write with partitioning for future queries

按分区写入以优化后续查询

(df.write .partitionBy("year", "month", "day") .mode("overwrite") .parquet("s3://bucket/partitioned_output/"))
undefined
(df.write .partitionBy("year", "month", "day") .mode("overwrite") .parquet("s3://bucket/partitioned_output/"))
undefined

Pattern 2: Join Optimization

模式2:连接操作优化

python
from pyspark.sql import functions as F
from pyspark.sql.types import *
python
from pyspark.sql import functions as F
from pyspark.sql.types import *

1. Broadcast Join - Small table joins

1. 广播连接 - 小表连接场景

Best when: One side < 10MB (configurable)

最佳适用场景:其中一张表小于10MB(可配置)

small_df = spark.read.parquet("s3://bucket/small_table/") # < 10MB large_df = spark.read.parquet("s3://bucket/large_table/") # TBs
small_df = spark.read.parquet("s3://bucket/small_table/") # < 10MB large_df = spark.read.parquet("s3://bucket/large_table/") # TB级数据

Explicit broadcast hint

显式指定广播连接提示

result = large_df.join( F.broadcast(small_df), on="key", how="left" )
result = large_df.join( F.broadcast(small_df), on="key", how="left" )

2. Sort-Merge Join - Default for large tables

2. 排序合并连接 - 大表连接默认方式

Requires shuffle, but handles any size

需要Shuffle操作,但支持任意数据量

result = large_df1.join(large_df2, on="key", how="inner")
result = large_df1.join(large_df2, on="key", how="inner")

3. Bucket Join - Pre-sorted, no shuffle at join time

3. 分桶连接 - 预排序,连接时无Shuffle

Write bucketed tables

写入分桶表

(df.write .bucketBy(200, "customer_id") .sortBy("customer_id") .mode("overwrite") .saveAsTable("bucketed_orders"))
(df.write .bucketBy(200, "customer_id") .sortBy("customer_id") .mode("overwrite") .saveAsTable("bucketed_orders"))

Join bucketed tables (no shuffle!)

连接分桶表(无Shuffle操作!)

orders = spark.table("bucketed_orders") customers = spark.table("bucketed_customers") # Same bucket count result = orders.join(customers, on="customer_id")
orders = spark.table("bucketed_orders") customers = spark.table("bucketed_customers") # 分桶数相同 result = orders.join(customers, on="customer_id")

4. Skew Join Handling

4. 倾斜连接处理

Enable AQE skew join optimization

启用AQE倾斜连接优化

spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5") spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5") spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")

Manual salting for severe skew

手动加盐处理严重数据倾斜

def salt_join(df_skewed, df_other, key_col, num_salts=10): """Add salt to distribute skewed keys""" # Add salt to skewed side df_salted = df_skewed.withColumn( "salt", (F.rand() * num_salts).cast("int") ).withColumn( "salted_key", F.concat(F.col(key_col), F.lit("_"), F.col("salt")) )
# Explode other side with all salts
df_exploded = df_other.crossJoin(
    spark.range(num_salts).withColumnRenamed("id", "salt")
).withColumn(
    "salted_key",
    F.concat(F.col(key_col), F.lit("_"), F.col("salt"))
)

# Join on salted key
return df_salted.join(df_exploded, on="salted_key", how="inner")
undefined
def salt_join(df_skewed, df_other, key_col, num_salts=10): """添加盐值以分散倾斜的键""" # 为倾斜表添加盐值 df_salted = df_skewed.withColumn( "salt", (F.rand() * num_salts).cast("int") ).withColumn( "salted_key", F.concat(F.col(key_col), F.lit("_"), F.col("salt")) )
# 为另一张表展开所有盐值
df_exploded = df_other.crossJoin(
    spark.range(num_salts).withColumnRenamed("id", "salt")
).withColumn(
    "salted_key",
    F.concat(F.col(key_col), F.lit("_"), F.col("salt"))
)

# 按加盐后的键进行连接
return df_salted.join(df_exploded, on="salted_key", how="inner")
undefined

Pattern 3: Caching and Persistence

模式3:缓存与持久化

python
from pyspark import StorageLevel
python
from pyspark import StorageLevel

Cache when reusing DataFrame multiple times

当DataFrame被多次复用进行缓存

df = spark.read.parquet("s3://bucket/data/") df_filtered = df.filter(F.col("status") == "active")
df = spark.read.parquet("s3://bucket/data/") df_filtered = df.filter(F.col("status") == "active")

Cache in memory (MEMORY_AND_DISK is default)

缓存到内存(默认存储级别为MEMORY_AND_DISK)

df_filtered.cache()
df_filtered.cache()

Or with specific storage level

或者指定存储级别

df_filtered.persist(StorageLevel.MEMORY_AND_DISK_SER)
df_filtered.persist(StorageLevel.MEMORY_AND_DISK_SER)

Force materialization

强制物化缓存

df_filtered.count()
df_filtered.count()

Use in multiple actions

在多个动作中使用缓存后的DataFrame

agg1 = df_filtered.groupBy("category").count() agg2 = df_filtered.groupBy("region").sum("amount")
agg1 = df_filtered.groupBy("category").count() agg2 = df_filtered.groupBy("region").sum("amount")

Unpersist when done

使用完成后取消缓存

df_filtered.unpersist()
df_filtered.unpersist()

Storage levels explained:

存储级别说明:

MEMORY_ONLY - Fast, but may not fit

MEMORY_ONLY - 速度快,但可能无法容纳全部数据

MEMORY_AND_DISK - Spills to disk if needed (recommended)

MEMORY_AND_DISK - 内存不足时溢写到磁盘(推荐使用)

MEMORY_ONLY_SER - Serialized, less memory, more CPU

MEMORY_ONLY_SER - 序列化存储,占用内存少,CPU开销大

DISK_ONLY - When memory is tight

DISK_ONLY - 内存紧张时使用

OFF_HEAP - Tungsten off-heap memory

OFF_HEAP - 使用Tungsten堆外内存

Checkpoint for complex lineage

为复杂血缘关系设置检查点

spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/") df_complex = (df .join(other_df, "key") .groupBy("category") .agg(F.sum("amount"))) df_complex.checkpoint() # Breaks lineage, materializes
undefined
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/") df_complex = (df .join(other_df, "key") .groupBy("category") .agg(F.sum("amount"))) df_complex.checkpoint() # 切断血缘关系,物化数据
undefined

Pattern 4: Memory Tuning

模式4:内存调优

python
undefined
python
undefined

Executor memory configuration

执行器内存配置

spark-submit --executor-memory 8g --executor-cores 4

spark-submit --executor-memory 8g --executor-cores 4

Memory breakdown (8GB executor):

内存分配 breakdown(8GB执行器):

- spark.memory.fraction = 0.6 (60% = 4.8GB for execution + storage)

- spark.memory.fraction = 0.6(60% = 4.8GB 用于执行 + 存储)

- spark.memory.storageFraction = 0.5 (50% of 4.8GB = 2.4GB for cache)

- spark.memory.storageFraction = 0.5(4.8GB的50% = 2.4GB 用于缓存)

- Remaining 2.4GB for execution (shuffles, joins, sorts)

- 剩余2.4GB用于执行操作(Shuffle、连接、排序)

- 40% = 3.2GB for user data structures and internal metadata

- 40% = 3.2GB 用于用户数据结构和内部元数据

spark = (SparkSession.builder .config("spark.executor.memory", "8g") .config("spark.executor.memoryOverhead", "2g") # For non-JVM memory .config("spark.memory.fraction", "0.6") .config("spark.memory.storageFraction", "0.5") .config("spark.sql.shuffle.partitions", "200") # For memory-intensive operations .config("spark.sql.autoBroadcastJoinThreshold", "50MB") # Prevent OOM on large shuffles .config("spark.sql.files.maxPartitionBytes", "128MB") .getOrCreate())
spark = (SparkSession.builder .config("spark.executor.memory", "8g") .config("spark.executor.memoryOverhead", "2g") # 用于非JVM内存 .config("spark.memory.fraction", "0.6") .config("spark.memory.storageFraction", "0.5") .config("spark.sql.shuffle.partitions", "200") # 针对内存密集型操作 .config("spark.sql.autoBroadcastJoinThreshold", "50MB") # 避免大规模Shuffle时出现OOM .config("spark.sql.files.maxPartitionBytes", "128MB") .getOrCreate())

Monitor memory usage

监控内存使用情况

def print_memory_usage(spark): """Print current memory usage""" sc = spark.sparkContext for executor in sc._jsc.sc().getExecutorMemoryStatus().keySet().toArray(): mem_status = sc._jsc.sc().getExecutorMemoryStatus().get(executor) total = mem_status._1() / (10243) free = mem_status._2() / (10243) print(f"{executor}: {total:.2f}GB total, {free:.2f}GB free")
undefined
def print_memory_usage(spark): """打印当前内存使用情况""" sc = spark.sparkContext for executor in sc._jsc.sc().getExecutorMemoryStatus().keySet().toArray(): mem_status = sc._jsc.sc().getExecutorMemoryStatus().get(executor) total = mem_status._1() / (10243) free = mem_status._2() / (10243) print(f"{executor}: 总内存 {total:.2f}GB, 剩余内存 {free:.2f}GB")
undefined

Pattern 5: Shuffle Optimization

模式5:Shuffle优化

python
undefined
python
undefined

Reduce shuffle data size

减少Shuffle数据量

spark.conf.set("spark.sql.shuffle.partitions", "auto") # With AQE spark.conf.set("spark.shuffle.compress", "true") spark.conf.set("spark.shuffle.spill.compress", "true")
spark.conf.set("spark.sql.shuffle.partitions", "auto") # 配合AQE使用 spark.conf.set("spark.shuffle.compress", "true") spark.conf.set("spark.shuffle.spill.compress", "true")

Pre-aggregate before shuffle

Shuffle前先做预聚合

df_optimized = (df # Local aggregation first (combiner) .groupBy("key", "partition_col") .agg(F.sum("value").alias("partial_sum")) # Then global aggregation .groupBy("key") .agg(F.sum("partial_sum").alias("total")))
df_optimized = (df # 先做本地聚合(Combiner) .groupBy("key", "partition_col") .agg(F.sum("value").alias("partial_sum")) # 再做全局聚合 .groupBy("key") .agg(F.sum("partial_sum").alias("total")))

Avoid shuffle with map-side operations

使用Map端操作避免Shuffle

BAD: Shuffle for each distinct

不推荐:每个distinct操作都会触发Shuffle

distinct_count = df.select("category").distinct().count()
distinct_count = df.select("category").distinct().count()

GOOD: Approximate distinct (no shuffle)

推荐:近似去重(无Shuffle操作)

approx_count = df.select(F.approx_count_distinct("category")).collect()[0][0]
approx_count = df.select(F.approx_count_distinct("category")).collect()[0][0]

Use coalesce instead of repartition when reducing partitions

减少分区时使用coalesce而非repartition

df_reduced = df.coalesce(10) # No shuffle
df_reduced = df.coalesce(10) # 无Shuffle操作

Optimize shuffle with compression

使用压缩优化Shuffle

spark.conf.set("spark.io.compression.codec", "lz4") # Fast compression
undefined
spark.conf.set("spark.io.compression.codec", "lz4") # 快速压缩算法
undefined

Pattern 6: Data Format Optimization

模式6:数据格式优化

python
undefined
python
undefined

Parquet optimizations

Parquet格式优化

(df.write .option("compression", "snappy") # Fast compression .option("parquet.block.size", 128 * 1024 * 1024) # 128MB row groups .parquet("s3://bucket/output/"))
(df.write .option("compression", "snappy") # 快速压缩算法 .option("parquet.block.size", 128 * 1024 * 1024) # 128MB行组 .parquet("s3://bucket/output/"))

Column pruning - only read needed columns

列裁剪 - 仅读取需要的列

df = (spark.read.parquet("s3://bucket/data/") .select("id", "amount", "date")) # Spark only reads these columns
df = (spark.read.parquet("s3://bucket/data/") .select("id", "amount", "date")) # Spark仅读取这些列

Predicate pushdown - filter at storage level

谓词下推 - 在存储层进行过滤

df = (spark.read.parquet("s3://bucket/partitioned/year=2024/") .filter(F.col("status") == "active")) # Pushed to Parquet reader
df = (spark.read.parquet("s3://bucket/partitioned/year=2024/") .filter(F.col("status") == "active")) # 过滤条件被下推到Parquet读取器

Delta Lake optimizations

Delta Lake 优化

(df.write .format("delta") .option("optimizeWrite", "true") # Bin-packing .option("autoCompact", "true") # Compact small files .mode("overwrite") .save("s3://bucket/delta_table/"))
(df.write .format("delta") .option("optimizeWrite", "true") # 装箱优化 .option("autoCompact", "true") # 自动合并小文件 .mode("overwrite") .save("s3://bucket/delta_table/"))

Z-ordering for multi-dimensional queries

Z-ordering 优化多维查询

spark.sql(""" OPTIMIZE delta.
s3://bucket/delta_table/
ZORDER BY (customer_id, date) """)
undefined
spark.sql(""" OPTIMIZE delta.
s3://bucket/delta_table/
ZORDER BY (customer_id, date) """)
undefined

Pattern 7: Monitoring and Debugging

模式7:监控与调试

python
undefined
python
undefined

Enable detailed metrics

启用详细指标

spark.conf.set("spark.sql.codegen.wholeStage", "true") spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.codegen.wholeStage", "true") spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

Explain query plan

查看查询计划

df.explain(mode="extended")
df.explain(mode="extended")

Modes: simple, extended, codegen, cost, formatted

模式:simple、extended、codegen、cost、formatted

Get physical plan statistics

获取物理计划统计信息

df.explain(mode="cost")
df.explain(mode="cost")

Monitor task metrics

监控任务指标

def analyze_stage_metrics(spark): """Analyze recent stage metrics""" status_tracker = spark.sparkContext.statusTracker()
for stage_id in status_tracker.getActiveStageIds():
    stage_info = status_tracker.getStageInfo(stage_id)
    print(f"Stage {stage_id}:")
    print(f"  Tasks: {stage_info.numTasks}")
    print(f"  Completed: {stage_info.numCompletedTasks}")
    print(f"  Failed: {stage_info.numFailedTasks}")
def analyze_stage_metrics(spark): """分析最近的Stage指标""" status_tracker = spark.sparkContext.statusTracker()
for stage_id in status_tracker.getActiveStageIds():
    stage_info = status_tracker.getStageInfo(stage_id)
    print(f"Stage {stage_id}:")
    print(f"  总任务数: {stage_info.numTasks}")
    print(f"  已完成任务数: {stage_info.numCompletedTasks}")
    print(f"  失败任务数: {stage_info.numFailedTasks}")

Identify data skew

识别数据倾斜

def check_partition_skew(df): """Check for partition skew""" partition_counts = (df .withColumn("partition_id", F.spark_partition_id()) .groupBy("partition_id") .count() .orderBy(F.desc("count")))
partition_counts.show(20)

stats = partition_counts.select(
    F.min("count").alias("min"),
    F.max("count").alias("max"),
    F.avg("count").alias("avg"),
    F.stddev("count").alias("stddev")
).collect()[0]

skew_ratio = stats["max"] / stats["avg"]
print(f"Skew ratio: {skew_ratio:.2f}x (>2x indicates skew)")
undefined
def check_partition_skew(df): """检查分区倾斜""" partition_counts = (df .withColumn("partition_id", F.spark_partition_id()) .groupBy("partition_id") .count() .orderBy(F.desc("count")))
partition_counts.show(20)

stats = partition_counts.select(
    F.min("count").alias("min"),
    F.max("count").alias("max"),
    F.avg("count").alias("avg"),
    F.stddev("count").alias("stddev")
).collect()[0]

skew_ratio = stats["max"] / stats["avg"]
print(f"倾斜比率: {skew_ratio:.2f}倍(大于2倍表示存在倾斜)")
undefined

Configuration Cheat Sheet

配置速查表

python
undefined
python
undefined

Production configuration template

生产环境配置模板

spark_configs = { # Adaptive Query Execution (AQE) "spark.sql.adaptive.enabled": "true", "spark.sql.adaptive.coalescePartitions.enabled": "true", "spark.sql.adaptive.skewJoin.enabled": "true",
# Memory
"spark.executor.memory": "8g",
"spark.executor.memoryOverhead": "2g",
"spark.memory.fraction": "0.6",
"spark.memory.storageFraction": "0.5",

# Parallelism
"spark.sql.shuffle.partitions": "200",
"spark.default.parallelism": "200",

# Serialization
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.execution.arrow.pyspark.enabled": "true",

# Compression
"spark.io.compression.codec": "lz4",
"spark.shuffle.compress": "true",

# Broadcast
"spark.sql.autoBroadcastJoinThreshold": "50MB",

# File handling
"spark.sql.files.maxPartitionBytes": "128MB",
"spark.sql.files.openCostInBytes": "4MB",
}
undefined
spark_configs = { # 自适应查询执行(AQE) "spark.sql.adaptive.enabled": "true", "spark.sql.adaptive.coalescePartitions.enabled": "true", "spark.sql.adaptive.skewJoin.enabled": "true",
# 内存配置
"spark.executor.memory": "8g",
"spark.executor.memoryOverhead": "2g",
"spark.memory.fraction": "0.6",
"spark.memory.storageFraction": "0.5",

# 并行度配置
"spark.sql.shuffle.partitions": "200",
"spark.default.parallelism": "200",

# 序列化配置
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.execution.arrow.pyspark.enabled": "true",

# 压缩配置
"spark.io.compression.codec": "lz4",
"spark.shuffle.compress": "true",

# 广播配置
"spark.sql.autoBroadcastJoinThreshold": "50MB",

# 文件处理配置
"spark.sql.files.maxPartitionBytes": "128MB",
"spark.sql.files.openCostInBytes": "4MB",
}
undefined

Best Practices

最佳实践

Do's

推荐做法

  • Enable AQE - Adaptive query execution handles many issues
  • Use Parquet/Delta - Columnar formats with compression
  • Broadcast small tables - Avoid shuffle for small joins
  • Monitor Spark UI - Check for skew, spills, GC
  • Right-size partitions - 128MB - 256MB per partition
  • 启用AQE - 自适应查询执行可自动处理诸多问题
  • 使用Parquet/Delta格式 - 带压缩的列存格式
  • 广播小表 - 避免小表连接时的Shuffle操作
  • 监控Spark UI - 检查数据倾斜、数据溢写和GC情况
  • 合理设置分区大小 - 每个分区128MB - 256MB

Don'ts

不推荐做法

  • Don't collect large data - Keep data distributed
  • Don't use UDFs unnecessarily - Use built-in functions
  • Don't over-cache - Memory is limited
  • Don't ignore data skew - It dominates job time
  • Don't use
    .count()
    for existence
    - Use
    .take(1)
    or
    .isEmpty()
  • 不要收集大规模数据 - 保持数据分布式存储
  • 不要不必要地使用UDF - 使用内置函数
  • 不要过度缓存 - 内存资源有限
  • 不要忽略数据倾斜 - 它会主导作业执行时间
  • 不要用
    .count()
    检查存在性
    - 使用
    .take(1)
    .isEmpty()

Resources

参考资源