Spark SQL & Transformations
1. Spark SQL Overview
Spark SQL
Spark SQL cho phép bạn chạy SQL queries trên Spark DataFrames, kết hợp sức mạnh của SQL với distributed computing.
Python
1from pyspark.sql import SparkSession23spark = SparkSession.builder \4 .appName("Spark SQL") \5 .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse") \6 .enableHiveSupport() \ # Optional: for Hive integration7 .getOrCreate()2. SQL Queries trên DataFrames
2.1 Register DataFrame as View
Python
1# Create sample DataFrame2df = spark.createDataFrame([3 (1, "Alice", "Sales", 50000),4 (2, "Bob", "IT", 60000),5 (3, "Charlie", "Sales", 55000),6 (4, "David", "IT", 65000)7], ["id", "name", "department", "salary"])89# Register as temporary view10df.createOrReplaceTempView("employees")1112# Now you can use SQL13result = spark.sql("""14 SELECT department, 15 COUNT(*) as count,16 AVG(salary) as avg_salary17 FROM employees18 GROUP BY department19""")20result.show()2.2 Types of Views
Python
1# Temporary View (session-scoped)2df.createOrReplaceTempView("temp_view")34# Global Temporary View (cross-session)5df.createOrReplaceGlobalTempView("global_view")6# Access với: global_temp.global_view7spark.sql("SELECT * FROM global_temp.global_view").show()89# Drop views10spark.catalog.dropTempView("temp_view")11spark.catalog.dropGlobalTempView("global_view")2.3 Complex SQL Queries
Python
1# Multiple tables2orders = spark.createDataFrame([3 (101, 1, "2024-01-15", 1000),4 (102, 2, "2024-01-16", 500),5 (103, 1, "2024-01-17", 750)6], ["order_id", "customer_id", "date", "amount"])78customers = spark.createDataFrame([9 (1, "Alice", "North"),10 (2, "Bob", "South"),11 (3, "Charlie", "East")12], ["customer_id", "name", "region"])1314orders.createOrReplaceTempView("orders")15customers.createOrReplaceTempView("customers")1617# Complex query with JOIN, aggregation, window18result = spark.sql("""19 WITH customer_orders AS (20 SELECT 21 c.customer_id,22 c.name,23 c.region,24 o.order_id,25 o.amount,26 o.date27 FROM customers c28 LEFT JOIN orders o ON c.customer_id = o.customer_id29 ),30 customer_summary AS (31 SELECT 32 customer_id,33 name,34 region,35 COUNT(order_id) as total_orders,36 COALESCE(SUM(amount), 0) as total_amount,37 AVG(amount) as avg_order_value38 FROM customer_orders39 GROUP BY customer_id, name, region40 )41 SELECT 42 *,43 RANK() OVER (PARTITION BY region ORDER BY total_amount DESC) as region_rank44 FROM customer_summary45 ORDER BY region, region_rank46""")47result.show()3. SQL vs DataFrame API
Python
1# Cùng một logic, hai cách viết23# ===== SQL =====4spark.sql("""5 SELECT 6 department,7 COUNT(*) as count,8 AVG(salary) as avg_salary,9 MAX(salary) as max_salary10 FROM employees11 WHERE salary > 5000012 GROUP BY department13 HAVING COUNT(*) > 114 ORDER BY avg_salary DESC15""").show()1617# ===== DataFrame API =====18from pyspark.sql.functions import count, avg, max as spark_max, col1920df.filter(col("salary") > 50000) \21 .groupBy("department") \22 .agg(23 count("*").alias("count"),24 avg("salary").alias("avg_salary"),25 spark_max("salary").alias("max_salary")26 ) \27 .filter(col("count") > 1) \28 .orderBy(col("avg_salary").desc()) \29 .show()Khi nào dùng SQL vs DataFrame API?
- SQL: Familiar cho SQL users, complex queries dễ đọc
- DataFrame API: Type-safe, IDE autocomplete, easier chaining
- Recommendation: Use what makes your code more readable
4. Catalyst Optimizer
4.1 How Catalyst Works
Text
1┌──────────────────────────────────────────────────────────┐2│ User Code (SQL/DataFrame) │3└────────────────────────────┬─────────────────────────────┘4 │5 v6┌──────────────────────────────────────────────────────────┐7│ Unresolved Logical Plan │8│ (Abstract Syntax Tree) │9└────────────────────────────┬─────────────────────────────┘10 │ Analysis11 v12┌──────────────────────────────────────────────────────────┐13│ Resolved Logical Plan │14│ (Schema resolved) │15└────────────────────────────┬─────────────────────────────┘16 │ Optimization17 v18┌──────────────────────────────────────────────────────────┐19│ Optimized Logical Plan │20│ (Predicate pushdown, etc.) │21└────────────────────────────┬─────────────────────────────┘22 │ Physical Planning23 v24┌──────────────────────────────────────────────────────────┐25│ Physical Plan │26│ (Best execution strategy) │27└────────────────────────────┬─────────────────────────────┘28 │29 v30┌──────────────────────────────────────────────────────────┐31│ Code Generation (Whole-stage codegen) │32└──────────────────────────────────────────────────────────┘4.2 View Execution Plan
Python
1# Logical Plan2df.filter(col("salary") > 50000).explain(mode="simple")34# Physical Plan5df.filter(col("salary") > 50000).explain(mode="extended")67# Cost-based plan8df.filter(col("salary") > 50000).explain(mode="cost")910# Formatted plan11df.filter(col("salary") > 50000).explain(mode="formatted")4.3 Catalyst Optimizations
Python
1# Predicate Pushdown - Filter pushed to data source2df = spark.read.parquet("data.parquet")3df.filter(col("year") == 2024).explain()4# Filter pushed down to scan, reads less data56# Column Pruning - Only read needed columns7df.select("name", "salary").explain()8# Only reads 2 columns from parquet, not all910# Constant Folding11df.withColumn("x", lit(1) + lit(2)).explain()12# Computes 1+2=3 at compile time, not runtime1314# Predicate Simplification15df.filter((col("x") > 0) & (col("x") > 0)).explain()16# Removes duplicate predicate5. Advanced Transformations
5.1 User Defined Functions (UDF)
Python
1from pyspark.sql.functions import udf2from pyspark.sql.types import StringType, IntegerType34# Define Python function5def categorize_salary(salary):6 if salary >= 60000:7 return "High"8 elif salary >= 50000:9 return "Medium"10 else:11 return "Low"1213# Register as UDF14categorize_udf = udf(categorize_salary, StringType())1516# Use in DataFrame17df.withColumn("salary_category", categorize_udf(col("salary"))).show()1819# Register for SQL20spark.udf.register("categorize_salary_sql", categorize_salary, StringType())21spark.sql("""22 SELECT name, salary, categorize_salary_sql(salary) as category23 FROM employees24""").show()5.2 Pandas UDF (Vectorized)
Python
1from pyspark.sql.functions import pandas_udf2import pandas as pd34# Scalar Pandas UDF (row-by-row)5@pandas_udf(StringType())6def categorize_pandas(salary: pd.Series) -> pd.Series:7 return salary.apply(lambda x: "High" if x >= 60000 else "Medium" if x >= 50000 else "Low")89df.withColumn("category", categorize_pandas(col("salary"))).show()1011# Grouped Map Pandas UDF (group-by operations)12from pyspark.sql.functions import pandas_udf, PandasUDFType1314@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)15def normalize_salary(pdf: pd.DataFrame) -> pd.DataFrame:16 pdf['salary_normalized'] = (pdf['salary'] - pdf['salary'].mean()) / pdf['salary'].std()17 return pdf1819df.groupby("department").apply(normalize_salary).show()UDF Performance
- Regular UDF: Slow due to Python serialization
- Pandas UDF: Faster, uses Arrow for zero-copy transfer
- Built-in functions: Fastest, always prefer when possible
5.3 Higher-Order Functions
Python
1from pyspark.sql.functions import transform, filter as array_filter, aggregate, array23# Sample data with arrays4df = spark.createDataFrame([5 (1, [1, 2, 3, 4, 5]),6 (2, [10, 20, 30])7], ["id", "numbers"])89# Transform - apply function to each element10df.withColumn(11 "doubled",12 transform(col("numbers"), lambda x: x * 2)13).show(truncate=False)1415# Filter array elements16df.withColumn(17 "evens",18 array_filter(col("numbers"), lambda x: x % 2 == 0)19).show(truncate=False)2021# Aggregate array to single value22df.withColumn(23 "sum",24 aggregate(col("numbers"), lit(0), lambda acc, x: acc + x)25).show()6. Working with Complex Types
6.1 Structs
Python
1from pyspark.sql.functions import struct, col23# Create struct column4df = df.withColumn(5 "employee_info",6 struct(7 col("name").alias("emp_name"),8 col("department").alias("dept"),9 col("salary")10 )11)1213# Access struct fields14df.select(15 col("id"),16 col("employee_info.emp_name"),17 col("employee_info.salary")18).show()1920# Flatten struct21df.select("id", "employee_info.*").show()6.2 Arrays
Python
1from pyspark.sql.functions import array, explode, collect_list, array_contains, size23# Create array column4df = spark.createDataFrame([5 (1, "Alice", ["Python", "SQL", "Spark"]),6 (2, "Bob", ["Java", "Scala"]),7 (3, "Charlie", ["Python", "R"])8], ["id", "name", "skills"])910# Explode array to rows11df.select("name", explode(col("skills")).alias("skill")).show()1213# Array operations14df.withColumn("num_skills", size(col("skills"))).show()15df.filter(array_contains(col("skills"), "Python")).show()1617# Collect back to array18df.select("name", explode(col("skills")).alias("skill")) \19 .groupBy("skill") \20 .agg(collect_list("name").alias("people")) \21 .show(truncate=False)6.3 Maps
Python
1from pyspark.sql.functions import create_map, map_keys, map_values, explode23# Create map from columns4df = spark.createDataFrame([5 (1, "Alice", "Developer", 50000),6 (2, "Bob", "Manager", 60000)7], ["id", "name", "role", "salary"])89df = df.withColumn(10 "attributes",11 create_map(12 lit("role"), col("role"),13 lit("salary"), col("salary").cast("string")14 )15)16df.show(truncate=False)1718# Access map values19df.select(20 col("name"),21 col("attributes")["role"].alias("role")22).show()2324# Explode map25df.select("name", explode(col("attributes"))).show()7. Working with JSON
Python
1from pyspark.sql.functions import from_json, to_json, json_tuple, get_json_object2from pyspark.sql.types import StructType, StructField, StringType, IntegerType34# Sample JSON data5df = spark.createDataFrame([6 (1, '{"name": "Alice", "age": 30, "city": "NYC"}'),7 (2, '{"name": "Bob", "age": 25, "city": "LA"}')8], ["id", "json_data"])910# Parse JSON string to struct11schema = StructType([12 StructField("name", StringType()),13 StructField("age", IntegerType()),14 StructField("city", StringType())15])1617df_parsed = df.withColumn("parsed", from_json(col("json_data"), schema))18df_parsed.select("id", "parsed.*").show()1920# Extract specific fields21df.select(22 "id",23 get_json_object(col("json_data"), "$.name").alias("name"),24 get_json_object(col("json_data"), "$.age").alias("age")25).show()2627# Convert struct back to JSON28df_parsed.withColumn("back_to_json", to_json(col("parsed"))).show(truncate=False)8. Data Skew Handling
8.1 Identifying Skew
Python
1# Check partition sizes2df.groupBy(spark_partition_id()).count().show()34# Check value distribution5df.groupBy("key_column").count().orderBy(col("count").desc()).show(10)8.2 Handling Skew
Python
1# Method 1: Salting2from pyspark.sql.functions import rand, floor, concat, lit34# Add salt to skewed key5num_salts = 106df_salted = df.withColumn(7 "salted_key",8 concat(col("key"), lit("_"), (floor(rand() * num_salts)).cast("string"))9)1011# Salt the lookup table too12lookup_salted = lookup_df.crossJoin(13 spark.range(num_salts).withColumnRenamed("id", "salt")14).withColumn(15 "salted_key",16 concat(col("key"), lit("_"), col("salt").cast("string"))17)1819# Join on salted key20result = df_salted.join(lookup_salted, "salted_key")2122# Method 2: Broadcast join for small tables23from pyspark.sql.functions import broadcast2425# Force broadcast of small table26result = large_df.join(broadcast(small_df), "key")2728# Method 3: Adaptive Query Execution (Spark 3.0+)29spark.conf.set("spark.sql.adaptive.enabled", "true")30spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")9. Thực hành
SQL Practice
Exercise 1: Complex SQL Query
Python
1# Given tables: orders, products, customers2# Write SQL to find:3# 1. Top 3 customers by total spending per region4# 2. Include their order count and average order value5# 3. Compare to region average67orders_data = [8 (1, 101, 1, "2024-01-15", 100.0),9 (2, 101, 2, "2024-01-16", 200.0),10 (3, 102, 1, "2024-01-17", 150.0),11 (4, 103, 3, "2024-01-18", 300.0),12 (5, 101, 1, "2024-01-19", 250.0),13]14orders = spark.createDataFrame(orders_data, 15 ["order_id", "customer_id", "product_id", "date", "amount"])1617customers_data = [18 (101, "Alice", "North"),19 (102, "Bob", "North"),20 (103, "Charlie", "South"),21 (104, "David", "South")22]23customers = spark.createDataFrame(customers_data,24 ["customer_id", "name", "region"])2526orders.createOrReplaceTempView("orders")27customers.createOrReplaceTempView("customers")2829# YOUR SQL QUERY HERE💡 Xem đáp án
Python
1result = spark.sql("""2 WITH customer_stats AS (3 SELECT 4 c.customer_id,5 c.name,6 c.region,7 COUNT(o.order_id) as order_count,8 COALESCE(SUM(o.amount), 0) as total_spending,9 COALESCE(AVG(o.amount), 0) as avg_order_value10 FROM customers c11 LEFT JOIN orders o ON c.customer_id = o.customer_id12 GROUP BY c.customer_id, c.name, c.region13 ),14 region_stats AS (15 SELECT 16 region,17 AVG(total_spending) as region_avg_spending18 FROM customer_stats19 GROUP BY region20 ),21 ranked AS (22 SELECT 23 cs.*,24 rs.region_avg_spending,25 cs.total_spending - rs.region_avg_spending as vs_region_avg,26 RANK() OVER (PARTITION BY cs.region ORDER BY cs.total_spending DESC) as region_rank27 FROM customer_stats cs28 JOIN region_stats rs ON cs.region = rs.region29 )30 SELECT 31 region,32 name,33 order_count,34 ROUND(total_spending, 2) as total_spending,35 ROUND(avg_order_value, 2) as avg_order_value,36 ROUND(region_avg_spending, 2) as region_avg,37 ROUND(vs_region_avg, 2) as vs_region_avg,38 region_rank39 FROM ranked40 WHERE region_rank <= 341 ORDER BY region, region_rank42""")43result.show()Exercise 2: Create UDF
Python
1# Create UDF to calculate loyalty tier based on:2# - total_orders >= 10 and total_amount >= 1000: "Platinum"3# - total_orders >= 5 or total_amount >= 500: "Gold"4# - otherwise: "Silver"56# YOUR CODE HERE💡 Xem đáp án
Python
1from pyspark.sql.functions import udf2from pyspark.sql.types import StringType34def calculate_loyalty_tier(total_orders, total_amount):5 if total_orders is None or total_amount is None:6 return "Silver"7 if total_orders >= 10 and total_amount >= 1000:8 return "Platinum"9 elif total_orders >= 5 or total_amount >= 500:10 return "Gold"11 else:12 return "Silver"1314loyalty_udf = udf(calculate_loyalty_tier, StringType())1516# Test data17test_df = spark.createDataFrame([18 (1, "Alice", 15, 2000.0),19 (2, "Bob", 8, 800.0),20 (3, "Charlie", 3, 200.0),21 (4, "David", 2, 600.0)22], ["id", "name", "total_orders", "total_amount"])2324test_df.withColumn(25 "loyalty_tier",26 loyalty_udf(col("total_orders"), col("total_amount"))27).show()10. Tổng kết
| Concept | Description | When to Use |
|---|---|---|
| SQL | Query DataFrames với SQL | Complex queries, SQL familiarity |
| Catalyst | Query optimizer | Automatic - understand for tuning |
| UDF | Custom functions | When built-ins không đủ |
| Pandas UDF | Vectorized UDFs | Better performance than regular UDF |
| Complex Types | Struct, Array, Map | Nested/complex data |
Performance Tips:
- Prefer built-in functions over UDFs
- Use Pandas UDF over regular UDF
- Enable Adaptive Query Execution
- Monitor execution plans
Bài tiếp theo: Spark Performance Tuning
