apache-spark-data-processing
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseApache Spark Data Processing
Apache Spark数据处理
A comprehensive skill for mastering Apache Spark data processing, from basic RDD operations to advanced streaming, SQL, and machine learning workflows. Learn to build scalable, distributed data pipelines and analytics systems.
这是一份掌握Apache Spark数据处理的全面指南,涵盖从基础RDD操作到高级流处理、SQL及机器学习工作流的内容。你将学习如何构建可扩展的分布式数据管道与分析系统。
When to Use This Skill
何时使用该技能
Use Apache Spark when you need to:
- Process Large-Scale Data: Handle datasets too large for single-machine processing (TB to PB scale)
- Perform Distributed Computing: Execute parallel computations across cluster nodes
- Real-Time Stream Processing: Process continuous data streams with low latency
- Complex Data Analytics: Run sophisticated analytics, aggregations, and transformations
- Machine Learning at Scale: Train ML models on massive datasets
- ETL/ELT Pipelines: Build robust data transformation and loading workflows
- Interactive Data Analysis: Perform exploratory analysis on large datasets
- Unified Data Processing: Combine batch and streaming workloads in one framework
Not Ideal For:
- Small datasets (<100 GB) that fit in memory on a single machine
- Simple CRUD operations (use traditional databases)
- Ultra-low latency requirements (<10ms) where specialized stream processors excel
- Workflows requiring strong ACID transactions across distributed data
在以下场景中使用Apache Spark:
- 处理大规模数据:处理单机器无法承载的数据集(TB到PB级)
- 执行分布式计算:在集群节点间并行执行计算
- 实时流处理:以低延迟处理持续的数据流
- 复杂数据分析:运行复杂的分析、聚合与转换操作
- 规模化机器学习:在海量数据集上训练ML模型
- ETL/ELT管道:构建可靠的数据转换与加载工作流
- 交互式数据分析:对大型数据集进行探索性分析
- 统一数据处理:在单一框架中结合批处理与流处理工作负载
不适用场景:
- 单机器内存可容纳的小型数据集(<100 GB)
- 简单CRUD操作(使用传统数据库)
- 超低延迟需求(<10ms,专用流处理器更擅长)
- 需要跨分布式数据强ACID事务的工作流
Core Concepts
核心概念
Resilient Distributed Datasets (RDDs)
弹性分布式数据集(RDDs)
RDDs are Spark's fundamental data abstraction - immutable, distributed collections of objects that can be processed in parallel.
Key Characteristics:
- Resilient: Fault-tolerant through lineage tracking
- Distributed: Partitioned across cluster nodes
- Immutable: Transformations create new RDDs, not modify existing ones
- Lazy Evaluation: Transformations build computation graph; actions trigger execution
- In-Memory Computing: Cache intermediate results for iterative algorithms
RDD Operations:
- Transformations: Lazy operations that return new RDDs (map, filter, flatMap, reduceByKey)
- Actions: Operations that trigger computation and return values (collect, count, reduce, saveAsTextFile)
When to Use RDDs:
- Low-level control over data distribution and partitioning
- Custom partitioning schemes required
- Working with unstructured data (text files, binary data)
- Migrating legacy code from early Spark versions
Prefer DataFrames/Datasets when possible - they provide automatic optimization via Catalyst optimizer.
RDD是Spark的基础数据抽象——不可变的分布式对象集合,可并行处理。
关键特性:
- 弹性:通过 lineage 追踪实现容错
- 分布式:在集群节点间分区存储
- 不可变:转换操作生成新RDD,不修改现有RDD
- 惰性计算:转换操作构建计算图;行动操作触发执行
- 内存计算:缓存中间结果以优化迭代算法
RDD操作:
- 转换:惰性操作,返回新RDD(map、filter、flatMap、reduceByKey)
- 行动:触发计算并返回结果的操作(collect、count、reduce、saveAsTextFile)
何时使用RDD:
- 需要对数据分布与分区进行底层控制
- 需要自定义分区策略
- 处理非结构化数据(文本文件、二进制数据)
- 迁移早期Spark版本的遗留代码
尽可能优先使用DataFrame/Dataset——它们通过Catalyst优化器提供自动优化。
DataFrames and Datasets
DataFrame与Dataset
DataFrames are distributed collections of data organized into named columns - similar to a database table or pandas DataFrame, but with powerful optimizations.
DataFrames:
- Structured data with schema
- Automatic query optimization (Catalyst)
- Cross-language support (Python, Scala, Java, R)
- Rich API for SQL-like operations
Datasets (Scala/Java only):
- Typed DataFrames with compile-time type safety
- Best performance in Scala due to JVM optimization
- Combine RDD type safety with DataFrame optimizations
Key Advantages Over RDDs:
- Query Optimization: Catalyst optimizer rewrites queries for efficiency
- Tungsten Execution: Optimized CPU and memory usage
- Columnar Storage: Efficient data representation
- Code Generation: Compile-time bytecode generation for faster execution
DataFrame是按命名列组织的分布式数据集合——类似数据库表或pandas DataFrame,但具备强大的优化能力。
DataFrame:
- 带Schema的结构化数据
- 自动查询优化(Catalyst)
- 跨语言支持(Python、Scala、Java、R)
- 丰富的类SQL操作API
Dataset(仅Scala/Java):
- 带编译时类型安全的强类型DataFrame
- 在Scala中因JVM优化性能最佳
- 结合RDD的类型安全与DataFrame的优化能力
相比RDD的核心优势:
- 查询优化:Catalyst优化器重写查询以提升效率
- Tungsten执行引擎:优化CPU与内存使用
- 列存储:高效的数据表示方式
- 代码生成:编译时字节码生成以加快执行速度
Lazy Evaluation
惰性计算
Spark uses lazy evaluation to optimize execution:
- Transformations build a Directed Acyclic Graph (DAG) of operations
- Actions trigger execution of the DAG
- Spark's optimizer analyzes the entire DAG and creates an optimized execution plan
- Work is distributed across cluster nodes
Benefits:
- Minimize data movement across network
- Combine multiple operations into single stage
- Eliminate unnecessary computations
- Optimize memory usage
Spark使用惰性计算来优化执行:
- 转换操作构建操作的有向无环图(DAG)
- 行动操作触发DAG的执行
- Spark的优化器分析整个DAG并创建优化的执行计划
- 工作负载在集群节点间分布式执行
优势:
- 最小化跨网络的数据移动
- 将多个操作合并为单个阶段
- 消除不必要的计算
- 优化内存使用
Partitioning
分区
Data is divided into partitions for parallel processing:
- Default Partitioning: Typically based on HDFS block size or input source
- Hash Partitioning: Distribute data by key hash (used by groupByKey, reduceByKey)
- Range Partitioning: Distribute data by key ranges (useful for sorted data)
- Custom Partitioning: Define your own partitioning logic
Partition Count Considerations:
- Too few partitions: Underutilized cluster, large task execution time
- Too many partitions: Scheduling overhead, small task execution time
- General rule: 2-4 partitions per CPU core in cluster
- Use or
repartition()to adjust partition countcoalesce()
数据被划分为分区以实现并行处理:
- 默认分区:通常基于HDFS块大小或输入源
- 哈希分区:按Key的哈希值分布数据(groupByKey、reduceByKey使用)
- 范围分区:按Key的范围分布数据(适用于排序后的数据)
- 自定义分区:定义自己的分区逻辑
分区数考量:
- 分区数过少:集群资源未充分利用,任务执行时间长
- 分区数过多:调度开销大,单个任务执行时间短
- 通用规则:集群中每个CPU核心对应2-4个分区
- 使用或
repartition()调整分区数coalesce()
Caching and Persistence
缓存与持久化
Cache frequently accessed data in memory for performance:
python
undefined将频繁访问的数据缓存到内存以提升性能:
python
undefinedCache DataFrame in memory
将DataFrame缓存到内存
df.cache() # Shorthand for persist(StorageLevel.MEMORY_AND_DISK)
df.cache() # persist(StorageLevel.MEMORY_AND_DISK)的简写
Different storage levels
不同的存储级别
df.persist(StorageLevel.MEMORY_ONLY) # Fast but may lose data if evicted
df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk if memory full
df.persist(StorageLevel.DISK_ONLY) # Store only on disk
df.persist(StorageLevel.MEMORY_ONLY_SER) # Serialized in memory (more compact)
df.persist(StorageLevel.MEMORY_ONLY) # 速度快但数据被驱逐时可能丢失
df.persist(StorageLevel.MEMORY_AND_DISK) # 内存不足时溢出到磁盘
df.persist(StorageLevel.DISK_ONLY) # 仅存储在磁盘
df.persist(StorageLevel.MEMORY_ONLY_SER) # 在内存中序列化(更紧凑)
Unpersist when done
使用完成后取消持久化
df.unpersist()
**When to Cache:**
- Data used multiple times in workflow
- Iterative algorithms (ML training)
- Interactive analysis sessions
- Expensive transformations reused downstream
**When Not to Cache:**
- Data used only once
- Very large datasets that exceed cluster memory
- Streaming applications with continuous new datadf.unpersist()
**何时缓存:**
- 工作流中多次使用的数据
- 迭代算法(ML训练)
- 交互式分析会话
- 下游重复使用的开销大的转换操作
**何时不缓存:**
- 仅使用一次的数据
- 超出集群内存的超大型数据集
- 持续产生新数据的流处理应用Spark SQL
Spark SQL
Spark SQL allows you to query structured data using SQL or DataFrame API:
- Execute SQL queries on DataFrames and tables
- Register DataFrames as temporary views
- Join structured and semi-structured data
- Connect to Hive metastore for table metadata
- Support for various data sources (Parquet, ORC, JSON, CSV, JDBC)
Performance Features:
- Catalyst Optimizer: Rule-based and cost-based query optimization
- Tungsten Execution Engine: Whole-stage code generation, vectorized processing
- Adaptive Query Execution (AQE): Runtime optimization based on statistics
- Dynamic Partition Pruning: Skip irrelevant partitions during execution
Spark SQL允许你使用SQL或DataFrame API查询结构化数据:
- 在DataFrame与表上执行SQL查询
- 将DataFrame注册为临时视图
- 连接结构化与半结构化数据
- 连接Hive元数据存储以获取表元数据
- 支持多种数据源(Parquet、ORC、JSON、CSV、JDBC)
性能特性:
- Catalyst优化器:基于规则与成本的查询优化
- Tungsten执行引擎:全阶段代码生成、向量化处理
- 自适应查询执行(AQE):基于统计信息的运行时优化
- 动态分区修剪:执行时跳过无关分区
Broadcast Variables and Accumulators
广播变量与累加器
Shared variables for efficient distributed computing:
Broadcast Variables:
- Read-only variables cached on each node
- Efficient for sharing large read-only data (lookup tables, ML models)
- Avoid sending large data with every task
python
undefined用于高效分布式计算的共享变量:
广播变量:
- 每个节点上缓存的只读变量
- 高效共享大型只读数据(查找表、ML模型)
- 避免随每个任务发送大型数据
python
undefinedBroadcast a lookup table
广播一个查找表
lookup_table = {"key1": "value1", "key2": "value2"}
broadcast_lookup = sc.broadcast(lookup_table)
lookup_table = {"key1": "value1", "key2": "value2"}
broadcast_lookup = sc.broadcast(lookup_table)
Use in transformations
在转换操作中使用
rdd.map(lambda x: broadcast_lookup.value.get(x, "default"))
**Accumulators:**
- Write-only variables for aggregating values across tasks
- Used for counters and sums in distributed operations
- Only driver can read final accumulated value
```pythonrdd.map(lambda x: broadcast_lookup.value.get(x, "default"))
**累加器:**
- 用于跨任务聚合值的只写变量
- 用于分布式操作中的计数器与求和
- 仅Driver可以读取最终的累加值
```pythonCreate accumulator
创建累加器
error_count = sc.accumulator(0)
error_count = sc.accumulator(0)
Increment in tasks
在任务中递增
rdd.foreach(lambda x: error_count.add(1) if is_error(x) else None)
rdd.foreach(lambda x: error_count.add(1) if is_error(x) else None)
Read final value in driver
在Driver中读取最终值
print(f"Total errors: {error_count.value}")
undefinedprint(f"Total errors: {error_count.value}")
undefinedSpark SQL Deep Dive
Spark SQL深入解析
DataFrame Creation
DataFrame创建
Create DataFrames from various sources:
python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()从多种数据源创建DataFrame:
python
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()From structured data
从结构化数据创建
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
columns = ["name", "id"]
df = spark.createDataFrame(data, columns)
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
columns = ["name", "id"]
df = spark.createDataFrame(data, columns)
From files
从文件创建
df_json = spark.read.json("path/to/file.json")
df_parquet = spark.read.parquet("path/to/file.parquet")
df_csv = spark.read.option("header", "true").csv("path/to/file.csv")
df_json = spark.read.json("path/to/file.json")
df_parquet = spark.read.parquet("path/to/file.parquet")
df_csv = spark.read.option("header", "true").csv("path/to/file.csv")
From JDBC sources
从JDBC数据源创建
df_jdbc = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://host:port/database")
.option("dbtable", "table_name")
.option("user", "username")
.option("password", "password")
.load()
.format("jdbc")
.option("url", "jdbc:postgresql://host:port/database")
.option("dbtable", "table_name")
.option("user", "username")
.option("password", "password")
.load()
undefineddf_jdbc = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://host:port/database")
.option("dbtable", "table_name")
.option("user", "username")
.option("password", "password")
.load()
.format("jdbc")
.option("url", "jdbc:postgresql://host:port/database")
.option("dbtable", "table_name")
.option("user", "username")
.option("password", "password")
.load()
undefinedDataFrame Operations
DataFrame操作
Common DataFrame transformations:
python
undefined常见的DataFrame转换操作:
python
undefinedSelect columns
选择列
df.select("name", "age").show()
df.select("name", "age").show()
Filter rows
过滤行
df.filter(df.age > 21).show()
df.where(df["age"] > 21).show() # Alternative syntax
df.filter(df.age > 21).show()
df.where(df["age"] > 21).show() # 替代语法
Add/modify columns
添加/修改列
from pyspark.sql.functions import col, lit
df.withColumn("age_plus_10", col("age") + 10).show()
df.withColumn("country", lit("USA")).show()
from pyspark.sql.functions import col, lit
df.withColumn("age_plus_10", col("age") + 10).show()
df.withColumn("country", lit("USA")).show()
Aggregations
聚合操作
df.groupBy("department").count().show()
df.groupBy("department").agg({"salary": "avg", "age": "max"}).show()
df.groupBy("department").count().show()
df.groupBy("department").agg({"salary": "avg", "age": "max"}).show()
Sorting
排序
df.orderBy("age").show()
df.orderBy(col("age").desc()).show()
df.orderBy("age").show()
df.orderBy(col("age").desc()).show()
Joins
连接
df1.join(df2, df1.id == df2.user_id, "inner").show()
df1.join(df2, "id", "left_outer").show()
df1.join(df2, df1.id == df2.user_id, "inner").show()
df1.join(df2, "id", "left_outer").show()
Unions
合并
df1.union(df2).show()
undefineddf1.union(df2).show()
undefinedSQL Queries
SQL查询
Execute SQL on DataFrames:
python
undefined在DataFrame上执行SQL查询:
python
undefinedRegister DataFrame as temporary view
将DataFrame注册为临时视图
df.createOrReplaceTempView("people")
df.createOrReplaceTempView("people")
Run SQL queries
运行SQL查询
sql_result = spark.sql("SELECT name FROM people WHERE age > 21")
sql_result.show()
sql_result = spark.sql("SELECT name FROM people WHERE age > 21")
sql_result.show()
Complex queries
复杂查询
result = spark.sql("""
SELECT
department,
COUNT() as employee_count,
AVG(salary) as avg_salary,
MAX(age) as max_age
FROM people
WHERE age > 25
GROUP BY department
HAVING COUNT() > 5
ORDER BY avg_salary DESC
""")
result.show()
undefinedresult = spark.sql("""
SELECT
department,
COUNT() as employee_count,
AVG(salary) as avg_salary,
MAX(age) as max_age
FROM people
WHERE age > 25
GROUP BY department
HAVING COUNT() > 5
ORDER BY avg_salary DESC
""")
result.show()
undefinedData Sources
数据源
Spark SQL supports multiple data formats:
Parquet (Recommended for Analytics):
- Columnar storage format
- Excellent compression and query performance
- Schema embedded in file
- Supports predicate pushdown and column pruning
python
undefinedSpark SQL支持多种数据格式:
Parquet(分析场景推荐):
- 列存储格式
- 出色的压缩与查询性能
- Schema嵌入文件中
- 支持谓词下推与列修剪
python
undefinedWrite
写入
df.write.parquet("output/path", mode="overwrite", compression="snappy")
df.write.parquet("output/path", mode="overwrite", compression="snappy")
Read with partition pruning
读取并进行分区修剪
df = spark.read.parquet("output/path").filter(col("date") == "2025-01-01")
**ORC** (Optimized Row Columnar):
- Similar to Parquet with slightly better compression
- Preferred for Hive integration
- Built-in indexes for faster queries
```python
df.write.orc("output/path", mode="overwrite")
df = spark.read.orc("output/path")JSON (Semi-Structured Data):
- Human-readable but less efficient
- Schema inference on read
- Good for nested/complex data
python
undefineddf = spark.read.parquet("output/path").filter(col("date") == "2025-01-01")
**ORC(优化行列存储):**
- 与Parquet类似,压缩率略高
- 与Hive集成时优先选择
- 内置索引以加快查询速度
```python
df.write.orc("output/path", mode="overwrite")
df = spark.read.orc("output/path")JSON(半结构化数据):
- 人类可读但效率较低
- 读取时自动推断Schema
- 适合嵌套/复杂数据
python
undefinedRead with explicit schema
使用显式Schema读取
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.schema(schema).json("data.json")
**CSV** (Legacy/Simple Data):
- Widely compatible but slow
- Requires header inference or explicit schema
- Minimal compression benefits
```python
df.write.csv("output.csv", header=True, mode="overwrite")
df = spark.read.option("header", "true").option("inferSchema", "true").csv("data.csv")from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.schema(schema).json("data.json")
**CSV(遗留/简单数据):**
- 兼容性广但速度慢
- 需要推断表头或指定显式Schema
- 压缩收益极小
```python
df.write.csv("output.csv", header=True, mode="overwrite")
df = spark.read.option("header", "true").option("inferSchema", "true").csv("data.csv")Window Functions
窗口函数
Advanced analytics with window functions:
python
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, sum, avg使用窗口函数进行高级分析:
python
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, sum, avgDefine window specification
定义窗口规范
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
Ranking functions
排名函数
df.withColumn("rank", rank().over(window_spec)).show()
df.withColumn("row_num", row_number().over(window_spec)).show()
df.withColumn("dense_rank", dense_rank().over(window_spec)).show()
df.withColumn("rank", rank().over(window_spec)).show()
df.withColumn("row_num", row_number().over(window_spec)).show()
df.withColumn("dense_rank", dense_rank().over(window_spec)).show()
Aggregate functions over window
窗口上的聚合函数
df.withColumn("dept_avg_salary", avg("salary").over(window_spec)).show()
df.withColumn("running_total", sum("salary").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show()
df.withColumn("dept_avg_salary", avg("salary").over(window_spec)).show()
df.withColumn("running_total", sum("salary").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))).show()
Offset functions
偏移函数
df.withColumn("prev_salary", lag("salary", 1).over(window_spec)).show()
df.withColumn("next_salary", lead("salary", 1).over(window_spec)).show()
undefineddf.withColumn("prev_salary", lag("salary", 1).over(window_spec)).show()
df.withColumn("next_salary", lead("salary", 1).over(window_spec)).show()
undefinedUser-Defined Functions (UDFs)
用户自定义函数(UDF)
Create custom transformations:
python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType创建自定义转换操作:
python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerTypePython UDF (slower due to serialization overhead)
Python UDF(因序列化开销速度较慢)
def categorize_age(age):
if age < 18:
return "Minor"
elif age < 65:
return "Adult"
else:
return "Senior"
categorize_udf = udf(categorize_age, StringType())
df.withColumn("age_category", categorize_udf(col("age"))).show()
def categorize_age(age):
if age < 18:
return "Minor"
elif age < 65:
return "Adult"
else:
return "Senior"
categorize_udf = udf(categorize_age, StringType())
df.withColumn("age_category", categorize_udf(col("age"))).show()
Pandas UDF (vectorized, faster for large datasets)
Pandas UDF(向量化,大型数据集速度更快)
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(IntegerType())
def square(series: pd.Series) -> pd.Series:
return series ** 2
df.withColumn("age_squared", square(col("age"))).show()
**UDF Performance Tips:**
- Use built-in Spark functions when possible (always faster)
- Prefer Pandas UDFs over Python UDFs for better performance
- Use Scala UDFs for maximum performance (no serialization overhead)
- Cache DataFrames before applying UDFs if used multiple timesfrom pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(IntegerType())
def square(series: pd.Series) -> pd.Series:
return series ** 2
df.withColumn("age_squared", square(col("age"))).show()
**UDF性能提示:**
- 尽可能使用Spark内置函数(速度始终更快)
- 优先使用Pandas UDF而非Python UDF以获得更好性能
- 使用Scala UDF以获得最佳性能(无序列化开销)
- 若多次使用UDF,在应用前缓存DataFrameTransformations and Actions
转换与行动操作
Common Transformations
常见转换操作
map: Apply function to each element
python
undefinedmap: 对每个元素应用函数
python
undefinedRDD
RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
squared = rdd.map(lambda x: x * 2) # [2, 4, 6, 8, 10]
rdd = sc.parallelize([1, 2, 3, 4, 5])
squared = rdd.map(lambda x: x * 2) # [2, 4, 6, 8, 10]
DataFrame (use select with functions)
DataFrame(使用select结合函数)
from pyspark.sql.functions import col
df.select(col("value") * 2).show()
**filter**: Select elements matching predicate
```pythonfrom pyspark.sql.functions import col
df.select(col("value") * 2).show()
**filter:** 选择符合条件的元素
```pythonRDD
RDD
rdd.filter(lambda x: x > 2).collect() # [3, 4, 5]
rdd.filter(lambda x: x > 2).collect() # [3, 4, 5]
DataFrame
DataFrame
df.filter(col("age") > 25).show()
**flatMap**: Map and flatten results
```pythondf.filter(col("age") > 25).show()
**flatMap:** 映射并展平结果
```pythonRDD - Split text into words
RDD - 将文本拆分为单词
lines = sc.parallelize(["hello world", "apache spark"])
words = lines.flatMap(lambda line: line.split(" ")) # ["hello", "world", "apache", "spark"]
**reduceByKey**: Aggregate values by key
```pythonlines = sc.parallelize(["hello world", "apache spark"])
words = lines.flatMap(lambda line: line.split(" ")) # ["hello", "world", "apache", "spark"]
**reduceByKey:** 按Key聚合值
```pythonWord count example
词频统计示例
words = sc.parallelize(["apple", "banana", "apple", "cherry", "banana", "apple"])
word_pairs = words.map(lambda word: (word, 1))
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
words = sc.parallelize(["apple", "banana", "apple", "cherry", "banana", "apple"])
word_pairs = words.map(lambda word: (word, 1))
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)
Result: [("apple", 3), ("banana", 2), ("cherry", 1)]
结果: [("apple", 3), ("banana", 2), ("cherry", 1)]
**groupByKey**: Group values by key (avoid when possible - use reduceByKey instead)
```python
**groupByKey:** 按Key分组值(尽可能避免 - 优先使用reduceByKey)
```pythonLess efficient than reduceByKey
效率低于reduceByKey
word_pairs.groupByKey().mapValues(list).collect()
word_pairs.groupByKey().mapValues(list).collect()
Result: [("apple", [1, 1, 1]), ("banana", [1, 1]), ("cherry", [1])]
结果: [("apple", [1, 1, 1]), ("banana", [1, 1]), ("cherry", [1])]
**join**: Combine datasets by key
```python
**join:** 按Key合并数据集
```pythonRDD join
RDD连接
users = sc.parallelize([("user1", "Alice"), ("user2", "Bob")])
orders = sc.parallelize([("user1", 100), ("user2", 200), ("user1", 150)])
users.join(orders).collect()
users = sc.parallelize([("user1", "Alice"), ("user2", "Bob")])
orders = sc.parallelize([("user1", 100), ("user2", 200), ("user1", 150)])
users.join(orders).collect()
Result: [("user1", ("Alice", 100)), ("user1", ("Alice", 150)), ("user2", ("Bob", 200))]
结果: [("user1", ("Alice", 100)), ("user1", ("Alice", 150)), ("user2", ("Bob", 200))]
DataFrame join (more efficient)
DataFrame连接(更高效)
df_users.join(df_orders, "user_id", "inner").show()
**distinct**: Remove duplicates
```pythondf_users.join(df_orders, "user_id", "inner").show()
**distinct:** 移除重复项
```pythonRDD
RDD
rdd.distinct().collect()
rdd.distinct().collect()
DataFrame
DataFrame
df.distinct().show()
df.dropDuplicates(["user_id"]).show() # Drop based on specific columns
**coalesce/repartition**: Change partition count
```pythondf.distinct().show()
df.dropDuplicates(["user_id"]).show() # 基于指定列去重
**coalesce/repartition:** 更改分区数
```pythonReduce partitions (no shuffle, more efficient)
减少分区(无shuffle,更高效)
df.coalesce(1).write.parquet("output")
df.coalesce(1).write.parquet("output")
Increase/decrease partitions (involves shuffle)
增加/减少分区(涉及shuffle)
df.repartition(10).write.parquet("output")
df.repartition(10, "user_id").write.parquet("output") # Partition by column
undefineddf.repartition(10).write.parquet("output")
df.repartition(10, "user_id").write.parquet("output") # 按列分区
undefinedCommon Actions
常见行动操作
collect: Retrieve all data to driver
python
results = rdd.collect() # Returns listcollect: 将所有数据取回Driver
python
results = rdd.collect() # 返回列表WARNING: Only use on small datasets that fit in driver memory
警告:仅用于能容纳在Driver内存中的小型数据集
**count**: Count elements
```python
total = df.count() # Number of rowsfirst/take: Get first N elements
python
first_elem = rdd.first()
first_five = rdd.take(5)reduce: Aggregate all elements
python
total_sum = rdd.reduce(lambda a, b: a + b)foreach: Execute function on each element
python
undefined
**count:** 统计元素数量
```python
total = df.count() # 行数first/take: 获取前N个元素
python
first_elem = rdd.first()
first_five = rdd.take(5)reduce: 聚合所有元素
python
total_sum = rdd.reduce(lambda a, b: a + b)foreach: 对每个元素执行函数
python
undefinedSide effects only (no return value)
仅产生副作用(无返回值)
rdd.foreach(lambda x: print(x))
**saveAsTextFile**: Write to file system
```python
rdd.saveAsTextFile("hdfs://path/to/output")show: Display DataFrame rows (action)
python
df.show(20, truncate=False) # Show 20 rows, don't truncate columnsrdd.foreach(lambda x: print(x))
**saveAsTextFile:** 写入文件系统
```python
rdd.saveAsTextFile("hdfs://path/to/output")show: 显示DataFrame行(行动操作)
python
df.show(20, truncate=False) # 显示20行,不截断列Structured Streaming
结构化流处理
Process continuous data streams using DataFrame API.
使用DataFrame API处理连续数据流。
Core Concepts
核心概念
Streaming DataFrame:
- Unbounded table that grows continuously
- Same operations as batch DataFrames
- Micro-batch processing (default) or continuous processing
Input Sources:
- File sources (JSON, Parquet, CSV, ORC, text)
- Kafka
- Socket (for testing)
- Rate source (for testing)
- Custom sources
Output Modes:
- Append: Only new rows added to result table
- Complete: Entire result table written every trigger
- Update: Only updated rows written
Output Sinks:
- File sinks (Parquet, ORC, JSON, CSV, text)
- Kafka
- Console (for debugging)
- Memory (for testing)
- Foreach/ForeachBatch (custom logic)
流DataFrame:
- 持续增长的无界表
- 与批处理DataFrame使用相同操作
- 微批处理(默认)或连续处理
输入源:
- 文件源(JSON、Parquet、CSV、ORC、文本)
- Kafka
- Socket(用于测试)
- Rate源(用于测试)
- 自定义源
输出模式:
- Append:仅将新增行写入结果表
- Complete:每次触发时写入整个结果表
- Update:仅写入更新的行
输出接收器:
- 文件接收器(Parquet、ORC、JSON、CSV、文本)
- Kafka
- Console(用于调试)
- Memory(用于测试)
- Foreach/ForeachBatch(自定义逻辑)
Basic Streaming Example
基础流处理示例
python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("StreamingExample").getOrCreate()python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("StreamingExample").getOrCreate()Read stream from JSON files
从JSON文件读取流
input_stream = spark.readStream
.format("json")
.schema(schema)
.option("maxFilesPerTrigger", 1)
.load("input/directory")
.format("json")
.schema(schema)
.option("maxFilesPerTrigger", 1)
.load("input/directory")
input_stream = spark.readStream
.format("json")
.schema(schema)
.option("maxFilesPerTrigger", 1)
.load("input/directory")
.format("json")
.schema(schema)
.option("maxFilesPerTrigger", 1)
.load("input/directory")
Transform streaming data
转换流数据
processed = input_stream
.filter(col("value") > 10)
.select("id", "value", "timestamp")
.filter(col("value") > 10)
.select("id", "value", "timestamp")
processed = input_stream
.filter(col("value") > 10)
.select("id", "value", "timestamp")
.filter(col("value") > 10)
.select("id", "value", "timestamp")
Write stream to Parquet
将流写入Parquet
query = processed.writeStream
.format("parquet")
.option("path", "output/directory")
.option("checkpointLocation", "checkpoint/directory")
.outputMode("append")
.start()
.format("parquet")
.option("path", "output/directory")
.option("checkpointLocation", "checkpoint/directory")
.outputMode("append")
.start()
query = processed.writeStream
.format("parquet")
.option("path", "output/directory")
.option("checkpointLocation", "checkpoint/directory")
.outputMode("append")
.start()
.format("parquet")
.option("path", "output/directory")
.option("checkpointLocation", "checkpoint/directory")
.outputMode("append")
.start()
Wait for termination
等待终止
query.awaitTermination()
undefinedquery.awaitTermination()
undefinedStream-Static Joins
流-静态连接
Join streaming data with static reference data:
python
undefined将流数据与静态参考数据连接:
python
undefinedStatic DataFrame (loaded once)
静态DataFrame(仅加载一次)
static_df = spark.read.parquet("reference/data")
static_df = spark.read.parquet("reference/data")
Streaming DataFrame
流DataFrame
streaming_df = spark.readStream.format("kafka").load()
streaming_df = spark.readStream.format("kafka").load()
Inner join (supported)
内连接(支持)
joined = streaming_df.join(static_df, "type")
joined = streaming_df.join(static_df, "type")
Left outer join (supported)
左外连接(支持)
joined = streaming_df.join(static_df, "type", "left_outer")
joined = streaming_df.join(static_df, "type", "left_outer")
Write result
写入结果
joined.writeStream
.format("parquet")
.option("path", "output")
.option("checkpointLocation", "checkpoint")
.start()
.format("parquet")
.option("path", "output")
.option("checkpointLocation", "checkpoint")
.start()
undefinedjoined.writeStream
.format("parquet")
.option("path", "output")
.option("checkpointLocation", "checkpoint")
.start()
.format("parquet")
.option("path", "output")
.option("checkpointLocation", "checkpoint")
.start()
undefinedWindowed Aggregations
窗口聚合
Aggregate data over time windows:
python
from pyspark.sql.functions import window, col, count在时间窗口上聚合数据:
python
from pyspark.sql.functions import window, col, count10-minute tumbling window
10分钟滚动窗口
windowed_counts = streaming_df
.groupBy( window(col("timestamp"), "10 minutes"), col("word") )
.count()
.groupBy( window(col("timestamp"), "10 minutes"), col("word") )
.count()
windowed_counts = streaming_df
.groupBy( window(col("timestamp"), "10 minutes"), col("word") )
.count()
.groupBy( window(col("timestamp"), "10 minutes"), col("word") )
.count()
10-minute sliding window with 5-minute slide
10分钟滑动窗口,滑动间隔5分钟
windowed_counts = streaming_df
.groupBy( window(col("timestamp"), "10 minutes", "5 minutes"), col("word") )
.count()
.groupBy( window(col("timestamp"), "10 minutes", "5 minutes"), col("word") )
.count()
windowed_counts = streaming_df
.groupBy( window(col("timestamp"), "10 minutes", "5 minutes"), col("word") )
.count()
.groupBy( window(col("timestamp"), "10 minutes", "5 minutes"), col("word") )
.count()
Write to console for debugging
写入控制台进行调试
query = windowed_counts.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start()
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start()
undefinedquery = windowed_counts.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start()
.outputMode("complete")
.format("console")
.option("truncate", "false")
.start()
undefinedWatermarking for Late Data
水印处理延迟数据
Handle late-arriving data with watermarks:
python
from pyspark.sql.functions import window使用水印处理迟到的数据:
python
from pyspark.sql.functions import windowDefine watermark (10 minutes tolerance for late data)
定义水印(10分钟延迟容忍)
windowed_counts = streaming_df
.withWatermark("timestamp", "10 minutes")
.groupBy( window(col("timestamp"), "10 minutes"), col("word") )
.count()
.withWatermark("timestamp", "10 minutes")
.groupBy( window(col("timestamp"), "10 minutes"), col("word") )
.count()
windowed_counts = streaming_df
.withWatermark("timestamp", "10 minutes")
.groupBy( window(col("timestamp"), "10 minutes"), col("word") )
.count()
.withWatermark("timestamp", "10 minutes")
.groupBy( window(col("timestamp"), "10 minutes"), col("word") )
.count()
Data arriving more than 10 minutes late will be dropped
迟到超过10分钟的数据将被丢弃
**Watermark Benefits:**
- Limit state size by dropping old aggregation state
- Handle late data within tolerance window
- Improve performance by not maintaining infinite state
**水印优势:**
- 通过丢弃旧聚合状态限制状态大小
- 在容忍窗口内处理迟到数据
- 不维护无限状态以提升性能Session Windows
会话窗口
Group events into sessions based on inactivity gaps:
python
from pyspark.sql.functions import session_window, when基于不活动间隔将事件分组为会话:
python
from pyspark.sql.functions import session_window, whenDynamic session window based on user
基于用户的动态会话窗口
session_window_spec = session_window(
col("timestamp"),
when(col("userId") == "user1", "5 seconds")
.when(col("userId") == "user2", "20 seconds")
.otherwise("5 minutes")
)
sessionized_counts = streaming_df
.withWatermark("timestamp", "10 minutes")
.groupBy(session_window_spec, col("userId"))
.count()
.withWatermark("timestamp", "10 minutes")
.groupBy(session_window_spec, col("userId"))
.count()
undefinedsession_window_spec = session_window(
col("timestamp"),
when(col("userId") == "user1", "5 seconds")
.when(col("userId") == "user2", "20 seconds")
.otherwise("5 minutes")
)
sessionized_counts = streaming_df
.withWatermark("timestamp", "10 minutes")
.groupBy(session_window_spec, col("userId"))
.count()
.withWatermark("timestamp", "10 minutes")
.groupBy(session_window_spec, col("userId"))
.count()
undefinedStateful Stream Processing
有状态流处理
Maintain state across micro-batches:
python
from pyspark.sql.functions import expr在微批之间维护状态:
python
from pyspark.sql.functions import exprDeduplication using state
使用状态去重
deduplicated = streaming_df
.withWatermark("timestamp", "1 hour")
.dropDuplicates(["user_id", "event_id"])
.withWatermark("timestamp", "1 hour")
.dropDuplicates(["user_id", "event_id"])
deduplicated = streaming_df
.withWatermark("timestamp", "1 hour")
.dropDuplicates(["user_id", "event_id"])
.withWatermark("timestamp", "1 hour")
.dropDuplicates(["user_id", "event_id"])
Stream-stream joins (stateful)
流-流连接(有状态)
stream1 = spark.readStream.format("kafka").option("subscribe", "topic1").load()
stream2 = spark.readStream.format("kafka").option("subscribe", "topic2").load()
joined = stream1
.withWatermark("timestamp", "10 minutes")
.join( stream2.withWatermark("timestamp", "20 minutes"), expr("stream1.user_id = stream2.user_id AND stream1.timestamp >= stream2.timestamp AND stream1.timestamp <= stream2.timestamp + interval 15 minutes"), "inner" )
.withWatermark("timestamp", "10 minutes")
.join( stream2.withWatermark("timestamp", "20 minutes"), expr("stream1.user_id = stream2.user_id AND stream1.timestamp >= stream2.timestamp AND stream1.timestamp <= stream2.timestamp + interval 15 minutes"), "inner" )
undefinedstream1 = spark.readStream.format("kafka").option("subscribe", "topic1").load()
stream2 = spark.readStream.format("kafka").option("subscribe", "topic2").load()
joined = stream1
.withWatermark("timestamp", "10 minutes")
.join( stream2.withWatermark("timestamp", "20 minutes"), expr("stream1.user_id = stream2.user_id AND stream1.timestamp >= stream2.timestamp AND stream1.timestamp <= stream2.timestamp + interval 15 minutes"), "inner" )
.withWatermark("timestamp", "10 minutes")
.join( stream2.withWatermark("timestamp", "20 minutes"), expr("stream1.user_id = stream2.user_id AND stream1.timestamp >= stream2.timestamp AND stream1.timestamp <= stream2.timestamp + interval 15 minutes"), "inner" )
undefinedCheckpointing
检查点
Ensure fault tolerance with checkpoints:
python
undefined使用检查点确保容错:
python
undefinedCheckpoint location stores:
检查点位置存储:
- Stream metadata (offsets, configuration)
- 流元数据(偏移量、配置)
- State information (for stateful operations)
- 状态信息(用于有状态操作)
- Write-ahead logs
- 预写日志
query = streaming_df.writeStream
.format("parquet")
.option("path", "output")
.option("checkpointLocation", "checkpoint/dir") # REQUIRED for production
.start()
.format("parquet")
.option("path", "output")
.option("checkpointLocation", "checkpoint/dir") # REQUIRED for production
.start()
query = streaming_df.writeStream
.format("parquet")
.option("path", "output")
.option("checkpointLocation", "checkpoint/dir") # 生产环境必填
.start()
.format("parquet")
.option("path", "output")
.option("checkpointLocation", "checkpoint/dir") # 生产环境必填
.start()
Recovery: Restart query with same checkpoint location
恢复:使用相同的检查点位置重启查询
Spark will resume from last committed offset
Spark将从最后提交的偏移量恢复
**Checkpoint Best Practices:**
- Always set checkpointLocation for production streams
- Use reliable distributed storage (HDFS, S3) for checkpoints
- Don't delete checkpoint directory while stream is running
- Back up checkpoints for disaster recovery
**检查点最佳实践:**
- 生产流处理始终设置checkpointLocation
- 使用可靠的分布式存储(HDFS、S3)存储检查点
- 流运行时不要删除检查点目录
- 备份检查点以进行灾难恢复Machine Learning with MLlib
使用MLlib进行机器学习
Spark's scalable machine learning library.
Spark的可扩展机器学习库。
Core Components
核心组件
MLlib Features:
- ML Pipelines: Chain transformations and models
- Featurization: Vector assemblers, scalers, encoders
- Classification & Regression: Linear models, tree-based models, neural networks
- Clustering: K-means, Gaussian Mixture, LDA
- Collaborative Filtering: ALS (Alternating Least Squares)
- Dimensionality Reduction: PCA, SVD
- Model Selection: Cross-validation, train-test split, parameter tuning
MLlib特性:
- ML管道:将转换与模型链式连接
- 特征工程:向量组装器、缩放器、编码器
- 分类与回归:线性模型、树模型、神经网络
- 聚类:K-means、高斯混合模型、LDA
- 协同过滤:ALS(交替最小二乘法)
- 降维:PCA、SVD
- 模型选择:交叉验证、训练测试拆分、参数调优
ML Pipelines
ML管道
Chain transformations and estimators:
python
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression将转换与估算器链式连接:
python
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegressionLoad data
加载数据
df = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
df = spark.read.format("libsvm").load("data/sample_libsvm_data.txt")
Define pipeline stages
定义管道阶段
assembler = VectorAssembler(
inputCols=["feature1", "feature2", "feature3"],
outputCol="features"
)
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features",
withStd=True,
withMean=True
)
lr = LogisticRegression(
featuresCol="scaled_features",
labelCol="label",
maxIter=10,
regParam=0.01
)
assembler = VectorAssembler(
inputCols=["feature1", "feature2", "feature3"],
outputCol="features"
)
scaler = StandardScaler(
inputCol="features",
outputCol="scaled_features",
withStd=True,
withMean=True
)
lr = LogisticRegression(
featuresCol="scaled_features",
labelCol="label",
maxIter=10,
regParam=0.01
)
Create pipeline
创建管道
pipeline = Pipeline(stages=[assembler, scaler, lr])
pipeline = Pipeline(stages=[assembler, scaler, lr])
Split data
拆分数据
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
Train model
训练模型
model = pipeline.fit(train_df)
model = pipeline.fit(train_df)
Make predictions
生成预测
predictions = model.transform(test_df)
predictions.select("label", "prediction", "probability").show()
undefinedpredictions = model.transform(test_df)
predictions.select("label", "prediction", "probability").show()
undefinedFeature Engineering
特征工程
Transform raw data into features:
python
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler将原始数据转换为特征:
python
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScalerCategorical encoding
分类编码
indexer = StringIndexer(inputCol="category", outputCol="category_index")
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec")
indexer = StringIndexer(inputCol="category", outputCol="category_index")
encoder = OneHotEncoder(inputCol="category_index", outputCol="category_vec")
Numerical scaling
数值缩放
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
Assemble features
组装特征
assembler = VectorAssembler(
inputCols=["category_vec", "numeric_feature1", "numeric_feature2"],
outputCol="features"
)
assembler = VectorAssembler(
inputCols=["category_vec", "numeric_feature1", "numeric_feature2"],
outputCol="features"
)
Text processing
文本处理
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")
undefinedfrom pyspark.ml.feature import Tokenizer, HashingTF, IDF
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=10000)
idf = IDF(inputCol="raw_features", outputCol="features")
undefinedStreaming Linear Regression
流线性回归
Train models on streaming data:
python
from pyspark.mllib.regression import LabeledPoint
from pyspark.streaming import StreamingContext
from pyspark.streaming.ml import StreamingLinearRegressionWithSGD在流数据上训练模型:
python
from pyspark.mllib.regression import LabeledPoint
from pyspark.streaming import StreamingContext
from pyspark.streaming.ml import StreamingLinearRegressionWithSGDCreate StreamingContext
创建StreamingContext
ssc = StreamingContext(sc, batchDuration=1)
ssc = StreamingContext(sc, batchDuration=1)
Define data streams
定义数据流
training_stream = ssc.textFileStream("training/data/path")
testing_stream = ssc.textFileStream("testing/data/path")
training_stream = ssc.textFileStream("training/data/path")
testing_stream = ssc.textFileStream("testing/data/path")
Parse streams into LabeledPoint objects
将流解析为LabeledPoint对象
def parse_point(line):
values = [float(x) for x in line.strip().split(',')]
return LabeledPoint(values[0], values[1:])
parsed_training = training_stream.map(parse_point)
parsed_testing = testing_stream.map(parse_point)
def parse_point(line):
values = [float(x) for x in line.strip().split(',')]
return LabeledPoint(values[0], values[1:])
parsed_training = training_stream.map(parse_point)
parsed_testing = testing_stream.map(parse_point)
Initialize model
初始化模型
num_features = 3
model = StreamingLinearRegressionWithSGD(initialWeights=[0.0] * num_features)
num_features = 3
model = StreamingLinearRegressionWithSGD(initialWeights=[0.0] * num_features)
Train and predict
训练与预测
model.trainOn(parsed_training)
predictions = model.predictOnValues(parsed_testing.map(lambda lp: (lp.label, lp.features)))
model.trainOn(parsed_training)
predictions = model.predictOnValues(parsed_testing.map(lambda lp: (lp.label, lp.features)))
Print predictions
打印预测
predictions.pprint()
predictions.pprint()
Start streaming
启动流处理
ssc.start()
ssc.awaitTermination()
undefinedssc.start()
ssc.awaitTermination()
undefinedModel Evaluation
模型评估
Evaluate model performance:
python
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator评估模型性能:
python
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluatorBinary classification
二分类
binary_evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
auc = binary_evaluator.evaluate(predictions)
print(f"AUC: {auc}")
binary_evaluator = BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol="rawPrediction",
metricName="areaUnderROC"
)
auc = binary_evaluator.evaluate(predictions)
print(f"AUC: {auc}")
Multiclass classification
多分类
multi_evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="accuracy"
)
accuracy = multi_evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")
multi_evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="accuracy"
)
accuracy = multi_evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")
Regression
回归
regression_evaluator = RegressionEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="rmse"
)
rmse = regression_evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")
undefinedregression_evaluator = RegressionEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="rmse"
)
rmse = regression_evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")
undefinedHyperparameter Tuning
超参数调优
Optimize model parameters with cross-validation:
python
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator使用交叉验证优化模型参数:
python
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluatorDefine model
定义模型
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
Build parameter grid
构建参数网格
param_grid = ParamGridBuilder()
.addGrid(rf.numTrees, [10, 20, 50])
.addGrid(rf.maxDepth, [5, 10, 15])
.addGrid(rf.minInstancesPerNode, [1, 5, 10])
.build()
.addGrid(rf.numTrees, [10, 20, 50])
.addGrid(rf.maxDepth, [5, 10, 15])
.addGrid(rf.minInstancesPerNode, [1, 5, 10])
.build()
param_grid = ParamGridBuilder()
.addGrid(rf.numTrees, [10, 20, 50])
.addGrid(rf.maxDepth, [5, 10, 15])
.addGrid(rf.minInstancesPerNode, [1, 5, 10])
.build()
.addGrid(rf.numTrees, [10, 20, 50])
.addGrid(rf.maxDepth, [5, 10, 15])
.addGrid(rf.minInstancesPerNode, [1, 5, 10])
.build()
Define evaluator
定义评估器
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
Cross-validation
交叉验证
cv = CrossValidator(
estimator=rf,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=5,
parallelism=4
)
cv = CrossValidator(
estimator=rf,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=5,
parallelism=4
)
Train
训练
cv_model = cv.fit(train_df)
cv_model = cv.fit(train_df)
Best model
最佳模型
best_model = cv_model.bestModel
print(f"Best numTrees: {best_model.getNumTrees}")
print(f"Best maxDepth: {best_model.getMaxDepth()}")
best_model = cv_model.bestModel
print(f"Best numTrees: {best_model.getNumTrees}")
print(f"Best maxDepth: {best_model.getMaxDepth()}")
Evaluate on test set
在测试集上评估
predictions = cv_model.transform(test_df)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")
undefinedpredictions = cv_model.transform(test_df)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy}")
undefinedDistributed Matrix Operations
分布式矩阵操作
MLlib provides distributed matrix representations:
python
from pyspark.mllib.linalg.distributed import RowMatrix, IndexedRowMatrix, CoordinateMatrix
from pyspark.mllib.linalg import VectorsMLlib提供分布式矩阵表示:
python
from pyspark.mllib.linalg.distributed import RowMatrix, IndexedRowMatrix, CoordinateMatrix
from pyspark.mllib.linalg import VectorsRowMatrix: Distributed matrix without row indices
RowMatrix: 不带行索引的分布式矩阵
rows = sc.parallelize([
Vectors.dense([1.0, 2.0, 3.0]),
Vectors.dense([4.0, 5.0, 6.0]),
Vectors.dense([7.0, 8.0, 9.0])
])
row_matrix = RowMatrix(rows)
rows = sc.parallelize([
Vectors.dense([1.0, 2.0, 3.0]),
Vectors.dense([4.0, 5.0, 6.0]),
Vectors.dense([7.0, 8.0, 9.0])
])
row_matrix = RowMatrix(rows)
Compute statistics
计算统计信息
print(f"Rows: {row_matrix.numRows()}")
print(f"Cols: {row_matrix.numCols()}")
print(f"Column means: {row_matrix.computeColumnSummaryStatistics().mean()}")
print(f"Rows: {row_matrix.numRows()}")
print(f"Cols: {row_matrix.numCols()}")
print(f"Column means: {row_matrix.computeColumnSummaryStatistics().mean()}")
IndexedRowMatrix: Matrix with row indices
IndexedRowMatrix: 带行索引的矩阵
from pyspark.mllib.linalg.distributed import IndexedRow
indexed_rows = sc.parallelize([
IndexedRow(0, Vectors.dense([1.0, 2.0, 3.0])),
IndexedRow(1, Vectors.dense([4.0, 5.0, 6.0]))
])
indexed_matrix = IndexedRowMatrix(indexed_rows)
from pyspark.mllib.linalg.distributed import IndexedRow
indexed_rows = sc.parallelize([
IndexedRow(0, Vectors.dense([1.0, 2.0, 3.0])),
IndexedRow(1, Vectors.dense([4.0, 5.0, 6.0]))
])
indexed_matrix = IndexedRowMatrix(indexed_rows)
CoordinateMatrix: Sparse matrix using (row, col, value) entries
CoordinateMatrix: 使用(行, 列, 值)条目的稀疏矩阵
from pyspark.mllib.linalg.distributed import MatrixEntry
entries = sc.parallelize([
MatrixEntry(0, 0, 1.0),
MatrixEntry(0, 2, 3.0),
MatrixEntry(1, 1, 5.0)
])
coord_matrix = CoordinateMatrix(entries)
undefinedfrom pyspark.mllib.linalg.distributed import MatrixEntry
entries = sc.parallelize([
MatrixEntry(0, 0, 1.0),
MatrixEntry(0, 2, 3.0),
MatrixEntry(1, 1, 5.0)
])
coord_matrix = CoordinateMatrix(entries)
undefinedStratified Sampling
分层抽样
Sample data while preserving class distribution:
python
undefined在保留类别分布的同时抽样数据:
python
undefinedScala/Java approach
Scala/Java方法
data = [("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5), ("c", 6)]
rdd = sc.parallelize(data)
data = [("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5), ("c", 6)]
rdd = sc.parallelize(data)
Define sampling fractions per key
定义每个Key的抽样比例
fractions = {"a": 0.5, "b": 0.5, "c": 0.5}
fractions = {"a": 0.5, "b": 0.5, "c": 0.5}
Approximate sample (faster, one pass)
近似抽样(更快,一次遍历)
sampled_rdd = rdd.sampleByKey(withReplacement=False, fractions=fractions)
sampled_rdd = rdd.sampleByKey(withReplacement=False, fractions=fractions)
Exact sample (slower, guaranteed exact counts)
精确抽样(更慢,保证精确计数)
exact_sampled = rdd.sampleByKeyExact(withReplacement=False, fractions=fractions)
print(sampled_rdd.collect())
undefinedexact_sampled = rdd.sampleByKeyExact(withReplacement=False, fractions=fractions)
print(sampled_rdd.collect())
undefinedPerformance Tuning
性能调优
Memory Management
内存管理
Memory Breakdown:
- Execution Memory: Used for shuffles, joins, sorts, aggregations
- Storage Memory: Used for caching and broadcast variables
- User Memory: Used for user data structures and UDFs
- Reserved Memory: Reserved for Spark internal operations
Configuration:
python
spark = SparkSession.builder \
.appName("MemoryTuning") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.config("spark.memory.fraction", "0.6") # Fraction for execution + storage \
.config("spark.memory.storageFraction", "0.5") # Fraction of above for storage \
.getOrCreate()Memory Best Practices:
- Monitor memory usage via Spark UI
- Use appropriate storage levels for caching
- Avoid collecting large datasets to driver
- Increase executor memory for memory-intensive operations
- Use kryo serialization for better memory efficiency
内存分配:
- 执行内存:用于shuffle、连接、排序、聚合
- 存储内存:用于缓存与广播变量
- 用户内存:用于用户数据结构与UDF
- 预留内存:Spark内部操作预留
配置:
python
spark = SparkSession.builder \
.appName("MemoryTuning") \
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "2g") \
.config("spark.memory.fraction", "0.6") # 执行+存储内存占比 \
.config("spark.memory.storageFraction", "0.5") # 上述内存中存储内存占比 \
.getOrCreate()内存最佳实践:
- 通过Spark UI监控内存使用
- 为缓存选择合适的存储级别
- 避免将大型数据集收集到Driver
- 为内存密集型操作增加Executor内存
- 使用kryo序列化以提升内存效率
Shuffle Optimization
Shuffle优化
Shuffles are expensive operations - minimize them:
Causes of Shuffles:
- groupByKey, reduceByKey, aggregateByKey
- join, cogroup
- repartition, coalesce (with increase)
- distinct, intersection
- sortByKey
Optimization Strategies:
python
undefinedShuffle是开销极大的操作——尽量减少:
Shuffle的原因:
- groupByKey、reduceByKey、aggregateByKey
- join、cogroup
- repartition、coalesce(增加分区时)
- distinct、intersection
- sortByKey
优化策略:
python
undefined1. Use reduceByKey instead of groupByKey
1. 使用reduceByKey替代groupByKey
Bad: groupByKey shuffles all data
不佳:groupByKey会shuffle所有数据
word_pairs.groupByKey().mapValues(sum)
word_pairs.groupByKey().mapValues(sum)
Good: reduceByKey combines locally before shuffle
推荐:reduceByKey先在本地合并再shuffle
word_pairs.reduceByKey(lambda a, b: a + b)
word_pairs.reduceByKey(lambda a, b: a + b)
2. Broadcast small tables in joins
2. 在连接中广播小表
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "key")
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "key")
3. Partition data appropriately
3. 合理分区数据
df.repartition(200, "user_id") # Partition by key for subsequent aggregations
df.repartition(200, "user_id") # 后续聚合按Key分区
4. Coalesce instead of repartition when reducing partitions
4. 减少分区时使用coalesce而非repartition
df.coalesce(10) # No shuffle, just merge partitions
df.coalesce(10) # 无shuffle,仅合并分区
5. Tune shuffle partitions
5. 调优shuffle分区数
spark.conf.set("spark.sql.shuffle.partitions", 200) # Default is 200
**Shuffle Configuration:**
```python
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", 200) \
.config("spark.default.parallelism", 200) \
.config("spark.sql.adaptive.enabled", "true") # Enable AQE \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()spark.conf.set("spark.sql.shuffle.partitions", 200) # 默认是200
**Shuffle配置:**
```python
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", 200) \
.config("spark.default.parallelism", 200) \
.config("spark.sql.adaptive.enabled", "true") # 启用AQE \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()Partitioning Strategies
分区策略
Partition Count Guidelines:
- Too few: Underutilized cluster, OOM errors
- Too many: Task scheduling overhead
- Sweet spot: 2-4x number of CPU cores
- For large shuffles: 100-200+ partitions
Partition by Column:
python
undefined分区数指南:
- 过少:集群资源未充分利用,OOM错误
- 过多:任务调度开销大
- 最佳值:CPU核心数的2-4倍
- 大型shuffle:100-200+个分区
按列分区:
python
undefinedPartition writes by date for easy filtering
按日期分区写入以方便过滤
df.write.partitionBy("date", "country").parquet("output")
df.write.partitionBy("date", "country").parquet("output")
Read with partition pruning (only reads relevant partitions)
读取时进行分区修剪(仅读取相关分区)
spark.read.parquet("output").filter(col("date") == "2025-01-15").show()
**Custom Partitioning:**
```python
from pyspark.rdd import portable_hashspark.read.parquet("output").filter(col("date") == "2025-01-15").show()
**自定义分区:**
```python
from pyspark.rdd import portable_hashCustom partitioner for RDD
RDD自定义分区器
def custom_partitioner(key):
return portable_hash(key) % 100
rdd.partitionBy(100, custom_partitioner)
undefineddef custom_partitioner(key):
return portable_hash(key) % 100
rdd.partitionBy(100, custom_partitioner)
undefinedCaching Strategies
缓存策略
When to Cache:
python
undefined何时缓存:
python
undefinedIterative algorithms (ML)
迭代算法(ML)
training_data.cache()
for i in range(num_iterations):
model = train_model(training_data)
training_data.cache()
for i in range(num_iterations):
model = train_model(training_data)
Multiple aggregations on same data
同一数据多次聚合
base_df.cache()
result1 = base_df.groupBy("country").count()
result2 = base_df.groupBy("city").avg("sales")
base_df.cache()
result1 = base_df.groupBy("country").count()
result2 = base_df.groupBy("city").avg("sales")
Interactive analysis
交互式分析
df.cache()
df.filter(condition1).show()
df.filter(condition2).show()
df.groupBy("category").count().show()
**Storage Levels:**
```python
from pyspark import StorageLeveldf.cache()
df.filter(condition1).show()
df.filter(condition2).show()
df.groupBy("category").count().show()
**存储级别:**
```python
from pyspark import StorageLevelMemory only (fastest, but may lose data)
仅内存(最快,但数据可能丢失)
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_ONLY)
Memory and disk (spill to disk if needed)
内存与磁盘(内存不足时溢出到磁盘)
df.persist(StorageLevel.MEMORY_AND_DISK)
df.persist(StorageLevel.MEMORY_AND_DISK)
Serialized in memory (more compact, slower access)
内存中序列化(更紧凑,访问速度较慢)
df.persist(StorageLevel.MEMORY_ONLY_SER)
df.persist(StorageLevel.MEMORY_ONLY_SER)
Disk only (slowest, but always available)
仅磁盘(最慢,但始终可用)
df.persist(StorageLevel.DISK_ONLY)
df.persist(StorageLevel.DISK_ONLY)
Replicated (fault tolerance)
复制存储(容错)
df.persist(StorageLevel.MEMORY_AND_DISK_2) # 2 replicas
undefineddf.persist(StorageLevel.MEMORY_AND_DISK_2) # 2个副本
undefinedBroadcast Joins
广播连接
Optimize joins with small tables:
python
from pyspark.sql.functions import broadcast优化小表连接:
python
from pyspark.sql.functions import broadcastAutomatic broadcast (tables < spark.sql.autoBroadcastJoinThreshold)
自动广播(表大小 < spark.sql.autoBroadcastJoinThreshold)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024) # 10 MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10 * 1024 * 1024) # 10 MB
Explicit broadcast hint
显式广播提示
large_df.join(broadcast(small_df), "key")
large_df.join(broadcast(small_df), "key")
Benefits:
优势:
- No shuffle of large table
- 无需shuffle大表
- Small table sent to all executors once
- 小表仅发送给所有Executor一次
- Much faster for small dimension tables
- 小维度表连接速度快得多
undefinedundefinedAdaptive Query Execution (AQE)
自适应查询执行(AQE)
Enable runtime query optimization:
python
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")启用运行时查询优化:
python
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")AQE Benefits:
AQE优势:
- Dynamically coalesce partitions after shuffle
- shuffle后自动合并分区
- Handle skewed joins by splitting large partitions
- 通过拆分大分区处理倾斜连接
- Optimize join strategy at runtime
- 运行时优化连接策略
undefinedundefinedData Format Selection
数据格式选择
Performance Comparison:
- Parquet (Best for analytics): Columnar, compressed, fast queries
- ORC (Best for Hive): Similar to Parquet, slightly better compression
- Avro (Best for row-oriented): Good for write-heavy workloads
- JSON (Slowest): Human-readable but inefficient
- CSV (Legacy): Compatible but slow and no schema
Recommendation:
- Use Parquet for most analytics workloads
- Enable compression (snappy, gzip, lzo)
- Partition by commonly filtered columns
- Use columnar formats for read-heavy workloads
性能对比:
- Parquet(分析场景最佳):列存储、压缩、查询快速
- ORC(Hive场景最佳):与Parquet类似,压缩率略高
- Avro(行存储最佳):适合写密集型工作负载
- JSON(最慢):人类可读但效率低
- CSV(遗留格式):兼容但速度慢且无Schema
推荐:
- 大多数分析工作负载使用Parquet
- 启用压缩(snappy、gzip、lzo)
- 按常用过滤列分区
- 读密集型工作负载使用列存储格式
Catalyst Optimizer
Catalyst优化器
Understand query optimization:
python
undefined理解查询优化:
python
undefinedView physical plan
查看物理执行计划
df.explain(mode="extended")
df.explain(mode="extended")
Optimizations include:
优化包括:
- Predicate pushdown: Push filters to data source
- 谓词下推:将过滤推送到数据源
- Column pruning: Read only required columns
- 列修剪:仅读取所需列
- Constant folding: Evaluate constants at compile time
- 常量折叠:编译时计算常量
- Join reordering: Optimize join order
- 连接重排序:优化连接顺序
- Partition pruning: Skip irrelevant partitions
- 分区修剪:跳过无关分区
undefinedundefinedProduction Deployment
生产部署
Cluster Managers
集群管理器
Standalone:
- Simple, built-in cluster manager
- Easy setup for development and small clusters
- No resource sharing with other frameworks
bash
undefinedStandalone:
- 简单的内置集群管理器
- 开发与小型集群易于设置
- 不与其他框架共享资源
bash
undefinedStart master
启动Master
$SPARK_HOME/sbin/start-master.sh
$SPARK_HOME/sbin/start-master.sh
Start workers
启动Worker
$SPARK_HOME/sbin/start-worker.sh spark://master:7077
$SPARK_HOME/sbin/start-worker.sh spark://master:7077
Submit application
提交应用
spark-submit --master spark://master:7077 app.py
**YARN:**
- Hadoop's resource manager
- Share cluster resources with MapReduce, Hive, etc.
- Two modes: cluster (driver on YARN) and client (driver on local machine)
```bashspark-submit --master spark://master:7077 app.py
**YARN:**
- Hadoop的资源管理器
- 与MapReduce、Hive等共享集群资源
- 两种模式:cluster(Driver在YARN上)与client(Driver在本地机器)
```bashCluster mode (driver runs on YARN)
Cluster模式(Driver运行在YARN上)
spark-submit --master yarn --deploy-mode cluster app.py
spark-submit --master yarn --deploy-mode cluster app.py
Client mode (driver runs locally)
Client模式(Driver运行在本地)
spark-submit --master yarn --deploy-mode client app.py
**Kubernetes:**
- Modern container orchestration
- Dynamic resource allocation
- Cloud-native deployments
```bash
spark-submit \
--master k8s://https://k8s-master:443 \
--deploy-mode cluster \
--name spark-app \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=spark:latest \
app.pyMesos:
- General-purpose cluster manager
- Fine-grained or coarse-grained resource sharing
spark-submit --master yarn --deploy-mode client app.py
**Kubernetes:**
- 现代容器编排
- 动态资源分配
- 云原生部署
```bash
spark-submit \
--master k8s://https://k8s-master:443 \
--deploy-mode cluster \
--name spark-app \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image=spark:latest \
app.pyMesos:
- 通用集群管理器
- 细粒度或粗粒度资源共享
Application Submission
应用提交
Basic spark-submit:
bash
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--conf spark.sql.shuffle.partitions=200 \
--py-files dependencies.zip \
--files config.json \
application.pyConfiguration Options:
- : Cluster manager URL
--master - : Where to run driver (client or cluster)
--deploy-mode - : Memory for driver process
--driver-memory - : Memory per executor
--executor-memory - : Cores per executor
--executor-cores - : Number of executors
--num-executors - : Spark configuration properties
--conf - : Python dependencies
--py-files - : Additional files to distribute
--files
基础spark-submit:
bash
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--conf spark.sql.shuffle.partitions=200 \
--py-files dependencies.zip \
--files config.json \
application.py配置选项:
- :集群管理器URL
--master - :Driver运行位置(client或cluster)
--deploy-mode - :Driver进程内存
--driver-memory - :每个Executor的内存
--executor-memory - :每个Executor的核心数
--executor-cores - :Executor数量
--num-executors - :Spark配置属性
--conf - :Python依赖
--py-files - :要分发的额外文件
--files
Resource Allocation
资源分配
General Guidelines:
- Driver Memory: 1-4 GB (unless collecting large results)
- Executor Memory: 4-16 GB per executor
- Executor Cores: 4-5 cores per executor (diminishing returns beyond 5)
- Number of Executors: Fill cluster capacity, leave resources for OS/other services
- Parallelism: 2-4x total cores
Example Calculations:
Cluster: 10 nodes, 32 cores each, 128 GB RAM each
Option 1: Many small executors
- 30 executors (3 per node)
- 10 cores per executor
- 40 GB memory per executor
- Total: 300 cores
Option 2: Fewer large executors (RECOMMENDED)
- 50 executors (5 per node)
- 5 cores per executor
- 24 GB memory per executor
- Total: 250 cores通用指南:
- Driver内存:1-4 GB(除非收集大型结果)
- Executor内存:每个Executor 4-16 GB
- Executor核心数:每个Executor 4-5个核心(超过5个收益递减)
- Executor数量:填满集群容量,为OS/其他服务预留资源
- 并行度:总核心数的2-4倍
示例计算:
集群:10个节点,每个节点32核心、128 GB内存
选项1:多个小型Executor
- 30个Executor(每个节点3个)
- 每个Executor 10核心
- 每个Executor 40 GB内存
- 总计:300核心
选项2:较少的大型Executor(推荐)
- 50个Executor(每个节点5个)
- 每个Executor 5核心
- 每个Executor 24 GB内存
- 总计:250核心Dynamic Allocation
动态分配
Automatically scale executors based on workload:
python
spark = SparkSession.builder \
.appName("DynamicAllocation") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", 2) \
.config("spark.dynamicAllocation.maxExecutors", 100) \
.config("spark.dynamicAllocation.initialExecutors", 10) \
.config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
.getOrCreate()Benefits:
- Better resource utilization
- Automatic scaling for varying workloads
- Reduced costs in cloud environments
根据工作负载自动扩展Executor:
python
spark = SparkSession.builder \
.appName("DynamicAllocation") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", 2) \
.config("spark.dynamicAllocation.maxExecutors", 100) \
.config("spark.dynamicAllocation.initialExecutors", 10) \
.config("spark.dynamicAllocation.executorIdleTimeout", "60s") \
.getOrCreate()优势:
- 资源利用率更高
- 工作负载变化时自动扩展
- 云环境降低成本
Monitoring and Logging
监控与日志
Spark UI:
- Web UI at http://driver:4040
- Stages, tasks, storage, environment, executors
- SQL query plans and execution details
- Identify bottlenecks and performance issues
History Server:
bash
undefinedSpark UI:
- Web UI地址:http://driver:4040
- 包含阶段、任务、存储、环境、Executor信息
- SQL查询计划与执行细节
- 识别瓶颈与性能问题
历史服务器:
bash
undefinedStart history server
启动历史服务器
$SPARK_HOME/sbin/start-history-server.sh
$SPARK_HOME/sbin/start-history-server.sh
Configure event logging
配置事件日志
spark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.eventLog.dir", "hdfs://namenode/spark-logs")
**Metrics:**
```pythonspark.conf.set("spark.eventLog.enabled", "true")
spark.conf.set("spark.eventLog.dir", "hdfs://namenode/spark-logs")
**指标:**
```pythonEnable metrics collection
启用指标收集
spark.conf.set("spark.metrics.conf..sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink")
spark.conf.set("spark.metrics.conf..sink.console.period", 10)
**Logging:**
```pythonspark.conf.set("spark.metrics.conf..sink.console.class", "org.apache.spark.metrics.sink.ConsoleSink")
spark.conf.set("spark.metrics.conf..sink.console.period", 10)
**日志:**
```pythonConfigure log level
配置日志级别
spark.sparkContext.setLogLevel("WARN") # ERROR, WARN, INFO, DEBUG
spark.sparkContext.setLogLevel("WARN") # ERROR、WARN、INFO、DEBUG
Custom logging
自定义日志
import logging
logger = logging.getLogger(name)
logger.info("Custom log message")
undefinedimport logging
logger = logging.getLogger(name)
logger.info("Custom log message")
undefinedFault Tolerance
容错
Automatic Recovery:
- Task failures: Automatically retry failed tasks
- Executor failures: Reschedule tasks on other executors
- Driver failures: Restore from checkpoint (streaming)
- Node failures: Recompute lost partitions from lineage
Checkpointing:
python
undefined自动恢复:
- 任务失败:自动重试失败任务
- Executor失败:在其他Executor上重新调度任务
- Driver失败:从检查点恢复(流处理)
- 节点失败:从lineage重新计算丢失的分区
检查点:
python
undefinedSet checkpoint directory
设置检查点目录
spark.sparkContext.setCheckpointDir("hdfs://namenode/checkpoints")
spark.sparkContext.setCheckpointDir("hdfs://namenode/checkpoints")
Checkpoint RDD (breaks lineage for very long chains)
Checkpoint RDD(打破超长lineage链)
rdd.checkpoint()
rdd.checkpoint()
Streaming checkpoint (required for production)
流处理检查点(生产环境必填)
query = streaming_df.writeStream
.option("checkpointLocation", "hdfs://namenode/streaming-checkpoint")
.start()
.option("checkpointLocation", "hdfs://namenode/streaming-checkpoint")
.start()
**Speculative Execution:**
```pythonquery = streaming_df.writeStream
.option("checkpointLocation", "hdfs://namenode/streaming-checkpoint")
.start()
.option("checkpointLocation", "hdfs://namenode/streaming-checkpoint")
.start()
**推测执行:**
```pythonEnable speculative execution for slow tasks
为慢任务启用推测执行
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.multiplier", 1.5)
spark.conf.set("spark.speculation.quantile", 0.75)
undefinedspark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.multiplier", 1.5)
spark.conf.set("spark.speculation.quantile", 0.75)
undefinedData Locality
数据本地化
Optimize data placement for performance:
Locality Levels:
- PROCESS_LOCAL: Data in same JVM as task (fastest)
- NODE_LOCAL: Data on same node, different process
- RACK_LOCAL: Data on same rack
- ANY: Data on different rack (slowest)
Improve Locality:
python
undefined优化数据放置以提升性能:
本地化级别:
- PROCESS_LOCAL:数据与任务在同一JVM(最快)
- NODE_LOCAL:数据在同一节点但不同进程
- RACK_LOCAL:数据在同一机架
- ANY:数据在不同机架(最慢)
提升本地化:
python
undefinedIncrease locality wait time
增加本地化等待时间
spark.conf.set("spark.locality.wait", "10s")
spark.conf.set("spark.locality.wait.node", "5s")
spark.conf.set("spark.locality.wait.rack", "3s")
spark.conf.set("spark.locality.wait", "10s")
spark.conf.set("spark.locality.wait.node", "5s")
spark.conf.set("spark.locality.wait.rack", "3s")
Partition data to match cluster topology
按集群拓扑分区数据
df.repartition(num_nodes * cores_per_node)
undefineddf.repartition(num_nodes * cores_per_node)
undefinedBest Practices
最佳实践
Code Organization
代码组织
- Modular Design: Separate data loading, transformation, and output logic
- Configuration Management: Externalize configuration (use config files)
- Error Handling: Implement robust error handling and logging
- Testing: Unit test transformations, integration test pipelines
- Documentation: Document complex transformations and business logic
- 模块化设计:分离数据加载、转换与输出逻辑
- 配置管理:外部化配置(使用配置文件)
- 错误处理:实现健壮的错误处理与日志
- 测试:单元测试转换操作,集成测试管道
- 文档:记录复杂转换与业务逻辑
Performance
性能
- Avoid Shuffles: Use reduceByKey instead of groupByKey
- Cache Wisely: Only cache data reused multiple times
- Broadcast Small Tables: Use broadcast joins for small reference data
- Partition Appropriately: 2-4x CPU cores, partition by frequently filtered columns
- Use Parquet: Columnar format for analytical workloads
- Enable AQE: Leverage adaptive query execution for runtime optimization
- Tune Memory: Balance executor memory and cores
- Monitor: Use Spark UI to identify bottlenecks
- 避免Shuffle:使用reduceByKey替代groupByKey
- 合理缓存:仅缓存多次使用的数据
- 广播小表:连接小参考数据时使用广播
- 合理分区:CPU核心数的2-4倍,按常用过滤列分区
- 使用Parquet:分析工作负载使用列存储格式
- 启用AQE:利用自适应查询执行进行运行时优化
- 内存调优:平衡Executor内存与核心数
- 监控:使用Spark UI识别瓶颈
Development Workflow
开发工作流
- Start Small: Develop with sample data locally
- Profile Early: Monitor performance from the start
- Iterate: Optimize incrementally based on metrics
- Test at Scale: Validate with production-sized data before deployment
- Version Control: Track code, configurations, and schemas
- 从小规模开始:本地使用样本数据开发
- 尽早分析性能:从开始就监控性能
- 迭代优化:基于指标逐步优化
- 规模化测试:部署前使用生产规模数据验证
- 版本控制:跟踪代码、配置与Schema
Data Quality
数据质量
- Schema Validation: Enforce schemas on read/write
- Null Handling: Explicitly handle null values
- Data Validation: Check for expected ranges, formats, constraints
- Deduplication: Remove duplicates based on business logic
- Audit Logging: Track data lineage and transformations
- Schema验证:读写时强制Schema
- 空值处理:显式处理空值
- 数据验证:检查预期范围、格式、约束
- 去重:基于业务逻辑移除重复项
- 审计日志:跟踪数据 lineage 与转换
Security
安全
- Authentication: Enable Kerberos for YARN/HDFS
- Authorization: Use ACLs for data access control
- Encryption: Encrypt data at rest and in transit
- Secrets Management: Use secure credential providers
- Audit Trails: Log data access and modifications
- 认证:YARN/HDFS启用Kerberos
- 授权:使用ACL进行数据访问控制
- 加密:静态与传输中数据加密
- 密钥管理:使用安全凭证提供者
- 审计跟踪:记录数据访问与修改
Cost Optimization
成本优化
- Right-Size Resources: Don't over-provision executors
- Dynamic Allocation: Scale executors based on workload
- Spot Instances: Use spot/preemptible instances in cloud
- Data Compression: Use efficient formats (Parquet, ORC)
- Partitioning: Prune unnecessary data reads
- Auto-Shutdown: Terminate idle clusters
- 合理分配资源:不要过度配置Executor
- 动态分配:根据工作负载扩展Executor
- 抢占式实例:云环境使用抢占式实例
- 数据压缩:使用高效格式(Parquet、ORC)
- 分区:修剪不必要的数据读取
- 自动关机:终止空闲集群
Common Patterns
常见模式
ETL Pipeline Pattern
ETL管道模式
python
def etl_pipeline(spark, input_path, output_path):
# Extract
raw_df = spark.read.parquet(input_path)
# Transform
cleaned_df = raw_df \
.dropDuplicates(["id"]) \
.filter(col("value").isNotNull()) \
.withColumn("processed_date", current_date())
# Enrich
enriched_df = cleaned_df.join(broadcast(reference_df), "key")
# Aggregate
aggregated_df = enriched_df \
.groupBy("category", "date") \
.agg(
count("*").alias("count"),
sum("amount").alias("total_amount"),
avg("value").alias("avg_value")
)
# Load
aggregated_df.write \
.partitionBy("date") \
.mode("overwrite") \
.parquet(output_path)python
def etl_pipeline(spark, input_path, output_path):
# 提取
raw_df = spark.read.parquet(input_path)
# 转换
cleaned_df = raw_df \
.dropDuplicates(["id"]) \
.filter(col("value").isNotNull()) \
.withColumn("processed_date", current_date())
# 增强
enriched_df = cleaned_df.join(broadcast(reference_df), "key")
# 聚合
aggregated_df = enriched_df \
.groupBy("category", "date") \
.agg(
count("*").alias("count"),
sum("amount").alias("total_amount"),
avg("value").alias("avg_value")
)
# 加载
aggregated_df.write \
.partitionBy("date") \
.mode("overwrite") \
.parquet(output_path)Incremental Processing Pattern
增量处理模式
python
def incremental_process(spark, input_path, output_path, checkpoint_path):
# Read last processed timestamp
last_timestamp = read_checkpoint(checkpoint_path)
# Read new data
new_data = spark.read.parquet(input_path) \
.filter(col("timestamp") > last_timestamp)
# Process
processed = transform(new_data)
# Write
processed.write.mode("append").parquet(output_path)
# Update checkpoint
max_timestamp = new_data.agg(max("timestamp")).collect()[0][0]
write_checkpoint(checkpoint_path, max_timestamp)python
def incremental_process(spark, input_path, output_path, checkpoint_path):
# 读取最后处理的时间戳
last_timestamp = read_checkpoint(checkpoint_path)
# 读取新数据
new_data = spark.read.parquet(input_path) \
.filter(col("timestamp") > last_timestamp)
# 处理
processed = transform(new_data)
# 写入
processed.write.mode("append").parquet(output_path)
# 更新检查点
max_timestamp = new_data.agg(max("timestamp")).collect()[0][0]
write_checkpoint(checkpoint_path, max_timestamp)Slowly Changing Dimension (SCD) Pattern
缓慢变化维度(SCD)模式
python
def scd_type2_upsert(spark, dimension_df, updates_df):
# Mark existing records as inactive if updated
inactive_records = dimension_df \
.join(updates_df, "business_key") \
.select(
dimension_df["*"],
lit(False).alias("is_active"),
current_date().alias("end_date")
)
# Add new records
new_records = updates_df \
.withColumn("is_active", lit(True)) \
.withColumn("start_date", current_date()) \
.withColumn("end_date", lit(None))
# Union unchanged, inactive, and new records
result = dimension_df \
.join(updates_df, "business_key", "left_anti") \
.union(inactive_records) \
.union(new_records)
return resultpython
def scd_type2_upsert(spark, dimension_df, updates_df):
# 标记更新的现有记录为非活动
inactive_records = dimension_df \
.join(updates_df, "business_key") \
.select(
dimension_df["*"],
lit(False).alias("is_active"),
current_date().alias("end_date")
)
# 添加新记录
new_records = updates_df \
.withColumn("is_active", lit(True)) \
.withColumn("start_date", current_date()) \
.withColumn("end_date", lit(None))
# 合并未更改、非活动与新记录
result = dimension_df \
.join(updates_df, "business_key", "left_anti") \
.union(inactive_records) \
.union(new_records)
return resultWindow Analytics Pattern
窗口分析模式
python
def calculate_running_metrics(df):
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, lag, sum, avg
# Define window
window_spec = Window.partitionBy("user_id").orderBy("timestamp")
# Calculate metrics
result = df \
.withColumn("row_num", row_number().over(window_spec)) \
.withColumn("prev_value", lag("value", 1).over(window_spec)) \
.withColumn("running_total", sum("value").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))) \
.withColumn("moving_avg", avg("value").over(window_spec.rowsBetween(-2, 0)))
return resultpython
def calculate_running_metrics(df):
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, lag, sum, avg
# 定义窗口
window_spec = Window.partitionBy("user_id").orderBy("timestamp")
# 计算指标
result = df \
.withColumn("row_num", row_number().over(window_spec)) \
.withColumn("prev_value", lag("value", 1).over(window_spec)) \
.withColumn("running_total", sum("value").over(window_spec.rowsBetween(Window.unboundedPreceding, Window.currentRow))) \
.withColumn("moving_avg", avg("value").over(window_spec.rowsBetween(-2, 0)))
return resultTroubleshooting
故障排除
Out of Memory Errors
内存不足错误
Symptoms:
java.lang.OutOfMemoryError- Executor failures
- Slow garbage collection
Solutions:
python
undefined症状:
java.lang.OutOfMemoryError- Executor失败
- 垃圾回收缓慢
解决方案:
python
undefinedIncrease executor memory
增加Executor内存
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memory", "8g")
Increase driver memory (if collecting data)
增加Driver内存(如果收集数据)
spark.conf.set("spark.driver.memory", "4g")
spark.conf.set("spark.driver.memory", "4g")
Reduce memory pressure
降低内存压力
df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk
df.coalesce(100) # Reduce partition count
spark.conf.set("spark.sql.shuffle.partitions", 400) # Increase shuffle partitions
df.persist(StorageLevel.MEMORY_AND_DISK) # 溢出到磁盘
df.coalesce(100) # 减少分区数
spark.conf.set("spark.sql.shuffle.partitions", 400) # 增加shuffle分区数
Avoid collect() on large datasets
避免对大型数据集使用collect()
Use take() or limit() instead
使用take()或limit()替代
df.take(100)
undefineddf.take(100)
undefinedShuffle Performance Issues
Shuffle性能问题
Symptoms:
- Long shuffle read/write times
- Skewed partition sizes
- Task stragglers
Solutions:
python
undefined症状:
- Shuffle读写时间长
- 分区大小倾斜
- 任务执行缓慢
解决方案:
python
undefinedIncrease shuffle partitions
增加shuffle分区数
spark.conf.set("spark.sql.shuffle.partitions", 400)
spark.conf.set("spark.sql.shuffle.partitions", 400)
Handle skew with salting
使用加盐处理倾斜
df_salted = df.withColumn("salt", (rand() * 10).cast("int"))
result = df_salted.groupBy("key", "salt").agg(...)
df_salted = df.withColumn("salt", (rand() * 10).cast("int"))
result = df_salted.groupBy("key", "salt").agg(...)
Use broadcast for small tables
小表使用广播
large_df.join(broadcast(small_df), "key")
large_df.join(broadcast(small_df), "key")
Enable AQE for automatic optimization
启用AQE进行自动优化
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
undefinedspark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
undefinedStreaming Job Failures
流处理作业失败
Symptoms:
- Streaming query stopped
- Checkpoint corruption
- Processing lag increasing
Solutions:
python
undefined症状:
- 流查询停止
- 检查点损坏
- 处理延迟增加
解决方案:
python
undefinedIncrease executor memory for stateful operations
为有状态操作增加Executor内存
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.executor.memory", "8g")
Tune watermark for late data
调优迟到数据水印
.withWatermark("timestamp", "15 minutes")
.withWatermark("timestamp", "15 minutes")
Increase trigger interval to reduce micro-batch overhead
增加触发间隔以减少微批开销
.trigger(processingTime="30 seconds")
.trigger(processingTime="30 seconds")
Monitor lag and adjust parallelism
监控延迟并调整并行度
spark.conf.set("spark.sql.shuffle.partitions", 200)
spark.conf.set("spark.sql.shuffle.partitions", 200)
Recover from checkpoint corruption
从检查点损坏中恢复
Delete checkpoint directory and restart (data loss possible)
删除检查点目录并重启(可能丢失数据)
Or implement custom state recovery logic
或实现自定义状态恢复逻辑
undefinedundefinedData Skew
数据倾斜
Symptoms:
- Few tasks take much longer than others
- Unbalanced partition sizes
- Executor OOM errors
Solutions:
python
undefined症状:
- 少数任务耗时远长于其他任务
- 分区大小不平衡
- Executor OOM错误
解决方案:
python
undefined1. Salting technique (add random prefix to keys)
1. 加盐技术(为Key添加随机前缀)
from pyspark.sql.functions import concat, lit, rand
df_salted = df.withColumn("salted_key", concat(col("key"), lit("_"), (rand() * 10).cast("int")))
result = df_salted.groupBy("salted_key").agg(...)
from pyspark.sql.functions import concat, lit, rand
df_salted = df.withColumn("salted_key", concat(col("key"), lit("_"), (rand() * 10).cast("int")))
result = df_salted.groupBy("key", "salt").agg(...)
2. Repartition by skewed column
2. 按倾斜列重新分区
df.repartition(200, "skewed_column")
df.repartition(200, "skewed_column")
3. Isolate skewed keys
3. 隔离倾斜Key
skewed_keys = df.groupBy("key").count().filter(col("count") > threshold).select("key")
skewed_df = df.join(broadcast(skewed_keys), "key")
normal_df = df.join(broadcast(skewed_keys), "key", "left_anti")
skewed_keys = df.groupBy("key").count().filter(col("count") > threshold).select("key")
skewed_df = df.join(broadcast(skewed_keys), "key")
normal_df = df.join(broadcast(skewed_keys), "key", "left_anti")
Process separately
分别处理
skewed_result = process_with_salting(skewed_df)
normal_result = process_normally(normal_df)
final = skewed_result.union(normal_result)
undefinedskewed_result = process_with_salting(skewed_df)
normal_result = process_normally(normal_df)
final = skewed_result.union(normal_result)
undefinedContext7 Code Integration
Context7代码集成
This skill integrates real-world code examples from Apache Spark's official repository. All code snippets in the EXAMPLES.md file are sourced from Context7's Apache Spark library documentation, ensuring production-ready patterns and best practices.
本技能集成了Apache Spark官方仓库中的真实代码示例。EXAMPLES.md文件中的所有代码片段均来自Context7的Apache Spark库文档,确保是生产就绪的模式与最佳实践。
Version and Compatibility
版本与兼容性
- Apache Spark Version: 3.x (compatible with 2.4+)
- Python: 3.7+
- Scala: 2.12+
- Java: 8+
- R: 3.5+
- Apache Spark版本:3.x(兼容2.4+)
- Python:3.7+
- Scala:2.12+
- Java:8+
- R:3.5+
References
参考资料
- Official Documentation: https://spark.apache.org/docs/latest/
- API Reference: https://spark.apache.org/docs/latest/api.html
- GitHub Repository: https://github.com/apache/spark
- Databricks Blog: https://databricks.com/blog
- Context7 Library: /apache/spark
Skill Version: 1.0.0
Last Updated: October 2025
Skill Category: Big Data, Distributed Computing, Data Engineering, Machine Learning
Context7 Integration: /apache/spark with 8000 tokens of documentation
- 官方文档:https://spark.apache.org/docs/latest/
- API参考:https://spark.apache.org/docs/latest/api.html
- GitHub仓库:https://github.com/apache/spark
- Databricks博客:https://databricks.com/blog
- Context7库:/apache/spark
技能版本:1.0.0
最后更新:2025年10月
技能分类:大数据、分布式计算、数据工程、机器学习
Context7集成:/apache/spark,包含8000个令牌的文档