Apache Spark Introduction
1. Spark là gì?
Apache Spark
Apache Spark là một unified analytics engine cho xử lý dữ liệu lớn, nhanh hơn Hadoop MapReduce 100x trong memory và 10x trên disk.
1.1 Tại sao cần Spark?
| Scenario | Pandas | Spark |
|---|---|---|
| Dataset size | < 10GB (fits in RAM) | TB đến PB |
| Processing | Single machine | Distributed cluster |
| Speed | Fast for small data | Fast for big data |
| Real-time | Limited | Streaming support |
Khi nào dùng Spark:
- Data > memory available
- Cần parallel processing
- Real-time streaming
- Machine Learning at scale
1.2 Spark Ecosystem
Text
1┌─────────────────────────────────────────────────────────┐2│ Applications │3├──────────┬──────────┬──────────┬──────────┬────────────┤4│ Spark │ Spark │ Spark │ Spark │ Spark │5│ SQL │ Streaming│ MLlib │ GraphX │ Structured │6│ │ │ │ │ Streaming │7├──────────┴──────────┴──────────┴──────────┴────────────┤8│ Spark Core (RDD) │9├─────────────────────────────────────────────────────────┤10│ Standalone │ YARN │ Mesos │ Kubernetes │11├─────────────────────────────────────────────────────────┤12│ Storage (HDFS, S3, Cassandra, etc.) │13└─────────────────────────────────────────────────────────┘2. Spark Architecture
2.1 Master-Worker Architecture
Text
1┌─────────────┐2 │ Driver │3 │ Program │4 └──────┬──────┘5 │6 ┌──────▼──────┐7 │ Cluster │8 │ Manager │9 └──────┬──────┘10 ┌──────────────┼──────────────┐11 │ │ │12 ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐13 │ Worker │ │ Worker │ │ Worker │14 │ Node 1 │ │ Node 2 │ │ Node 3 │15 │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │16 │ │Executor│ │ │ │Executor│ │ │ │Executor│ │17 │ │ Task │ │ │ │ Task │ │ │ │ Task │ │18 │ │ Task │ │ │ │ Task │ │ │ │ Task │ │19 │ └───────┘ │ │ └───────┘ │ │ └───────┘ │20 └───────────┘ └───────────┘ └───────────┘Components:
- Driver Program: Main program, tạo SparkContext
- Cluster Manager: Quản lý resources (Standalone, YARN, K8s)
- Worker Node: Machines chạy computations
- Executor: Process chạy tasks, cache data
- Task: Unit of work nhỏ nhất
2.2 Execution Flow
Text
11. User submits application22. Driver creates SparkContext33. SparkContext connects to Cluster Manager44. Cluster Manager allocates Executors55. Driver sends tasks to Executors66. Executors run tasks, return results77. Driver aggregates results3. Core Concepts
3.1 RDD (Resilient Distributed Dataset)
RDD là fundamental data structure của Spark:
- Resilient: Fault-tolerant, có thể recover
- Distributed: Partitioned across cluster
- Dataset: Collection of elements
Python
1from pyspark import SparkContext23sc = SparkContext("local", "RDD Example")45# Create RDD from list6rdd = sc.parallelize([1, 2, 3, 4, 5])78# Create RDD from file9rdd = sc.textFile("hdfs://path/to/file.txt")1011# Transformations (lazy)12rdd2 = rdd.map(lambda x: x * 2)13rdd3 = rdd2.filter(lambda x: x > 4)1415# Actions (trigger execution)16result = rdd3.collect() # [6, 8, 10]17count = rdd3.count() # 33.2 DataFrame & Dataset
Python
1from pyspark.sql import SparkSession23spark = SparkSession.builder \4 .appName("DataFrame Example") \5 .getOrCreate()67# Create DataFrame8df = spark.createDataFrame([9 (1, "Alice", 30),10 (2, "Bob", 25),11 (3, "Charlie", 35)12], ["id", "name", "age"])1314# DataFrame operations (structured, optimized)15df.select("name", "age").filter(df.age > 25).show()So sánh:
| Feature | RDD | DataFrame | Dataset |
|---|---|---|---|
| Type Safety | No | No | Yes (Scala/Java) |
| Optimization | Manual | Catalyst | Catalyst |
| Schema | No | Yes | Yes |
| API | Functional | SQL-like | Both |
| Use Case | Low-level | Most common | Type safety needed |
3.3 Transformations vs Actions
Python
1# TRANSFORMATIONS (Lazy - không execute ngay)2# Narrow transformations (no shuffle)3rdd.map(lambda x: x * 2) # 1-to-14rdd.filter(lambda x: x > 0) # Select5rdd.flatMap(lambda x: x.split()) # 1-to-many67# Wide transformations (shuffle)8rdd.groupByKey()9rdd.reduceByKey(lambda a, b: a + b)10rdd.join(other_rdd)11rdd.repartition(10)1213# ACTIONS (Trigger execution)14rdd.collect() # Return all elements15rdd.count() # Count elements16rdd.first() # First element17rdd.take(5) # First 5 elements18rdd.reduce(lambda a, b: a + b) # Aggregate19rdd.saveAsTextFile("output/") # Save3.4 Lazy Evaluation
Python
1# Nothing happens yet (lazy)2rdd1 = sc.textFile("data.txt")3rdd2 = rdd1.filter(lambda x: "error" in x)4rdd3 = rdd2.map(lambda x: x.split(","))56# NOW execution happens (action triggers)7result = rdd3.collect()Benefits:
- Optimization opportunities
- Avoid unnecessary computation
- Pipeline operations efficiently
4. Installation & Setup
4.1 Local Installation
Bash
1# Install Java (required)2# Download from https://www.oracle.com/java/technologies/downloads/3 4# Install PySpark5pip install pyspark6 7# Verify8python -c "from pyspark.sql import SparkSession; print('PySpark OK')"4.2 Docker Setup
yaml
1# docker-compose.yml2version: '3'3services:4 spark-master:5 image: bitnami/spark:latest6 ports:7 - "8080:8080"8 - "7077:7077"9 environment:10 - SPARK_MODE=master11 12 spark-worker:13 image: bitnami/spark:latest14 depends_on:15 - spark-master16 environment:17 - SPARK_MODE=worker18 - SPARK_MASTER_URL=spark://spark-master:70774.3 Cloud Options
- Databricks: Managed Spark platform
- AWS EMR: Elastic MapReduce
- Azure Synapse: Analytics service
- Google Dataproc: Managed Hadoop/Spark
5. First Spark Application
5.1 Basic Setup
Python
1from pyspark.sql import SparkSession23# Create SparkSession4spark = SparkSession.builder \5 .appName("My First Spark App") \6 .master("local[*]") \ # Use all cores7 .config("spark.sql.shuffle.partitions", "200") \8 .getOrCreate()910# Get SparkContext11sc = spark.sparkContext1213print(f"Spark Version: {spark.version}")14print(f"Application ID: {sc.applicationId}")5.2 Word Count Example
Python
1from pyspark.sql import SparkSession2from pyspark.sql.functions import explode, split, col34spark = SparkSession.builder.appName("WordCount").getOrCreate()56# RDD approach7text_rdd = spark.sparkContext.textFile("sample.txt")8word_counts = (9 text_rdd10 .flatMap(lambda line: line.split(" "))11 .map(lambda word: (word.lower(), 1))12 .reduceByKey(lambda a, b: a + b)13 .sortBy(lambda x: x[1], ascending=False)14)15print(word_counts.take(10))1617# DataFrame approach18df = spark.read.text("sample.txt")19word_counts_df = (20 df21 .select(explode(split(col("value"), " ")).alias("word"))22 .groupBy("word")23 .count()24 .orderBy(col("count").desc())25)26word_counts_df.show(10)5.3 Spark UI
Python
1# Spark UI available at http://localhost:40402# Shows:3# - Jobs4# - Stages5# - Storage (cached RDDs)6# - Environment7# - Executors89# Keep app running to view UI10input("Press Enter to stop...")11spark.stop()6. Data Sources
6.1 Reading Data
Python
1# CSV2df = spark.read.csv("data.csv", header=True, inferSchema=True)3df = spark.read.option("header", True).option("delimiter", ";").csv("data.csv")45# JSON6df = spark.read.json("data.json")7df = spark.read.option("multiLine", True).json("data.json")89# Parquet (recommended for big data)10df = spark.read.parquet("data.parquet")1112# JDBC (Database)13df = spark.read \14 .format("jdbc") \15 .option("url", "jdbc:postgresql://host:5432/db") \16 .option("dbtable", "schema.table") \17 .option("user", "username") \18 .option("password", "password") \19 .load()2021# Delta Lake22df = spark.read.format("delta").load("path/to/delta")6.2 Writing Data
Python
1# CSV2df.write.csv("output/csv", header=True, mode="overwrite")34# JSON5df.write.json("output/json", mode="overwrite")67# Parquet (with partitioning)8df.write.partitionBy("year", "month").parquet("output/parquet")910# Save modes: overwrite, append, ignore, error11df.write.mode("overwrite").parquet("output/")7. Spark vs Pandas Comparison
Python
1import pandas as pd2from pyspark.sql import SparkSession3from pyspark.sql.functions import col, avg, sum as spark_sum45# PANDAS6pandas_df = pd.read_csv("sales.csv")7result_pandas = (8 pandas_df9 .groupby('category')10 .agg({'revenue': 'sum', 'quantity': 'mean'})11 .reset_index()12)1314# SPARK (PySpark)15spark = SparkSession.builder.appName("Comparison").getOrCreate()16spark_df = spark.read.csv("sales.csv", header=True, inferSchema=True)17result_spark = (18 spark_df19 .groupBy('category')20 .agg(21 spark_sum('revenue').alias('total_revenue'),22 avg('quantity').alias('avg_quantity')23 )24)25result_spark.show()2627# Convert between Pandas and Spark28spark_df = spark.createDataFrame(pandas_df) # Pandas -> Spark29pandas_df = spark_df.toPandas() # Spark -> Pandas8. Thực hành
Hands-on Exercise
Exercise 1: Setup và Basic Operations
Python
1# 1. Tạo SparkSession2# 2. Tạo DataFrame từ list3# 3. Thực hiện các operations cơ bản45# YOUR CODE HERE💡 Xem đáp án
Python
1from pyspark.sql import SparkSession2from pyspark.sql.functions import col34# Create SparkSession5spark = SparkSession.builder \6 .appName("Exercise 1") \7 .master("local[*]") \8 .getOrCreate()910# Create DataFrame11data = [12 (1, "iPhone", "Electronics", 999, 100),13 (2, "MacBook", "Electronics", 1999, 50),14 (3, "AirPods", "Electronics", 199, 200),15 (4, "T-Shirt", "Clothing", 29, 500),16 (5, "Jeans", "Clothing", 79, 300)17]18columns = ["id", "product", "category", "price", "quantity"]1920df = spark.createDataFrame(data, columns)2122# Show data23df.show()2425# Filter26df.filter(col("price") > 100).show()2728# Group by29df.groupBy("category").sum("quantity").show()3031# Add calculated column32df.withColumn("revenue", col("price") * col("quantity")).show()3334spark.stop()Exercise 2: Word Count
Python
1# Đếm số lần xuất hiện của mỗi từ trong một đoạn text2text = """3Apache Spark is a unified analytics engine4Spark provides high-level APIs5Spark supports Java Scala Python and R6Apache Spark is fast7"""89# YOUR CODE HERE (use both RDD and DataFrame approaches)💡 Xem đáp án
Python
1from pyspark.sql import SparkSession2from pyspark.sql.functions import explode, split, col, lower34spark = SparkSession.builder.appName("WordCount").getOrCreate()56text = """7Apache Spark is a unified analytics engine8Spark provides high-level APIs9Spark supports Java Scala Python and R10Apache Spark is fast11"""1213# RDD Approach14sc = spark.sparkContext15rdd = sc.parallelize(text.strip().split("\n"))16word_counts_rdd = (17 rdd18 .flatMap(lambda line: line.lower().split())19 .map(lambda word: (word, 1))20 .reduceByKey(lambda a, b: a + b)21 .sortBy(lambda x: -x[1])22)23print("RDD Result:")24print(word_counts_rdd.collect())2526# DataFrame Approach27df = spark.createDataFrame([(line,) for line in text.strip().split("\n")], ["line"])28word_counts_df = (29 df30 .select(explode(split(lower(col("line")), " ")).alias("word"))31 .filter(col("word") != "")32 .groupBy("word")33 .count()34 .orderBy(col("count").desc())35)36print("\nDataFrame Result:")37word_counts_df.show()3839spark.stop()9. Tổng kết
| Concept | Description |
|---|---|
| SparkSession | Entry point cho Spark |
| RDD | Low-level distributed data structure |
| DataFrame | High-level structured data (recommended) |
| Transformations | Lazy operations (map, filter, groupBy) |
| Actions | Trigger execution (collect, count, save) |
| Lazy Evaluation | Optimized execution plan |
Key Takeaways:
- Spark = Distributed computing engine
- Use DataFrame API (not RDD) cho hầu hết cases
- Lazy evaluation allows optimization
- Parquet là format recommended cho big data
Best Practices:
- Always stop SparkSession khi done
- Use
local[*]cho development - Monitor với Spark UI (port 4040)
- Prefer DataFrame over RDD
Bài tiếp theo: PySpark DataFrame API
