Lý thuyết
Bài 8/15

Spark SQL & Transformations

SQL queries trong Spark, Catalyst optimizer và advanced transformations

Spark SQL & Transformations

Spark SQL Data Transformations

1. Spark SQL Overview

Spark SQL

Spark SQL cho phép bạn chạy SQL queries trên Spark DataFrames, kết hợp sức mạnh của SQL với distributed computing.

Python
1from pyspark.sql import SparkSession
2
3spark = SparkSession.builder \
4 .appName("Spark SQL") \
5 .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse") \
6 .enableHiveSupport() \ # Optional: for Hive integration
7 .getOrCreate()

2. SQL Queries trên DataFrames

2.1 Register DataFrame as View

Python
1# Create sample DataFrame
2df = spark.createDataFrame([
3 (1, "Alice", "Sales", 50000),
4 (2, "Bob", "IT", 60000),
5 (3, "Charlie", "Sales", 55000),
6 (4, "David", "IT", 65000)
7], ["id", "name", "department", "salary"])
8
9# Register as temporary view
10df.createOrReplaceTempView("employees")
11
12# Now you can use SQL
13result = spark.sql("""
14 SELECT department,
15 COUNT(*) as count,
16 AVG(salary) as avg_salary
17 FROM employees
18 GROUP BY department
19""")
20result.show()

2.2 Types of Views

Python
1# Temporary View (session-scoped)
2df.createOrReplaceTempView("temp_view")
3
4# Global Temporary View (cross-session)
5df.createOrReplaceGlobalTempView("global_view")
6# Access với: global_temp.global_view
7spark.sql("SELECT * FROM global_temp.global_view").show()
8
9# Drop views
10spark.catalog.dropTempView("temp_view")
11spark.catalog.dropGlobalTempView("global_view")

2.3 Complex SQL Queries

Python
1# Multiple tables
2orders = spark.createDataFrame([
3 (101, 1, "2024-01-15", 1000),
4 (102, 2, "2024-01-16", 500),
5 (103, 1, "2024-01-17", 750)
6], ["order_id", "customer_id", "date", "amount"])
7
8customers = spark.createDataFrame([
9 (1, "Alice", "North"),
10 (2, "Bob", "South"),
11 (3, "Charlie", "East")
12], ["customer_id", "name", "region"])
13
14orders.createOrReplaceTempView("orders")
15customers.createOrReplaceTempView("customers")
16
17# Complex query with JOIN, aggregation, window
18result = spark.sql("""
19 WITH customer_orders AS (
20 SELECT
21 c.customer_id,
22 c.name,
23 c.region,
24 o.order_id,
25 o.amount,
26 o.date
27 FROM customers c
28 LEFT JOIN orders o ON c.customer_id = o.customer_id
29 ),
30 customer_summary AS (
31 SELECT
32 customer_id,
33 name,
34 region,
35 COUNT(order_id) as total_orders,
36 COALESCE(SUM(amount), 0) as total_amount,
37 AVG(amount) as avg_order_value
38 FROM customer_orders
39 GROUP BY customer_id, name, region
40 )
41 SELECT
42 *,
43 RANK() OVER (PARTITION BY region ORDER BY total_amount DESC) as region_rank
44 FROM customer_summary
45 ORDER BY region, region_rank
46""")
47result.show()

3. SQL vs DataFrame API

Python
1# Cùng một logic, hai cách viết
2
3# ===== SQL =====
4spark.sql("""
5 SELECT
6 department,
7 COUNT(*) as count,
8 AVG(salary) as avg_salary,
9 MAX(salary) as max_salary
10 FROM employees
11 WHERE salary > 50000
12 GROUP BY department
13 HAVING COUNT(*) > 1
14 ORDER BY avg_salary DESC
15""").show()
16
17# ===== DataFrame API =====
18from pyspark.sql.functions import count, avg, max as spark_max, col
19
20df.filter(col("salary") > 50000) \
21 .groupBy("department") \
22 .agg(
23 count("*").alias("count"),
24 avg("salary").alias("avg_salary"),
25 spark_max("salary").alias("max_salary")
26 ) \
27 .filter(col("count") > 1) \
28 .orderBy(col("avg_salary").desc()) \
29 .show()
Khi nào dùng SQL vs DataFrame API?
  • SQL: Familiar cho SQL users, complex queries dễ đọc
  • DataFrame API: Type-safe, IDE autocomplete, easier chaining
  • Recommendation: Use what makes your code more readable

4. Catalyst Optimizer

4.1 How Catalyst Works

Text
1┌──────────────────────────────────────────────────────────┐
2│ User Code (SQL/DataFrame) │
3└────────────────────────────┬─────────────────────────────┘
4
5 v
6┌──────────────────────────────────────────────────────────┐
7│ Unresolved Logical Plan │
8│ (Abstract Syntax Tree) │
9└────────────────────────────┬─────────────────────────────┘
10 │ Analysis
11 v
12┌──────────────────────────────────────────────────────────┐
13│ Resolved Logical Plan │
14│ (Schema resolved) │
15└────────────────────────────┬─────────────────────────────┘
16 │ Optimization
17 v
18┌──────────────────────────────────────────────────────────┐
19│ Optimized Logical Plan │
20│ (Predicate pushdown, etc.) │
21└────────────────────────────┬─────────────────────────────┘
22 │ Physical Planning
23 v
24┌──────────────────────────────────────────────────────────┐
25│ Physical Plan │
26│ (Best execution strategy) │
27└────────────────────────────┬─────────────────────────────┘
28
29 v
30┌──────────────────────────────────────────────────────────┐
31│ Code Generation (Whole-stage codegen) │
32└──────────────────────────────────────────────────────────┘

4.2 View Execution Plan

Python
1# Logical Plan
2df.filter(col("salary") > 50000).explain(mode="simple")
3
4# Physical Plan
5df.filter(col("salary") > 50000).explain(mode="extended")
6
7# Cost-based plan
8df.filter(col("salary") > 50000).explain(mode="cost")
9
10# Formatted plan
11df.filter(col("salary") > 50000).explain(mode="formatted")

4.3 Catalyst Optimizations

Python
1# Predicate Pushdown - Filter pushed to data source
2df = spark.read.parquet("data.parquet")
3df.filter(col("year") == 2024).explain()
4# Filter pushed down to scan, reads less data
5
6# Column Pruning - Only read needed columns
7df.select("name", "salary").explain()
8# Only reads 2 columns from parquet, not all
9
10# Constant Folding
11df.withColumn("x", lit(1) + lit(2)).explain()
12# Computes 1+2=3 at compile time, not runtime
13
14# Predicate Simplification
15df.filter((col("x") > 0) & (col("x") > 0)).explain()
16# Removes duplicate predicate

5. Advanced Transformations

5.1 User Defined Functions (UDF)

Python
1from pyspark.sql.functions import udf
2from pyspark.sql.types import StringType, IntegerType
3
4# Define Python function
5def categorize_salary(salary):
6 if salary >= 60000:
7 return "High"
8 elif salary >= 50000:
9 return "Medium"
10 else:
11 return "Low"
12
13# Register as UDF
14categorize_udf = udf(categorize_salary, StringType())
15
16# Use in DataFrame
17df.withColumn("salary_category", categorize_udf(col("salary"))).show()
18
19# Register for SQL
20spark.udf.register("categorize_salary_sql", categorize_salary, StringType())
21spark.sql("""
22 SELECT name, salary, categorize_salary_sql(salary) as category
23 FROM employees
24""").show()

5.2 Pandas UDF (Vectorized)

Python
1from pyspark.sql.functions import pandas_udf
2import pandas as pd
3
4# Scalar Pandas UDF (row-by-row)
5@pandas_udf(StringType())
6def categorize_pandas(salary: pd.Series) -> pd.Series:
7 return salary.apply(lambda x: "High" if x >= 60000 else "Medium" if x >= 50000 else "Low")
8
9df.withColumn("category", categorize_pandas(col("salary"))).show()
10
11# Grouped Map Pandas UDF (group-by operations)
12from pyspark.sql.functions import pandas_udf, PandasUDFType
13
14@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
15def normalize_salary(pdf: pd.DataFrame) -> pd.DataFrame:
16 pdf['salary_normalized'] = (pdf['salary'] - pdf['salary'].mean()) / pdf['salary'].std()
17 return pdf
18
19df.groupby("department").apply(normalize_salary).show()
UDF Performance
  • Regular UDF: Slow due to Python serialization
  • Pandas UDF: Faster, uses Arrow for zero-copy transfer
  • Built-in functions: Fastest, always prefer when possible

5.3 Higher-Order Functions

Python
1from pyspark.sql.functions import transform, filter as array_filter, aggregate, array
2
3# Sample data with arrays
4df = spark.createDataFrame([
5 (1, [1, 2, 3, 4, 5]),
6 (2, [10, 20, 30])
7], ["id", "numbers"])
8
9# Transform - apply function to each element
10df.withColumn(
11 "doubled",
12 transform(col("numbers"), lambda x: x * 2)
13).show(truncate=False)
14
15# Filter array elements
16df.withColumn(
17 "evens",
18 array_filter(col("numbers"), lambda x: x % 2 == 0)
19).show(truncate=False)
20
21# Aggregate array to single value
22df.withColumn(
23 "sum",
24 aggregate(col("numbers"), lit(0), lambda acc, x: acc + x)
25).show()

6. Working with Complex Types

6.1 Structs

Python
1from pyspark.sql.functions import struct, col
2
3# Create struct column
4df = df.withColumn(
5 "employee_info",
6 struct(
7 col("name").alias("emp_name"),
8 col("department").alias("dept"),
9 col("salary")
10 )
11)
12
13# Access struct fields
14df.select(
15 col("id"),
16 col("employee_info.emp_name"),
17 col("employee_info.salary")
18).show()
19
20# Flatten struct
21df.select("id", "employee_info.*").show()

6.2 Arrays

Python
1from pyspark.sql.functions import array, explode, collect_list, array_contains, size
2
3# Create array column
4df = spark.createDataFrame([
5 (1, "Alice", ["Python", "SQL", "Spark"]),
6 (2, "Bob", ["Java", "Scala"]),
7 (3, "Charlie", ["Python", "R"])
8], ["id", "name", "skills"])
9
10# Explode array to rows
11df.select("name", explode(col("skills")).alias("skill")).show()
12
13# Array operations
14df.withColumn("num_skills", size(col("skills"))).show()
15df.filter(array_contains(col("skills"), "Python")).show()
16
17# Collect back to array
18df.select("name", explode(col("skills")).alias("skill")) \
19 .groupBy("skill") \
20 .agg(collect_list("name").alias("people")) \
21 .show(truncate=False)

6.3 Maps

Python
1from pyspark.sql.functions import create_map, map_keys, map_values, explode
2
3# Create map from columns
4df = spark.createDataFrame([
5 (1, "Alice", "Developer", 50000),
6 (2, "Bob", "Manager", 60000)
7], ["id", "name", "role", "salary"])
8
9df = df.withColumn(
10 "attributes",
11 create_map(
12 lit("role"), col("role"),
13 lit("salary"), col("salary").cast("string")
14 )
15)
16df.show(truncate=False)
17
18# Access map values
19df.select(
20 col("name"),
21 col("attributes")["role"].alias("role")
22).show()
23
24# Explode map
25df.select("name", explode(col("attributes"))).show()

7. Working with JSON

Python
1from pyspark.sql.functions import from_json, to_json, json_tuple, get_json_object
2from pyspark.sql.types import StructType, StructField, StringType, IntegerType
3
4# Sample JSON data
5df = spark.createDataFrame([
6 (1, '{"name": "Alice", "age": 30, "city": "NYC"}'),
7 (2, '{"name": "Bob", "age": 25, "city": "LA"}')
8], ["id", "json_data"])
9
10# Parse JSON string to struct
11schema = StructType([
12 StructField("name", StringType()),
13 StructField("age", IntegerType()),
14 StructField("city", StringType())
15])
16
17df_parsed = df.withColumn("parsed", from_json(col("json_data"), schema))
18df_parsed.select("id", "parsed.*").show()
19
20# Extract specific fields
21df.select(
22 "id",
23 get_json_object(col("json_data"), "$.name").alias("name"),
24 get_json_object(col("json_data"), "$.age").alias("age")
25).show()
26
27# Convert struct back to JSON
28df_parsed.withColumn("back_to_json", to_json(col("parsed"))).show(truncate=False)

8. Data Skew Handling

8.1 Identifying Skew

Python
1# Check partition sizes
2df.groupBy(spark_partition_id()).count().show()
3
4# Check value distribution
5df.groupBy("key_column").count().orderBy(col("count").desc()).show(10)

8.2 Handling Skew

Python
1# Method 1: Salting
2from pyspark.sql.functions import rand, floor, concat, lit
3
4# Add salt to skewed key
5num_salts = 10
6df_salted = df.withColumn(
7 "salted_key",
8 concat(col("key"), lit("_"), (floor(rand() * num_salts)).cast("string"))
9)
10
11# Salt the lookup table too
12lookup_salted = lookup_df.crossJoin(
13 spark.range(num_salts).withColumnRenamed("id", "salt")
14).withColumn(
15 "salted_key",
16 concat(col("key"), lit("_"), col("salt").cast("string"))
17)
18
19# Join on salted key
20result = df_salted.join(lookup_salted, "salted_key")
21
22# Method 2: Broadcast join for small tables
23from pyspark.sql.functions import broadcast
24
25# Force broadcast of small table
26result = large_df.join(broadcast(small_df), "key")
27
28# Method 3: Adaptive Query Execution (Spark 3.0+)
29spark.conf.set("spark.sql.adaptive.enabled", "true")
30spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

9. Thực hành

SQL Practice

Exercise 1: Complex SQL Query

Python
1# Given tables: orders, products, customers
2# Write SQL to find:
3# 1. Top 3 customers by total spending per region
4# 2. Include their order count and average order value
5# 3. Compare to region average
6
7orders_data = [
8 (1, 101, 1, "2024-01-15", 100.0),
9 (2, 101, 2, "2024-01-16", 200.0),
10 (3, 102, 1, "2024-01-17", 150.0),
11 (4, 103, 3, "2024-01-18", 300.0),
12 (5, 101, 1, "2024-01-19", 250.0),
13]
14orders = spark.createDataFrame(orders_data,
15 ["order_id", "customer_id", "product_id", "date", "amount"])
16
17customers_data = [
18 (101, "Alice", "North"),
19 (102, "Bob", "North"),
20 (103, "Charlie", "South"),
21 (104, "David", "South")
22]
23customers = spark.createDataFrame(customers_data,
24 ["customer_id", "name", "region"])
25
26orders.createOrReplaceTempView("orders")
27customers.createOrReplaceTempView("customers")
28
29# YOUR SQL QUERY HERE
💡 Xem đáp án
Python
1result = spark.sql("""
2 WITH customer_stats AS (
3 SELECT
4 c.customer_id,
5 c.name,
6 c.region,
7 COUNT(o.order_id) as order_count,
8 COALESCE(SUM(o.amount), 0) as total_spending,
9 COALESCE(AVG(o.amount), 0) as avg_order_value
10 FROM customers c
11 LEFT JOIN orders o ON c.customer_id = o.customer_id
12 GROUP BY c.customer_id, c.name, c.region
13 ),
14 region_stats AS (
15 SELECT
16 region,
17 AVG(total_spending) as region_avg_spending
18 FROM customer_stats
19 GROUP BY region
20 ),
21 ranked AS (
22 SELECT
23 cs.*,
24 rs.region_avg_spending,
25 cs.total_spending - rs.region_avg_spending as vs_region_avg,
26 RANK() OVER (PARTITION BY cs.region ORDER BY cs.total_spending DESC) as region_rank
27 FROM customer_stats cs
28 JOIN region_stats rs ON cs.region = rs.region
29 )
30 SELECT
31 region,
32 name,
33 order_count,
34 ROUND(total_spending, 2) as total_spending,
35 ROUND(avg_order_value, 2) as avg_order_value,
36 ROUND(region_avg_spending, 2) as region_avg,
37 ROUND(vs_region_avg, 2) as vs_region_avg,
38 region_rank
39 FROM ranked
40 WHERE region_rank <= 3
41 ORDER BY region, region_rank
42""")
43result.show()

Exercise 2: Create UDF

Python
1# Create UDF to calculate loyalty tier based on:
2# - total_orders >= 10 and total_amount >= 1000: "Platinum"
3# - total_orders >= 5 or total_amount >= 500: "Gold"
4# - otherwise: "Silver"
5
6# YOUR CODE HERE
💡 Xem đáp án
Python
1from pyspark.sql.functions import udf
2from pyspark.sql.types import StringType
3
4def calculate_loyalty_tier(total_orders, total_amount):
5 if total_orders is None or total_amount is None:
6 return "Silver"
7 if total_orders >= 10 and total_amount >= 1000:
8 return "Platinum"
9 elif total_orders >= 5 or total_amount >= 500:
10 return "Gold"
11 else:
12 return "Silver"
13
14loyalty_udf = udf(calculate_loyalty_tier, StringType())
15
16# Test data
17test_df = spark.createDataFrame([
18 (1, "Alice", 15, 2000.0),
19 (2, "Bob", 8, 800.0),
20 (3, "Charlie", 3, 200.0),
21 (4, "David", 2, 600.0)
22], ["id", "name", "total_orders", "total_amount"])
23
24test_df.withColumn(
25 "loyalty_tier",
26 loyalty_udf(col("total_orders"), col("total_amount"))
27).show()

10. Tổng kết

ConceptDescriptionWhen to Use
SQLQuery DataFrames với SQLComplex queries, SQL familiarity
CatalystQuery optimizerAutomatic - understand for tuning
UDFCustom functionsWhen built-ins không đủ
Pandas UDFVectorized UDFsBetter performance than regular UDF
Complex TypesStruct, Array, MapNested/complex data

Performance Tips:

  • Prefer built-in functions over UDFs
  • Use Pandas UDF over regular UDF
  • Enable Adaptive Query Execution
  • Monitor execution plans

Bài tiếp theo: Spark Performance Tuning