Lý thuyết
Bài 6/15

Apache Spark Introduction

Tổng quan Apache Spark - Architecture, components và khi nào nên dùng

Apache Spark Introduction

Apache Spark Big Data Processing

1. Spark là gì?

Apache Spark

Apache Spark là một unified analytics engine cho xử lý dữ liệu lớn, nhanh hơn Hadoop MapReduce 100x trong memory và 10x trên disk.

1.1 Tại sao cần Spark?

ScenarioPandasSpark
Dataset size< 10GB (fits in RAM)TB đến PB
ProcessingSingle machineDistributed cluster
SpeedFast for small dataFast for big data
Real-timeLimitedStreaming support

Khi nào dùng Spark:

  • Data > memory available
  • Cần parallel processing
  • Real-time streaming
  • Machine Learning at scale

1.2 Spark Ecosystem

Text
1┌─────────────────────────────────────────────────────────┐
2│ Applications │
3├──────────┬──────────┬──────────┬──────────┬────────────┤
4│ Spark │ Spark │ Spark │ Spark │ Spark │
5│ SQL │ Streaming│ MLlib │ GraphX │ Structured │
6│ │ │ │ │ Streaming │
7├──────────┴──────────┴──────────┴──────────┴────────────┤
8│ Spark Core (RDD) │
9├─────────────────────────────────────────────────────────┤
10│ Standalone │ YARN │ Mesos │ Kubernetes │
11├─────────────────────────────────────────────────────────┤
12│ Storage (HDFS, S3, Cassandra, etc.) │
13└─────────────────────────────────────────────────────────┘

2. Spark Architecture

2.1 Master-Worker Architecture

Text
1┌─────────────┐
2 │ Driver │
3 │ Program │
4 └──────┬──────┘
5
6 ┌──────▼──────┐
7 │ Cluster │
8 │ Manager │
9 └──────┬──────┘
10 ┌──────────────┼──────────────┐
11 │ │ │
12 ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
13 │ Worker │ │ Worker │ │ Worker │
14 │ Node 1 │ │ Node 2 │ │ Node 3 │
15 │ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │
16 │ │Executor│ │ │ │Executor│ │ │ │Executor│ │
17 │ │ Task │ │ │ │ Task │ │ │ │ Task │ │
18 │ │ Task │ │ │ │ Task │ │ │ │ Task │ │
19 │ └───────┘ │ │ └───────┘ │ │ └───────┘ │
20 └───────────┘ └───────────┘ └───────────┘

Components:

  • Driver Program: Main program, tạo SparkContext
  • Cluster Manager: Quản lý resources (Standalone, YARN, K8s)
  • Worker Node: Machines chạy computations
  • Executor: Process chạy tasks, cache data
  • Task: Unit of work nhỏ nhất

2.2 Execution Flow

Text
11. User submits application
22. Driver creates SparkContext
33. SparkContext connects to Cluster Manager
44. Cluster Manager allocates Executors
55. Driver sends tasks to Executors
66. Executors run tasks, return results
77. Driver aggregates results

3. Core Concepts

3.1 RDD (Resilient Distributed Dataset)

RDD là fundamental data structure của Spark:

  • Resilient: Fault-tolerant, có thể recover
  • Distributed: Partitioned across cluster
  • Dataset: Collection of elements
Python
1from pyspark import SparkContext
2
3sc = SparkContext("local", "RDD Example")
4
5# Create RDD from list
6rdd = sc.parallelize([1, 2, 3, 4, 5])
7
8# Create RDD from file
9rdd = sc.textFile("hdfs://path/to/file.txt")
10
11# Transformations (lazy)
12rdd2 = rdd.map(lambda x: x * 2)
13rdd3 = rdd2.filter(lambda x: x > 4)
14
15# Actions (trigger execution)
16result = rdd3.collect() # [6, 8, 10]
17count = rdd3.count() # 3

3.2 DataFrame & Dataset

Python
1from pyspark.sql import SparkSession
2
3spark = SparkSession.builder \
4 .appName("DataFrame Example") \
5 .getOrCreate()
6
7# Create DataFrame
8df = spark.createDataFrame([
9 (1, "Alice", 30),
10 (2, "Bob", 25),
11 (3, "Charlie", 35)
12], ["id", "name", "age"])
13
14# DataFrame operations (structured, optimized)
15df.select("name", "age").filter(df.age > 25).show()

So sánh:

FeatureRDDDataFrameDataset
Type SafetyNoNoYes (Scala/Java)
OptimizationManualCatalystCatalyst
SchemaNoYesYes
APIFunctionalSQL-likeBoth
Use CaseLow-levelMost commonType safety needed

3.3 Transformations vs Actions

Python
1# TRANSFORMATIONS (Lazy - không execute ngay)
2# Narrow transformations (no shuffle)
3rdd.map(lambda x: x * 2) # 1-to-1
4rdd.filter(lambda x: x > 0) # Select
5rdd.flatMap(lambda x: x.split()) # 1-to-many
6
7# Wide transformations (shuffle)
8rdd.groupByKey()
9rdd.reduceByKey(lambda a, b: a + b)
10rdd.join(other_rdd)
11rdd.repartition(10)
12
13# ACTIONS (Trigger execution)
14rdd.collect() # Return all elements
15rdd.count() # Count elements
16rdd.first() # First element
17rdd.take(5) # First 5 elements
18rdd.reduce(lambda a, b: a + b) # Aggregate
19rdd.saveAsTextFile("output/") # Save

3.4 Lazy Evaluation

Python
1# Nothing happens yet (lazy)
2rdd1 = sc.textFile("data.txt")
3rdd2 = rdd1.filter(lambda x: "error" in x)
4rdd3 = rdd2.map(lambda x: x.split(","))
5
6# NOW execution happens (action triggers)
7result = rdd3.collect()

Benefits:

  • Optimization opportunities
  • Avoid unnecessary computation
  • Pipeline operations efficiently

4. Installation & Setup

4.1 Local Installation

Bash
1# Install Java (required)
2# Download from https://www.oracle.com/java/technologies/downloads/
3
4# Install PySpark
5pip install pyspark
6
7# Verify
8python -c "from pyspark.sql import SparkSession; print('PySpark OK')"

4.2 Docker Setup

yaml
1# docker-compose.yml
2version: '3'
3services:
4 spark-master:
5 image: bitnami/spark:latest
6 ports:
7 - "8080:8080"
8 - "7077:7077"
9 environment:
10 - SPARK_MODE=master
11
12 spark-worker:
13 image: bitnami/spark:latest
14 depends_on:
15 - spark-master
16 environment:
17 - SPARK_MODE=worker
18 - SPARK_MASTER_URL=spark://spark-master:7077

4.3 Cloud Options

  • Databricks: Managed Spark platform
  • AWS EMR: Elastic MapReduce
  • Azure Synapse: Analytics service
  • Google Dataproc: Managed Hadoop/Spark

5. First Spark Application

5.1 Basic Setup

Python
1from pyspark.sql import SparkSession
2
3# Create SparkSession
4spark = SparkSession.builder \
5 .appName("My First Spark App") \
6 .master("local[*]") \ # Use all cores
7 .config("spark.sql.shuffle.partitions", "200") \
8 .getOrCreate()
9
10# Get SparkContext
11sc = spark.sparkContext
12
13print(f"Spark Version: {spark.version}")
14print(f"Application ID: {sc.applicationId}")

5.2 Word Count Example

Python
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import explode, split, col
3
4spark = SparkSession.builder.appName("WordCount").getOrCreate()
5
6# RDD approach
7text_rdd = spark.sparkContext.textFile("sample.txt")
8word_counts = (
9 text_rdd
10 .flatMap(lambda line: line.split(" "))
11 .map(lambda word: (word.lower(), 1))
12 .reduceByKey(lambda a, b: a + b)
13 .sortBy(lambda x: x[1], ascending=False)
14)
15print(word_counts.take(10))
16
17# DataFrame approach
18df = spark.read.text("sample.txt")
19word_counts_df = (
20 df
21 .select(explode(split(col("value"), " ")).alias("word"))
22 .groupBy("word")
23 .count()
24 .orderBy(col("count").desc())
25)
26word_counts_df.show(10)

5.3 Spark UI

Python
1# Spark UI available at http://localhost:4040
2# Shows:
3# - Jobs
4# - Stages
5# - Storage (cached RDDs)
6# - Environment
7# - Executors
8
9# Keep app running to view UI
10input("Press Enter to stop...")
11spark.stop()

6. Data Sources

6.1 Reading Data

Python
1# CSV
2df = spark.read.csv("data.csv", header=True, inferSchema=True)
3df = spark.read.option("header", True).option("delimiter", ";").csv("data.csv")
4
5# JSON
6df = spark.read.json("data.json")
7df = spark.read.option("multiLine", True).json("data.json")
8
9# Parquet (recommended for big data)
10df = spark.read.parquet("data.parquet")
11
12# JDBC (Database)
13df = spark.read \
14 .format("jdbc") \
15 .option("url", "jdbc:postgresql://host:5432/db") \
16 .option("dbtable", "schema.table") \
17 .option("user", "username") \
18 .option("password", "password") \
19 .load()
20
21# Delta Lake
22df = spark.read.format("delta").load("path/to/delta")

6.2 Writing Data

Python
1# CSV
2df.write.csv("output/csv", header=True, mode="overwrite")
3
4# JSON
5df.write.json("output/json", mode="overwrite")
6
7# Parquet (with partitioning)
8df.write.partitionBy("year", "month").parquet("output/parquet")
9
10# Save modes: overwrite, append, ignore, error
11df.write.mode("overwrite").parquet("output/")

7. Spark vs Pandas Comparison

Python
1import pandas as pd
2from pyspark.sql import SparkSession
3from pyspark.sql.functions import col, avg, sum as spark_sum
4
5# PANDAS
6pandas_df = pd.read_csv("sales.csv")
7result_pandas = (
8 pandas_df
9 .groupby('category')
10 .agg({'revenue': 'sum', 'quantity': 'mean'})
11 .reset_index()
12)
13
14# SPARK (PySpark)
15spark = SparkSession.builder.appName("Comparison").getOrCreate()
16spark_df = spark.read.csv("sales.csv", header=True, inferSchema=True)
17result_spark = (
18 spark_df
19 .groupBy('category')
20 .agg(
21 spark_sum('revenue').alias('total_revenue'),
22 avg('quantity').alias('avg_quantity')
23 )
24)
25result_spark.show()
26
27# Convert between Pandas and Spark
28spark_df = spark.createDataFrame(pandas_df) # Pandas -> Spark
29pandas_df = spark_df.toPandas() # Spark -> Pandas

8. Thực hành

Hands-on Exercise

Exercise 1: Setup và Basic Operations

Python
1# 1. Tạo SparkSession
2# 2. Tạo DataFrame từ list
3# 3. Thực hiện các operations cơ bản
4
5# YOUR CODE HERE
💡 Xem đáp án
Python
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import col
3
4# Create SparkSession
5spark = SparkSession.builder \
6 .appName("Exercise 1") \
7 .master("local[*]") \
8 .getOrCreate()
9
10# Create DataFrame
11data = [
12 (1, "iPhone", "Electronics", 999, 100),
13 (2, "MacBook", "Electronics", 1999, 50),
14 (3, "AirPods", "Electronics", 199, 200),
15 (4, "T-Shirt", "Clothing", 29, 500),
16 (5, "Jeans", "Clothing", 79, 300)
17]
18columns = ["id", "product", "category", "price", "quantity"]
19
20df = spark.createDataFrame(data, columns)
21
22# Show data
23df.show()
24
25# Filter
26df.filter(col("price") > 100).show()
27
28# Group by
29df.groupBy("category").sum("quantity").show()
30
31# Add calculated column
32df.withColumn("revenue", col("price") * col("quantity")).show()
33
34spark.stop()

Exercise 2: Word Count

Python
1# Đếm số lần xuất hiện của mỗi từ trong một đoạn text
2text = """
3Apache Spark is a unified analytics engine
4Spark provides high-level APIs
5Spark supports Java Scala Python and R
6Apache Spark is fast
7"""
8
9# YOUR CODE HERE (use both RDD and DataFrame approaches)
💡 Xem đáp án
Python
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import explode, split, col, lower
3
4spark = SparkSession.builder.appName("WordCount").getOrCreate()
5
6text = """
7Apache Spark is a unified analytics engine
8Spark provides high-level APIs
9Spark supports Java Scala Python and R
10Apache Spark is fast
11"""
12
13# RDD Approach
14sc = spark.sparkContext
15rdd = sc.parallelize(text.strip().split("\n"))
16word_counts_rdd = (
17 rdd
18 .flatMap(lambda line: line.lower().split())
19 .map(lambda word: (word, 1))
20 .reduceByKey(lambda a, b: a + b)
21 .sortBy(lambda x: -x[1])
22)
23print("RDD Result:")
24print(word_counts_rdd.collect())
25
26# DataFrame Approach
27df = spark.createDataFrame([(line,) for line in text.strip().split("\n")], ["line"])
28word_counts_df = (
29 df
30 .select(explode(split(lower(col("line")), " ")).alias("word"))
31 .filter(col("word") != "")
32 .groupBy("word")
33 .count()
34 .orderBy(col("count").desc())
35)
36print("\nDataFrame Result:")
37word_counts_df.show()
38
39spark.stop()

9. Tổng kết

ConceptDescription
SparkSessionEntry point cho Spark
RDDLow-level distributed data structure
DataFrameHigh-level structured data (recommended)
TransformationsLazy operations (map, filter, groupBy)
ActionsTrigger execution (collect, count, save)
Lazy EvaluationOptimized execution plan

Key Takeaways:

  • Spark = Distributed computing engine
  • Use DataFrame API (not RDD) cho hầu hết cases
  • Lazy evaluation allows optimization
  • Parquet là format recommended cho big data

Best Practices:

  • Always stop SparkSession khi done
  • Use local[*] cho development
  • Monitor với Spark UI (port 4040)
  • Prefer DataFrame over RDD

Bài tiếp theo: PySpark DataFrame API