PySpark DataFrame API
1. Tạo DataFrame
1.1 Từ List/Dictionary
Python
1from pyspark.sql import SparkSession2from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType34spark = SparkSession.builder.appName("DataFrame API").getOrCreate()56# From list of tuples7data = [8 (1, "Alice", "Sales", 50000),9 (2, "Bob", "IT", 60000),10 (3, "Charlie", "Sales", 55000)11]12columns = ["id", "name", "department", "salary"]13df = spark.createDataFrame(data, columns)1415# From list of dictionaries16data_dict = [17 {"id": 1, "name": "Alice", "department": "Sales"},18 {"id": 2, "name": "Bob", "department": "IT"}19]20df = spark.createDataFrame(data_dict)2122# With explicit schema (recommended)23schema = StructType([24 StructField("id", IntegerType(), False),25 StructField("name", StringType(), True),26 StructField("department", StringType(), True),27 StructField("salary", DoubleType(), True)28])29df = spark.createDataFrame(data, schema)1.2 Từ Pandas DataFrame
Python
1import pandas as pd23pandas_df = pd.DataFrame({4 "id": [1, 2, 3],5 "name": ["Alice", "Bob", "Charlie"],6 "salary": [50000.0, 60000.0, 55000.0]7})89# Convert Pandas -> Spark10spark_df = spark.createDataFrame(pandas_df)1112# Convert Spark -> Pandas13back_to_pandas = spark_df.toPandas()1.3 Từ Files
Python
1# CSV2df = spark.read.csv("data.csv", header=True, inferSchema=True)34# With options5df = spark.read \6 .option("header", True) \7 .option("inferSchema", True) \8 .option("delimiter", ",") \9 .option("nullValue", "NA") \10 .csv("data.csv")1112# Parquet13df = spark.read.parquet("data.parquet")1415# JSON16df = spark.read.json("data.json")1718# Multiple files19df = spark.read.csv("data/*.csv", header=True)2. DataFrame Information
Python
1# Schema2df.printSchema()3# root4# |-- id: integer (nullable = true)5# |-- name: string (nullable = true)6# |-- salary: double (nullable = true)78# Show data9df.show() # First 20 rows10df.show(5) # First 5 rows11df.show(truncate=False) # Don't truncate long strings1213# Basic info14print(f"Columns: {df.columns}")15print(f"Data types: {df.dtypes}")16print(f"Row count: {df.count()}")1718# Statistics19df.describe().show() # count, mean, stddev, min, max2021# Sample22df.sample(0.1).show() # 10% sample23df.take(5) # First 5 as list of Row objects24df.head(5) # First 5 as list of Row objects25df.first() # First row3. Column Selection
Python
1from pyspark.sql.functions import col, column23# Single column4df.select("name").show()5df.select(df.name).show()6df.select(col("name")).show()78# Multiple columns9df.select("name", "salary").show()10df.select(["name", "salary"]).show()1112# All columns except some13df.select([c for c in df.columns if c != "id"]).show()1415# With expressions16df.select(17 col("name"),18 col("salary"),19 (col("salary") * 1.1).alias("salary_with_raise")20).show()2122# SelectExpr (SQL expressions)23df.selectExpr(24 "name",25 "salary",26 "salary * 1.1 as salary_with_raise",27 "UPPER(name) as name_upper"28).show()4. Filtering Rows
Python
1from pyspark.sql.functions import col23# Single condition4df.filter(col("salary") > 50000).show()5df.filter(df.salary > 50000).show()6df.filter("salary > 50000").show() # SQL string78# Multiple conditions9df.filter(10 (col("salary") > 50000) & (col("department") == "Sales")11).show()1213df.filter(14 (col("salary") > 50000) | (col("department") == "IT")15).show()1617# NOT18df.filter(~(col("department") == "Sales")).show()1920# IN clause21df.filter(col("department").isin(["Sales", "IT"])).show()2223# LIKE / CONTAINS24df.filter(col("name").like("A%")).show() # Starts with A25df.filter(col("name").contains("li")).show() # Contains "li"26df.filter(col("name").startswith("A")).show()27df.filter(col("name").endswith("e")).show()2829# NULL handling30df.filter(col("salary").isNull()).show()31df.filter(col("salary").isNotNull()).show()3233# Between34df.filter(col("salary").between(50000, 60000)).show()3536# where() is alias of filter()37df.where(col("salary") > 50000).show()5. Adding/Modifying Columns
Python
1from pyspark.sql.functions import col, lit, when, concat, upper, lower23# Add new column4df = df.withColumn("bonus", col("salary") * 0.1)56# Add constant column7df = df.withColumn("country", lit("USA"))89# Modify existing column10df = df.withColumn("salary", col("salary") * 1.1)1112# Conditional column (CASE WHEN)13df = df.withColumn(14 "salary_grade",15 when(col("salary") >= 60000, "High")16 .when(col("salary") >= 50000, "Medium")17 .otherwise("Low")18)1920# Multiple conditions21df = df.withColumn(22 "status",23 when((col("salary") > 55000) & (col("department") == "IT"), "Senior IT")24 .when(col("salary") > 55000, "Senior")25 .otherwise("Junior")26)2728# String operations29df = df.withColumn("name_upper", upper(col("name")))30df = df.withColumn("name_lower", lower(col("name")))31df = df.withColumn("full_info", concat(col("name"), lit(" - "), col("department")))3233# Rename column34df = df.withColumnRenamed("salary", "annual_salary")3536# Drop column37df = df.drop("bonus")38df = df.drop("col1", "col2")3940# Cast data type41df = df.withColumn("salary_int", col("salary").cast("integer"))42df = df.withColumn("salary_str", col("salary").cast(StringType()))6. Aggregations
Python
1from pyspark.sql.functions import (2 count, countDistinct, sum, avg, mean,3 min, max, first, last, collect_list, collect_set4)56# Basic aggregations7df.agg(8 count("*").alias("total_rows"),9 countDistinct("department").alias("unique_departments"),10 sum("salary").alias("total_salary"),11 avg("salary").alias("avg_salary"),12 min("salary").alias("min_salary"),13 max("salary").alias("max_salary")14).show()1516# Group By17df.groupBy("department").count().show()1819df.groupBy("department").agg(20 count("*").alias("employee_count"),21 sum("salary").alias("total_salary"),22 avg("salary").alias("avg_salary"),23 min("salary").alias("min_salary"),24 max("salary").alias("max_salary")25).show()2627# Multiple grouping columns28df.groupBy("department", "salary_grade").count().show()2930# Collect values into list/set31df.groupBy("department").agg(32 collect_list("name").alias("employees"),33 collect_set("salary_grade").alias("grades")34).show(truncate=False)3536# Pivot37df.groupBy("department").pivot("salary_grade").count().show()7. Sorting
Python
1from pyspark.sql.functions import col, asc, desc23# Ascending (default)4df.orderBy("salary").show()5df.orderBy(col("salary").asc()).show()67# Descending8df.orderBy(col("salary").desc()).show()9df.orderBy(df.salary.desc()).show()1011# Multiple columns12df.orderBy(13 col("department").asc(),14 col("salary").desc()15).show()1617# sort() is alias of orderBy()18df.sort("department", "salary").show()1920# Nulls handling21df.orderBy(col("salary").asc_nulls_first()).show()22df.orderBy(col("salary").desc_nulls_last()).show()8. Joins
Python
1# Sample DataFrames2employees = spark.createDataFrame([3 (1, "Alice", 101),4 (2, "Bob", 102),5 (3, "Charlie", 103),6 (4, "David", None)7], ["emp_id", "name", "dept_id"])89departments = spark.createDataFrame([10 (101, "Sales"),11 (102, "IT"),12 (104, "HR")13], ["dept_id", "dept_name"])1415# Inner Join (default)16employees.join(departments, employees.dept_id == departments.dept_id).show()1718# Using column name (if same in both)19employees.join(departments, "dept_id").show()2021# Left Join22employees.join(departments, "dept_id", "left").show()2324# Right Join25employees.join(departments, "dept_id", "right").show()2627# Full Outer Join28employees.join(departments, "dept_id", "outer").show()2930# Cross Join (Cartesian product)31employees.crossJoin(departments).show()3233# Left Anti Join (employees without departments)34employees.join(departments, "dept_id", "left_anti").show()3536# Left Semi Join (employees with departments, only left columns)37employees.join(departments, "dept_id", "left_semi").show()3839# Multiple join conditions40df1.join(41 df2,42 (df1.col1 == df2.col1) & (df1.col2 == df2.col2),43 "inner"44).show()9. Window Functions
Python
1from pyspark.sql.window import Window2from pyspark.sql.functions import (3 row_number, rank, dense_rank, 4 lag, lead, sum, avg5)67# Sample data8sales = spark.createDataFrame([9 ("2024-01", "Electronics", 1000),10 ("2024-01", "Clothing", 500),11 ("2024-02", "Electronics", 1200),12 ("2024-02", "Clothing", 600),13 ("2024-03", "Electronics", 1100),14 ("2024-03", "Clothing", 550)15], ["month", "category", "revenue"])1617# Define window18window_spec = Window.partitionBy("category").orderBy("month")1920# Row numbering21sales.withColumn("row_num", row_number().over(window_spec)).show()2223# Ranking24sales.withColumn("rank", rank().over(window_spec)).show()25sales.withColumn("dense_rank", dense_rank().over(window_spec)).show()2627# Lag/Lead28sales = sales.withColumn("prev_revenue", lag("revenue", 1).over(window_spec))29sales = sales.withColumn("next_revenue", lead("revenue", 1).over(window_spec))30sales.show()3132# Running totals33window_running = Window.partitionBy("category").orderBy("month").rowsBetween(34 Window.unboundedPreceding, Window.currentRow35)36sales.withColumn("running_total", sum("revenue").over(window_running)).show()3738# Moving average39window_moving = Window.partitionBy("category").orderBy("month").rowsBetween(-1, 1)40sales.withColumn("moving_avg", avg("revenue").over(window_moving)).show()10. Handling Nulls
Python
1from pyspark.sql.functions import col, when, coalesce, isnan23# Check for nulls4df.filter(col("salary").isNull()).show()5df.filter(col("salary").isNotNull()).show()67# Drop rows with nulls8df.dropna().show() # Any null in any column9df.dropna(subset=["salary"]).show() # Null in specific column10df.dropna(how="all").show() # All columns are null11df.dropna(thresh=2).show() # At least 2 non-null values1213# Fill nulls14df.fillna(0).show() # Fill all with 015df.fillna({"salary": 0, "name": "Unknown"}).show() # Column-specific1617# Coalesce (first non-null)18df.withColumn(19 "value",20 coalesce(col("salary"), col("bonus"), lit(0))21).show()2223# Replace values24df.replace("Sales", "Sales Team", subset=["department"]).show()25df.replace([1, 2], [10, 20], subset=["id"]).show()11. Useful Functions
Python
1from pyspark.sql.functions import (2 # String3 trim, ltrim, rtrim, length, substring,4 concat, concat_ws, split, regexp_replace, regexp_extract,5 6 # Date/Time7 current_date, current_timestamp, date_format,8 year, month, dayofmonth, hour, minute,9 datediff, date_add, date_sub, months_between,10 11 # Math12 abs, ceil, floor, round, sqrt, pow, log, exp,13 14 # Array15 array, array_contains, explode, size,16 17 # Conditional18 when, coalesce, greatest, least,19 20 # Null handling21 isnull, isnan, nvl22)2324# String examples25df.withColumn("name_len", length(col("name")))26df.withColumn("first_3", substring(col("name"), 1, 3))27df.withColumn("words", split(col("full_name"), " "))2829# Date examples30df.withColumn("today", current_date())31df.withColumn("year", year(col("date_column")))32df.withColumn("formatted", date_format(col("date_column"), "yyyy-MM-dd"))3334# Array examples35df.withColumn("tags_array", split(col("tags"), ","))36df.select(explode(col("tags_array")).alias("tag"))12. Thực hành
Comprehensive Exercise
Exercise: Sales Analysis
Python
1# Sample Data2sales_data = [3 ("2024-01-15", "Electronics", "Laptop", 1200, 5, "North"),4 ("2024-01-16", "Electronics", "Phone", 800, 10, "South"),5 ("2024-01-17", "Clothing", "Shirt", 50, 100, "North"),6 ("2024-01-18", "Clothing", "Pants", 80, 50, "East"),7 ("2024-01-19", "Electronics", "Tablet", 500, 15, "West"),8 ("2024-01-20", "Electronics", "Laptop", 1200, 8, "North"),9 ("2024-01-21", "Clothing", "Shirt", 50, 80, "South"),10]11columns = ["date", "category", "product", "price", "quantity", "region"]1213# Tasks:14# 1. Tạo DataFrame với schema rõ ràng15# 2. Thêm column revenue = price * quantity16# 3. Thêm column revenue_category: High (>5000), Medium (1000-5000), Low (<1000)17# 4. Tính tổng revenue theo category và region18# 5. Tìm top 2 products có revenue cao nhất mỗi category19# 6. Tính running total revenue theo category (order by date)2021# YOUR CODE HERE💡 Xem đáp án
Python
1from pyspark.sql import SparkSession2from pyspark.sql.types import *3from pyspark.sql.functions import *4from pyspark.sql.window import Window56spark = SparkSession.builder.appName("Sales Analysis").getOrCreate()78# 1. Create DataFrame with explicit schema9schema = StructType([10 StructField("date", StringType(), True),11 StructField("category", StringType(), True),12 StructField("product", StringType(), True),13 StructField("price", IntegerType(), True),14 StructField("quantity", IntegerType(), True),15 StructField("region", StringType(), True)16])1718sales_data = [19 ("2024-01-15", "Electronics", "Laptop", 1200, 5, "North"),20 ("2024-01-16", "Electronics", "Phone", 800, 10, "South"),21 ("2024-01-17", "Clothing", "Shirt", 50, 100, "North"),22 ("2024-01-18", "Clothing", "Pants", 80, 50, "East"),23 ("2024-01-19", "Electronics", "Tablet", 500, 15, "West"),24 ("2024-01-20", "Electronics", "Laptop", 1200, 8, "North"),25 ("2024-01-21", "Clothing", "Shirt", 50, 80, "South"),26]2728df = spark.createDataFrame(sales_data, schema)29df = df.withColumn("date", to_date(col("date")))3031# 2. Add revenue column32df = df.withColumn("revenue", col("price") * col("quantity"))3334# 3. Add revenue category35df = df.withColumn(36 "revenue_category",37 when(col("revenue") > 5000, "High")38 .when(col("revenue") >= 1000, "Medium")39 .otherwise("Low")40)4142print("DataFrame with revenue and category:")43df.show()4445# 4. Total revenue by category and region46print("Revenue by category and region:")47df.groupBy("category", "region").agg(48 sum("revenue").alias("total_revenue"),49 count("*").alias("num_transactions")50).orderBy("category", "total_revenue").show()5152# 5. Top 2 products per category53window_rank = Window.partitionBy("category").orderBy(col("revenue").desc())54print("Top 2 products per category:")55df.withColumn("rank", row_number().over(window_rank)) \56 .filter(col("rank") <= 2) \57 .select("category", "product", "revenue", "rank") \58 .show()5960# 6. Running total by category61window_running = Window.partitionBy("category").orderBy("date").rowsBetween(62 Window.unboundedPreceding, Window.currentRow63)64print("Running total by category:")65df.withColumn("running_total", sum("revenue").over(window_running)) \66 .select("date", "category", "product", "revenue", "running_total") \67 .orderBy("category", "date") \68 .show()6970spark.stop()13. Tổng kết
| Operation | Method | Example |
|---|---|---|
| Select | select(), selectExpr() | df.select("col1", "col2") |
| Filter | filter(), where() | df.filter(col("x") > 10) |
| Add Column | withColumn() | df.withColumn("new", ...) |
| Aggregate | groupBy().agg() | df.groupBy("x").agg(sum("y")) |
| Join | join() | df1.join(df2, "key", "left") |
| Sort | orderBy(), sort() | df.orderBy(col("x").desc()) |
| Window | over(Window...) | row_number().over(w) |
Best Practices:
- Use explicit schema cho production
- Prefer DataFrame API over SQL strings
- Use
col()cho clarity - Chain operations cho readability
- Handle nulls explicitly
Bài tiếp theo: Spark SQL & Transformations
