apache-spark-data-processing

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Apache Spark Data Processing

Apache Spark数据处理

A comprehensive skill for mastering Apache Spark data processing, from basic RDD operations to advanced streaming, SQL, and machine learning workflows. Learn to build scalable, distributed data pipelines and analytics systems.
这是一份掌握Apache Spark数据处理的全面指南,涵盖从基础RDD操作到高级流处理、SQL及机器学习工作流的内容。你将学习如何构建可扩展的分布式数据管道与分析系统。

When to Use This Skill

何时使用该技能

Use Apache Spark when you need to:
  • Process Large-Scale Data: Handle datasets too large for single-machine processing (TB to PB scale)
  • Perform Distributed Computing: Execute parallel computations across cluster nodes
  • Real-Time Stream Processing: Process continuous data streams with low latency
  • Complex Data Analytics: Run sophisticated analytics, aggregations, and transformations
  • Machine Learning at Scale: Train ML models on massive datasets
  • ETL/ELT Pipelines: Build robust data transformation and loading workflows
  • Interactive Data Analysis: Perform exploratory analysis on large datasets
  • Unified Data Processing: Combine batch and streaming workloads in one framework
Not Ideal For:
  • Small datasets (<100 GB) that fit in memory on a single machine
  • Simple CRUD operations (use traditional databases)
  • Ultra-low latency requirements (<10ms) where specialized stream processors excel
  • Workflows requiring strong ACID transactions across distributed data
在以下场景中使用Apache Spark:
  • 处理大规模数据:处理单机器无法承载的数据集(TB到PB级)
  • 执行分布式计算:在集群节点间并行执行计算
  • 实时流处理:以低延迟处理持续的数据流
  • 复杂数据分析:运行复杂的分析、聚合与转换操作
  • 规模化机器学习:在海量数据集上训练ML模型
  • ETL/ELT管道:构建可靠的数据转换与加载工作流
  • 交互式数据分析:对大型数据集进行探索性分析
  • 统一数据处理:在单一框架中结合批处理与流处理工作负载
不适用场景:
  • 单机器内存可容纳的小型数据集(<100 GB)
  • 简单CRUD操作(使用传统数据库)
  • 超低延迟需求(<10ms,专用流处理器更擅长)
  • 需要跨分布式数据强ACID事务的工作流

Core Concepts

核心概念

Resilient Distributed Datasets (RDDs)

弹性分布式数据集(RDDs)

RDDs are Spark's fundamental data abstraction - immutable, distributed collections of objects that can be processed in parallel.
Key Characteristics:
  • Resilient: Fault-tolerant through lineage tracking
  • Distributed: Partitioned across cluster nodes
  • Immutable: Transformations create new RDDs, not modify existing ones
  • Lazy Evaluation: Transformations build computation graph; actions trigger execution
  • In-Memory Computing: Cache intermediate results for iterative algorithms
RDD Operations:
  • Transformations: Lazy operations that return new RDDs (map, filter, flatMap, reduceByKey)
  • Actions: Operations that trigger computation and return values (collect, count, reduce, saveAsTextFile)
When to Use RDDs:
  • Low-level control over data distribution and partitioning
  • Custom partitioning schemes required
  • Working with unstructured data (text files, binary data)
  • Migrating legacy code from early Spark versions
Prefer DataFrames/Datasets when possible - they provide automatic optimization via Catalyst optimizer.
RDD是Spark的基础数据抽象——不可变的分布式对象集合,可并行处理。
关键特性:
  • 弹性:通过 lineage 追踪实现容错
  • 分布式:在集群节点间分区存储
  • 不可变:转换操作生成新RDD,不修改现有RDD
  • 惰性计算:转换操作构建计算图;行动操作触发执行
  • 内存计算:缓存中间结果以优化迭代算法
RDD操作:
  • 转换:惰性操作,返回新RDD(map、filter、flatMap、reduceByKey)
  • 行动:触发计算并返回结果的操作(collect、count、reduce、saveAsTextFile)
何时使用RDD:
  • 需要对数据分布与分区进行底层控制
  • 需要自定义分区策略
  • 处理非结构化数据(文本文件、二进制数据)
  • 迁移早期Spark版本的遗留代码
尽可能优先使用DataFrame/Dataset——它们通过Catalyst优化器提供自动优化。

DataFrames and Datasets

DataFrame与Dataset

DataFrames are distributed collections of data organized into named columns - similar to a database table or pandas DataFrame, but with powerful optimizations.
DataFrames:
  • Structured data with schema
  • Automatic query optimization (Catalyst)
  • Cross-language support (Python, Scala, Java, R)
  • Rich API for SQL-like operations
Datasets (Scala/Java only):
  • Typed DataFrames with compile-time type safety
  • Best performance in Scala due to JVM optimization
  • Combine RDD type safety with DataFrame optimizations
Key Advantages Over RDDs:
  • Query Optimization: Catalyst optimizer rewrites queries for efficiency
  • Tungsten Execution: Optimized CPU and memory usage
  • Columnar Storage: Efficient data representation
  • Code Generation: Compile-time bytecode generation for faster execution
DataFrame是按命名列组织的分布式数据集合——类似数据库表或pandas DataFrame,但具备强大的优化能力。
DataFrame:
  • 带Schema的结构化数据
  • 自动查询优化(Catalyst)
  • 跨语言支持(Python、Scala、Java、R)
  • 丰富的类SQL操作API
Dataset(仅Scala/Java):
  • 带编译时类型安全的强类型DataFrame
  • 在Scala中因JVM优化性能最佳
  • 结合RDD的类型安全与DataFrame的优化能力
相比RDD的核心优势:
  • 查询优化:Catalyst优化器重写查询以提升效率
  • Tungsten执行引擎:优化CPU与内存使用
  • 列存储:高效的数据表示方式
  • 代码生成:编译时字节码生成以加快执行速度

Lazy Evaluation

惰性计算

Spark uses lazy evaluation to optimize execution:
  1. Transformations build a Directed Acyclic Graph (DAG) of operations
  2. Actions trigger execution of the DAG
  3. Spark's optimizer analyzes the entire DAG and creates an optimized execution plan
  4. Work is distributed across cluster nodes
Benefits:
  • Minimize data movement across network
  • Combine multiple operations into single stage
  • Eliminate unnecessary computations
  • Optimize memory usage
Spark使用惰性计算来优化执行:
  1. 转换操作构建操作的有向无环图(DAG)
  2. 行动操作触发DAG的执行
  3. Spark的优化器分析整个DAG并创建优化的执行计划
  4. 工作负载在集群节点间分布式执行
优势:
  • 最小化跨网络的数据移动
  • 将多个操作合并为单个阶段
  • 消除不必要的计算
  • 优化内存使用

Partitioning

分区

Data is divided into partitions for parallel processing:
  • Default Partitioning: Typically based on HDFS block size or input source
  • Hash Partitioning: Distribute data by key hash (used by groupByKey, reduceByKey)
  • Range Partitioning: Distribute data by key ranges (useful for sorted data)
  • Custom Partitioning: Define your own partitioning logic
Partition Count Considerations:
  • Too few partitions: Underutilized cluster, large task execution time
  • Too many partitions: Scheduling overhead, small task execution time
  • General rule: 2-4 partitions per CPU core in cluster
  • Use
    repartition()
    or
    coalesce()
    to adjust partition count
数据被划分为分区以实现并行处理:
  • 默认分区:通常基于HDFS块大小或输入源
  • 哈希分区:按Key的哈希值分布数据(groupByKey、reduceByKey使用)
  • 范围分区:按Key的范围分布数据(适用于排序后的数据)
  • 自定义分区:定义自己的分区逻辑
分区数考量:
  • 分区数过少:集群资源未充分利用,任务执行时间长
  • 分区数过多:调度开销大,单个任务执行时间短
  • 通用规则:集群中每个CPU核心对应2-4个分区
  • 使用
    repartition()
    coalesce()
    调整分区数

Caching and Persistence

缓存与持久化

Cache frequently accessed data in memory for performance:
python
undefined
将频繁访问的数据缓存到内存以提升性能:
python
undefined

Cache DataFrame in memory

将DataFrame缓存到内存

df.cache() # Shorthand for persist(StorageLevel.MEMORY_AND_DISK)
df.cache() # persist(StorageLevel.MEMORY_AND_DISK)的简写

Different storage levels

不同的存储级别

df.persist(StorageLevel.MEMORY_ONLY) # Fast but may lose data if evicted df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk if memory full df.persist(StorageLevel.DISK_ONLY) # Store only on disk df.persist(StorageLevel.MEMORY_ONLY_SER) # Serialized in memory (more compact)
df.persist(StorageLevel.MEMORY_ONLY) # 速度快但数据被驱逐时可能丢失 df.persist(StorageLevel.MEMORY_AND_DISK) # 内存不足时溢出到磁盘 df.persist(StorageLevel.DISK_ONLY) # 仅存储在磁盘 df.persist(StorageLevel.MEMORY_ONLY_SER) # 在内存中序列化(更紧凑)

Unpersist when done

使用完成后取消持久化

df.unpersist()

**When to Cache:**
- Data used multiple times in workflow
- Iterative algorithms (ML training)
- Interactive analysis sessions
- Expensive transformations reused downstream

**When Not to Cache:**
- Data used only once
- Very large datasets that exceed cluster memory
- Streaming applications with continuous new data
df.unpersist()

**何时缓存:**
- 工作流中多次使用的数据
- 迭代算法(ML训练)
- 交互式分析会话
- 下游重复使用的开销大的转换操作

**何时不缓存:**
- 仅使用一次的数据
- 超出集群内存的超大型数据集
- 持续产生新数据的流处理应用

Spark SQL

Spark SQL

Spark SQL allows you to query structured data using SQL or DataFrame API:
  • Execute SQL queries on DataFrames and tables
  • Register DataFrames as temporary views
  • Join structured and semi-structured data
  • Connect to Hive metastore for table metadata
  • Support for various data sources (Parquet, ORC, JSON, CSV, JDBC)
Performance Features:
  • Catalyst Optimizer: Rule-based and cost-based query optimization
  • Tungsten Execution Engine: Whole-stage code generation, vectorized processing
  • Adaptive Query Execution (AQE): Runtime optimization based on statistics
  • Dynamic Partition Pruning: Skip irrelevant partitions during execution
Spark SQL允许你使用SQL或DataFrame API查询结构化数据:
  • 在DataFrame与表上执行SQL查询
  • 将DataFrame注册为临时视图
  • 连接结构化与半结构化数据
  • 连接Hive元数据存储以获取表元数据
  • 支持多种数据源(Parquet、ORC、JSON、CSV、JDBC)
性能特性:
  • Catalyst优化器:基于规则与成本的查询优化
  • Tungsten执行引擎:全阶段代码生成、向量化处理
  • 自适应查询执行(AQE):基于统计信息的运行时优化
  • 动态分区修剪:执行时跳过无关分区

Broadcast Variables and Accumulators

广播变量与累加器

Shared variables for efficient distributed computing:
Broadcast Variables:
  • Read-only variables cached on each node
  • Efficient for sharing large read-only data (lookup tables, ML models)
  • Avoid sending large data with every task
python
undefined
用于高效分布式计算的共享变量:
广播变量:
  • 每个节点上缓存的只读变量
  • 高效共享大型只读数据(查找表、ML模型)
  • 避免随每个任务发送大型数据
python
undefined

Broadcast a lookup table

广播一个查找表

lookup_table = {"key1": "value1", "key2": "value2"} broadcast_lookup = sc.broadcast(lookup_table)
lookup_table = {"key1": "value1", "key2": "value2"} broadcast_lookup = sc.broadcast(lookup_table)

Use in transformations

在转换操作中使用

rdd.map(lambda x: broadcast_lookup.value.get(x, "default"))

**Accumulators:**
- Write-only variables for aggregating values across tasks
- Used for counters and sums in distributed operations
- Only driver can read final accumulated value

```python
rdd.map(lambda x: broadcast_lookup.value.get(x, "default"))

**累加器:**
- 用于跨任务聚合值的只写变量
- 用于分布式操作中的计数器与求和
- 仅Driver可以读取最终的累加值

```python

Create accumulator

创建累加器

error_count = sc.accumulator(0)
error_count = sc.accumulator(0)

Increment in tasks

在任务中递增

rdd.foreach(lambda x: error_count.add(1) if is_error(x) else None)
rdd.foreach(lambda x: error_count.add(1) if is_error(x) else None)

Read final value in driver

在Driver中读取最终值

print(f"Total errors: {error_count.value}")
undefined
print(f"Total errors: {error_count.value}")
undefined

Spark SQL Deep Dive

Spark SQL深入解析

DataFrame Creation

DataFrame创建

Create DataFrames from various sources:
python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()
从多种数据源创建DataFrame:
python
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()

From structured data

从结构化数据创建

data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)] columns = ["name", "id"] df = spark.createDataFrame(data, columns)
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)] columns = ["name", "id"] df = spark.createDataFrame(data, columns)

From files

从文件创建

df_json = spark.read.json("path/to/file.json") df_parquet = spark.read.parquet("path/to/file.parquet") df_csv = spark.read.option("header", "true").csv("path/to/file.csv")
df_json = spark.read.json("path/to/file.json") df_parquet = spark.read.parquet("path/to/file.parquet") df_csv = spark.read.option("header", "true").csv("path/to/file.csv")

From JDBC sources

从JDBC数据源创建

df_jdbc = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://host:port/database")
.option("dbtable", "table_name")
.option("user", "username")
.option("password", "password")
.load()
undefined
df_jdbc = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://host:port/database")
.option("dbtable", "table_name")
.option("user", "username")
.option("password", "password")
.load()
undefined

DataFrame Operations

DataFrame操作

Common DataFrame transformations:
python
undefined
常见的DataFrame转换操作:
python
undefined

Select columns

选择列

df.select("name", "age").show()
df.select("name", "age").show()

Filter rows

过滤行

df.filter(df.age > 21).show() df.where(df["age"] > 21).show() # Alternative syntax
df.filter(df.age > 21).show() df.where(df["age"] > 21).show() # 替代语法

Add/modify columns

添加/修改列

from pyspark.sql.functions import col, lit df.withColumn("age_plus_10", col("age") + 10).show() df.withColumn("country", lit("USA")).show()
from pyspark.sql.functions import col, lit df.withColumn("age_plus_10", col("age") + 10).show() df.withColumn("country", lit("USA")).show()

Aggregations

聚合操作

df.groupBy("department").count().show() df.groupBy("department").agg({"salary": "avg", "age": "max"}).show()
df.groupBy("department").count().show() df.groupBy("department").agg({"salary": "avg", "age": "max"}).show()

Sorting

排序

df.orderBy("age").show() df.orderBy(col("age").desc()).show()
df.orderBy("age").show() df.orderBy(col("age").desc()).show()

Joins

连接

df1.join(df2, df1.id == df2.user_id, "inner").show() df1.join(df2, "id", "left_outer").show()
df1.join(df2, df1.id == df2.user_id, "inner").show() df1.join(df2, "id", "left_outer").show()

Unions

合并

df1.union(df2).show()
undefined
df1.union(df2).show()
undefined

SQL Queries

SQL查询

Execute SQL on DataFrames:
python
undefined
在DataFrame上执行SQL查询:
python
undefined

Register DataFrame as temporary view

将DataFrame注册为临时视图

df.createOrReplaceTempView("people")
df.createOrReplaceTempView("people")

Run SQL queries

运行SQL查询

sql_result = spark.sql("SELECT name FROM people WHERE age > 21") sql_result.show()
sql_result = spark.sql("SELECT name FROM people WHERE age > 21") sql_result.show()

Complex queries

复杂查询

result = spark.sql(""" SELECT department, COUNT() as employee_count, AVG(salary) as avg_salary, MAX(age) as max_age FROM people WHERE age > 25 GROUP BY department HAVING COUNT() > 5 ORDER BY avg_salary DESC """) result.show()
undefined
result = spark.sql(""" SELECT department, COUNT() as employee_count, AVG(salary) as avg_salary, MAX(age) as max_age FROM people WHERE age > 25 GROUP BY department HAVING COUNT() > 5 ORDER BY avg_salary DESC """) result.show()
undefined

Data Sources

数据源

Spark SQL supports multiple data formats:
Parquet (Recommended for Analytics):
  • Columnar storage format
  • Excellent compression and query performance
  • Schema embedded in file
  • Supports predicate pushdown and column pruning
python
undefined
Spark SQL支持多种数据格式:
Parquet(分析场景推荐):
  • 列存储格式
  • 出色的压缩与查询性能
  • Schema嵌入文件中
  • 支持谓词下推与列修剪
python
undefined

Write

写入

df.write.parquet("output/path", mode="overwrite", compression="snappy")
df.write.parquet("output/path", mode="overwrite", compression="snappy")

Read with partition pruning

读取并进行分区修剪

df = spark.read.parquet("output/path").filter(col("date") == "2025-01-01")

**ORC** (Optimized Row Columnar):
- Similar to Parquet with slightly better compression
- Preferred for Hive integration
- Built-in indexes for faster queries

```python
df.write.orc("output/path", mode="overwrite")
df = spark.read.orc("output/path")
JSON (Semi-Structured Data):
  • Human-readable but less efficient
  • Schema inference on read
  • Good for nested/complex data
python
undefined
df = spark.read.parquet("output/path").filter(col("date") == "2025-01-01")

**ORC(优化行列存储):**
- 与Parquet类似,压缩率略高
- 与Hive集成时优先选择
- 内置索引以加快查询速度

```python
df.write.orc("output/path", mode="overwrite")
df = spark.read.orc("output/path")
JSON(半结构化数据):
  • 人类可读但效率较低
  • 读取时自动推断Schema
  • 适合嵌套/复杂数据
python
undefined

Read with explicit schema

使用显式Schema读取

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True) ])
df = spark.read.schema(schema).json("data.json")

**CSV** (Legacy/Simple Data):
- Widely compatible but slow
- Requires header inference or explicit schema
- Minimal compression benefits

```python
df.write.csv("output.csv", header=True, mode="overwrite")
df = spark.read.option("header", "true").option("inferSchema", "true").csv("data.csv")
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True) ])
df = spark.read.schema(schema).json("data.json")

**CSV(遗留/简单数据):**
- 兼容性广但速度慢
- 需要推断表头或指定显式Schema
- 压缩收益极小

```python
df.write.csv("output.csv", header=True, mode="overwrite")
df = spark.read.option("header", "true").option("inferSchema", "true").csv("data.csv")

Window Functions

窗口函数

Advanced analytics with window functions:
python
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, sum, avg
使用窗口函数进行高级分析:
python
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, sum, avg

Define window specification

定义窗口规范

window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())

Ranking functions

排名函数

df.withColumn("rank", rank().over(window_spec)).show() df.withColumn("row_num", row_number().over(window_spec)).show() df.withColumn("dense_rank", dense_rank().over(window_spec)).show()
df.withColumn("rank", rank().over(window_spec)).show() df.withColumn("row_num", row_number().over(window_spec)).show() df.withColumn("dense_rank", dense_rank().over(window_spec)).show()

Aggregate functions over window

窗口上的聚合函数

df.withColumn("dept_avg_salary", avg("salary").over(window_spec)).show() df.withColumn("running_total", sum("salary").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show()
df.withColumn("dept_avg_salary", avg("salary").over(window_spec)).show() df.withColumn("running_total", sum("salary").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show()

Offset functions

偏移函数

df.withColumn("prev_salary", lag("salary", 1).over(window_spec)).show() df.withColumn("next_salary", lead("salary", 1).over(window_spec)).show()
undefined
df.withColumn("prev_salary", lag("salary", 1).over(window_spec)).show() df.withColumn("next_salary", lead("salary", 1).over(window_spec)).show()
undefined

User-Defined Functions (UDFs)

用户自定义函数(UDF)

Create custom transformations:
python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
创建自定义转换操作:
python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

Python UDF (slower due to serialization overhead)

Python UDF(因序列化开销速度较慢)

def categorize_age(age): if age < 18: return "Minor" elif age < 65: return "Adult" else: return "Senior"
categorize_udf = udf(categorize_age, StringType()) df.withColumn("age_category", categorize_udf(col("age"))).show()
def categorize_age(age): if age < 18: return "Minor" elif age < 65: return "Adult" else: return "Senior"
categorize_udf = udf(categorize_age, StringType()) df.withColumn("age_category", categorize_udf(col("age"))).show()

Pandas UDF (vectorized, faster for large datasets)

Pandas UDF(向量化,大型数据集速度更快)

from pyspark.sql.functions import pandas_udf import pandas as pd
@pandas_udf(IntegerType()) def square(series: pd.Series) -> pd.Series: return series ** 2
df.withColumn("age_squared", square(col("age"))).show()

**UDF Performance Tips:**
- Use built-in Spark functions when possible (always faster)
- Prefer Pandas UDFs over Python UDFs for better performance
- Use Scala UDFs for maximum performance (no serialization overhead)
- Cache DataFrames before applying UDFs if used multiple times
from pyspark.sql.functions import pandas_udf import pandas as pd
@pandas_udf(IntegerType()) def square(series: pd.Series) -> pd.Series: return series ** 2
df.withColumn("age_squared", square(col("age"))).show()

**UDF性能提示:**
- 尽可能使用Spark内置函数(速度始终更快)
- 优先使用Pandas UDF而非Python UDF以获得更好性能
- 使用Scala UDF以获得最佳性能(无序列化开销)
- 若多次使用UDF,在应用前缓存DataFrame

Transformations and Actions

转换与行动操作

Common Transformations

常见转换操作

map: Apply function to each element
python
undefined
map: 对每个元素应用函数
python
undefined

RDD

RDD

rdd = sc.parallelize([1, 2, 3, 4, 5]) squared = rdd.map(lambda x: x * 2) # [2, 4, 6, 8, 10]
rdd = sc.parallelize([1, 2, 3, 4, 5]) squared = rdd.map(lambda x: x * 2) # [2, 4, 6, 8, 10]

DataFrame (use select with functions)

DataFrame(使用select结合函数)

from pyspark.sql.functions import col df.select(col("value") * 2).show()

**filter**: Select elements matching predicate

```python
from pyspark.sql.functions import col df.select(col("value") * 2).show()

**filter:** 选择符合条件的元素

```python

RDD

RDD

rdd.filter(lambda x: x > 2).collect() # [3, 4, 5]
rdd.filter(lambda x: x > 2).collect() # [3, 4, 5]

DataFrame

DataFrame

df.filter(col("age") > 25).show()

**flatMap**: Map and flatten results

```python
df.filter(col("age") > 25).show()

**flatMap:** 映射并展平结果

```python

RDD - Split text into words

RDD - 将文本拆分为单词

lines = sc.parallelize(["hello world", "apache spark"]) words = lines.flatMap(lambda line: line.split(" ")) # ["hello", "world", "apache", "spark"]

**reduceByKey**: Aggregate values by key

```python
lines = sc.parallelize(["hello world", "apache spark"]) words = lines.flatMap(lambda line: line.split(" ")) # ["hello", "world", "apache", "spark"]

**reduceByKey:** 按Key聚合值

```python

Word count example

词频统计示例

words = sc.parallelize(["apple", "banana", "apple", "cherry", "banana", "apple"]) word_pairs = words.map(lambda word: (word, 1)) word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
words = sc.parallelize(["apple", "banana", "apple", "cherry", "banana", "apple"]) word_pairs = words.map(lambda word: (word, 1)) word_counts = word_pairs.reduceByKey(lambda a, b: a + b)

Result: [("apple", 3), ("banana", 2), ("cherry", 1)]

结果: [("apple", 3), ("banana", 2), ("cherry", 1)]


**groupByKey**: Group values by key (avoid when possible - use reduceByKey instead)

```python

**groupByKey:** 按Key分组值(尽可能避免 - 优先使用reduceByKey)

```python

Less efficient than reduceByKey

效率低于reduceByKey

word_pairs.groupByKey().mapValues(list).collect()
word_pairs.groupByKey().mapValues(list).collect()

Result: [("apple", [1, 1, 1]), ("banana", [1, 1]), ("cherry", [1])]

结果: [("apple", [1, 1, 1]), ("banana", [1, 1]), ("cherry", [1])]


**join**: Combine datasets by key

```python

**join:** 按Key合并数据集

```python

RDD join

RDD连接

users = sc.parallelize([("user1", "Alice"), ("user2", "Bob")]) orders = sc.parallelize([("user1", 100), ("user2", 200), ("user1", 150)]) users.join(orders).collect()
users = sc.parallelize([("user1", "Alice"), ("user2", "Bob")]) orders = sc.parallelize([("user1", 100), ("user2", 200), ("user1", 150)]) users.join(orders).collect()

Result: [("user1", ("Alice", 100)), ("user1", ("Alice", 150)), ("user2", ("Bob", 200))]

结果: [("user1", ("Alice", 100)), ("user1", ("Alice", 150)), ("user2", ("Bob", 200))]

DataFrame join (more efficient)

DataFrame连接(更高效)

df_users.join(df_orders, "user_id", "inner").show()

**distinct**: Remove duplicates

```python
df_users.join(df_orders, "user_id", "inner").show()

**distinct:** 移除重复项

```python

RDD

RDD

rdd.distinct().collect()
rdd.distinct().collect()

DataFrame

DataFrame

df.distinct().show() df.dropDuplicates(["user_id"]).show() # Drop based on specific columns

**coalesce/repartition**: Change partition count

```python
df.distinct().show() df.dropDuplicates(["user_id"]).show() # 基于指定列去重

**coalesce/repartition:** 更改分区数

```python

Reduce partitions (no shuffle, more efficient)

减少分区(无shuffle,更高效)

df.coalesce(1).write.parquet("output")
df.coalesce(1).write.parquet("output")

Increase/decrease partitions (involves shuffle)

增加/减少分区(涉及shuffle)

df.repartition(10).write.parquet("output") df.repartition(10, "user_id").write.parquet("output") # Partition by column
undefined
df.repartition(10).write.parquet("output") df.repartition(10, "user_id").write.parquet("output") # 按列分区
undefined

Common Actions

常见行动操作

collect: Retrieve all data to driver
python
results = rdd.collect()  # Returns list
collect: 将所有数据取回Driver
python
results = rdd.collect()  # 返回列表

WARNING: Only use on small datasets that fit in driver memory

警告:仅用于能容纳在Driver内存中的小型数据集


**count**: Count elements

```python
total = df.count()  # Number of rows
first/take: Get first N elements
python
first_elem = rdd.first()
first_five = rdd.take(5)
reduce: Aggregate all elements
python
total_sum = rdd.reduce(lambda a, b: a + b)
foreach: Execute function on each element
python
undefined

**count:** 统计元素数量

```python
total = df.count()  # 行数
first/take: 获取前N个元素
python
first_elem = rdd.first()
first_five = rdd.take(5)
reduce: 聚合所有元素
python
total_sum = rdd.reduce(lambda a, b: a + b)
foreach: 对每个元素执行函数
python
undefined

Side effects only (no return value)

仅产生副作用(无返回值)

rdd.foreach(lambda x: print(x))

**saveAsTextFile**: Write to file system

```python
rdd.saveAsTextFile("hdfs://path/to/output")
show: Display DataFrame rows (action)
python
df.show(20, truncate=False)  # Show 20 rows, don't truncate columns
rdd.foreach(lambda x: print(x))

**saveAsTextFile:** 写入文件系统

```python
rdd.saveAsTextFile("hdfs://path/to/output")
show: 显示DataFrame行(行动操作)
python
df.show(20, truncate=False)  # 显示20行,不截断列

Structured Streaming

结构化流处理

Process continuous data streams using DataFrame API.
使用DataFrame API处理连续数据流。

Core Concepts

核心概念

Streaming DataFrame:
  • Unbounded table that grows continuously
  • Same operations as batch DataFrames
  • Micro-batch processing (default) or continuous processing
Input Sources:
  • File sources (JSON, Parquet, CSV, ORC, text)
  • Kafka
  • Socket (for testing)
  • Rate source (for testing)
  • Custom sources
Output Modes:
  • Append: Only new rows added to result table
  • Complete: Entire result table written every trigger
  • Update: Only updated rows written
Output Sinks:
  • File sinks (Parquet, ORC, JSON, CSV, text)
  • Kafka
  • Console (for debugging)
  • Memory (for testing)
  • Foreach/ForeachBatch (custom logic)
流DataFrame:
  • 持续增长的无界表
  • 与批处理DataFrame使用相同操作
  • 微批处理(默认)或连续处理
输入源:
  • 文件源(JSON、Parquet、CSV、ORC、文本)
  • Kafka
  • Socket(用于测试)
  • Rate源(用于测试)
  • 自定义源
输出模式:
  • Append:仅将新增行写入结果表
  • Complete:每次触发时写入整个结果表
  • Update:仅写入更新的行
输出接收器:
  • 文件接收器(Parquet、ORC、JSON、CSV、文本)
  • Kafka
  • Console(用于调试)
  • Memory(用于测试)
  • Foreach/ForeachBatch(自定义逻辑)

Basic Streaming Example

基础流处理示例

python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("StreamingExample").getOrCreate()
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("StreamingExample").getOrCreate()

Read stream from JSON files

从JSON文件读取流

input_stream = spark.readStream
.format("json")
.schema(schema)
.option("maxFilesPerTrigger", 1)
.load("input/directory")
input_stream = spark.readStream
.format("json")
.schema(schema)
.option("maxFilesPerTrigger", 1)
.load("input/directory")

Transform streaming data

转换流数据

processed = input_stream
.filter(col("value") > 10)
.select("id", "value", "timestamp")
processed = input_stream
.filter(col("value") > 10)
.select("id", "value", "timestamp")

Write stream to Parquet

将流写入Parquet

query = processed.writeStream
.format("parquet")
.option("path", "output/directory")
.option("checkpointLocation", "checkpoint/directory")
.outputMode("append")
.start()
query = processed.writeStream
.format("parquet")
.option("path", "output/directory")
.option("checkpointLocation", "checkpoint/directory")
.outputMode("append")
.start()

Wait for termination

等待终止

query.awaitTermination()
undefined
query.awaitTermination()
undefined

Stream-Static Joins

流-静态连接

Join streaming data with static reference data:
python
undefined
将流数据与静态参考数据连接:
python
undefined

Static DataFrame (loaded once)

静态DataFrame(仅加载一次)

static_df = spark.read.parquet("reference/data")
static_df = spark.read.parquet("reference/data")

Streaming DataFrame

流DataFrame

streaming_df = spark.readStream.format("kafka").load()
streaming_df = spark.readStream.format("kafka").load()

Inner join (supported)

内连接(支持)

joined = streaming_df.join(static_df, "type")
joined = streaming_df.join(static_df, "type")

Left outer join (supported)

左外连接(支持)

joined = streaming_df.join(static_df, "type", "left_outer")
joined = streaming_df.join(static_df, "type", "left_outer")

Write result

写入结果

joined.writeStream
.format("parquet")
.option("path", "output")
.option("checkpointLocation", "checkpoint")
.start()
undefined
joined.writeStream
.format("parquet")
.option("path", "output")
.option("checkpointLocation", "checkpoint")
.start()
undefined

Windowed Aggregations

窗口聚合

Aggregate data over time windows:
python
from pyspark.sql.functions import window, col, count
在时间窗口上聚合数据:
python
from pyspark.sql.functions import window, col, count

10-minute tumbling window

10分钟滚动窗口

windowed_counts = streaming_df
.groupBy( window(col("timestamp"), "10 minutes"), col("word") )
.count()
windowed_counts = streaming_df
.groupBy( window(col("timestamp"), "10 minutes"), col("word") )
.count()

10-minute sliding window with 5-minute slide

10分钟滑动窗口,滑动间隔5分钟

windowed_counts = streaming_df
.groupBy( window(col("timestamp"), "10 minutes", "5 minutes"), col("word") )
.count()
windowed_counts = streaming_df
.groupBy( window(col("timestamp"), "10 minutes", "5 minutes"), col("word") )
.count()

Write to console for debugging

写入控制台进行调试

query = windowed_counts.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start()
undefined
query = windowed_counts.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start()
undefined

Watermarking for Late Data

水印处理延迟数据

Handle late-arriving data with watermarks:
python
from pyspark.sql.functions import window
使用水印处理迟到的数据:
python
from pyspark.sql.functions import window

Define watermark (10 minutes tolerance for late data)

定义水印(10分钟延迟容忍)

windowed_counts = streaming_df
.withWatermark("timestamp", "10 minutes")
.groupBy( window(col("timestamp"), "10 minutes"), col("word") )
.count()
windowed_counts = streaming_df
.withWatermark("timestamp", "10 minutes")
.groupBy( window(col("timestamp"), "10 minutes"), col("word") )
.count()

Data arriving more than 10 minutes late will be dropped

迟到超过10分钟的数据将被丢弃


**Watermark Benefits:**
- Limit state size by dropping old aggregation state
- Handle late data within tolerance window
- Improve performance by not maintaining infinite state

**水印优势:**
- 通过丢弃旧聚合状态限制状态大小
- 在容忍窗口内处理迟到数据
- 不维护无限状态以提升性能

Session Windows

会话窗口

Group events into sessions based on inactivity gaps:
python
from pyspark.sql.functions import session_window, when
基于不活动间隔将事件分组为会话:
python
from pyspark.sql.functions import session_window, when

Dynamic session window based on user

基于用户的动态会话窗口

session_window_spec = session_window( col("timestamp"), when(col("userId") == "user1", "5 seconds") .when(col("userId") == "user2", "20 seconds") .otherwise("5 minutes") )
sessionized_counts = streaming_df
.withWatermark("timestamp", "10 minutes")
.groupBy(session_window_spec, col("userId"))
.count()
undefined
session_window_spec = session_window( col("timestamp"), when(col("userId") == "user1", "5 seconds") .when(col("userId") == "user2", "20 seconds") .otherwise("5 minutes") )
sessionized_counts = streaming_df
.withWatermark("timestamp", "10 minutes")
.groupBy(session_window_spec, col("userId"))
.count()
undefined

Stateful Stream Processing

有状态流处理

Maintain state across micro-batches:
python
from pyspark.sql.functions import expr
在微批之间维护状态:
python
from pyspark.sql.functions import expr

Deduplication using state

使用状态去重

deduplicated = streaming_df
.withWatermark("timestamp", "1 hour")
.dropDuplicates(["user_id", "event_id"])
deduplicated = streaming_df
.withWatermark("timestamp", "1 hour")
.dropDuplicates(["user_id", "event_id"])

Stream-stream joins (stateful)

流-流连接(有状态)

stream1 = spark.readStream.format("kafka").option("subscribe", "topic1").load() stream2 = spark.readStream.format("kafka").option("subscribe", "topic2").load()
joined = stream1
.withWatermark("timestamp", "10 minutes")
.join( stream2.withWatermark("timestamp", "20 minutes"), expr("stream1.user_id = stream2.user_id AND stream1.timestamp >= stream2.timestamp AND stream1.timestamp <= stream2.timestamp + interval 15 minutes"), "inner" )
undefined
stream1 = spark.readStream.format("kafka").option("subscribe", "topic1").load() stream2 = spark.readStream.format("kafka").option("subscribe", "topic2").load()
joined = stream1
.withWatermark("timestamp", "10 minutes")
.join( stream2.withWatermark("timestamp", "20 minutes"), expr("stream1.user_id = stream2.user_id AND stream1.timestamp >= stream2.timestamp AND stream1.timestamp <= stream2.timestamp + interval 15 minutes"), "inner" )
undefined

Checkpointing

检查点

Ensure fault tolerance with checkpoints:
python
undefined
使用检查点确保容错:
python
undefined

Checkpoint location stores:

检查点位置存储:

- Stream metadata (offsets, configuration)

- 流元数据(偏移量、配置)

- State information (for stateful operations)

- 状态信息(用于有状态操作)

- Write-ahead logs

- 预写日志

query = streaming_df.writeStream
.format("parquet")
.option("path", "output")
.option("checkpointLocation", "checkpoint/dir") # REQUIRED for production
.start()
query = streaming_df.writeStream
.format("parquet")
.option("path", "output")
.option("checkpointLocation", "checkpoint/dir") # 生产环境必填
.start()

Recovery: Restart query with same checkpoint location

恢复:使用相同的检查点位置重启查询

Spark will resume from last committed offset

Spark将从最后提交的偏移量恢复


**Checkpoint Best Practices:**
- Always set checkpointLocation for production streams
- Use reliable distributed storage (HDFS, S3) for checkpoints
- Don't delete checkpoint directory while stream is running
- Back up checkpoints for disaster recovery

**检查点最佳实践:**
- 生产流处理始终设置checkpointLocation
- 使用可靠的分布式存储(HDFS、S3)存储检查点
- 流运行时不要删除检查点目录
- 备份检查点以进行灾难恢复

Machine Learning with MLlib

使用MLlib进行机器学习

Spark's scalable machine learning library.
Spark的可扩展机器学习库。

Core Components

核心组件

MLlib Features:
  • ML Pipelines: Chain transformations and models
  • Featurization: Vector assemblers, scalers, encoders
  • Classification & Regression: Linear models, tree-based models, neural networks
  • Clustering: K-means, Gaussian Mixture, LDA
  • Collaborative Filtering: ALS (Alternating Least Squares)
  • Dimensionality Reduction: PCA, SVD
  • Model Selection: Cross-validation, train-test split, parameter tuning
MLlib特性:
  • ML管道:将转换与模型链式连接
  • 特征工程:向量组装器、缩放器、编码器
  • 分类与回归:线性模型、树模型、神经网络
  • 聚类:K-means、高斯混合模型、LDA
  • 协同过滤:ALS(交替最小二乘法)
  • 降维:PCA、SVD
  • 模型选择:交叉验证、训练测试拆分、参数调优

ML Pipelines

ML管道

Chain transformations and estimators:
python
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
将转换与估算器链式连接:
python
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression

Load data

加载数据

df = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
df = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")

Define pipeline stages

定义管道阶段

assembler = VectorAssembler( inputCols=["feature1", "feature2", "feature3"], outputCol="features" )
scaler = StandardScaler( inputCol="features", outputCol="scaled_features", withStd=True, withMean=True )
lr = LogisticRegression( featuresCol="scaled_features", labelCol="label", maxIter=10, regParam=0.01 )
assembler = VectorAssembler( inputCols=["feature1", "feature2", "feature3"], outputCol="features" )
scaler = StandardScaler( inputCol="features", outputCol="scaled_features", withStd=True, withMean=True )
lr = LogisticRegression( featuresCol="scaled_features", labelCol="label", maxIter=10, regParam=0.01 )

Create pipeline

创建管道

pipeline = Pipeline(stages=[assembler, scaler, lr])
pipeline = Pipeline(stages=[assembler, scaler, lr])

Split data

拆分数据

train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

Train model

训练模型

model = pipeline.fit(train_df)
model = pipeline.fit(train_df)

Make predictions

生成预测

predictions = model.transform(test_df) predictions.select("label", "prediction", "probability").show()
undefined
predictions = model.transform(test_df) predictions.select("label", "prediction", "probability").show()
undefined

Feature Engineering

特征工程

Transform raw data into features:
python
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
将原始数据转换为特征:
python
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler

Categorical encoding

分类编码

indexer = StringIndexer(inputCol="category", outputCol="category_index") encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec")
indexer = StringIndexer(inputCol="category", outputCol="category_index") encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec")

Numerical scaling

数值缩放

scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")

Assemble features

组装特征

assembler = VectorAssembler( inputCols=["category_vec", "numeric_feature1", "numeric_feature2"], outputCol="features" )
assembler = VectorAssembler( inputCols=["category_vec", "numeric_feature1", "numeric_feature2"], outputCol="features" )

Text processing

文本处理

from pyspark.ml.feature import Tokenizer, HashingTF, IDF
tokenizer = Tokenizer(inputCol="text", outputCol="words") hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000) idf = IDF(inputCol="raw_features", outputCol="features")
undefined
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
tokenizer = Tokenizer(inputCol="text", outputCol="words") hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000) idf = IDF(inputCol="raw_features", outputCol="features")
undefined

Streaming Linear Regression

流线性回归

Train models on streaming data:
python
from pyspark.mllib.regression import LabeledPoint
from pyspark.streaming import StreamingContext
from pyspark.streaming.ml import StreamingLinearRegressionWithSGD
在流数据上训练模型:
python
from pyspark.mllib.regression import LabeledPoint
from pyspark.streaming import StreamingContext
from pyspark.streaming.ml import StreamingLinearRegressionWithSGD

Create StreamingContext

创建StreamingContext

ssc = StreamingContext(sc, batchDuration=1)
ssc = StreamingContext(sc, batchDuration=1)

Define data streams

定义数据流

training_stream = ssc.textFileStream("training/data/path") testing_stream = ssc.textFileStream("testing/data/path")
training_stream = ssc.textFileStream("training/data/path") testing_stream = ssc.textFileStream("testing/data/path")

Parse streams into LabeledPoint objects

将流解析为LabeledPoint对象

def parse_point(line): values = [float(x) for x in line.strip().split(',')] return LabeledPoint(values[0], values[1:])
parsed_training = training_stream.map(parse_point) parsed_testing = testing_stream.map(parse_point)
def parse_point(line): values = [float(x) for x in line.strip().split(',')] return LabeledPoint(values[0], values[1:])
parsed_training = training_stream.map(parse_point) parsed_testing = testing_stream.map(parse_point)

Initialize model

初始化模型

num_features = 3 model = StreamingLinearRegressionWithSGD(initialWeights=[0.0] * num_features)
num_features = 3 model = StreamingLinearRegressionWithSGD(initialWeights=[0.0] * num_features)

Train and predict

训练与预测

model.trainOn(parsed_training) predictions = model.predictOnValues(parsed_testing.map(lambda lp: (lp.label, lp.features)))
model.trainOn(parsed_training) predictions = model.predictOnValues(parsed_testing.map(lambda lp: (lp.label, lp.features)))

Print predictions

打印预测

predictions.pprint()
predictions.pprint()

Start streaming

启动流处理

ssc.start() ssc.awaitTermination()
undefined
ssc.start() ssc.awaitTermination()
undefined

Model Evaluation

模型评估

Evaluate model performance:
python
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator
评估模型性能:
python
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator

Binary classification

二分类

binary_evaluator = BinaryClassificationEvaluator( labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC" ) auc = binary_evaluator.evaluate(predictions) print(f"AUC: {auc}")
binary_evaluator = BinaryClassificationEvaluator( labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC" ) auc = binary_evaluator.evaluate(predictions) print(f"AUC: {auc}")

Multiclass classification

多分类

multi_evaluator = MulticlassClassificationEvaluator( labelCol="label", predictionCol="prediction", metricName="accuracy" ) accuracy = multi_evaluator.evaluate(predictions) print(f"Accuracy: {accuracy}")
multi_evaluator = MulticlassClassificationEvaluator( labelCol="label", predictionCol="prediction", metricName="accuracy" ) accuracy = multi_evaluator.evaluate(predictions) print(f"Accuracy: {accuracy}")

Regression

回归

regression_evaluator = RegressionEvaluator( labelCol="label", predictionCol="prediction", metricName="rmse" ) rmse = regression_evaluator.evaluate(predictions) print(f"RMSE: {rmse}")
undefined
regression_evaluator = RegressionEvaluator( labelCol="label", predictionCol="prediction", metricName="rmse" ) rmse = regression_evaluator.evaluate(predictions) print(f"RMSE: {rmse}")
undefined

Hyperparameter Tuning

超参数调优

Optimize model parameters with cross-validation:
python
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
使用交叉验证优化模型参数:
python
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Define model

定义模型

rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

Build parameter grid

构建参数网格

param_grid = ParamGridBuilder()
.addGrid(rf.numTrees, [10, 20, 50])
.addGrid(rf.maxDepth, [5, 10, 15])
.addGrid(rf.minInstancesPerNode, [1, 5, 10])
.build()
param_grid = ParamGridBuilder()
.addGrid(rf.numTrees, [10, 20, 50])
.addGrid(rf.maxDepth, [5, 10, 15])
.addGrid(rf.minInstancesPerNode, [1, 5, 10])
.build()

Define evaluator

定义评估器

evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

Cross-validation

交叉验证

cv = CrossValidator( estimator=rf, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5, parallelism=4 )
cv = CrossValidator( estimator=rf, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5, parallelism=4 )

Train

训练

cv_model = cv.fit(train_df)
cv_model = cv.fit(train_df)

Best model

最佳模型

best_model = cv_model.bestModel print(f"Best numTrees: {best_model.getNumTrees}") print(f"Best maxDepth: {best_model.getMaxDepth()}")
best_model = cv_model.bestModel print(f"Best numTrees: {best_model.getNumTrees}") print(f"Best maxDepth: {best_model.getMaxDepth()}")

Evaluate on test set

在测试集上评估

predictions = cv_model.transform(test_df) accuracy = evaluator.evaluate(predictions) print(f"Test Accuracy: {accuracy}")
undefined
predictions = cv_model.transform(test_df) accuracy = evaluator.evaluate(predictions) print(f"Test Accuracy: {accuracy}")
undefined

Distributed Matrix Operations

分布式矩阵操作

MLlib provides distributed matrix representations:
python
from pyspark.mllib.linalg.distributed import RowMatrix, IndexedRowMatrix, CoordinateMatrix
from pyspark.mllib.linalg import Vectors
MLlib提供分布式矩阵表示:
python
from pyspark.mllib.linalg.distributed import RowMatrix, IndexedRowMatrix, CoordinateMatrix
from pyspark.mllib.linalg import Vectors

RowMatrix: Distributed matrix without row indices

RowMatrix: 不带行索引的分布式矩阵

rows = sc.parallelize([ Vectors.dense([1.0, 2.0, 3.0]), Vectors.dense([4.0, 5.0, 6.0]), Vectors.dense([7.0, 8.0, 9.0]) ]) row_matrix = RowMatrix(rows)
rows = sc.parallelize([ Vectors.dense([1.0, 2.0, 3.0]), Vectors.dense([4.0, 5.0, 6.0]), Vectors.dense([7.0, 8.0, 9.0]) ]) row_matrix = RowMatrix(rows)

Compute statistics

计算统计信息

print(f"Rows: {row_matrix.numRows()}") print(f"Cols: {row_matrix.numCols()}") print(f"Column means: {row_matrix.computeColumnSummaryStatistics().mean()}")
print(f"Rows: {row_matrix.numRows()}") print(f"Cols: {row_matrix.numCols()}") print(f"Column means: {row_matrix.computeColumnSummaryStatistics().mean()}")

IndexedRowMatrix: Matrix with row indices

IndexedRowMatrix: 带行索引的矩阵

from pyspark.mllib.linalg.distributed import IndexedRow indexed_rows = sc.parallelize([ IndexedRow(0, Vectors.dense([1.0, 2.0, 3.0])), IndexedRow(1, Vectors.dense([4.0, 5.0, 6.0])) ]) indexed_matrix = IndexedRowMatrix(indexed_rows)
from pyspark.mllib.linalg.distributed import IndexedRow indexed_rows = sc.parallelize([ IndexedRow(0, Vectors.dense([1.0, 2.0, 3.0])), IndexedRow(1, Vectors.dense([4.0, 5.0, 6.0])) ]) indexed_matrix = IndexedRowMatrix(indexed_rows)

CoordinateMatrix: Sparse matrix using (row, col, value) entries

CoordinateMatrix: 使用(行, 列, 值)条目的稀疏矩阵

from pyspark.mllib.linalg.distributed import MatrixEntry entries = sc.parallelize([ MatrixEntry(0, 0, 1.0), MatrixEntry(0, 2, 3.0), MatrixEntry(1, 1, 5.0) ]) coord_matrix = CoordinateMatrix(entries)
undefined
from pyspark.mllib.linalg.distributed import MatrixEntry entries = sc.parallelize([ MatrixEntry(0, 0, 1.0), MatrixEntry(0, 2, 3.0), MatrixEntry(1, 1, 5.0) ]) coord_matrix = CoordinateMatrix(entries)
undefined

Stratified Sampling

分层抽样

Sample data while preserving class distribution:
python
undefined
在保留类别分布的同时抽样数据:
python
undefined

Scala/Java approach

Scala/Java方法

data = [("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5), ("c", 6)] rdd = sc.parallelize(data)
data = [("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5), ("c", 6)] rdd = sc.parallelize(data)

Define sampling fractions per key

定义每个Key的抽样比例

fractions = {"a": 0.5, "b": 0.5, "c": 0.5}
fractions = {"a": 0.5, "b": 0.5, "c": 0.5}

Approximate sample (faster, one pass)

近似抽样(更快,一次遍历)

sampled_rdd = rdd.sampleByKey(withReplacement=False, fractions=fractions)
sampled_rdd = rdd.sampleByKey(withReplacement=False, fractions=fractions)

Exact sample (slower, guaranteed exact counts)

精确抽样(更慢,保证精确计数)

exact_sampled = rdd.sampleByKeyExact(withReplacement=False, fractions=fractions)
print(sampled_rdd.collect())
undefined
exact_sampled = rdd.sampleByKeyExact(withReplacement=False, fractions=fractions)
print(sampled_rdd.collect())
undefined

Performance Tuning

性能调优

Memory Management

内存管理

Memory Breakdown:
  • Execution Memory: Used for shuffles, joins, sorts, aggregations
  • Storage Memory: Used for caching and broadcast variables
  • User Memory: Used for user data structures and UDFs
  • Reserved Memory: Reserved for Spark internal operations
Configuration:
python
spark = SparkSession.builder \
    .appName("MemoryTuning") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.memory.fraction", "0.6")  # Fraction for execution + storage \
    .config("spark.memory.storageFraction", "0.5")  # Fraction of above for storage \
    .getOrCreate()
Memory Best Practices:
  • Monitor memory usage via Spark UI
  • Use appropriate storage levels for caching
  • Avoid collecting large datasets to driver
  • Increase executor memory for memory-intensive operations
  • Use kryo serialization for better memory efficiency
内存分配:
  • 执行内存:用于shuffle、连接、排序、聚合
  • 存储内存:用于缓存与广播变量
  • 用户内存:用于用户数据结构与UDF
  • 预留内存:Spark内部操作预留
配置:
python
spark = SparkSession.builder \
    .appName("MemoryTuning") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.memory.fraction", "0.6")  # 执行+存储内存占比 \
    .config("spark.memory.storageFraction", "0.5")  # 上述内存中存储内存占比 \
    .getOrCreate()
内存最佳实践:
  • 通过Spark UI监控内存使用
  • 为缓存选择合适的存储级别
  • 避免将大型数据集收集到Driver
  • 为内存密集型操作增加Executor内存
  • 使用kryo序列化以提升内存效率

Shuffle Optimization

Shuffle优化

Shuffles are expensive operations - minimize them:
Causes of Shuffles:
  • groupByKey, reduceByKey, aggregateByKey
  • join, cogroup
  • repartition, coalesce (with increase)
  • distinct, intersection
  • sortByKey
Optimization Strategies:
python
undefined
Shuffle是开销极大的操作——尽量减少:
Shuffle的原因:
  • groupByKey、reduceByKey、aggregateByKey
  • join、cogroup
  • repartition、coalesce(增加分区时)
  • distinct、intersection
  • sortByKey
优化策略:
python
undefined

1. Use reduceByKey instead of groupByKey

1. 使用reduceByKey替代groupByKey

Bad: groupByKey shuffles all data

不佳:groupByKey会shuffle所有数据

word_pairs.groupByKey().mapValues(sum)
word_pairs.groupByKey().mapValues(sum)

Good: reduceByKey combines locally before shuffle

推荐:reduceByKey先在本地合并再shuffle

word_pairs.reduceByKey(lambda a, b: a + b)
word_pairs.reduceByKey(lambda a, b: a + b)

2. Broadcast small tables in joins

2. 在连接中广播小表

from pyspark.sql.functions import broadcast large_df.join(broadcast(small_df), "key")
from pyspark.sql.functions import broadcast large_df.join(broadcast(small_df), "key")

3. Partition data appropriately

3. 合理分区数据

df.repartition(200, "user_id") # Partition by key for subsequent aggregations
df.repartition(200, "user_id") # 后续聚合按Key分区

4. Coalesce instead of repartition when reducing partitions

4. 减少分区时使用coalesce而非repartition

df.coalesce(10) # No shuffle, just merge partitions
df.coalesce(10) # 无shuffle,仅合并分区

5. Tune shuffle partitions

5. 调优shuffle分区数

spark.conf.set("spark.sql.shuffle.partitions", 200) # Default is 200

**Shuffle Configuration:**
```python
spark = SparkSession.builder \
    .config("spark.sql.shuffle.partitions", 200) \
    .config("spark.default.parallelism", 200) \
    .config("spark.sql.adaptive.enabled", "true")  # Enable AQE \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 200) # 默认是200

**Shuffle配置:**
```python
spark = SparkSession.builder \
    .config("spark.sql.shuffle.partitions", 200) \
    .config("spark.default.parallelism", 200) \
    .config("spark.sql.adaptive.enabled", "true")  # 启用AQE \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

Partitioning Strategies

分区策略

Partition Count Guidelines:
  • Too few: Underutilized cluster, OOM errors
  • Too many: Task scheduling overhead
  • Sweet spot: 2-4x number of CPU cores
  • For large shuffles: 100-200+ partitions
Partition by Column:
python
undefined
分区数指南:
  • 过少:集群资源未充分利用,OOM错误
  • 过多:任务调度开销大
  • 最佳值:CPU核心数的2-4倍
  • 大型shuffle:100-200+个分区
按列分区:
python
undefined

Partition writes by date for easy filtering

按日期分区写入以方便过滤

df.write.partitionBy("date", "country").parquet("output")
df.write.partitionBy("date", "country").parquet("output")

Read with partition pruning (only reads relevant partitions)

读取时进行分区修剪(仅读取相关分区)

spark.read.parquet("output").filter(col("date") == "2025-01-15").show()

**Custom Partitioning:**
```python
from pyspark.rdd import portable_hash
spark.read.parquet("output").filter(col("date") == "2025-01-15").show()

**自定义分区:**
```python
from pyspark.rdd import portable_hash

Custom partitioner for RDD

RDD自定义分区器

def custom_partitioner(key): return portable_hash(key) % 100
rdd.partitionBy(100, custom_partitioner)
undefined
def custom_partitioner(key): return portable_hash(key) % 100
rdd.partitionBy(100, custom_partitioner)
undefined

Caching Strategies

缓存策略

When to Cache:
python
undefined
何时缓存:
python
undefined

Iterative algorithms (ML)

迭代算法(ML)

training_data.cache() for i in range(num_iterations): model = train_model(training_data)
training_data.cache() for i in range(num_iterations): model = train_model(training_data)

Multiple aggregations on same data

同一数据多次聚合

base_df.cache() result1 = base_df.groupBy("country").count() result2 = base_df.groupBy("city").avg("sales")
base_df.cache() result1 = base_df.groupBy("country").count() result2 = base_df.groupBy("city").avg("sales")

Interactive analysis

交互式分析

df.cache() df.filter(condition1).show() df.filter(condition2).show() df.groupBy("category").count().show()

**Storage Levels:**
```python
from pyspark import StorageLevel
df.cache() df.filter(condition1).show() df.filter(condition2).show() df.groupBy("category").count().show()

**存储级别:**
```python
from pyspark import StorageLevel

Memory only (fastest, but may lose data)

仅内存(最快,但数据可能丢失)

df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_ONLY)

Memory and disk (spill to disk if needed)

内存与磁盘(内存不足时溢出到磁盘)

df.persist(StorageLevel.MEMORY_AND_DISK)
df.persist(StorageLevel.MEMORY_AND_DISK)

Serialized in memory (more compact, slower access)

内存中序列化(更紧凑,访问速度较慢)

df.persist(StorageLevel.MEMORY_ONLY_SER)
df.persist(StorageLevel.MEMORY_ONLY_SER)

Disk only (slowest, but always available)

仅磁盘(最慢,但始终可用)

df.persist(StorageLevel.DISK_ONLY)
df.persist(StorageLevel.DISK_ONLY)

Replicated (fault tolerance)

复制存储(容错)

df.persist(StorageLevel.MEMORY_AND_DISK_2) # 2 replicas
undefined
df.persist(StorageLevel.MEMORY_AND_DISK_2) # 2个副本
undefined

Broadcast Joins

广播连接

Optimize joins with small tables:
python
from pyspark.sql.functions import broadcast
优化小表连接:
python
from pyspark.sql.functions import broadcast

Automatic broadcast (tables < spark.sql.autoBroadcastJoinThreshold)

自动广播(表大小 < spark.sql.autoBroadcastJoinThreshold)

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024) # 10 MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024) # 10 MB

Explicit broadcast hint

显式广播提示

large_df.join(broadcast(small_df), "key")
large_df.join(broadcast(small_df), "key")

Benefits:

优势:

- No shuffle of large table

- 无需shuffle大表

- Small table sent to all executors once

- 小表仅发送给所有Executor一次

- Much faster for small dimension tables

- 小维度表连接速度快得多

undefined
undefined

Adaptive Query Execution (AQE)

自适应查询执行(AQE)

Enable runtime query optimization:
python
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
启用运行时查询优化:
python
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

AQE Benefits:

AQE优势:

- Dynamically coalesce partitions after shuffle

- shuffle后自动合并分区

- Handle skewed joins by splitting large partitions

- 通过拆分大分区处理倾斜连接

- Optimize join strategy at runtime

- 运行时优化连接策略

undefined
undefined

Data Format Selection

数据格式选择

Performance Comparison:
  1. Parquet (Best for analytics): Columnar, compressed, fast queries
  2. ORC (Best for Hive): Similar to Parquet, slightly better compression
  3. Avro (Best for row-oriented): Good for write-heavy workloads
  4. JSON (Slowest): Human-readable but inefficient
  5. CSV (Legacy): Compatible but slow and no schema
Recommendation:
  • Use Parquet for most analytics workloads
  • Enable compression (snappy, gzip, lzo)
  • Partition by commonly filtered columns
  • Use columnar formats for read-heavy workloads
性能对比:
  1. Parquet(分析场景最佳):列存储、压缩、查询快速
  2. ORC(Hive场景最佳):与Parquet类似,压缩率略高
  3. Avro(行存储最佳):适合写密集型工作负载
  4. JSON(最慢):人类可读但效率低
  5. CSV(遗留格式):兼容但速度慢且无Schema
推荐:
  • 大多数分析工作负载使用Parquet
  • 启用压缩(snappy、gzip、lzo)
  • 按常用过滤列分区
  • 读密集型工作负载使用列存储格式

Catalyst Optimizer

Catalyst优化器

Understand query optimization:
python
undefined
理解查询优化:
python
undefined

View physical plan

查看物理执行计划

df.explain(mode="extended")
df.explain(mode="extended")

Optimizations include:

优化包括:

- Predicate pushdown: Push filters to data source

- 谓词下推:将过滤推送到数据源

- Column pruning: Read only required columns

- 列修剪:仅读取所需列

- Constant folding: Evaluate constants at compile time

- 常量折叠:编译时计算常量

- Join reordering: Optimize join order

- 连接重排序:优化连接顺序

- Partition pruning: Skip irrelevant partitions

- 分区修剪:跳过无关分区

undefined
undefined

Production Deployment

生产部署

Cluster Managers

集群管理器

Standalone:
  • Simple, built-in cluster manager
  • Easy setup for development and small clusters
  • No resource sharing with other frameworks
bash
undefined
Standalone:
  • 简单的内置集群管理器
  • 开发与小型集群易于设置
  • 不与其他框架共享资源
bash
undefined

Start master

启动Master

$SPARK_HOME/sbin/start-master.sh
$SPARK_HOME/sbin/start-master.sh

Start workers

启动Worker

$SPARK_HOME/sbin/start-worker.sh spark://master:7077
$SPARK_HOME/sbin/start-worker.sh spark://master:7077

Submit application

提交应用

spark-submit --master spark://master:7077 app.py

**YARN:**
- Hadoop's resource manager
- Share cluster resources with MapReduce, Hive, etc.
- Two modes: cluster (driver on YARN) and client (driver on local machine)

```bash
spark-submit --master spark://master:7077 app.py

**YARN:**
- Hadoop的资源管理器
- 与MapReduce、Hive等共享集群资源
- 两种模式:cluster(Driver在YARN上)与client(Driver在本地机器)

```bash

Cluster mode (driver runs on YARN)

Cluster模式(Driver运行在YARN上)

spark-submit --master yarn --deploy-mode cluster app.py
spark-submit --master yarn --deploy-mode cluster app.py

Client mode (driver runs locally)

Client模式(Driver运行在本地)

spark-submit --master yarn --deploy-mode client app.py

**Kubernetes:**
- Modern container orchestration
- Dynamic resource allocation
- Cloud-native deployments

```bash
spark-submit \
  --master k8s://https://k8s-master:443 \
  --deploy-mode cluster \
  --name spark-app \
  --conf spark.executor.instances=5 \
  --conf spark.kubernetes.container.image=spark:latest \
  app.py
Mesos:
  • General-purpose cluster manager
  • Fine-grained or coarse-grained resource sharing
spark-submit --master yarn --deploy-mode client app.py

**Kubernetes:**
- 现代容器编排
- 动态资源分配
- 云原生部署

```bash
spark-submit \
  --master k8s://https://k8s-master:443 \
  --deploy-mode cluster \
  --name spark-app \
  --conf spark.executor.instances=5 \
  --conf spark.kubernetes.container.image=spark:latest \
  app.py
Mesos:
  • 通用集群管理器
  • 细粒度或粗粒度资源共享

Application Submission

应用提交

Basic spark-submit:
bash
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 4g \
  --executor-memory 8g \
  --executor-cores 4 \
  --num-executors 10 \
  --conf spark.sql.shuffle.partitions=200 \
  --py-files dependencies.zip \
  --files config.json \
  application.py
Configuration Options:
  • --master
    : Cluster manager URL
  • --deploy-mode
    : Where to run driver (client or cluster)
  • --driver-memory
    : Memory for driver process
  • --executor-memory
    : Memory per executor
  • --executor-cores
    : Cores per executor
  • --num-executors
    : Number of executors
  • --conf
    : Spark configuration properties
  • --py-files
    : Python dependencies
  • --files
    : Additional files to distribute
基础spark-submit:
bash
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 4g \
  --executor-memory 8g \
  --executor-cores 4 \
  --num-executors 10 \
  --conf spark.sql.shuffle.partitions=200 \
  --py-files dependencies.zip \
  --files config.json \
  application.py
配置选项:
  • --master
    :集群管理器URL
  • --deploy-mode
    :Driver运行位置(client或cluster)
  • --driver-memory
    :Driver进程内存
  • --executor-memory
    :每个Executor的内存
  • --executor-cores
    :每个Executor的核心数
  • --num-executors
    :Executor数量
  • --conf
    :Spark配置属性
  • --py-files
    :Python依赖
  • --files
    :要分发的额外文件

Resource Allocation

资源分配

General Guidelines:
  • Driver Memory: 1-4 GB (unless collecting large results)
  • Executor Memory: 4-16 GB per executor
  • Executor Cores: 4-5 cores per executor (diminishing returns beyond 5)
  • Number of Executors: Fill cluster capacity, leave resources for OS/other services
  • Parallelism: 2-4x total cores
Example Calculations:
Cluster: 10 nodes, 32 cores each, 128 GB RAM each

Option 1: Many small executors
  - 30 executors (3 per node)
  - 10 cores per executor
  - 40 GB memory per executor
  - Total: 300 cores

Option 2: Fewer large executors (RECOMMENDED)
  - 50 executors (5 per node)
  - 5 cores per executor
  - 24 GB memory per executor
  - Total: 250 cores
通用指南:
  • Driver内存:1-4 GB(除非收集大型结果)
  • Executor内存:每个Executor 4-16 GB
  • Executor核心数:每个Executor 4-5个核心(超过5个收益递减)
  • Executor数量:填满集群容量,为OS/其他服务预留资源
  • 并行度:总核心数的2-4倍
示例计算:
集群:10个节点,每个节点32核心、128 GB内存

选项1:多个小型Executor
  - 30个Executor(每个节点3个)
  - 每个Executor 10核心
  - 每个Executor 40 GB内存
  - 总计:300核心

选项2:较少的大型Executor(推荐)
  - 50个Executor(每个节点5个)
  - 每个Executor 5核心
  - 每个Executor 24 GB内存
  - 总计:250核心

Dynamic Allocation

动态分配

Automatically scale executors based on workload:
python
spark = SparkSession.builder \
    .appName("DynamicAllocation") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", 2) \
    .config("spark.dynamicAllocation.maxExecutors", 100) \
    .config("spark.dynamicAllocation.initialExecutors", 10) \
    .config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
    .getOrCreate()
Benefits:
  • Better resource utilization
  • Automatic scaling for varying workloads
  • Reduced costs in cloud environments
根据工作负载自动扩展Executor:
python
spark = SparkSession.builder \
    .appName("DynamicAllocation") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", 2) \
    .config("spark.dynamicAllocation.maxExecutors", 100) \
    .config("spark.dynamicAllocation.initialExecutors", 10) \
    .config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
    .getOrCreate()
优势:
  • 资源利用率更高
  • 工作负载变化时自动扩展
  • 云环境降低成本

Monitoring and Logging

监控与日志

Spark UI:
  • Web UI at http://driver:4040
  • Stages, tasks, storage, environment, executors
  • SQL query plans and execution details
  • Identify bottlenecks and performance issues
History Server:
bash
undefined
Spark UI:
  • Web UI地址:http://driver:4040
  • 包含阶段、任务、存储、环境、Executor信息
  • SQL查询计划与执行细节
  • 识别瓶颈与性能问题
历史服务器:
bash
undefined

Start history server

启动历史服务器

$SPARK_HOME/sbin/start-history-server.sh
$SPARK_HOME/sbin/start-history-server.sh

Configure event logging

配置事件日志

spark.conf.set("spark.eventLog.enabled", "true") spark.conf.set("spark.eventLog.dir", "hdfs://namenode/spark-logs")

**Metrics:**
```python
spark.conf.set("spark.eventLog.enabled", "true") spark.conf.set("spark.eventLog.dir", "hdfs://namenode/spark-logs")

**指标:**
```python

Enable metrics collection

启用指标收集

spark.conf.set("spark.metrics.conf..sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink") spark.conf.set("spark.metrics.conf..sink.console.period", 10)

**Logging:**
```python
spark.conf.set("spark.metrics.conf..sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink") spark.conf.set("spark.metrics.conf..sink.console.period", 10)

**日志:**
```python

Configure log level

配置日志级别

spark.sparkContext.setLogLevel("WARN") # ERROR, WARN, INFO, DEBUG
spark.sparkContext.setLogLevel("WARN") # ERROR、WARN、INFO、DEBUG

Custom logging

自定义日志

import logging logger = logging.getLogger(name) logger.info("Custom log message")
undefined
import logging logger = logging.getLogger(name) logger.info("Custom log message")
undefined

Fault Tolerance

容错

Automatic Recovery:
  • Task failures: Automatically retry failed tasks
  • Executor failures: Reschedule tasks on other executors
  • Driver failures: Restore from checkpoint (streaming)
  • Node failures: Recompute lost partitions from lineage
Checkpointing:
python
undefined
自动恢复:
  • 任务失败:自动重试失败任务
  • Executor失败:在其他Executor上重新调度任务
  • Driver失败:从检查点恢复(流处理)
  • 节点失败:从lineage重新计算丢失的分区
检查点:
python
undefined

Set checkpoint directory

设置检查点目录

spark.sparkContext.setCheckpointDir("hdfs://namenode/checkpoints")
spark.sparkContext.setCheckpointDir("hdfs://namenode/checkpoints")

Checkpoint RDD (breaks lineage for very long chains)

Checkpoint RDD(打破超长lineage链)

rdd.checkpoint()
rdd.checkpoint()

Streaming checkpoint (required for production)

流处理检查点(生产环境必填)

query = streaming_df.writeStream
.option("checkpointLocation", "hdfs://namenode/streaming-checkpoint")
.start()

**Speculative Execution:**
```python
query = streaming_df.writeStream
.option("checkpointLocation", "hdfs://namenode/streaming-checkpoint")
.start()

**推测执行:**
```python

Enable speculative execution for slow tasks

为慢任务启用推测执行

spark.conf.set("spark.speculation", "true") spark.conf.set("spark.speculation.multiplier", 1.5) spark.conf.set("spark.speculation.quantile", 0.75)
undefined
spark.conf.set("spark.speculation", "true") spark.conf.set("spark.speculation.multiplier", 1.5) spark.conf.set("spark.speculation.quantile", 0.75)
undefined

Data Locality

数据本地化

Optimize data placement for performance:
Locality Levels:
  1. PROCESS_LOCAL: Data in same JVM as task (fastest)
  2. NODE_LOCAL: Data on same node, different process
  3. RACK_LOCAL: Data on same rack
  4. ANY: Data on different rack (slowest)
Improve Locality:
python
undefined
优化数据放置以提升性能:
本地化级别:
  1. PROCESS_LOCAL:数据与任务在同一JVM(最快)
  2. NODE_LOCAL:数据在同一节点但不同进程
  3. RACK_LOCAL:数据在同一机架
  4. ANY:数据在不同机架(最慢)
提升本地化:
python
undefined

Increase locality wait time

增加本地化等待时间

spark.conf.set("spark.locality.wait", "10s") spark.conf.set("spark.locality.wait.node", "5s") spark.conf.set("spark.locality.wait.rack", "3s")
spark.conf.set("spark.locality.wait", "10s") spark.conf.set("spark.locality.wait.node", "5s") spark.conf.set("spark.locality.wait.rack", "3s")

Partition data to match cluster topology

按集群拓扑分区数据

df.repartition(num_nodes * cores_per_node)
undefined
df.repartition(num_nodes * cores_per_node)
undefined

Best Practices

最佳实践

Code Organization

代码组织

  1. Modular Design: Separate data loading, transformation, and output logic
  2. Configuration Management: Externalize configuration (use config files)
  3. Error Handling: Implement robust error handling and logging
  4. Testing: Unit test transformations, integration test pipelines
  5. Documentation: Document complex transformations and business logic
  1. 模块化设计:分离数据加载、转换与输出逻辑
  2. 配置管理:外部化配置(使用配置文件)
  3. 错误处理:实现健壮的错误处理与日志
  4. 测试:单元测试转换操作,集成测试管道
  5. 文档:记录复杂转换与业务逻辑

Performance

性能

  1. Avoid Shuffles: Use reduceByKey instead of groupByKey
  2. Cache Wisely: Only cache data reused multiple times
  3. Broadcast Small Tables: Use broadcast joins for small reference data
  4. Partition Appropriately: 2-4x CPU cores, partition by frequently filtered columns
  5. Use Parquet: Columnar format for analytical workloads
  6. Enable AQE: Leverage adaptive query execution for runtime optimization
  7. Tune Memory: Balance executor memory and cores
  8. Monitor: Use Spark UI to identify bottlenecks
  1. 避免Shuffle:使用reduceByKey替代groupByKey
  2. 合理缓存:仅缓存多次使用的数据
  3. 广播小表:连接小参考数据时使用广播
  4. 合理分区:CPU核心数的2-4倍,按常用过滤列分区
  5. 使用Parquet:分析工作负载使用列存储格式
  6. 启用AQE:利用自适应查询执行进行运行时优化
  7. 内存调优:平衡Executor内存与核心数
  8. 监控:使用Spark UI识别瓶颈

Development Workflow

开发工作流

  1. Start Small: Develop with sample data locally
  2. Profile Early: Monitor performance from the start
  3. Iterate: Optimize incrementally based on metrics
  4. Test at Scale: Validate with production-sized data before deployment
  5. Version Control: Track code, configurations, and schemas
  1. 从小规模开始:本地使用样本数据开发
  2. 尽早分析性能:从开始就监控性能
  3. 迭代优化:基于指标逐步优化
  4. 规模化测试:部署前使用生产规模数据验证
  5. 版本控制:跟踪代码、配置与Schema

Data Quality

数据质量

  1. Schema Validation: Enforce schemas on read/write
  2. Null Handling: Explicitly handle null values
  3. Data Validation: Check for expected ranges, formats, constraints
  4. Deduplication: Remove duplicates based on business logic
  5. Audit Logging: Track data lineage and transformations
  1. Schema验证:读写时强制Schema
  2. 空值处理:显式处理空值
  3. 数据验证:检查预期范围、格式、约束
  4. 去重:基于业务逻辑移除重复项
  5. 审计日志:跟踪数据 lineage 与转换

Security

安全

  1. Authentication: Enable Kerberos for YARN/HDFS
  2. Authorization: Use ACLs for data access control
  3. Encryption: Encrypt data at rest and in transit
  4. Secrets Management: Use secure credential providers
  5. Audit Trails: Log data access and modifications
  1. 认证:YARN/HDFS启用Kerberos
  2. 授权:使用ACL进行数据访问控制
  3. 加密:静态与传输中数据加密
  4. 密钥管理:使用安全凭证提供者
  5. 审计跟踪:记录数据访问与修改

Cost Optimization

成本优化

  1. Right-Size Resources: Don't over-provision executors
  2. Dynamic Allocation: Scale executors based on workload
  3. Spot Instances: Use spot/preemptible instances in cloud
  4. Data Compression: Use efficient formats (Parquet, ORC)
  5. Partitioning: Prune unnecessary data reads
  6. Auto-Shutdown: Terminate idle clusters
  1. 合理分配资源:不要过度配置Executor
  2. 动态分配:根据工作负载扩展Executor
  3. 抢占式实例:云环境使用抢占式实例
  4. 数据压缩:使用高效格式(Parquet、ORC)
  5. 分区:修剪不必要的数据读取
  6. 自动关机:终止空闲集群

Common Patterns

常见模式

ETL Pipeline Pattern

ETL管道模式

python
def etl_pipeline(spark, input_path, output_path):
    # Extract
    raw_df = spark.read.parquet(input_path)

    # Transform
    cleaned_df = raw_df \
        .dropDuplicates(["id"]) \
        .filter(col("value").isNotNull()) \
        .withColumn("processed_date", current_date())

    # Enrich
    enriched_df = cleaned_df.join(broadcast(reference_df), "key")

    # Aggregate
    aggregated_df = enriched_df \
        .groupBy("category", "date") \
        .agg(
            count("*").alias("count"),
            sum("amount").alias("total_amount"),
            avg("value").alias("avg_value")
        )

    # Load
    aggregated_df.write \
        .partitionBy("date") \
        .mode("overwrite") \
        .parquet(output_path)
python
def etl_pipeline(spark, input_path, output_path):
    # 提取
    raw_df = spark.read.parquet(input_path)

    # 转换
    cleaned_df = raw_df \
        .dropDuplicates(["id"]) \
        .filter(col("value").isNotNull()) \
        .withColumn("processed_date", current_date())

    # 增强
    enriched_df = cleaned_df.join(broadcast(reference_df), "key")

    # 聚合
    aggregated_df = enriched_df \
        .groupBy("category", "date") \
        .agg(
            count("*").alias("count"),
            sum("amount").alias("total_amount"),
            avg("value").alias("avg_value")
        )

    # 加载
    aggregated_df.write \
        .partitionBy("date") \
        .mode("overwrite") \
        .parquet(output_path)

Incremental Processing Pattern

增量处理模式

python
def incremental_process(spark, input_path, output_path, checkpoint_path):
    # Read last processed timestamp
    last_timestamp = read_checkpoint(checkpoint_path)

    # Read new data
    new_data = spark.read.parquet(input_path) \
        .filter(col("timestamp") > last_timestamp)

    # Process
    processed = transform(new_data)

    # Write
    processed.write.mode("append").parquet(output_path)

    # Update checkpoint
    max_timestamp = new_data.agg(max("timestamp")).collect()[0][0]
    write_checkpoint(checkpoint_path, max_timestamp)
python
def incremental_process(spark, input_path, output_path, checkpoint_path):
    # 读取最后处理的时间戳
    last_timestamp = read_checkpoint(checkpoint_path)

    # 读取新数据
    new_data = spark.read.parquet(input_path) \
        .filter(col("timestamp") > last_timestamp)

    # 处理
    processed = transform(new_data)

    # 写入
    processed.write.mode("append").parquet(output_path)

    # 更新检查点
    max_timestamp = new_data.agg(max("timestamp")).collect()[0][0]
    write_checkpoint(checkpoint_path, max_timestamp)

Slowly Changing Dimension (SCD) Pattern

缓慢变化维度(SCD)模式

python
def scd_type2_upsert(spark, dimension_df, updates_df):
    # Mark existing records as inactive if updated
    inactive_records = dimension_df \
        .join(updates_df, "business_key") \
        .select(
            dimension_df["*"],
            lit(False).alias("is_active"),
            current_date().alias("end_date")
        )

    # Add new records
    new_records = updates_df \
        .withColumn("is_active", lit(True)) \
        .withColumn("start_date", current_date()) \
        .withColumn("end_date", lit(None))

    # Union unchanged, inactive, and new records
    result = dimension_df \
        .join(updates_df, "business_key", "left_anti") \
        .union(inactive_records) \
        .union(new_records)

    return result
python
def scd_type2_upsert(spark, dimension_df, updates_df):
    # 标记更新的现有记录为非活动
    inactive_records = dimension_df \
        .join(updates_df, "business_key") \
        .select(
            dimension_df["*"],
            lit(False).alias("is_active"),
            current_date().alias("end_date")
        )

    # 添加新记录
    new_records = updates_df \
        .withColumn("is_active", lit(True)) \
        .withColumn("start_date", current_date()) \
        .withColumn("end_date", lit(None))

    # 合并未更改、非活动与新记录
    result = dimension_df \
        .join(updates_df, "business_key", "left_anti") \
        .union(inactive_records) \
        .union(new_records)

    return result

Window Analytics Pattern

窗口分析模式

python
def calculate_running_metrics(df):
    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number, lag, sum, avg

    # Define window
    window_spec = Window.partitionBy("user_id").orderBy("timestamp")

    # Calculate metrics
    result = df \
        .withColumn("row_num", row_number().over(window_spec)) \
        .withColumn("prev_value", lag("value", 1).over(window_spec)) \
        .withColumn("running_total", sum("value").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))) \
        .withColumn("moving_avg", avg("value").over(window_spec.rowsBetween(-2, 0)))

    return result
python
def calculate_running_metrics(df):
    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number, lag, sum, avg

    # 定义窗口
    window_spec = Window.partitionBy("user_id").orderBy("timestamp")

    # 计算指标
    result = df \
        .withColumn("row_num", row_number().over(window_spec)) \
        .withColumn("prev_value", lag("value", 1).over(window_spec)) \
        .withColumn("running_total", sum("value").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))) \
        .withColumn("moving_avg", avg("value").over(window_spec.rowsBetween(-2, 0)))

    return result

Troubleshooting

故障排除

Out of Memory Errors

内存不足错误

Symptoms:
  • java.lang.OutOfMemoryError
  • Executor failures
  • Slow garbage collection
Solutions:
python
undefined
症状:
  • java.lang.OutOfMemoryError
  • Executor失败
  • 垃圾回收缓慢
解决方案:
python
undefined

Increase executor memory

增加Executor内存

spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memory", "8g")

Increase driver memory (if collecting data)

增加Driver内存(如果收集数据)

spark.conf.set("spark.driver.memory", "4g")
spark.conf.set("spark.driver.memory", "4g")

Reduce memory pressure

降低内存压力

df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk df.coalesce(100) # Reduce partition count spark.conf.set("spark.sql.shuffle.partitions", 400) # Increase shuffle partitions
df.persist(StorageLevel.MEMORY_AND_DISK) # 溢出到磁盘 df.coalesce(100) # 减少分区数 spark.conf.set("spark.sql.shuffle.partitions", 400) # 增加shuffle分区数

Avoid collect() on large datasets

避免对大型数据集使用collect()

Use take() or limit() instead

使用take()或limit()替代

df.take(100)
undefined
df.take(100)
undefined

Shuffle Performance Issues

Shuffle性能问题

Symptoms:
  • Long shuffle read/write times
  • Skewed partition sizes
  • Task stragglers
Solutions:
python
undefined
症状:
  • Shuffle读写时间长
  • 分区大小倾斜
  • 任务执行缓慢
解决方案:
python
undefined

Increase shuffle partitions

增加shuffle分区数

spark.conf.set("spark.sql.shuffle.partitions", 400)
spark.conf.set("spark.sql.shuffle.partitions", 400)

Handle skew with salting

使用加盐处理倾斜

df_salted = df.withColumn("salt", (rand() * 10).cast("int")) result = df_salted.groupBy("key", "salt").agg(...)
df_salted = df.withColumn("salt", (rand() * 10).cast("int")) result = df_salted.groupBy("key", "salt").agg(...)

Use broadcast for small tables

小表使用广播

large_df.join(broadcast(small_df), "key")
large_df.join(broadcast(small_df), "key")

Enable AQE for automatic optimization

启用AQE进行自动优化

spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
undefined
spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
undefined

Streaming Job Failures

流处理作业失败

Symptoms:
  • Streaming query stopped
  • Checkpoint corruption
  • Processing lag increasing
Solutions:
python
undefined
症状:
  • 流查询停止
  • 检查点损坏
  • 处理延迟增加
解决方案:
python
undefined

Increase executor memory for stateful operations

为有状态操作增加Executor内存

spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memory", "8g")

Tune watermark for late data

调优迟到数据水印

.withWatermark("timestamp", "15 minutes")
.withWatermark("timestamp", "15 minutes")

Increase trigger interval to reduce micro-batch overhead

增加触发间隔以减少微批开销

.trigger(processingTime="30 seconds")
.trigger(processingTime="30 seconds")

Monitor lag and adjust parallelism

监控延迟并调整并行度

spark.conf.set("spark.sql.shuffle.partitions", 200)
spark.conf.set("spark.sql.shuffle.partitions", 200)

Recover from checkpoint corruption

从检查点损坏中恢复

Delete checkpoint directory and restart (data loss possible)

删除检查点目录并重启(可能丢失数据)

Or implement custom state recovery logic

或实现自定义状态恢复逻辑

undefined
undefined

Data Skew

数据倾斜

Symptoms:
  • Few tasks take much longer than others
  • Unbalanced partition sizes
  • Executor OOM errors
Solutions:
python
undefined
症状:
  • 少数任务耗时远长于其他任务
  • 分区大小不平衡
  • Executor OOM错误
解决方案:
python
undefined

1. Salting technique (add random prefix to keys)

1. 加盐技术(为Key添加随机前缀)

from pyspark.sql.functions import concat, lit, rand
df_salted = df.withColumn("salted_key", concat(col("key"), lit("_"), (rand() * 10).cast("int"))) result = df_salted.groupBy("salted_key").agg(...)
from pyspark.sql.functions import concat, lit, rand
df_salted = df.withColumn("salted_key", concat(col("key"), lit("_"), (rand() * 10).cast("int"))) result = df_salted.groupBy("key", "salt").agg(...)

2. Repartition by skewed column

2. 按倾斜列重新分区

df.repartition(200, "skewed_column")
df.repartition(200, "skewed_column")

3. Isolate skewed keys

3. 隔离倾斜Key

skewed_keys = df.groupBy("key").count().filter(col("count") > threshold).select("key") skewed_df = df.join(broadcast(skewed_keys), "key") normal_df = df.join(broadcast(skewed_keys), "key", "left_anti")
skewed_keys = df.groupBy("key").count().filter(col("count") > threshold).select("key") skewed_df = df.join(broadcast(skewed_keys), "key") normal_df = df.join(broadcast(skewed_keys), "key", "left_anti")

Process separately

分别处理

skewed_result = process_with_salting(skewed_df) normal_result = process_normally(normal_df) final = skewed_result.union(normal_result)
undefined
skewed_result = process_with_salting(skewed_df) normal_result = process_normally(normal_df) final = skewed_result.union(normal_result)
undefined

Context7 Code Integration

Context7代码集成

This skill integrates real-world code examples from Apache Spark's official repository. All code snippets in the EXAMPLES.md file are sourced from Context7's Apache Spark library documentation, ensuring production-ready patterns and best practices.
本技能集成了Apache Spark官方仓库中的真实代码示例。EXAMPLES.md文件中的所有代码片段均来自Context7的Apache Spark库文档,确保是生产就绪的模式与最佳实践。

Version and Compatibility

版本与兼容性

  • Apache Spark Version: 3.x (compatible with 2.4+)
  • Python: 3.7+
  • Scala: 2.12+
  • Java: 8+
  • R: 3.5+
  • Apache Spark版本:3.x(兼容2.4+)
  • Python:3.7+
  • Scala:2.12+
  • Java:8+
  • R:3.5+

References

参考资料


Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: Big Data, Distributed Computing, Data Engineering, Machine Learning Context7 Integration: /apache/spark with 8000 tokens of documentation

技能版本:1.0.0 最后更新:2025年10月 技能分类:大数据、分布式计算、数据工程、机器学习 Context7集成:/apache/spark,包含8000个令牌的文档