Spark Performance Tuning
1. Understanding Spark Jobs
1.1 Job, Stage, Task
Text
1┌─────────────────────────────────────────────────────────┐2│ JOB │3│ (Triggered by an Action: collect, count, save...) │4├─────────────────────────────────────────────────────────┤5│ │6│ ┌─────────────┐ ┌─────────────┐ │7│ │ STAGE 1 │ --> │ STAGE 2 │ --> ... │8│ │ (Narrow) │ │ (Wide) │ │9│ └─────────────┘ └─────────────┘ │10│ │ │ │11│ ┌───┴───┐ ┌───┴───┐ │12│ │ Task │ │ Task │ (one per partition) │13│ │ Task │ │ Task │ │14│ │ Task │ │ Task │ │15│ └───────┘ └───────┘ │16│ │17└─────────────────────────────────────────────────────────┘Key Concepts:
- Job: Complete computation triggered by action
- Stage: Set of tasks that can run in parallel (bounded by shuffle)
- Task: Single unit of work on one partition
- Shuffle: Data redistribution (expensive!)
1.2 Narrow vs Wide Transformations
Python
1# NARROW (No shuffle - fast)2# Each input partition → one output partition3df.filter(col("x") > 0) # Filter4df.select("col1", "col2") # Select5df.withColumn("y", col("x") * 2) # Map67# WIDE (Shuffle required - slow)8# Input partitions → multiple output partitions9df.groupBy("key").count() # GroupBy10df.orderBy("col") # Sort11df.join(other_df, "key") # Join12df.repartition(100) # Repartition13df.distinct() # Distinct2. Memory Management
2.1 Spark Memory Model
Text
1┌──────────────────────────────────────────────┐2│ Executor Memory │3├──────────────────────────────────────────────┤4│ ┌──────────────────────────────────────┐ │5│ │ Execution Memory (60%) │ │6│ │ (Shuffles, Joins, Sorts) │ │7│ ├──────────────────────────────────────┤ │8│ │ Storage Memory (40%) │ │9│ │ (Cached Data, Broadcast) │ │10│ └──────────────────────────────────────┘ │11│ │12│ ┌──────────────────────────────────────┐ │13│ │ Reserved Memory (300MB) │ │14│ └──────────────────────────────────────┘ │15│ │16│ ┌──────────────────────────────────────┐ │17│ │ User Memory │ │18│ │ (User data structures) │ │19│ └──────────────────────────────────────┘ │20└──────────────────────────────────────────────┘2.2 Memory Configuration
Python
1spark = SparkSession.builder \2 .appName("Memory Tuning") \3 .config("spark.executor.memory", "8g") \4 .config("spark.executor.memoryOverhead", "2g") \5 .config("spark.driver.memory", "4g") \6 .config("spark.memory.fraction", "0.6") \7 .config("spark.memory.storageFraction", "0.5") \8 .getOrCreate()3. Caching & Persistence
3.1 When to Cache
Python
1# Cache when:2# - DataFrame used multiple times3# - After expensive transformation4# - Before iterative algorithms56df = spark.read.parquet("large_data.parquet")7df_processed = df.filter(...).groupBy(...).agg(...)89# Cache it!10df_processed.cache() # or df_processed.persist()1112# Multiple uses (cache helps here)13df_processed.count()14df_processed.show()15df_processed.write.parquet("output/")1617# Clean up when done18df_processed.unpersist()3.2 Storage Levels
Python
1from pyspark import StorageLevel23# MEMORY_ONLY (default for cache())4df.persist(StorageLevel.MEMORY_ONLY)56# MEMORY_AND_DISK (spill to disk if needed)7df.persist(StorageLevel.MEMORY_AND_DISK)89# DISK_ONLY10df.persist(StorageLevel.DISK_ONLY)1112# MEMORY_ONLY_SER (serialized - less memory, more CPU)13df.persist(StorageLevel.MEMORY_ONLY_SER)1415# With replication (_2 suffix)16df.persist(StorageLevel.MEMORY_AND_DISK_2) # 2 copiesCache Best Practice
MEMORY_AND_DISKlà safe defaultMEMORY_ONLY_SERkhi memory constrained- Always
unpersist()khi done
3.3 Check Cached Data
Python
1# Check what's cached2spark.catalog.isCached("table_name")34# In Spark UI5# Go to Storage tab to see cached RDDs/DataFrames67# Clear all cache8spark.catalog.clearCache()4. Partitioning
4.1 Understanding Partitions
Python
1# Check number of partitions2df.rdd.getNumPartitions()34# Default partitions = spark.sql.shuffle.partitions (200)5# or number of input file blocks4.2 Repartition vs Coalesce
Python
1# REPARTITION: Full shuffle, can increase or decrease2df.repartition(100) # Exactly 100 partitions3df.repartition("column") # Partition by column values4df.repartition(100, "column") # Both56# COALESCE: No shuffle, can only decrease7df.coalesce(10) # Reduce to 10 partitions (merge)89# When to use:10# - Increase partitions: repartition()11# - Decrease partitions: coalesce() (more efficient)12# - Partition by key for joins: repartition("key")4.3 Optimal Partition Size
Python
1# Rule of thumb:2# - Target 128MB - 1GB per partition3# - 2-3x number of cores available45# Calculate optimal partitions6data_size_bytes = 50 * 1024 * 1024 * 1024 # 50GB7target_partition_size = 128 * 1024 * 1024 # 128MB8optimal_partitions = data_size_bytes // target_partition_size # ~400910# Set shuffle partitions11spark.conf.set("spark.sql.shuffle.partitions", optimal_partitions)1213# Adaptive Query Execution (Spark 3.0+) - auto-adjusts!14spark.conf.set("spark.sql.adaptive.enabled", "true")15spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")5. Join Optimization
5.1 Join Types & Performance
Text
1Performance (fast to slow):21. Broadcast Hash Join - Small table broadcasted32. Shuffle Hash Join - Build hash table43. Sort Merge Join - Sort both sides (default for large)54. Cartesian Join - AVOID!5.2 Broadcast Join
Python
1from pyspark.sql.functions import broadcast23# Small table (< 10MB default)4small_df = spark.read.parquet("small_lookup.parquet")5large_df = spark.read.parquet("large_facts.parquet")67# Force broadcast (override size check)8result = large_df.join(broadcast(small_df), "key")910# Configure broadcast threshold11spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024) # 100MB5.3 Join Strategies
Python
1# Check join plan2df1.join(df2, "key").explain()34# Hint for specific join type5df1.hint("broadcast").join(df2, "key") # Broadcast6df1.hint("merge").join(df2, "key") # Sort-merge7df1.hint("shuffle_hash").join(df2, "key") # Shuffle hash89# Optimize large-to-large joins10# 1. Filter before join11df1_filtered = df1.filter(col("date") >= "2024-01-01")12result = df1_filtered.join(df2, "key")1314# 2. Select only needed columns15df1_slim = df1.select("key", "value1")16result = df1_slim.join(df2.select("key", "value2"), "key")1718# 3. Pre-partition by join key19df1_partitioned = df1.repartition("key")20df2_partitioned = df2.repartition("key")21result = df1_partitioned.join(df2_partitioned, "key")6. Shuffle Optimization
6.1 Reduce Shuffle
Python
1# 1. Use reduceByKey instead of groupByKey (RDD)2# Bad3rdd.groupByKey().mapValues(sum)4# Good5rdd.reduceByKey(lambda a, b: a + b)67# 2. Aggregate before join8# Bad9df1.join(df2, "key").groupBy("category").sum("value")10# Good11df1_agg = df1.groupBy("key", "category").sum("value")12df1_agg.join(df2, "key")1314# 3. Use coalesce instead of repartition to reduce15df.coalesce(10) # No shuffle16df.repartition(10) # Full shuffle6.2 Shuffle Configuration
Python
1# Number of shuffle partitions2spark.conf.set("spark.sql.shuffle.partitions", "200")34# Shuffle compression5spark.conf.set("spark.shuffle.compress", "true")6spark.conf.set("spark.shuffle.spill.compress", "true")78# Shuffle file buffer9spark.conf.set("spark.shuffle.file.buffer", "64k")7. Data Serialization
7.1 Kryo Serialization
Python
1spark = SparkSession.builder \2 .appName("Kryo") \3 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \4 .config("spark.kryo.registrationRequired", "false") \5 .getOrCreate()67# Register custom classes for better performance8# spark.conf.set("spark.kryo.classesToRegister", "com.example.MyClass")7.2 Data Types Optimization
Python
1# Use appropriate types2# Bad3df.withColumn("id", col("id").cast("string")) # String takes more space45# Good6df.withColumn("id", col("id").cast("integer")) # Compact78# Use IntegerType instead of LongType where possible9# Use FloatType instead of DoubleType where precision allows8. File Format Optimization
8.1 Parquet Best Practices
Python
1# Write with compression2df.write \3 .option("compression", "snappy") \4 .parquet("output/")56# Compression options: snappy (default), gzip, lz4, zstd78# Partition by frequently filtered columns9df.write \10 .partitionBy("year", "month") \11 .parquet("output/")1213# Z-ordering (Delta Lake)14# Optimizes data layout for common filter columns8.2 Predicate Pushdown
Python
1# Filter pushdown to Parquet (automatic with Spark SQL)2df = spark.read.parquet("data/")3df.filter(col("year") == 2024).explain()4# Shows: PushedFilters: [IsNotNull(year), EqualTo(year,2024)]56# Works best with:7# - Parquet files8# - Partitioned data9# - Pruned columns9. Monitoring & Debugging
9.1 Spark UI
Python
1# Access at http://<driver-host>:404023# Key tabs:4# - Jobs: Overview of all jobs5# - Stages: Details of stages (look for slow stages)6# - Storage: Cached data7# - Environment: Configuration8# - Executors: Resource usage9# - SQL: Query execution plans9.2 Common Issues
Python
1# 1. OutOfMemoryError2# Solutions:3spark.conf.set("spark.executor.memory", "8g")4spark.conf.set("spark.driver.memory", "4g")5df.persist(StorageLevel.MEMORY_AND_DISK) # Spill to disk67# 2. Shuffle taking too long8# Solutions:9spark.conf.set("spark.sql.shuffle.partitions", "500")10# Use broadcast join for small tables11# Reduce data before shuffle1213# 3. Skewed data14# Solutions:15# - Salting technique16# - Adaptive Query Execution17spark.conf.set("spark.sql.adaptive.enabled", "true")18spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")1920# 4. Too many small files21# Solutions:22df.coalesce(100).write.parquet("output/")23# Or compact after write9.3 Logging & Metrics
Python
1# Set log level2spark.sparkContext.setLogLevel("WARN") # DEBUG, INFO, WARN, ERROR34# Custom metrics5from pyspark.sql.functions import count, sum as spark_sum67# Track row counts through pipeline8df1 = spark.read.parquet("input/")9print(f"Input rows: {df1.count()}")1011df2 = df1.filter(col("status") == "active")12print(f"After filter: {df2.count()}")1314df3 = df2.groupBy("category").agg(spark_sum("amount"))15print(f"After groupby: {df3.count()}")10. Configuration Checklist
Python
1# Production configuration template2spark = SparkSession.builder \3 .appName("Production Job") \4 5 # Executor configuration6 .config("spark.executor.instances", "10") \7 .config("spark.executor.cores", "4") \8 .config("spark.executor.memory", "8g") \9 .config("spark.executor.memoryOverhead", "2g") \10 11 # Driver configuration12 .config("spark.driver.memory", "4g") \13 .config("spark.driver.maxResultSize", "2g") \14 15 # Shuffle configuration16 .config("spark.sql.shuffle.partitions", "200") \17 .config("spark.shuffle.compress", "true") \18 19 # Serialization20 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \21 22 # Adaptive Query Execution (Spark 3.0+)23 .config("spark.sql.adaptive.enabled", "true") \24 .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \25 .config("spark.sql.adaptive.skewJoin.enabled", "true") \26 27 # Dynamic allocation28 .config("spark.dynamicAllocation.enabled", "true") \29 .config("spark.dynamicAllocation.minExecutors", "2") \30 .config("spark.dynamicAllocation.maxExecutors", "20") \31 32 .getOrCreate()11. Thực hành
Performance Exercise
Exercise: Optimize Slow Query
Python
1# This query is slow. Identify problems and optimize it.23# Sample data (imagine these are large)4transactions = spark.createDataFrame([5 (1, "2024-01-15", 100.0, "A"),6 (2, "2024-01-16", 200.0, "B"),7 # ... millions of rows8], ["id", "date", "amount", "category"])910categories = spark.createDataFrame([11 ("A", "Electronics"),12 ("B", "Clothing"),13], ["category", "category_name"])1415# SLOW QUERY - Find problems16result = transactions \17 .join(categories, "category") \18 .filter(col("date") >= "2024-01-01") \19 .groupBy("category_name") \20 .agg({"amount": "sum", "*": "count"}) \21 .orderBy(col("sum(amount)").desc())2223# YOUR OPTIMIZED VERSION HERE💡 Xem đáp án
Python
1# Problems identified:2# 1. Filter after join (should filter first)3# 2. Small table not broadcasted4# 3. No caching for reuse56# OPTIMIZED VERSION7from pyspark.sql.functions import broadcast, sum as spark_sum, count89# 1. Filter BEFORE join10transactions_filtered = transactions.filter(col("date") >= "2024-01-01")1112# 2. Broadcast small lookup table13result_optimized = transactions_filtered \14 .join(broadcast(categories), "category") \15 .groupBy("category_name") \16 .agg(17 spark_sum("amount").alias("total_amount"),18 count("*").alias("transaction_count")19 ) \20 .orderBy(col("total_amount").desc())2122# 3. If result used multiple times, cache it23result_optimized.cache()2425result_optimized.show()26result_optimized.write.parquet("output/")2728result_optimized.unpersist()2930# Check execution plan31result_optimized.explain(mode="formatted")12. Tổng kết
| Optimization | Technique | Impact |
|---|---|---|
| Memory | Proper sizing, caching | Avoid OOM |
| Partitioning | Right number & size | Better parallelism |
| Joins | Broadcast, filter first | Reduce shuffle |
| Serialization | Kryo | Faster transfer |
| File Format | Parquet, compression | Less I/O |
| AQE | Enable adaptive | Auto-optimize |
Quick Wins:
- Enable Adaptive Query Execution
- Broadcast small tables
- Filter early
- Use appropriate partitions
- Cache reused DataFrames
Bài tiếp theo: Data Streaming Fundamentals
