Lý thuyết
Bài 11/15

Spark Structured Streaming

Real-time data processing với Spark Structured Streaming API

Spark Structured Streaming

Spark Structured Streaming

1. Introduction

Structured Streaming

Structured Streaming xây dựng trên Spark SQL, cho phép bạn viết streaming queries giống như batch queries trên static DataFrames.

1.1 Key Concept: Unbounded Table

Text
1┌─────────────────────────────────────────────────────────┐
2│ Structured Streaming Model │
3├─────────────────────────────────────────────────────────┤
4│ │
5│ Input Stream treated as unbounded table: │
6│ │
7│ Time 0: | data1 | │
8│ Time 1: | data1 | data2 | │
9│ Time 2: | data1 | data2 | data3 | │
10│ Time 3: | data1 | data2 | data3 | data4 | │
11│ └─────────────────────────────────┘ │
12│ Unbounded Table │
13│ │
14│ Query runs on entire table │
15│ Output = Result as of that time │
16│ │
17└─────────────────────────────────────────────────────────┘

2. Basic Structured Streaming

2.1 Reading from Stream Source

Python
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import *
3from pyspark.sql.types import *
4
5spark = SparkSession.builder \
6 .appName("Structured Streaming") \
7 .getOrCreate()
8
9# Read from socket (for testing)
10lines = spark.readStream \
11 .format("socket") \
12 .option("host", "localhost") \
13 .option("port", 9999) \
14 .load()
15
16# Read from files (new files trigger processing)
17df = spark.readStream \
18 .format("csv") \
19 .option("header", True) \
20 .schema(your_schema) \
21 .load("path/to/input/folder/")
22
23# Read from Kafka
24df = spark.readStream \
25 .format("kafka") \
26 .option("kafka.bootstrap.servers", "localhost:9092") \
27 .option("subscribe", "topic_name") \
28 .load()

2.2 Transformations

Python
1# Same as batch DataFrame operations!
2from pyspark.sql.functions import col, explode, split, window
3
4# Parse JSON from Kafka value
5schema = StructType([
6 StructField("user_id", StringType()),
7 StructField("action", StringType()),
8 StructField("timestamp", TimestampType()),
9 StructField("amount", DoubleType())
10])
11
12events = df \
13 .selectExpr("CAST(value AS STRING) as json") \
14 .select(from_json(col("json"), schema).alias("data")) \
15 .select("data.*")
16
17# Filter
18filtered = events.filter(col("action") == "purchase")
19
20# Aggregate
21aggregated = events \
22 .groupBy("user_id") \
23 .agg(
24 count("*").alias("event_count"),
25 sum("amount").alias("total_amount")
26 )
27
28# Windowed aggregation
29windowed = events \
30 .groupBy(
31 window(col("timestamp"), "10 minutes", "5 minutes"),
32 col("action")
33 ) \
34 .count()

2.3 Writing to Sink

Python
1# Output Modes:
2# - append: Only new rows (default, works with aggregation only with watermark)
3# - complete: All rows (for aggregations)
4# - update: Only changed rows
5
6# Write to console (for debugging)
7query = aggregated.writeStream \
8 .outputMode("complete") \
9 .format("console") \
10 .option("truncate", False) \
11 .start()
12
13# Write to files (Parquet)
14query = events.writeStream \
15 .outputMode("append") \
16 .format("parquet") \
17 .option("path", "output/events") \
18 .option("checkpointLocation", "checkpoints/events") \
19 .start()
20
21# Write to Kafka
22query = events.writeStream \
23 .outputMode("append") \
24 .format("kafka") \
25 .option("kafka.bootstrap.servers", "localhost:9092") \
26 .option("topic", "output_topic") \
27 .option("checkpointLocation", "checkpoints/kafka") \
28 .start()
29
30# Wait for termination
31query.awaitTermination()

3. Windowed Operations

3.1 Tumbling Window

Python
1from pyspark.sql.functions import window
2
3# 10-minute tumbling windows
4tumbling = events \
5 .groupBy(
6 window(col("timestamp"), "10 minutes")
7 ) \
8 .agg(
9 count("*").alias("count"),
10 sum("amount").alias("total")
11 )
12
13# Result has columns: window (struct with start, end), count, total

3.2 Sliding Window

Python
1# 10-minute window, sliding every 5 minutes
2sliding = events \
3 .groupBy(
4 window(col("timestamp"), "10 minutes", "5 minutes")
5 ) \
6 .agg(count("*").alias("count"))
7
8# Each event appears in 2 windows

3.3 Session Window (Spark 3.2+)

Python
1from pyspark.sql.functions import session_window
2
3# Session window with 10-minute gap timeout
4sessions = events \
5 .groupBy(
6 session_window(col("timestamp"), "10 minutes"),
7 col("user_id")
8 ) \
9 .agg(
10 count("*").alias("events_in_session"),
11 first("action").alias("first_action"),
12 last("action").alias("last_action")
13 )

4. Watermarks & Late Data

4.1 Why Watermarks?

Python
1# Without watermark: State grows forever
2# With watermark: System knows when to discard old state
3
4# Watermark = "I don't expect data older than X"
5# Data older than watermark may be dropped

4.2 Setting Watermark

Python
1# Accept data up to 10 minutes late
2events_with_watermark = events \
3 .withWatermark("timestamp", "10 minutes")
4
5# Now aggregation with append mode works
6windowed = events_with_watermark \
7 .groupBy(
8 window(col("timestamp"), "5 minutes"),
9 col("user_id")
10 ) \
11 .count()
12
13query = windowed.writeStream \
14 .outputMode("append") \
15 .format("console") \
16 .start()

4.3 How Watermark Works

Text
1Timeline:
2─────────────────────────────────────────────────────────
3Event time: 10:00 10:05 10:10 10:15 10:20
4 │ │ │ │ │
5Watermark (10min): - - 10:00 10:05 10:10
6 │ │ │ │ │
7─────────────────────────────────────────────────────────
8
9At processing time when max event time is 10:20:
10- Watermark = 10:10 (10:20 - 10 minutes)
11- Events before 10:10 might be dropped
12- Window [10:00-10:05) can be finalized and output

5. Joins in Streaming

5.1 Stream-Static Join

Python
1# Static DataFrame (lookup table)
2products = spark.read.parquet("products.parquet")
3
4# Stream
5orders = spark.readStream.format("kafka")...
6
7# Join: Enrich orders with product info
8enriched = orders.join(
9 products,
10 orders.product_id == products.product_id,
11 "left"
12)

5.2 Stream-Stream Join

Python
1# Two streams
2impressions = spark.readStream.format("kafka") \
3 .option("subscribe", "impressions") \
4 .load() \
5 .withWatermark("timestamp", "2 hours")
6
7clicks = spark.readStream.format("kafka") \
8 .option("subscribe", "clicks") \
9 .load() \
10 .withWatermark("timestamp", "3 hours")
11
12# Join within time range
13joined = impressions.join(
14 clicks,
15 expr("""
16 impressions.ad_id = clicks.ad_id AND
17 clicks.timestamp >= impressions.timestamp AND
18 clicks.timestamp <= impressions.timestamp + interval 1 hour
19 """),
20 "leftOuter" # inner, leftOuter, rightOuter supported
21)

6. Stateful Operations

6.1 Deduplication

Python
1# Deduplicate within watermark
2deduped = events \
3 .withWatermark("timestamp", "10 minutes") \
4 .dropDuplicates(["event_id"]) # or ["event_id", "timestamp"]

6.2 Arbitrary Stateful Processing

Python
1from pyspark.sql.streaming import GroupState, GroupStateTimeout
2
3# Define state and output types
4state_schema = StructType([
5 StructField("count", IntegerType()),
6 StructField("sum", DoubleType())
7])
8
9output_schema = StructType([
10 StructField("user_id", StringType()),
11 StructField("avg", DoubleType())
12])
13
14def update_state(key, events, state: GroupState):
15 """Custom stateful processing"""
16 # Get current state or initialize
17 if state.exists:
18 current = state.get
19 else:
20 current = {"count": 0, "sum": 0.0}
21
22 # Update with new events
23 for event in events:
24 current["count"] += 1
25 current["sum"] += event.amount
26
27 # Save state
28 state.update(current)
29
30 # Output result
31 if current["count"] > 0:
32 avg = current["sum"] / current["count"]
33 yield (key[0], avg)
34
35# Apply
36result = events \
37 .groupByKey(lambda x: x.user_id) \
38 .flatMapGroupsWithState(
39 output_schema,
40 state_schema,
41 GroupStateTimeout.ProcessingTimeTimeout,
42 update_state
43 )

6.3 MapGroupsWithState (Simpler)

Python
1def update_user_state(user_id, events, state: GroupState):
2 """Update state for each user"""
3 # Get or initialize state
4 if state.exists:
5 total = state.get
6 else:
7 total = 0
8
9 # Process new events
10 for event in events:
11 total += event.amount
12
13 # Update state
14 state.update(total)
15
16 # Return single output
17 return (user_id, total)
18
19result = events \
20 .groupByKey(lambda x: x.user_id) \
21 .mapGroupsWithState(
22 output_schema,
23 state_schema,
24 GroupStateTimeout.NoTimeout,
25 update_user_state
26 )

7. Monitoring & Debugging

7.1 Query Progress

Python
1# Get query status
2query = df.writeStream...start()
3
4print(query.status) # Current status
5print(query.lastProgress) # Last batch statistics
6print(query.recentProgress) # Recent batches
7
8# Programmatic monitoring
9while query.isActive:
10 progress = query.lastProgress
11 if progress:
12 print(f"Batch: {progress['batchId']}")
13 print(f"Input rows: {progress['numInputRows']}")
14 print(f"Processing time: {progress['durationMs']}")
15 time.sleep(10)

7.2 Checkpointing

Python
1# Checkpoint stores:
2# - Offsets (where we are in the source)
3# - State (for aggregations)
4# - Metadata
5
6query = df.writeStream \
7 .option("checkpointLocation", "/path/to/checkpoint") \
8 .start()
9
10# Benefits:
11# - Fault tolerance (restart from checkpoint)
12# - Exactly-once semantics
13# - Required for production
14
15# Note: Don't change query between restarts without clearing checkpoint

7.3 Spark UI for Streaming

Python
1# Access Spark UI at http://localhost:4040
2# Streaming tab shows:
3# - Input rate
4# - Processing time
5# - Batch duration
6# - State size

8. Production Best Practices

8.1 Configuration

Python
1spark = SparkSession.builder \
2 .appName("Production Streaming") \
3
4 # Shuffle partitions (adjust based on data volume)
5 .config("spark.sql.shuffle.partitions", "200") \
6
7 # State store
8 .config("spark.sql.streaming.stateStore.providerClass",
9 "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
10
11 # Checkpointing
12 .config("spark.sql.streaming.checkpointLocation", "/checkpoints") \
13
14 # Processing guarantees
15 .config("spark.sql.streaming.metricsEnabled", "true") \
16
17 .getOrCreate()

8.2 Error Handling

Python
1# For file sink: Handle corrupt files
2df.writeStream \
3 .option("badRecordsPath", "/path/to/bad_records") \
4 .start()
5
6# Graceful shutdown
7import signal
8
9def shutdown_handler(signum, frame):
10 query.stop()
11
12signal.signal(signal.SIGTERM, shutdown_handler)
13
14# Monitor for failures
15try:
16 query.awaitTermination()
17except Exception as e:
18 print(f"Stream failed: {e}")
19 # Send alert, restart, etc.

8.3 Trigger Modes

Python
1# Default: Process as fast as possible
2query = df.writeStream.start()
3
4# Fixed interval: Every 10 seconds
5query = df.writeStream \
6 .trigger(processingTime="10 seconds") \
7 .start()
8
9# Once: Process all available data and stop
10query = df.writeStream \
11 .trigger(once=True) \
12 .start()
13
14# Available now: Process all available, then stop (Spark 3.3+)
15query = df.writeStream \
16 .trigger(availableNow=True) \
17 .start()
18
19# Continuous (low latency, experimental)
20query = df.writeStream \
21 .trigger(continuous="1 second") \
22 .start()

9. Complete Example

Python
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import *
3from pyspark.sql.types import *
4
5# Initialize
6spark = SparkSession.builder \
7 .appName("E-commerce Streaming") \
8 .getOrCreate()
9
10# Schema for events
11event_schema = StructType([
12 StructField("event_id", StringType()),
13 StructField("user_id", StringType()),
14 StructField("product_id", StringType()),
15 StructField("event_type", StringType()), # view, cart, purchase
16 StructField("amount", DoubleType()),
17 StructField("timestamp", TimestampType())
18])
19
20# Read from Kafka
21events = spark.readStream \
22 .format("kafka") \
23 .option("kafka.bootstrap.servers", "localhost:9092") \
24 .option("subscribe", "ecommerce_events") \
25 .option("startingOffsets", "latest") \
26 .load() \
27 .select(
28 from_json(col("value").cast("string"), event_schema).alias("event")
29 ) \
30 .select("event.*") \
31 .withWatermark("timestamp", "10 minutes")
32
33# Real-time metrics: Revenue per 5-minute window
34revenue_by_window = events \
35 .filter(col("event_type") == "purchase") \
36 .groupBy(
37 window(col("timestamp"), "5 minutes")
38 ) \
39 .agg(
40 count("*").alias("purchase_count"),
41 sum("amount").alias("total_revenue"),
42 approx_count_distinct("user_id").alias("unique_buyers")
43 )
44
45# Real-time metrics: Active users per minute
46active_users = events \
47 .groupBy(
48 window(col("timestamp"), "1 minute")
49 ) \
50 .agg(
51 approx_count_distinct("user_id").alias("active_users"),
52 count("*").alias("total_events")
53 )
54
55# Write revenue metrics to console
56revenue_query = revenue_by_window.writeStream \
57 .outputMode("update") \
58 .format("console") \
59 .option("truncate", False) \
60 .trigger(processingTime="30 seconds") \
61 .start()
62
63# Write active users to Kafka
64users_query = active_users \
65 .select(
66 to_json(struct("*")).alias("value")
67 ) \
68 .writeStream \
69 .outputMode("update") \
70 .format("kafka") \
71 .option("kafka.bootstrap.servers", "localhost:9092") \
72 .option("topic", "active_users_metrics") \
73 .option("checkpointLocation", "/checkpoints/active_users") \
74 .start()
75
76# Wait for termination
77spark.streams.awaitAnyTermination()

10. Thực hành

Streaming Exercise

Exercise: Build Real-time Dashboard Metrics

Python
1# Scenario: Streaming clickstream data
2#
3# Input schema:
4# - user_id: string
5# - page: string
6# - action: string (click, scroll, hover)
7# - timestamp: timestamp
8#
9# Tasks:
10# 1. Count page views per page per 5-minute window
11# 2. Calculate unique users per page per 10-minute sliding window (slide 2 min)
12# 3. Find top 3 pages by views in each 15-minute window
13#
14# Use watermark of 10 minutes for late data
15
16# YOUR CODE HERE
💡 Xem đáp án
Python
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import *
3from pyspark.sql.types import *
4
5spark = SparkSession.builder.appName("Clickstream").getOrCreate()
6
7# Simulate input (in production, read from Kafka)
8schema = StructType([
9 StructField("user_id", StringType()),
10 StructField("page", StringType()),
11 StructField("action", StringType()),
12 StructField("timestamp", TimestampType())
13])
14
15# Read stream (simulated with rate source for demo)
16clicks = spark.readStream \
17 .format("rate") \
18 .option("rowsPerSecond", 100) \
19 .load() \
20 .withColumn("user_id", (rand() * 1000).cast("int").cast("string")) \
21 .withColumn("page", concat(lit("/page_"), (rand() * 20).cast("int").cast("string"))) \
22 .withColumn("action",
23 when(rand() < 0.7, "click")
24 .when(rand() < 0.9, "scroll")
25 .otherwise("hover")
26 ) \
27 .withWatermark("timestamp", "10 minutes")
28
29# 1. Page views per page per 5-minute tumbling window
30page_views = clicks \
31 .filter(col("action") == "click") \
32 .groupBy(
33 window(col("timestamp"), "5 minutes"),
34 col("page")
35 ) \
36 .count() \
37 .withColumnRenamed("count", "page_views")
38
39# 2. Unique users per page per 10-minute sliding window
40unique_users = clicks \
41 .groupBy(
42 window(col("timestamp"), "10 minutes", "2 minutes"),
43 col("page")
44 ) \
45 .agg(
46 approx_count_distinct("user_id").alias("unique_users")
47 )
48
49# 3. Top 3 pages per 15-minute window
50from pyspark.sql.window import Window
51
52page_counts = clicks \
53 .filter(col("action") == "click") \
54 .groupBy(
55 window(col("timestamp"), "15 minutes"),
56 col("page")
57 ) \
58 .count()
59
60# Use row_number to get top 3
61# Note: This requires complete mode
62window_spec = Window.partitionBy("window").orderBy(col("count").desc())
63
64top_pages = page_counts \
65 .withColumn("rank", row_number().over(window_spec)) \
66 .filter(col("rank") <= 3)
67
68# Output to console
69query1 = page_views.writeStream \
70 .outputMode("update") \
71 .format("console") \
72 .option("truncate", False) \
73 .queryName("page_views") \
74 .start()
75
76query2 = unique_users.writeStream \
77 .outputMode("update") \
78 .format("console") \
79 .option("truncate", False) \
80 .queryName("unique_users") \
81 .start()
82
83query3 = top_pages.writeStream \
84 .outputMode("complete") \
85 .format("console") \
86 .option("truncate", False) \
87 .queryName("top_pages") \
88 .start()
89
90spark.streams.awaitAnyTermination()

11. Tổng kết

ConceptDescription
Unbounded TableStream as continuously appending table
Output Modesappend, complete, update
WindowingTumbling, Sliding, Session
WatermarksHandle late data, bound state
CheckpointingFault tolerance, exactly-once
Stateful OpsAggregations, dedup, custom state

Key Points:

  • Same API as batch DataFrame
  • Watermarks essential for production
  • Always use checkpoints
  • Monitor with Spark UI

Bài tiếp theo: Kafka & Message Queues