Lý thuyết
Bài 7/15

PySpark DataFrame API

DataFrame operations, transformations và best practices trong PySpark

PySpark DataFrame API

PySpark DataFrame Operations

1. Tạo DataFrame

1.1 Từ List/Dictionary

Python
1from pyspark.sql import SparkSession
2from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
3
4spark = SparkSession.builder.appName("DataFrame API").getOrCreate()
5
6# From list of tuples
7data = [
8 (1, "Alice", "Sales", 50000),
9 (2, "Bob", "IT", 60000),
10 (3, "Charlie", "Sales", 55000)
11]
12columns = ["id", "name", "department", "salary"]
13df = spark.createDataFrame(data, columns)
14
15# From list of dictionaries
16data_dict = [
17 {"id": 1, "name": "Alice", "department": "Sales"},
18 {"id": 2, "name": "Bob", "department": "IT"}
19]
20df = spark.createDataFrame(data_dict)
21
22# With explicit schema (recommended)
23schema = StructType([
24 StructField("id", IntegerType(), False),
25 StructField("name", StringType(), True),
26 StructField("department", StringType(), True),
27 StructField("salary", DoubleType(), True)
28])
29df = spark.createDataFrame(data, schema)

1.2 Từ Pandas DataFrame

Python
1import pandas as pd
2
3pandas_df = pd.DataFrame({
4 "id": [1, 2, 3],
5 "name": ["Alice", "Bob", "Charlie"],
6 "salary": [50000.0, 60000.0, 55000.0]
7})
8
9# Convert Pandas -> Spark
10spark_df = spark.createDataFrame(pandas_df)
11
12# Convert Spark -> Pandas
13back_to_pandas = spark_df.toPandas()

1.3 Từ Files

Python
1# CSV
2df = spark.read.csv("data.csv", header=True, inferSchema=True)
3
4# With options
5df = spark.read \
6 .option("header", True) \
7 .option("inferSchema", True) \
8 .option("delimiter", ",") \
9 .option("nullValue", "NA") \
10 .csv("data.csv")
11
12# Parquet
13df = spark.read.parquet("data.parquet")
14
15# JSON
16df = spark.read.json("data.json")
17
18# Multiple files
19df = spark.read.csv("data/*.csv", header=True)

2. DataFrame Information

Python
1# Schema
2df.printSchema()
3# root
4# |-- id: integer (nullable = true)
5# |-- name: string (nullable = true)
6# |-- salary: double (nullable = true)
7
8# Show data
9df.show() # First 20 rows
10df.show(5) # First 5 rows
11df.show(truncate=False) # Don't truncate long strings
12
13# Basic info
14print(f"Columns: {df.columns}")
15print(f"Data types: {df.dtypes}")
16print(f"Row count: {df.count()}")
17
18# Statistics
19df.describe().show() # count, mean, stddev, min, max
20
21# Sample
22df.sample(0.1).show() # 10% sample
23df.take(5) # First 5 as list of Row objects
24df.head(5) # First 5 as list of Row objects
25df.first() # First row

3. Column Selection

Python
1from pyspark.sql.functions import col, column
2
3# Single column
4df.select("name").show()
5df.select(df.name).show()
6df.select(col("name")).show()
7
8# Multiple columns
9df.select("name", "salary").show()
10df.select(["name", "salary"]).show()
11
12# All columns except some
13df.select([c for c in df.columns if c != "id"]).show()
14
15# With expressions
16df.select(
17 col("name"),
18 col("salary"),
19 (col("salary") * 1.1).alias("salary_with_raise")
20).show()
21
22# SelectExpr (SQL expressions)
23df.selectExpr(
24 "name",
25 "salary",
26 "salary * 1.1 as salary_with_raise",
27 "UPPER(name) as name_upper"
28).show()

4. Filtering Rows

Python
1from pyspark.sql.functions import col
2
3# Single condition
4df.filter(col("salary") > 50000).show()
5df.filter(df.salary > 50000).show()
6df.filter("salary > 50000").show() # SQL string
7
8# Multiple conditions
9df.filter(
10 (col("salary") > 50000) & (col("department") == "Sales")
11).show()
12
13df.filter(
14 (col("salary") > 50000) | (col("department") == "IT")
15).show()
16
17# NOT
18df.filter(~(col("department") == "Sales")).show()
19
20# IN clause
21df.filter(col("department").isin(["Sales", "IT"])).show()
22
23# LIKE / CONTAINS
24df.filter(col("name").like("A%")).show() # Starts with A
25df.filter(col("name").contains("li")).show() # Contains "li"
26df.filter(col("name").startswith("A")).show()
27df.filter(col("name").endswith("e")).show()
28
29# NULL handling
30df.filter(col("salary").isNull()).show()
31df.filter(col("salary").isNotNull()).show()
32
33# Between
34df.filter(col("salary").between(50000, 60000)).show()
35
36# where() is alias of filter()
37df.where(col("salary") > 50000).show()

5. Adding/Modifying Columns

Python
1from pyspark.sql.functions import col, lit, when, concat, upper, lower
2
3# Add new column
4df = df.withColumn("bonus", col("salary") * 0.1)
5
6# Add constant column
7df = df.withColumn("country", lit("USA"))
8
9# Modify existing column
10df = df.withColumn("salary", col("salary") * 1.1)
11
12# Conditional column (CASE WHEN)
13df = df.withColumn(
14 "salary_grade",
15 when(col("salary") >= 60000, "High")
16 .when(col("salary") >= 50000, "Medium")
17 .otherwise("Low")
18)
19
20# Multiple conditions
21df = df.withColumn(
22 "status",
23 when((col("salary") > 55000) & (col("department") == "IT"), "Senior IT")
24 .when(col("salary") > 55000, "Senior")
25 .otherwise("Junior")
26)
27
28# String operations
29df = df.withColumn("name_upper", upper(col("name")))
30df = df.withColumn("name_lower", lower(col("name")))
31df = df.withColumn("full_info", concat(col("name"), lit(" - "), col("department")))
32
33# Rename column
34df = df.withColumnRenamed("salary", "annual_salary")
35
36# Drop column
37df = df.drop("bonus")
38df = df.drop("col1", "col2")
39
40# Cast data type
41df = df.withColumn("salary_int", col("salary").cast("integer"))
42df = df.withColumn("salary_str", col("salary").cast(StringType()))

6. Aggregations

Python
1from pyspark.sql.functions import (
2 count, countDistinct, sum, avg, mean,
3 min, max, first, last, collect_list, collect_set
4)
5
6# Basic aggregations
7df.agg(
8 count("*").alias("total_rows"),
9 countDistinct("department").alias("unique_departments"),
10 sum("salary").alias("total_salary"),
11 avg("salary").alias("avg_salary"),
12 min("salary").alias("min_salary"),
13 max("salary").alias("max_salary")
14).show()
15
16# Group By
17df.groupBy("department").count().show()
18
19df.groupBy("department").agg(
20 count("*").alias("employee_count"),
21 sum("salary").alias("total_salary"),
22 avg("salary").alias("avg_salary"),
23 min("salary").alias("min_salary"),
24 max("salary").alias("max_salary")
25).show()
26
27# Multiple grouping columns
28df.groupBy("department", "salary_grade").count().show()
29
30# Collect values into list/set
31df.groupBy("department").agg(
32 collect_list("name").alias("employees"),
33 collect_set("salary_grade").alias("grades")
34).show(truncate=False)
35
36# Pivot
37df.groupBy("department").pivot("salary_grade").count().show()

7. Sorting

Python
1from pyspark.sql.functions import col, asc, desc
2
3# Ascending (default)
4df.orderBy("salary").show()
5df.orderBy(col("salary").asc()).show()
6
7# Descending
8df.orderBy(col("salary").desc()).show()
9df.orderBy(df.salary.desc()).show()
10
11# Multiple columns
12df.orderBy(
13 col("department").asc(),
14 col("salary").desc()
15).show()
16
17# sort() is alias of orderBy()
18df.sort("department", "salary").show()
19
20# Nulls handling
21df.orderBy(col("salary").asc_nulls_first()).show()
22df.orderBy(col("salary").desc_nulls_last()).show()

8. Joins

Python
1# Sample DataFrames
2employees = spark.createDataFrame([
3 (1, "Alice", 101),
4 (2, "Bob", 102),
5 (3, "Charlie", 103),
6 (4, "David", None)
7], ["emp_id", "name", "dept_id"])
8
9departments = spark.createDataFrame([
10 (101, "Sales"),
11 (102, "IT"),
12 (104, "HR")
13], ["dept_id", "dept_name"])
14
15# Inner Join (default)
16employees.join(departments, employees.dept_id == departments.dept_id).show()
17
18# Using column name (if same in both)
19employees.join(departments, "dept_id").show()
20
21# Left Join
22employees.join(departments, "dept_id", "left").show()
23
24# Right Join
25employees.join(departments, "dept_id", "right").show()
26
27# Full Outer Join
28employees.join(departments, "dept_id", "outer").show()
29
30# Cross Join (Cartesian product)
31employees.crossJoin(departments).show()
32
33# Left Anti Join (employees without departments)
34employees.join(departments, "dept_id", "left_anti").show()
35
36# Left Semi Join (employees with departments, only left columns)
37employees.join(departments, "dept_id", "left_semi").show()
38
39# Multiple join conditions
40df1.join(
41 df2,
42 (df1.col1 == df2.col1) & (df1.col2 == df2.col2),
43 "inner"
44).show()

9. Window Functions

Python
1from pyspark.sql.window import Window
2from pyspark.sql.functions import (
3 row_number, rank, dense_rank,
4 lag, lead, sum, avg
5)
6
7# Sample data
8sales = spark.createDataFrame([
9 ("2024-01", "Electronics", 1000),
10 ("2024-01", "Clothing", 500),
11 ("2024-02", "Electronics", 1200),
12 ("2024-02", "Clothing", 600),
13 ("2024-03", "Electronics", 1100),
14 ("2024-03", "Clothing", 550)
15], ["month", "category", "revenue"])
16
17# Define window
18window_spec = Window.partitionBy("category").orderBy("month")
19
20# Row numbering
21sales.withColumn("row_num", row_number().over(window_spec)).show()
22
23# Ranking
24sales.withColumn("rank", rank().over(window_spec)).show()
25sales.withColumn("dense_rank", dense_rank().over(window_spec)).show()
26
27# Lag/Lead
28sales = sales.withColumn("prev_revenue", lag("revenue", 1).over(window_spec))
29sales = sales.withColumn("next_revenue", lead("revenue", 1).over(window_spec))
30sales.show()
31
32# Running totals
33window_running = Window.partitionBy("category").orderBy("month").rowsBetween(
34 Window.unboundedPreceding, Window.currentRow
35)
36sales.withColumn("running_total", sum("revenue").over(window_running)).show()
37
38# Moving average
39window_moving = Window.partitionBy("category").orderBy("month").rowsBetween(-1, 1)
40sales.withColumn("moving_avg", avg("revenue").over(window_moving)).show()

10. Handling Nulls

Python
1from pyspark.sql.functions import col, when, coalesce, isnan
2
3# Check for nulls
4df.filter(col("salary").isNull()).show()
5df.filter(col("salary").isNotNull()).show()
6
7# Drop rows with nulls
8df.dropna().show() # Any null in any column
9df.dropna(subset=["salary"]).show() # Null in specific column
10df.dropna(how="all").show() # All columns are null
11df.dropna(thresh=2).show() # At least 2 non-null values
12
13# Fill nulls
14df.fillna(0).show() # Fill all with 0
15df.fillna({"salary": 0, "name": "Unknown"}).show() # Column-specific
16
17# Coalesce (first non-null)
18df.withColumn(
19 "value",
20 coalesce(col("salary"), col("bonus"), lit(0))
21).show()
22
23# Replace values
24df.replace("Sales", "Sales Team", subset=["department"]).show()
25df.replace([1, 2], [10, 20], subset=["id"]).show()

11. Useful Functions

Python
1from pyspark.sql.functions import (
2 # String
3 trim, ltrim, rtrim, length, substring,
4 concat, concat_ws, split, regexp_replace, regexp_extract,
5
6 # Date/Time
7 current_date, current_timestamp, date_format,
8 year, month, dayofmonth, hour, minute,
9 datediff, date_add, date_sub, months_between,
10
11 # Math
12 abs, ceil, floor, round, sqrt, pow, log, exp,
13
14 # Array
15 array, array_contains, explode, size,
16
17 # Conditional
18 when, coalesce, greatest, least,
19
20 # Null handling
21 isnull, isnan, nvl
22)
23
24# String examples
25df.withColumn("name_len", length(col("name")))
26df.withColumn("first_3", substring(col("name"), 1, 3))
27df.withColumn("words", split(col("full_name"), " "))
28
29# Date examples
30df.withColumn("today", current_date())
31df.withColumn("year", year(col("date_column")))
32df.withColumn("formatted", date_format(col("date_column"), "yyyy-MM-dd"))
33
34# Array examples
35df.withColumn("tags_array", split(col("tags"), ","))
36df.select(explode(col("tags_array")).alias("tag"))

12. Thực hành

Comprehensive Exercise

Exercise: Sales Analysis

Python
1# Sample Data
2sales_data = [
3 ("2024-01-15", "Electronics", "Laptop", 1200, 5, "North"),
4 ("2024-01-16", "Electronics", "Phone", 800, 10, "South"),
5 ("2024-01-17", "Clothing", "Shirt", 50, 100, "North"),
6 ("2024-01-18", "Clothing", "Pants", 80, 50, "East"),
7 ("2024-01-19", "Electronics", "Tablet", 500, 15, "West"),
8 ("2024-01-20", "Electronics", "Laptop", 1200, 8, "North"),
9 ("2024-01-21", "Clothing", "Shirt", 50, 80, "South"),
10]
11columns = ["date", "category", "product", "price", "quantity", "region"]
12
13# Tasks:
14# 1. Tạo DataFrame với schema rõ ràng
15# 2. Thêm column revenue = price * quantity
16# 3. Thêm column revenue_category: High (>5000), Medium (1000-5000), Low (<1000)
17# 4. Tính tổng revenue theo category và region
18# 5. Tìm top 2 products có revenue cao nhất mỗi category
19# 6. Tính running total revenue theo category (order by date)
20
21# YOUR CODE HERE
💡 Xem đáp án
Python
1from pyspark.sql import SparkSession
2from pyspark.sql.types import *
3from pyspark.sql.functions import *
4from pyspark.sql.window import Window
5
6spark = SparkSession.builder.appName("Sales Analysis").getOrCreate()
7
8# 1. Create DataFrame with explicit schema
9schema = StructType([
10 StructField("date", StringType(), True),
11 StructField("category", StringType(), True),
12 StructField("product", StringType(), True),
13 StructField("price", IntegerType(), True),
14 StructField("quantity", IntegerType(), True),
15 StructField("region", StringType(), True)
16])
17
18sales_data = [
19 ("2024-01-15", "Electronics", "Laptop", 1200, 5, "North"),
20 ("2024-01-16", "Electronics", "Phone", 800, 10, "South"),
21 ("2024-01-17", "Clothing", "Shirt", 50, 100, "North"),
22 ("2024-01-18", "Clothing", "Pants", 80, 50, "East"),
23 ("2024-01-19", "Electronics", "Tablet", 500, 15, "West"),
24 ("2024-01-20", "Electronics", "Laptop", 1200, 8, "North"),
25 ("2024-01-21", "Clothing", "Shirt", 50, 80, "South"),
26]
27
28df = spark.createDataFrame(sales_data, schema)
29df = df.withColumn("date", to_date(col("date")))
30
31# 2. Add revenue column
32df = df.withColumn("revenue", col("price") * col("quantity"))
33
34# 3. Add revenue category
35df = df.withColumn(
36 "revenue_category",
37 when(col("revenue") > 5000, "High")
38 .when(col("revenue") >= 1000, "Medium")
39 .otherwise("Low")
40)
41
42print("DataFrame with revenue and category:")
43df.show()
44
45# 4. Total revenue by category and region
46print("Revenue by category and region:")
47df.groupBy("category", "region").agg(
48 sum("revenue").alias("total_revenue"),
49 count("*").alias("num_transactions")
50).orderBy("category", "total_revenue").show()
51
52# 5. Top 2 products per category
53window_rank = Window.partitionBy("category").orderBy(col("revenue").desc())
54print("Top 2 products per category:")
55df.withColumn("rank", row_number().over(window_rank)) \
56 .filter(col("rank") <= 2) \
57 .select("category", "product", "revenue", "rank") \
58 .show()
59
60# 6. Running total by category
61window_running = Window.partitionBy("category").orderBy("date").rowsBetween(
62 Window.unboundedPreceding, Window.currentRow
63)
64print("Running total by category:")
65df.withColumn("running_total", sum("revenue").over(window_running)) \
66 .select("date", "category", "product", "revenue", "running_total") \
67 .orderBy("category", "date") \
68 .show()
69
70spark.stop()

13. Tổng kết

OperationMethodExample
Selectselect(), selectExpr()df.select("col1", "col2")
Filterfilter(), where()df.filter(col("x") > 10)
Add ColumnwithColumn()df.withColumn("new", ...)
AggregategroupBy().agg()df.groupBy("x").agg(sum("y"))
Joinjoin()df1.join(df2, "key", "left")
SortorderBy(), sort()df.orderBy(col("x").desc())
Windowover(Window...)row_number().over(w)

Best Practices:

  • Use explicit schema cho production
  • Prefer DataFrame API over SQL strings
  • Use col() cho clarity
  • Chain operations cho readability
  • Handle nulls explicitly

Bài tiếp theo: Spark SQL & Transformations