Spark Structured Streaming
1. Introduction
Structured Streaming
Structured Streaming xây dựng trên Spark SQL, cho phép bạn viết streaming queries giống như batch queries trên static DataFrames.
1.1 Key Concept: Unbounded Table
Text
1┌─────────────────────────────────────────────────────────┐2│ Structured Streaming Model │3├─────────────────────────────────────────────────────────┤4│ │5│ Input Stream treated as unbounded table: │6│ │7│ Time 0: | data1 | │8│ Time 1: | data1 | data2 | │9│ Time 2: | data1 | data2 | data3 | │10│ Time 3: | data1 | data2 | data3 | data4 | │11│ └─────────────────────────────────┘ │12│ Unbounded Table │13│ │14│ Query runs on entire table │15│ Output = Result as of that time │16│ │17└─────────────────────────────────────────────────────────┘2. Basic Structured Streaming
2.1 Reading from Stream Source
Python
1from pyspark.sql import SparkSession2from pyspark.sql.functions import *3from pyspark.sql.types import *45spark = SparkSession.builder \6 .appName("Structured Streaming") \7 .getOrCreate()89# Read from socket (for testing)10lines = spark.readStream \11 .format("socket") \12 .option("host", "localhost") \13 .option("port", 9999) \14 .load()1516# Read from files (new files trigger processing)17df = spark.readStream \18 .format("csv") \19 .option("header", True) \20 .schema(your_schema) \21 .load("path/to/input/folder/")2223# Read from Kafka24df = spark.readStream \25 .format("kafka") \26 .option("kafka.bootstrap.servers", "localhost:9092") \27 .option("subscribe", "topic_name") \28 .load()2.2 Transformations
Python
1# Same as batch DataFrame operations!2from pyspark.sql.functions import col, explode, split, window34# Parse JSON from Kafka value5schema = StructType([6 StructField("user_id", StringType()),7 StructField("action", StringType()),8 StructField("timestamp", TimestampType()),9 StructField("amount", DoubleType())10])1112events = df \13 .selectExpr("CAST(value AS STRING) as json") \14 .select(from_json(col("json"), schema).alias("data")) \15 .select("data.*")1617# Filter18filtered = events.filter(col("action") == "purchase")1920# Aggregate21aggregated = events \22 .groupBy("user_id") \23 .agg(24 count("*").alias("event_count"),25 sum("amount").alias("total_amount")26 )2728# Windowed aggregation29windowed = events \30 .groupBy(31 window(col("timestamp"), "10 minutes", "5 minutes"),32 col("action")33 ) \34 .count()2.3 Writing to Sink
Python
1# Output Modes:2# - append: Only new rows (default, works with aggregation only with watermark)3# - complete: All rows (for aggregations)4# - update: Only changed rows56# Write to console (for debugging)7query = aggregated.writeStream \8 .outputMode("complete") \9 .format("console") \10 .option("truncate", False) \11 .start()1213# Write to files (Parquet)14query = events.writeStream \15 .outputMode("append") \16 .format("parquet") \17 .option("path", "output/events") \18 .option("checkpointLocation", "checkpoints/events") \19 .start()2021# Write to Kafka22query = events.writeStream \23 .outputMode("append") \24 .format("kafka") \25 .option("kafka.bootstrap.servers", "localhost:9092") \26 .option("topic", "output_topic") \27 .option("checkpointLocation", "checkpoints/kafka") \28 .start()2930# Wait for termination31query.awaitTermination()3. Windowed Operations
3.1 Tumbling Window
Python
1from pyspark.sql.functions import window23# 10-minute tumbling windows4tumbling = events \5 .groupBy(6 window(col("timestamp"), "10 minutes")7 ) \8 .agg(9 count("*").alias("count"),10 sum("amount").alias("total")11 )1213# Result has columns: window (struct with start, end), count, total3.2 Sliding Window
Python
1# 10-minute window, sliding every 5 minutes2sliding = events \3 .groupBy(4 window(col("timestamp"), "10 minutes", "5 minutes")5 ) \6 .agg(count("*").alias("count"))78# Each event appears in 2 windows3.3 Session Window (Spark 3.2+)
Python
1from pyspark.sql.functions import session_window23# Session window with 10-minute gap timeout4sessions = events \5 .groupBy(6 session_window(col("timestamp"), "10 minutes"),7 col("user_id")8 ) \9 .agg(10 count("*").alias("events_in_session"),11 first("action").alias("first_action"),12 last("action").alias("last_action")13 )4. Watermarks & Late Data
4.1 Why Watermarks?
Python
1# Without watermark: State grows forever2# With watermark: System knows when to discard old state34# Watermark = "I don't expect data older than X"5# Data older than watermark may be dropped4.2 Setting Watermark
Python
1# Accept data up to 10 minutes late2events_with_watermark = events \3 .withWatermark("timestamp", "10 minutes")45# Now aggregation with append mode works6windowed = events_with_watermark \7 .groupBy(8 window(col("timestamp"), "5 minutes"),9 col("user_id")10 ) \11 .count()1213query = windowed.writeStream \14 .outputMode("append") \15 .format("console") \16 .start()4.3 How Watermark Works
Text
1Timeline:2─────────────────────────────────────────────────────────3Event time: 10:00 10:05 10:10 10:15 10:204 │ │ │ │ │5Watermark (10min): - - 10:00 10:05 10:106 │ │ │ │ │7─────────────────────────────────────────────────────────8 9At processing time when max event time is 10:20:10- Watermark = 10:10 (10:20 - 10 minutes)11- Events before 10:10 might be dropped12- Window [10:00-10:05) can be finalized and output5. Joins in Streaming
5.1 Stream-Static Join
Python
1# Static DataFrame (lookup table)2products = spark.read.parquet("products.parquet")34# Stream5orders = spark.readStream.format("kafka")...67# Join: Enrich orders with product info8enriched = orders.join(9 products,10 orders.product_id == products.product_id,11 "left"12)5.2 Stream-Stream Join
Python
1# Two streams2impressions = spark.readStream.format("kafka") \3 .option("subscribe", "impressions") \4 .load() \5 .withWatermark("timestamp", "2 hours")67clicks = spark.readStream.format("kafka") \8 .option("subscribe", "clicks") \9 .load() \10 .withWatermark("timestamp", "3 hours")1112# Join within time range13joined = impressions.join(14 clicks,15 expr("""16 impressions.ad_id = clicks.ad_id AND17 clicks.timestamp >= impressions.timestamp AND18 clicks.timestamp <= impressions.timestamp + interval 1 hour19 """),20 "leftOuter" # inner, leftOuter, rightOuter supported21)6. Stateful Operations
6.1 Deduplication
Python
1# Deduplicate within watermark2deduped = events \3 .withWatermark("timestamp", "10 minutes") \4 .dropDuplicates(["event_id"]) # or ["event_id", "timestamp"]6.2 Arbitrary Stateful Processing
Python
1from pyspark.sql.streaming import GroupState, GroupStateTimeout23# Define state and output types4state_schema = StructType([5 StructField("count", IntegerType()),6 StructField("sum", DoubleType())7])89output_schema = StructType([10 StructField("user_id", StringType()),11 StructField("avg", DoubleType())12])1314def update_state(key, events, state: GroupState):15 """Custom stateful processing"""16 # Get current state or initialize17 if state.exists:18 current = state.get19 else:20 current = {"count": 0, "sum": 0.0}21 22 # Update with new events23 for event in events:24 current["count"] += 125 current["sum"] += event.amount26 27 # Save state28 state.update(current)29 30 # Output result31 if current["count"] > 0:32 avg = current["sum"] / current["count"]33 yield (key[0], avg)3435# Apply36result = events \37 .groupByKey(lambda x: x.user_id) \38 .flatMapGroupsWithState(39 output_schema,40 state_schema,41 GroupStateTimeout.ProcessingTimeTimeout,42 update_state43 )6.3 MapGroupsWithState (Simpler)
Python
1def update_user_state(user_id, events, state: GroupState):2 """Update state for each user"""3 # Get or initialize state4 if state.exists:5 total = state.get6 else:7 total = 08 9 # Process new events10 for event in events:11 total += event.amount12 13 # Update state14 state.update(total)15 16 # Return single output17 return (user_id, total)1819result = events \20 .groupByKey(lambda x: x.user_id) \21 .mapGroupsWithState(22 output_schema,23 state_schema,24 GroupStateTimeout.NoTimeout,25 update_user_state26 )7. Monitoring & Debugging
7.1 Query Progress
Python
1# Get query status2query = df.writeStream...start()34print(query.status) # Current status5print(query.lastProgress) # Last batch statistics6print(query.recentProgress) # Recent batches78# Programmatic monitoring9while query.isActive:10 progress = query.lastProgress11 if progress:12 print(f"Batch: {progress['batchId']}")13 print(f"Input rows: {progress['numInputRows']}")14 print(f"Processing time: {progress['durationMs']}")15 time.sleep(10)7.2 Checkpointing
Python
1# Checkpoint stores:2# - Offsets (where we are in the source)3# - State (for aggregations)4# - Metadata56query = df.writeStream \7 .option("checkpointLocation", "/path/to/checkpoint") \8 .start()910# Benefits:11# - Fault tolerance (restart from checkpoint)12# - Exactly-once semantics13# - Required for production1415# Note: Don't change query between restarts without clearing checkpoint7.3 Spark UI for Streaming
Python
1# Access Spark UI at http://localhost:40402# Streaming tab shows:3# - Input rate4# - Processing time5# - Batch duration6# - State size8. Production Best Practices
8.1 Configuration
Python
1spark = SparkSession.builder \2 .appName("Production Streaming") \3 4 # Shuffle partitions (adjust based on data volume)5 .config("spark.sql.shuffle.partitions", "200") \6 7 # State store8 .config("spark.sql.streaming.stateStore.providerClass", 9 "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \10 11 # Checkpointing12 .config("spark.sql.streaming.checkpointLocation", "/checkpoints") \13 14 # Processing guarantees15 .config("spark.sql.streaming.metricsEnabled", "true") \16 17 .getOrCreate()8.2 Error Handling
Python
1# For file sink: Handle corrupt files2df.writeStream \3 .option("badRecordsPath", "/path/to/bad_records") \4 .start()56# Graceful shutdown7import signal89def shutdown_handler(signum, frame):10 query.stop()1112signal.signal(signal.SIGTERM, shutdown_handler)1314# Monitor for failures15try:16 query.awaitTermination()17except Exception as e:18 print(f"Stream failed: {e}")19 # Send alert, restart, etc.8.3 Trigger Modes
Python
1# Default: Process as fast as possible2query = df.writeStream.start()34# Fixed interval: Every 10 seconds5query = df.writeStream \6 .trigger(processingTime="10 seconds") \7 .start()89# Once: Process all available data and stop10query = df.writeStream \11 .trigger(once=True) \12 .start()1314# Available now: Process all available, then stop (Spark 3.3+)15query = df.writeStream \16 .trigger(availableNow=True) \17 .start()1819# Continuous (low latency, experimental)20query = df.writeStream \21 .trigger(continuous="1 second") \22 .start()9. Complete Example
Python
1from pyspark.sql import SparkSession2from pyspark.sql.functions import *3from pyspark.sql.types import *45# Initialize6spark = SparkSession.builder \7 .appName("E-commerce Streaming") \8 .getOrCreate()910# Schema for events11event_schema = StructType([12 StructField("event_id", StringType()),13 StructField("user_id", StringType()),14 StructField("product_id", StringType()),15 StructField("event_type", StringType()), # view, cart, purchase16 StructField("amount", DoubleType()),17 StructField("timestamp", TimestampType())18])1920# Read from Kafka21events = spark.readStream \22 .format("kafka") \23 .option("kafka.bootstrap.servers", "localhost:9092") \24 .option("subscribe", "ecommerce_events") \25 .option("startingOffsets", "latest") \26 .load() \27 .select(28 from_json(col("value").cast("string"), event_schema).alias("event")29 ) \30 .select("event.*") \31 .withWatermark("timestamp", "10 minutes")3233# Real-time metrics: Revenue per 5-minute window34revenue_by_window = events \35 .filter(col("event_type") == "purchase") \36 .groupBy(37 window(col("timestamp"), "5 minutes")38 ) \39 .agg(40 count("*").alias("purchase_count"),41 sum("amount").alias("total_revenue"),42 approx_count_distinct("user_id").alias("unique_buyers")43 )4445# Real-time metrics: Active users per minute46active_users = events \47 .groupBy(48 window(col("timestamp"), "1 minute")49 ) \50 .agg(51 approx_count_distinct("user_id").alias("active_users"),52 count("*").alias("total_events")53 )5455# Write revenue metrics to console56revenue_query = revenue_by_window.writeStream \57 .outputMode("update") \58 .format("console") \59 .option("truncate", False) \60 .trigger(processingTime="30 seconds") \61 .start()6263# Write active users to Kafka64users_query = active_users \65 .select(66 to_json(struct("*")).alias("value")67 ) \68 .writeStream \69 .outputMode("update") \70 .format("kafka") \71 .option("kafka.bootstrap.servers", "localhost:9092") \72 .option("topic", "active_users_metrics") \73 .option("checkpointLocation", "/checkpoints/active_users") \74 .start()7576# Wait for termination77spark.streams.awaitAnyTermination()10. Thực hành
Streaming Exercise
Exercise: Build Real-time Dashboard Metrics
Python
1# Scenario: Streaming clickstream data2# 3# Input schema:4# - user_id: string5# - page: string6# - action: string (click, scroll, hover)7# - timestamp: timestamp8#9# Tasks:10# 1. Count page views per page per 5-minute window11# 2. Calculate unique users per page per 10-minute sliding window (slide 2 min)12# 3. Find top 3 pages by views in each 15-minute window13#14# Use watermark of 10 minutes for late data1516# YOUR CODE HERE💡 Xem đáp án
Python
1from pyspark.sql import SparkSession2from pyspark.sql.functions import *3from pyspark.sql.types import *45spark = SparkSession.builder.appName("Clickstream").getOrCreate()67# Simulate input (in production, read from Kafka)8schema = StructType([9 StructField("user_id", StringType()),10 StructField("page", StringType()),11 StructField("action", StringType()),12 StructField("timestamp", TimestampType())13])1415# Read stream (simulated with rate source for demo)16clicks = spark.readStream \17 .format("rate") \18 .option("rowsPerSecond", 100) \19 .load() \20 .withColumn("user_id", (rand() * 1000).cast("int").cast("string")) \21 .withColumn("page", concat(lit("/page_"), (rand() * 20).cast("int").cast("string"))) \22 .withColumn("action", 23 when(rand() < 0.7, "click")24 .when(rand() < 0.9, "scroll")25 .otherwise("hover")26 ) \27 .withWatermark("timestamp", "10 minutes")2829# 1. Page views per page per 5-minute tumbling window30page_views = clicks \31 .filter(col("action") == "click") \32 .groupBy(33 window(col("timestamp"), "5 minutes"),34 col("page")35 ) \36 .count() \37 .withColumnRenamed("count", "page_views")3839# 2. Unique users per page per 10-minute sliding window40unique_users = clicks \41 .groupBy(42 window(col("timestamp"), "10 minutes", "2 minutes"),43 col("page")44 ) \45 .agg(46 approx_count_distinct("user_id").alias("unique_users")47 )4849# 3. Top 3 pages per 15-minute window50from pyspark.sql.window import Window5152page_counts = clicks \53 .filter(col("action") == "click") \54 .groupBy(55 window(col("timestamp"), "15 minutes"),56 col("page")57 ) \58 .count()5960# Use row_number to get top 361# Note: This requires complete mode62window_spec = Window.partitionBy("window").orderBy(col("count").desc())6364top_pages = page_counts \65 .withColumn("rank", row_number().over(window_spec)) \66 .filter(col("rank") <= 3)6768# Output to console69query1 = page_views.writeStream \70 .outputMode("update") \71 .format("console") \72 .option("truncate", False) \73 .queryName("page_views") \74 .start()7576query2 = unique_users.writeStream \77 .outputMode("update") \78 .format("console") \79 .option("truncate", False) \80 .queryName("unique_users") \81 .start()8283query3 = top_pages.writeStream \84 .outputMode("complete") \85 .format("console") \86 .option("truncate", False) \87 .queryName("top_pages") \88 .start()8990spark.streams.awaitAnyTermination()11. Tổng kết
| Concept | Description |
|---|---|
| Unbounded Table | Stream as continuously appending table |
| Output Modes | append, complete, update |
| Windowing | Tumbling, Sliding, Session |
| Watermarks | Handle late data, bound state |
| Checkpointing | Fault tolerance, exactly-once |
| Stateful Ops | Aggregations, dedup, custom state |
Key Points:
- Same API as batch DataFrame
- Watermarks essential for production
- Always use checkpoints
- Monitor with Spark UI
Bài tiếp theo: Kafka & Message Queues
