Lý thuyết
Bài 10/15

Data Streaming Fundamentals

Concepts of real-time data processing, stream processing architectures

Data Streaming Fundamentals

Real-time Data Streaming

1. Batch vs Stream Processing

1.1 Comparison

AspectBatch ProcessingStream Processing
DataBounded, finiteUnbounded, infinite
ProcessingProcess all at onceProcess as it arrives
LatencyMinutes to hoursMilliseconds to seconds
ComplexitySimplerMore complex
Use CaseReports, ML trainingReal-time alerts, dashboards

1.2 When to Use Streaming

Use Streaming When
  • Real-time insights needed
  • Data arrives continuously
  • Low latency required (under 1 second)
  • Event-driven architecture

Examples:

  • Fraud detection (immediate)
  • Real-time recommendations
  • IoT sensor monitoring
  • Live dashboards
  • Log analysis

2. Stream Processing Concepts

2.1 Event Time vs Processing Time

Text
1┌─────────────────────────────────────────────────────────┐
2│ Timeline │
3├─────────────────────────────────────────────────────────┤
4│ │
5│ Event Time: [E1]----[E2]----[E3]----[E4] │
6│ (When event 10:00 10:01 10:02 10:03 │
7│ occurred) │
8│ │
9│ Processing Time: [E1]--[E3]----[E2]------[E4] │
10│ (When event 10:05 10:06 10:08 10:10 │
11│ was processed) │
12│ │
13│ Note: Events can arrive out of order! │
14│ │
15└─────────────────────────────────────────────────────────┘
Python
1# Why it matters
2# Event at 10:00 might arrive at 10:05 due to network delay
3# If you compute "sales in last 5 minutes" at 10:06:
4# - Processing time: might miss events still in transit
5# - Event time: correct result based on when sales actually happened

2.2 Windowing

Text
1┌─────────────────────────────────────────────────────────┐
2│ Windowing Types │
3├─────────────────────────────────────────────────────────┤
4│ │
5│ TUMBLING WINDOW (Fixed, non-overlapping) │
6│ |----W1----|----W2----|----W3----| │
7│ | E1 E2 | E3 E4 | E5 E6 | │
8│ │
9│ SLIDING WINDOW (Overlapping) │
10│ |----W1----| │
11│ |----W2----| │
12│ |----W3----| │
13│ │
14│ SESSION WINDOW (Activity-based) │
15│ |---Session1---| gap |--Session2--| gap |--S3--| │
16│ │
17└─────────────────────────────────────────────────────────┘
Python
1# Tumbling Window: 5-minute windows
2# Events [10:00-10:05) go to Window 1
3# Events [10:05-10:10) go to Window 2
4# No overlap
5
6# Sliding Window: 5-minute window, sliding every 1 minute
7# Window 1: [10:00-10:05)
8# Window 2: [10:01-10:06)
9# Events can be in multiple windows
10
11# Session Window: Based on activity
12# Gap of 30 minutes creates new session
13# User activity from 10:00-10:15, then 11:00-11:30
14# = 2 separate sessions

2.3 Watermarks

Python
1# Watermark = How long to wait for late data
2
3# Example: Watermark of 10 minutes
4# At processing time 10:20:
5# - Watermark = 10:10 (10:20 - 10 minutes)
6# - Events before 10:10 are considered "late"
7# - Events after 10:10 are still accepted
8
9# Trade-off:
10# - Longer watermark = More complete results, higher latency
11# - Shorter watermark = Faster results, might miss late data

2.4 State Management

Python
1# Stream processing often needs state
2# Examples:
3# - Running counts
4# - Session information
5# - Aggregations within windows
6
7# Challenges:
8# 1. State can grow unboundedly
9# 2. Need fault tolerance (checkpointing)
10# 3. State must be distributed

3. Stream Processing Architectures

3.1 Lambda Architecture

Text
1┌─────────────────────────────────────────────────────────┐
2│ Lambda Architecture │
3├─────────────────────────────────────────────────────────┤
4│ │
5│ ┌──────────┐ │
6│ │ Data │ │
7│ │ Source │ │
8│ └────┬─────┘ │
9│ │ │
10│ ┌────────────┼────────────┐ │
11│ │ │ │ │
12│ v │ v │
13│ ┌──────────┐ │ ┌──────────┐ │
14│ │ Batch │ │ │ Speed │ │
15│ │ Layer │ │ │ Layer │ │
16│ │ (Hadoop) │ │ │ (Storm) │ │
17│ └────┬─────┘ │ └────┬─────┘ │
18│ │ │ │ │
19│ v │ v │
20│ ┌──────────┐ │ ┌──────────┐ │
21│ │ Batch │ │ │ Real-time│ │
22│ │ Views │ │ │ Views │ │
23│ └────┬─────┘ │ └────┬─────┘ │
24│ │ │ │ │
25│ └──────────────┼──────────┘ │
26│ │ │
27│ ┌────▼────┐ │
28│ │ Serving │ │
29│ │ Layer │ │
30│ └─────────┘ │
31│ │
32└─────────────────────────────────────────────────────────┘

Pros:

  • Accurate batch results + fast real-time
  • Fault tolerant

Cons:

  • Two codebases to maintain
  • Complex

3.2 Kappa Architecture

Text
1┌─────────────────────────────────────────────────────────┐
2│ Kappa Architecture │
3├─────────────────────────────────────────────────────────┤
4│ │
5│ ┌──────────┐ │
6│ │ Data │ │
7│ │ Source │ │
8│ └────┬─────┘ │
9│ │ │
10│ v │
11│ ┌──────────┐ │
12│ │ Stream │ (Kafka) │
13│ │ Storage │ │
14│ └────┬─────┘ │
15│ │ │
16│ v │
17│ ┌──────────┐ │
18│ │ Stream │ (Spark Streaming, Flink) │
19│ │ Processor│ │
20│ └────┬─────┘ │
21│ │ │
22│ v │
23│ ┌──────────┐ │
24│ │ Serving │ │
25│ │ Layer │ │
26│ └──────────┘ │
27│ │
28└─────────────────────────────────────────────────────────┘

Pros:

  • Single codebase
  • Simpler than Lambda
  • Reprocess by replaying stream

Cons:

  • Requires log-based storage (Kafka)
  • May not suit all use cases

4. Exactly-Once Semantics

4.1 Delivery Guarantees

Text
1┌─────────────────────────────────────────────────────────┐
2│ Delivery Guarantees │
3├─────────────────────────────────────────────────────────┤
4│ │
5│ AT-MOST-ONCE │
6│ - Message processed 0 or 1 time │
7│ - Fire and forget │
8│ - Data loss possible │
9│ - Use: Metrics where some loss OK │
10│ │
11│ AT-LEAST-ONCE │
12│ - Message processed 1 or more times │
13│ - Retry on failure │
14│ - Duplicates possible │
15│ - Use: When idempotent operations │
16│ │
17│ EXACTLY-ONCE │
18│ - Message processed exactly 1 time │
19│ - Most complex to achieve │
20│ - No data loss, no duplicates │
21│ - Use: Financial transactions │
22│ │
23└─────────────────────────────────────────────────────────┘

4.2 Achieving Exactly-Once

Python
1# Strategies:
2
3# 1. Idempotent Operations
4# Same operation applied multiple times = same result
5# INSERT becomes UPSERT
6# Count becomes "set count to X"
7
8# 2. Transactional Processing
9# Producer commits offset only after successful processing
10# Uses transaction IDs
11
12# 3. Deduplication
13# Assign unique ID to each message
14# Check if ID already processed

5. Common Streaming Patterns

5.1 Event Sourcing

Python
1# Store events, not state
2# State = replay all events
3
4# Example: Bank Account
5events = [
6 {"type": "deposit", "amount": 100, "time": "10:00"},
7 {"type": "withdraw", "amount": 30, "time": "10:05"},
8 {"type": "deposit", "amount": 50, "time": "10:10"},
9]
10
11# Current balance = replay events
12# 0 + 100 - 30 + 50 = 120
13
14# Benefits:
15# - Complete audit trail
16# - Can reconstruct state at any point
17# - Easy debugging

5.2 CQRS (Command Query Responsibility Segregation)

Text
1┌─────────────────────────────────────────────────────────┐
2│ CQRS │
3├─────────────────────────────────────────────────────────┤
4│ │
5│ Commands (Write) Queries (Read) │
6│ │ │ │
7│ v v │
8│ ┌─────────┐ ┌─────────┐ │
9│ │ Write │ │ Read │ │
10│ │ Model │ │ Model │ │
11│ └────┬────┘ └────▲────┘ │
12│ │ │ │
13│ │ Events Stream │ │
14│ └────────────────────────┘ │
15│ │
16└─────────────────────────────────────────────────────────┘

5.3 Stream Joins

Python
1# Stream-Stream Join
2# Join two event streams within a time window
3
4# Example: Match clicks with impressions
5# Click stream: user_id, ad_id, click_time
6# Impression stream: user_id, ad_id, impression_time
7
8# Join within 10-minute window
9# Find clicks that happened within 10 minutes of impression
10
11# Stream-Table Join
12# Enrich stream with reference data
13
14# Example: Enrich orders with customer info
15# Order stream + Customer table
16# Every order gets customer details

6. Streaming Technologies Overview

6.1 Comparison

TechnologyLatencyExactly-OnceSQL SupportEcosystem
Kafka StreamsLowYesKSQLKafka
Spark StreamingMediumYesYesSpark
FlinkVery LowYesYesStandalone
StormLowAt-least-onceNoHadoop

6.2 When to Use What

Python
1# Kafka Streams
2# - Already using Kafka
3# - JVM-based applications
4# - Lightweight streaming
5
6# Spark Structured Streaming
7# - Already using Spark
8# - Unified batch + stream
9# - Complex transformations
10
11# Apache Flink
12# - True event-time processing
13# - Very low latency needed
14# - Complex event processing
15
16# Cloud Services
17# - AWS Kinesis
18# - Azure Stream Analytics
19# - Google Dataflow

7. Simple Streaming Example

7.1 Concept Demo with Python

Python
1import time
2from collections import deque
3from datetime import datetime
4import random
5
6class SimpleStreamProcessor:
7 """Simulate stream processing concepts"""
8
9 def __init__(self, window_size_seconds=5):
10 self.window_size = window_size_seconds
11 self.events = deque()
12 self.watermark = None
13
14 def process_event(self, event):
15 """Process incoming event"""
16 event_time = event['timestamp']
17 value = event['value']
18
19 # Add to window
20 self.events.append(event)
21
22 # Remove old events (tumbling window)
23 cutoff = datetime.now().timestamp() - self.window_size
24 while self.events and self.events[0]['timestamp'] < cutoff:
25 self.events.popleft()
26
27 # Compute window aggregate
28 if self.events:
29 window_sum = sum(e['value'] for e in self.events)
30 window_count = len(self.events)
31 print(f"Window: count={window_count}, sum={window_sum}, avg={window_sum/window_count:.2f}")
32
33 def simulate_stream(self, duration_seconds=30):
34 """Simulate streaming events"""
35 start = time.time()
36
37 while time.time() - start < duration_seconds:
38 # Generate random event
39 event = {
40 'timestamp': datetime.now().timestamp(),
41 'value': random.randint(1, 100)
42 }
43
44 print(f"\nReceived event: value={event['value']}")
45 self.process_event(event)
46
47 # Sleep random interval
48 time.sleep(random.uniform(0.5, 1.5))
49
50
51# Demo
52processor = SimpleStreamProcessor(window_size_seconds=5)
53processor.simulate_stream(duration_seconds=20)

7.2 Windowed Aggregation

Python
1from datetime import datetime, timedelta
2from collections import defaultdict
3
4class WindowedAggregator:
5 """Tumbling window aggregation"""
6
7 def __init__(self, window_minutes=1):
8 self.window_minutes = window_minutes
9 self.windows = defaultdict(list)
10
11 def get_window_key(self, timestamp):
12 """Get window key for timestamp"""
13 dt = datetime.fromtimestamp(timestamp)
14 # Round down to window boundary
15 minute_bucket = (dt.minute // self.window_minutes) * self.window_minutes
16 window_start = dt.replace(minute=minute_bucket, second=0, microsecond=0)
17 return window_start.isoformat()
18
19 def add_event(self, event):
20 """Add event to appropriate window"""
21 window_key = self.get_window_key(event['timestamp'])
22 self.windows[window_key].append(event)
23
24 # Check if window should be emitted
25 self.emit_completed_windows()
26
27 def emit_completed_windows(self):
28 """Emit aggregates for completed windows"""
29 current_time = datetime.now()
30 current_window = self.get_window_key(current_time.timestamp())
31
32 for window_key in list(self.windows.keys()):
33 if window_key < current_window:
34 events = self.windows[window_key]
35 total = sum(e['value'] for e in events)
36 count = len(events)
37
38 print(f"Window [{window_key}] completed: count={count}, sum={total}")
39 del self.windows[window_key]
40
41
42# Usage example
43agg = WindowedAggregator(window_minutes=1)
44
45# Simulate events
46import time
47import random
48
49for i in range(20):
50 event = {
51 'timestamp': datetime.now().timestamp(),
52 'value': random.randint(10, 50),
53 'event_id': i
54 }
55 print(f"Event {i}: value={event['value']}")
56 agg.add_event(event)
57 time.sleep(0.5)

8. Thực hành

Exercise

Exercise 1: Design Streaming System

Python
1# Scenario: E-commerce real-time analytics
2#
3# Events:
4# - page_view: {user_id, page, timestamp}
5# - add_to_cart: {user_id, product_id, timestamp}
6# - purchase: {user_id, order_id, amount, timestamp}
7#
8# Requirements:
9# 1. Real-time dashboard: Active users (last 5 min)
10# 2. Alert: Cart abandonment (add_to_cart without purchase in 30 min)
11# 3. Metric: Revenue per hour
12#
13# Design:
14# - What windowing strategy for each?
15# - What state needs to be maintained?
16# - What delivery guarantee needed?
17
18# YOUR DESIGN HERE
💡 Xem đáp án
Python
1"""
2Streaming System Design
3
41. Active Users (last 5 min)
5 - Window: Sliding window, 5-min size, sliding every 1-min
6 - State: Set of unique user_ids per window
7 - Guarantee: At-least-once OK (counting unique users)
8 - Implementation:
9 group by window(5-min, slide 1-min)
10 count distinct user_id
11
122. Cart Abandonment Alert
13 - Window: Session window with 30 min timeout
14 - State:
15 - Map of user_id -> last_cart_time
16 - Watermark of 30 minutes
17 - Guarantee: Exactly-once (important for alerts)
18 - Implementation:
19 - On add_to_cart: store user_id + timestamp
20 - On purchase: remove user_id from state
21 - Timer: After 30 min of no purchase, emit alert
22
233. Revenue per Hour
24 - Window: Tumbling window, 1-hour
25 - State: Running sum of amount
26 - Guarantee: Exactly-once (financial data)
27 - Implementation:
28 purchases
29 .group_by(window(1-hour))
30 .sum(amount)
31"""
32
33# Pseudo-code implementation sketch
34class EcommerceStreaming:
35 def __init__(self):
36 self.active_users_window = SlidingWindow(size=300, slide=60) # seconds
37 self.cart_sessions = {} # user_id -> last_cart_time
38 self.hourly_revenue = TumblingWindow(size=3600)
39
40 def process_page_view(self, event):
41 self.active_users_window.add(event['user_id'], event['timestamp'])
42
43 def process_add_to_cart(self, event):
44 self.cart_sessions[event['user_id']] = event['timestamp']
45 # Set timer for 30 min
46 self.schedule_abandonment_check(event['user_id'], event['timestamp'] + 1800)
47
48 def process_purchase(self, event):
49 # Remove from cart tracking
50 if event['user_id'] in self.cart_sessions:
51 del self.cart_sessions[event['user_id']]
52 # Add to revenue
53 self.hourly_revenue.add(event['amount'], event['timestamp'])
54
55 def check_abandonment(self, user_id, trigger_time):
56 if user_id in self.cart_sessions:
57 cart_time = self.cart_sessions[user_id]
58 if trigger_time - cart_time >= 1800: # 30 min
59 self.emit_alert(f"Cart abandoned: {user_id}")
60 del self.cart_sessions[user_id]
61
62 def emit_metrics(self):
63 print(f"Active users: {self.active_users_window.count_unique()}")
64 print(f"Hourly revenue: {self.hourly_revenue.sum()}")

9. Tổng kết

ConceptDescription
Event Time vs Processing TimeWhen event happened vs when processed
WindowingGroup events (Tumbling, Sliding, Session)
WatermarksHandle late data
StateMaintain information across events
Delivery GuaranteesAt-most/At-least/Exactly-once

Key Architectures:

  • Lambda: Batch + Speed layers
  • Kappa: Stream-only with replay

When Streaming:

  • Real-time requirements
  • Continuous data flow
  • Event-driven systems

Bài tiếp theo: Spark Structured Streaming