Lý thuyết
Bài 9/15

Spark Performance Tuning

Optimization techniques, caching, partitioning và troubleshooting Spark jobs

Spark Performance Tuning

Spark Performance Optimization

1. Understanding Spark Jobs

1.1 Job, Stage, Task

Text
1┌─────────────────────────────────────────────────────────┐
2│ JOB │
3│ (Triggered by an Action: collect, count, save...) │
4├─────────────────────────────────────────────────────────┤
5│ │
6│ ┌─────────────┐ ┌─────────────┐ │
7│ │ STAGE 1 │ --> │ STAGE 2 │ --> ... │
8│ │ (Narrow) │ │ (Wide) │ │
9│ └─────────────┘ └─────────────┘ │
10│ │ │ │
11│ ┌───┴───┐ ┌───┴───┐ │
12│ │ Task │ │ Task │ (one per partition) │
13│ │ Task │ │ Task │ │
14│ │ Task │ │ Task │ │
15│ └───────┘ └───────┘ │
16│ │
17└─────────────────────────────────────────────────────────┘

Key Concepts:

  • Job: Complete computation triggered by action
  • Stage: Set of tasks that can run in parallel (bounded by shuffle)
  • Task: Single unit of work on one partition
  • Shuffle: Data redistribution (expensive!)

1.2 Narrow vs Wide Transformations

Python
1# NARROW (No shuffle - fast)
2# Each input partition → one output partition
3df.filter(col("x") > 0) # Filter
4df.select("col1", "col2") # Select
5df.withColumn("y", col("x") * 2) # Map
6
7# WIDE (Shuffle required - slow)
8# Input partitions → multiple output partitions
9df.groupBy("key").count() # GroupBy
10df.orderBy("col") # Sort
11df.join(other_df, "key") # Join
12df.repartition(100) # Repartition
13df.distinct() # Distinct

2. Memory Management

2.1 Spark Memory Model

Text
1┌──────────────────────────────────────────────┐
2│ Executor Memory │
3├──────────────────────────────────────────────┤
4│ ┌──────────────────────────────────────┐ │
5│ │ Execution Memory (60%) │ │
6│ │ (Shuffles, Joins, Sorts) │ │
7│ ├──────────────────────────────────────┤ │
8│ │ Storage Memory (40%) │ │
9│ │ (Cached Data, Broadcast) │ │
10│ └──────────────────────────────────────┘ │
11│ │
12│ ┌──────────────────────────────────────┐ │
13│ │ Reserved Memory (300MB) │ │
14│ └──────────────────────────────────────┘ │
15│ │
16│ ┌──────────────────────────────────────┐ │
17│ │ User Memory │ │
18│ │ (User data structures) │ │
19│ └──────────────────────────────────────┘ │
20└──────────────────────────────────────────────┘

2.2 Memory Configuration

Python
1spark = SparkSession.builder \
2 .appName("Memory Tuning") \
3 .config("spark.executor.memory", "8g") \
4 .config("spark.executor.memoryOverhead", "2g") \
5 .config("spark.driver.memory", "4g") \
6 .config("spark.memory.fraction", "0.6") \
7 .config("spark.memory.storageFraction", "0.5") \
8 .getOrCreate()

3. Caching & Persistence

3.1 When to Cache

Python
1# Cache when:
2# - DataFrame used multiple times
3# - After expensive transformation
4# - Before iterative algorithms
5
6df = spark.read.parquet("large_data.parquet")
7df_processed = df.filter(...).groupBy(...).agg(...)
8
9# Cache it!
10df_processed.cache() # or df_processed.persist()
11
12# Multiple uses (cache helps here)
13df_processed.count()
14df_processed.show()
15df_processed.write.parquet("output/")
16
17# Clean up when done
18df_processed.unpersist()

3.2 Storage Levels

Python
1from pyspark import StorageLevel
2
3# MEMORY_ONLY (default for cache())
4df.persist(StorageLevel.MEMORY_ONLY)
5
6# MEMORY_AND_DISK (spill to disk if needed)
7df.persist(StorageLevel.MEMORY_AND_DISK)
8
9# DISK_ONLY
10df.persist(StorageLevel.DISK_ONLY)
11
12# MEMORY_ONLY_SER (serialized - less memory, more CPU)
13df.persist(StorageLevel.MEMORY_ONLY_SER)
14
15# With replication (_2 suffix)
16df.persist(StorageLevel.MEMORY_AND_DISK_2) # 2 copies
Cache Best Practice
  • MEMORY_AND_DISK là safe default
  • MEMORY_ONLY_SER khi memory constrained
  • Always unpersist() khi done

3.3 Check Cached Data

Python
1# Check what's cached
2spark.catalog.isCached("table_name")
3
4# In Spark UI
5# Go to Storage tab to see cached RDDs/DataFrames
6
7# Clear all cache
8spark.catalog.clearCache()

4. Partitioning

4.1 Understanding Partitions

Python
1# Check number of partitions
2df.rdd.getNumPartitions()
3
4# Default partitions = spark.sql.shuffle.partitions (200)
5# or number of input file blocks

4.2 Repartition vs Coalesce

Python
1# REPARTITION: Full shuffle, can increase or decrease
2df.repartition(100) # Exactly 100 partitions
3df.repartition("column") # Partition by column values
4df.repartition(100, "column") # Both
5
6# COALESCE: No shuffle, can only decrease
7df.coalesce(10) # Reduce to 10 partitions (merge)
8
9# When to use:
10# - Increase partitions: repartition()
11# - Decrease partitions: coalesce() (more efficient)
12# - Partition by key for joins: repartition("key")

4.3 Optimal Partition Size

Python
1# Rule of thumb:
2# - Target 128MB - 1GB per partition
3# - 2-3x number of cores available
4
5# Calculate optimal partitions
6data_size_bytes = 50 * 1024 * 1024 * 1024 # 50GB
7target_partition_size = 128 * 1024 * 1024 # 128MB
8optimal_partitions = data_size_bytes // target_partition_size # ~400
9
10# Set shuffle partitions
11spark.conf.set("spark.sql.shuffle.partitions", optimal_partitions)
12
13# Adaptive Query Execution (Spark 3.0+) - auto-adjusts!
14spark.conf.set("spark.sql.adaptive.enabled", "true")
15spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

5. Join Optimization

5.1 Join Types & Performance

Text
1Performance (fast to slow):
21. Broadcast Hash Join - Small table broadcasted
32. Shuffle Hash Join - Build hash table
43. Sort Merge Join - Sort both sides (default for large)
54. Cartesian Join - AVOID!

5.2 Broadcast Join

Python
1from pyspark.sql.functions import broadcast
2
3# Small table (< 10MB default)
4small_df = spark.read.parquet("small_lookup.parquet")
5large_df = spark.read.parquet("large_facts.parquet")
6
7# Force broadcast (override size check)
8result = large_df.join(broadcast(small_df), "key")
9
10# Configure broadcast threshold
11spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) # 100MB

5.3 Join Strategies

Python
1# Check join plan
2df1.join(df2, "key").explain()
3
4# Hint for specific join type
5df1.hint("broadcast").join(df2, "key") # Broadcast
6df1.hint("merge").join(df2, "key") # Sort-merge
7df1.hint("shuffle_hash").join(df2, "key") # Shuffle hash
8
9# Optimize large-to-large joins
10# 1. Filter before join
11df1_filtered = df1.filter(col("date") >= "2024-01-01")
12result = df1_filtered.join(df2, "key")
13
14# 2. Select only needed columns
15df1_slim = df1.select("key", "value1")
16result = df1_slim.join(df2.select("key", "value2"), "key")
17
18# 3. Pre-partition by join key
19df1_partitioned = df1.repartition("key")
20df2_partitioned = df2.repartition("key")
21result = df1_partitioned.join(df2_partitioned, "key")

6. Shuffle Optimization

6.1 Reduce Shuffle

Python
1# 1. Use reduceByKey instead of groupByKey (RDD)
2# Bad
3rdd.groupByKey().mapValues(sum)
4# Good
5rdd.reduceByKey(lambda a, b: a + b)
6
7# 2. Aggregate before join
8# Bad
9df1.join(df2, "key").groupBy("category").sum("value")
10# Good
11df1_agg = df1.groupBy("key", "category").sum("value")
12df1_agg.join(df2, "key")
13
14# 3. Use coalesce instead of repartition to reduce
15df.coalesce(10) # No shuffle
16df.repartition(10) # Full shuffle

6.2 Shuffle Configuration

Python
1# Number of shuffle partitions
2spark.conf.set("spark.sql.shuffle.partitions", "200")
3
4# Shuffle compression
5spark.conf.set("spark.shuffle.compress", "true")
6spark.conf.set("spark.shuffle.spill.compress", "true")
7
8# Shuffle file buffer
9spark.conf.set("spark.shuffle.file.buffer", "64k")

7. Data Serialization

7.1 Kryo Serialization

Python
1spark = SparkSession.builder \
2 .appName("Kryo") \
3 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
4 .config("spark.kryo.registrationRequired", "false") \
5 .getOrCreate()
6
7# Register custom classes for better performance
8# spark.conf.set("spark.kryo.classesToRegister", "com.example.MyClass")

7.2 Data Types Optimization

Python
1# Use appropriate types
2# Bad
3df.withColumn("id", col("id").cast("string")) # String takes more space
4
5# Good
6df.withColumn("id", col("id").cast("integer")) # Compact
7
8# Use IntegerType instead of LongType where possible
9# Use FloatType instead of DoubleType where precision allows

8. File Format Optimization

8.1 Parquet Best Practices

Python
1# Write with compression
2df.write \
3 .option("compression", "snappy") \
4 .parquet("output/")
5
6# Compression options: snappy (default), gzip, lz4, zstd
7
8# Partition by frequently filtered columns
9df.write \
10 .partitionBy("year", "month") \
11 .parquet("output/")
12
13# Z-ordering (Delta Lake)
14# Optimizes data layout for common filter columns

8.2 Predicate Pushdown

Python
1# Filter pushdown to Parquet (automatic with Spark SQL)
2df = spark.read.parquet("data/")
3df.filter(col("year") == 2024).explain()
4# Shows: PushedFilters: [IsNotNull(year), EqualTo(year,2024)]
5
6# Works best with:
7# - Parquet files
8# - Partitioned data
9# - Pruned columns

9. Monitoring & Debugging

9.1 Spark UI

Python
1# Access at http://<driver-host>:4040
2
3# Key tabs:
4# - Jobs: Overview of all jobs
5# - Stages: Details of stages (look for slow stages)
6# - Storage: Cached data
7# - Environment: Configuration
8# - Executors: Resource usage
9# - SQL: Query execution plans

9.2 Common Issues

Python
1# 1. OutOfMemoryError
2# Solutions:
3spark.conf.set("spark.executor.memory", "8g")
4spark.conf.set("spark.driver.memory", "4g")
5df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk
6
7# 2. Shuffle taking too long
8# Solutions:
9spark.conf.set("spark.sql.shuffle.partitions", "500")
10# Use broadcast join for small tables
11# Reduce data before shuffle
12
13# 3. Skewed data
14# Solutions:
15# - Salting technique
16# - Adaptive Query Execution
17spark.conf.set("spark.sql.adaptive.enabled", "true")
18spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
19
20# 4. Too many small files
21# Solutions:
22df.coalesce(100).write.parquet("output/")
23# Or compact after write

9.3 Logging & Metrics

Python
1# Set log level
2spark.sparkContext.setLogLevel("WARN") # DEBUG, INFO, WARN, ERROR
3
4# Custom metrics
5from pyspark.sql.functions import count, sum as spark_sum
6
7# Track row counts through pipeline
8df1 = spark.read.parquet("input/")
9print(f"Input rows: {df1.count()}")
10
11df2 = df1.filter(col("status") == "active")
12print(f"After filter: {df2.count()}")
13
14df3 = df2.groupBy("category").agg(spark_sum("amount"))
15print(f"After groupby: {df3.count()}")

10. Configuration Checklist

Python
1# Production configuration template
2spark = SparkSession.builder \
3 .appName("Production Job") \
4
5 # Executor configuration
6 .config("spark.executor.instances", "10") \
7 .config("spark.executor.cores", "4") \
8 .config("spark.executor.memory", "8g") \
9 .config("spark.executor.memoryOverhead", "2g") \
10
11 # Driver configuration
12 .config("spark.driver.memory", "4g") \
13 .config("spark.driver.maxResultSize", "2g") \
14
15 # Shuffle configuration
16 .config("spark.sql.shuffle.partitions", "200") \
17 .config("spark.shuffle.compress", "true") \
18
19 # Serialization
20 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
21
22 # Adaptive Query Execution (Spark 3.0+)
23 .config("spark.sql.adaptive.enabled", "true") \
24 .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
25 .config("spark.sql.adaptive.skewJoin.enabled", "true") \
26
27 # Dynamic allocation
28 .config("spark.dynamicAllocation.enabled", "true") \
29 .config("spark.dynamicAllocation.minExecutors", "2") \
30 .config("spark.dynamicAllocation.maxExecutors", "20") \
31
32 .getOrCreate()

11. Thực hành

Performance Exercise

Exercise: Optimize Slow Query

Python
1# This query is slow. Identify problems and optimize it.
2
3# Sample data (imagine these are large)
4transactions = spark.createDataFrame([
5 (1, "2024-01-15", 100.0, "A"),
6 (2, "2024-01-16", 200.0, "B"),
7 # ... millions of rows
8], ["id", "date", "amount", "category"])
9
10categories = spark.createDataFrame([
11 ("A", "Electronics"),
12 ("B", "Clothing"),
13], ["category", "category_name"])
14
15# SLOW QUERY - Find problems
16result = transactions \
17 .join(categories, "category") \
18 .filter(col("date") >= "2024-01-01") \
19 .groupBy("category_name") \
20 .agg({"amount": "sum", "*": "count"}) \
21 .orderBy(col("sum(amount)").desc())
22
23# YOUR OPTIMIZED VERSION HERE
💡 Xem đáp án
Python
1# Problems identified:
2# 1. Filter after join (should filter first)
3# 2. Small table not broadcasted
4# 3. No caching for reuse
5
6# OPTIMIZED VERSION
7from pyspark.sql.functions import broadcast, sum as spark_sum, count
8
9# 1. Filter BEFORE join
10transactions_filtered = transactions.filter(col("date") >= "2024-01-01")
11
12# 2. Broadcast small lookup table
13result_optimized = transactions_filtered \
14 .join(broadcast(categories), "category") \
15 .groupBy("category_name") \
16 .agg(
17 spark_sum("amount").alias("total_amount"),
18 count("*").alias("transaction_count")
19 ) \
20 .orderBy(col("total_amount").desc())
21
22# 3. If result used multiple times, cache it
23result_optimized.cache()
24
25result_optimized.show()
26result_optimized.write.parquet("output/")
27
28result_optimized.unpersist()
29
30# Check execution plan
31result_optimized.explain(mode="formatted")

12. Tổng kết

OptimizationTechniqueImpact
MemoryProper sizing, cachingAvoid OOM
PartitioningRight number & sizeBetter parallelism
JoinsBroadcast, filter firstReduce shuffle
SerializationKryoFaster transfer
File FormatParquet, compressionLess I/O
AQEEnable adaptiveAuto-optimize

Quick Wins:

  1. Enable Adaptive Query Execution
  2. Broadcast small tables
  3. Filter early
  4. Use appropriate partitions
  5. Cache reused DataFrames

Bài tiếp theo: Data Streaming Fundamentals