Data Streaming Fundamentals
1. Batch vs Stream Processing
1.1 Comparison
| Aspect | Batch Processing | Stream Processing |
|---|---|---|
| Data | Bounded, finite | Unbounded, infinite |
| Processing | Process all at once | Process as it arrives |
| Latency | Minutes to hours | Milliseconds to seconds |
| Complexity | Simpler | More complex |
| Use Case | Reports, ML training | Real-time alerts, dashboards |
1.2 When to Use Streaming
Use Streaming When
- Real-time insights needed
- Data arrives continuously
- Low latency required (under 1 second)
- Event-driven architecture
Examples:
- Fraud detection (immediate)
- Real-time recommendations
- IoT sensor monitoring
- Live dashboards
- Log analysis
2. Stream Processing Concepts
2.1 Event Time vs Processing Time
Text
1┌─────────────────────────────────────────────────────────┐2│ Timeline │3├─────────────────────────────────────────────────────────┤4│ │5│ Event Time: [E1]----[E2]----[E3]----[E4] │6│ (When event 10:00 10:01 10:02 10:03 │7│ occurred) │8│ │9│ Processing Time: [E1]--[E3]----[E2]------[E4] │10│ (When event 10:05 10:06 10:08 10:10 │11│ was processed) │12│ │13│ Note: Events can arrive out of order! │14│ │15└─────────────────────────────────────────────────────────┘Python
1# Why it matters2# Event at 10:00 might arrive at 10:05 due to network delay3# If you compute "sales in last 5 minutes" at 10:06:4# - Processing time: might miss events still in transit5# - Event time: correct result based on when sales actually happened2.2 Windowing
Text
1┌─────────────────────────────────────────────────────────┐2│ Windowing Types │3├─────────────────────────────────────────────────────────┤4│ │5│ TUMBLING WINDOW (Fixed, non-overlapping) │6│ |----W1----|----W2----|----W3----| │7│ | E1 E2 | E3 E4 | E5 E6 | │8│ │9│ SLIDING WINDOW (Overlapping) │10│ |----W1----| │11│ |----W2----| │12│ |----W3----| │13│ │14│ SESSION WINDOW (Activity-based) │15│ |---Session1---| gap |--Session2--| gap |--S3--| │16│ │17└─────────────────────────────────────────────────────────┘Python
1# Tumbling Window: 5-minute windows2# Events [10:00-10:05) go to Window 13# Events [10:05-10:10) go to Window 24# No overlap56# Sliding Window: 5-minute window, sliding every 1 minute7# Window 1: [10:00-10:05)8# Window 2: [10:01-10:06)9# Events can be in multiple windows1011# Session Window: Based on activity12# Gap of 30 minutes creates new session13# User activity from 10:00-10:15, then 11:00-11:3014# = 2 separate sessions2.3 Watermarks
Python
1# Watermark = How long to wait for late data23# Example: Watermark of 10 minutes4# At processing time 10:20:5# - Watermark = 10:10 (10:20 - 10 minutes)6# - Events before 10:10 are considered "late"7# - Events after 10:10 are still accepted89# Trade-off:10# - Longer watermark = More complete results, higher latency11# - Shorter watermark = Faster results, might miss late data2.4 State Management
Python
1# Stream processing often needs state2# Examples:3# - Running counts4# - Session information5# - Aggregations within windows67# Challenges:8# 1. State can grow unboundedly9# 2. Need fault tolerance (checkpointing)10# 3. State must be distributed3. Stream Processing Architectures
3.1 Lambda Architecture
Text
1┌─────────────────────────────────────────────────────────┐2│ Lambda Architecture │3├─────────────────────────────────────────────────────────┤4│ │5│ ┌──────────┐ │6│ │ Data │ │7│ │ Source │ │8│ └────┬─────┘ │9│ │ │10│ ┌────────────┼────────────┐ │11│ │ │ │ │12│ v │ v │13│ ┌──────────┐ │ ┌──────────┐ │14│ │ Batch │ │ │ Speed │ │15│ │ Layer │ │ │ Layer │ │16│ │ (Hadoop) │ │ │ (Storm) │ │17│ └────┬─────┘ │ └────┬─────┘ │18│ │ │ │ │19│ v │ v │20│ ┌──────────┐ │ ┌──────────┐ │21│ │ Batch │ │ │ Real-time│ │22│ │ Views │ │ │ Views │ │23│ └────┬─────┘ │ └────┬─────┘ │24│ │ │ │ │25│ └──────────────┼──────────┘ │26│ │ │27│ ┌────▼────┐ │28│ │ Serving │ │29│ │ Layer │ │30│ └─────────┘ │31│ │32└─────────────────────────────────────────────────────────┘Pros:
- Accurate batch results + fast real-time
- Fault tolerant
Cons:
- Two codebases to maintain
- Complex
3.2 Kappa Architecture
Text
1┌─────────────────────────────────────────────────────────┐2│ Kappa Architecture │3├─────────────────────────────────────────────────────────┤4│ │5│ ┌──────────┐ │6│ │ Data │ │7│ │ Source │ │8│ └────┬─────┘ │9│ │ │10│ v │11│ ┌──────────┐ │12│ │ Stream │ (Kafka) │13│ │ Storage │ │14│ └────┬─────┘ │15│ │ │16│ v │17│ ┌──────────┐ │18│ │ Stream │ (Spark Streaming, Flink) │19│ │ Processor│ │20│ └────┬─────┘ │21│ │ │22│ v │23│ ┌──────────┐ │24│ │ Serving │ │25│ │ Layer │ │26│ └──────────┘ │27│ │28└─────────────────────────────────────────────────────────┘Pros:
- Single codebase
- Simpler than Lambda
- Reprocess by replaying stream
Cons:
- Requires log-based storage (Kafka)
- May not suit all use cases
4. Exactly-Once Semantics
4.1 Delivery Guarantees
Text
1┌─────────────────────────────────────────────────────────┐2│ Delivery Guarantees │3├─────────────────────────────────────────────────────────┤4│ │5│ AT-MOST-ONCE │6│ - Message processed 0 or 1 time │7│ - Fire and forget │8│ - Data loss possible │9│ - Use: Metrics where some loss OK │10│ │11│ AT-LEAST-ONCE │12│ - Message processed 1 or more times │13│ - Retry on failure │14│ - Duplicates possible │15│ - Use: When idempotent operations │16│ │17│ EXACTLY-ONCE │18│ - Message processed exactly 1 time │19│ - Most complex to achieve │20│ - No data loss, no duplicates │21│ - Use: Financial transactions │22│ │23└─────────────────────────────────────────────────────────┘4.2 Achieving Exactly-Once
Python
1# Strategies:23# 1. Idempotent Operations4# Same operation applied multiple times = same result5# INSERT becomes UPSERT6# Count becomes "set count to X"78# 2. Transactional Processing9# Producer commits offset only after successful processing10# Uses transaction IDs1112# 3. Deduplication13# Assign unique ID to each message14# Check if ID already processed5. Common Streaming Patterns
5.1 Event Sourcing
Python
1# Store events, not state2# State = replay all events34# Example: Bank Account5events = [6 {"type": "deposit", "amount": 100, "time": "10:00"},7 {"type": "withdraw", "amount": 30, "time": "10:05"},8 {"type": "deposit", "amount": 50, "time": "10:10"},9]1011# Current balance = replay events12# 0 + 100 - 30 + 50 = 1201314# Benefits:15# - Complete audit trail16# - Can reconstruct state at any point17# - Easy debugging5.2 CQRS (Command Query Responsibility Segregation)
Text
1┌─────────────────────────────────────────────────────────┐2│ CQRS │3├─────────────────────────────────────────────────────────┤4│ │5│ Commands (Write) Queries (Read) │6│ │ │ │7│ v v │8│ ┌─────────┐ ┌─────────┐ │9│ │ Write │ │ Read │ │10│ │ Model │ │ Model │ │11│ └────┬────┘ └────▲────┘ │12│ │ │ │13│ │ Events Stream │ │14│ └────────────────────────┘ │15│ │16└─────────────────────────────────────────────────────────┘5.3 Stream Joins
Python
1# Stream-Stream Join2# Join two event streams within a time window34# Example: Match clicks with impressions5# Click stream: user_id, ad_id, click_time6# Impression stream: user_id, ad_id, impression_time78# Join within 10-minute window9# Find clicks that happened within 10 minutes of impression1011# Stream-Table Join12# Enrich stream with reference data1314# Example: Enrich orders with customer info15# Order stream + Customer table16# Every order gets customer details6. Streaming Technologies Overview
6.1 Comparison
| Technology | Latency | Exactly-Once | SQL Support | Ecosystem |
|---|---|---|---|---|
| Kafka Streams | Low | Yes | KSQL | Kafka |
| Spark Streaming | Medium | Yes | Yes | Spark |
| Flink | Very Low | Yes | Yes | Standalone |
| Storm | Low | At-least-once | No | Hadoop |
6.2 When to Use What
Python
1# Kafka Streams2# - Already using Kafka3# - JVM-based applications4# - Lightweight streaming56# Spark Structured Streaming7# - Already using Spark8# - Unified batch + stream9# - Complex transformations1011# Apache Flink12# - True event-time processing13# - Very low latency needed14# - Complex event processing1516# Cloud Services17# - AWS Kinesis18# - Azure Stream Analytics19# - Google Dataflow7. Simple Streaming Example
7.1 Concept Demo with Python
Python
1import time2from collections import deque3from datetime import datetime4import random56class SimpleStreamProcessor:7 """Simulate stream processing concepts"""8 9 def __init__(self, window_size_seconds=5):10 self.window_size = window_size_seconds11 self.events = deque()12 self.watermark = None13 14 def process_event(self, event):15 """Process incoming event"""16 event_time = event['timestamp']17 value = event['value']18 19 # Add to window20 self.events.append(event)21 22 # Remove old events (tumbling window)23 cutoff = datetime.now().timestamp() - self.window_size24 while self.events and self.events[0]['timestamp'] < cutoff:25 self.events.popleft()26 27 # Compute window aggregate28 if self.events:29 window_sum = sum(e['value'] for e in self.events)30 window_count = len(self.events)31 print(f"Window: count={window_count}, sum={window_sum}, avg={window_sum/window_count:.2f}")32 33 def simulate_stream(self, duration_seconds=30):34 """Simulate streaming events"""35 start = time.time()36 37 while time.time() - start < duration_seconds:38 # Generate random event39 event = {40 'timestamp': datetime.now().timestamp(),41 'value': random.randint(1, 100)42 }43 44 print(f"\nReceived event: value={event['value']}")45 self.process_event(event)46 47 # Sleep random interval48 time.sleep(random.uniform(0.5, 1.5))495051# Demo52processor = SimpleStreamProcessor(window_size_seconds=5)53processor.simulate_stream(duration_seconds=20)7.2 Windowed Aggregation
Python
1from datetime import datetime, timedelta2from collections import defaultdict34class WindowedAggregator:5 """Tumbling window aggregation"""6 7 def __init__(self, window_minutes=1):8 self.window_minutes = window_minutes9 self.windows = defaultdict(list)10 11 def get_window_key(self, timestamp):12 """Get window key for timestamp"""13 dt = datetime.fromtimestamp(timestamp)14 # Round down to window boundary15 minute_bucket = (dt.minute // self.window_minutes) * self.window_minutes16 window_start = dt.replace(minute=minute_bucket, second=0, microsecond=0)17 return window_start.isoformat()18 19 def add_event(self, event):20 """Add event to appropriate window"""21 window_key = self.get_window_key(event['timestamp'])22 self.windows[window_key].append(event)23 24 # Check if window should be emitted25 self.emit_completed_windows()26 27 def emit_completed_windows(self):28 """Emit aggregates for completed windows"""29 current_time = datetime.now()30 current_window = self.get_window_key(current_time.timestamp())31 32 for window_key in list(self.windows.keys()):33 if window_key < current_window:34 events = self.windows[window_key]35 total = sum(e['value'] for e in events)36 count = len(events)37 38 print(f"Window [{window_key}] completed: count={count}, sum={total}")39 del self.windows[window_key]404142# Usage example43agg = WindowedAggregator(window_minutes=1)4445# Simulate events46import time47import random4849for i in range(20):50 event = {51 'timestamp': datetime.now().timestamp(),52 'value': random.randint(10, 50),53 'event_id': i54 }55 print(f"Event {i}: value={event['value']}")56 agg.add_event(event)57 time.sleep(0.5)8. Thực hành
Exercise
Exercise 1: Design Streaming System
Python
1# Scenario: E-commerce real-time analytics2# 3# Events:4# - page_view: {user_id, page, timestamp}5# - add_to_cart: {user_id, product_id, timestamp}6# - purchase: {user_id, order_id, amount, timestamp}7#8# Requirements:9# 1. Real-time dashboard: Active users (last 5 min)10# 2. Alert: Cart abandonment (add_to_cart without purchase in 30 min)11# 3. Metric: Revenue per hour12#13# Design:14# - What windowing strategy for each?15# - What state needs to be maintained?16# - What delivery guarantee needed?1718# YOUR DESIGN HERE💡 Xem đáp án
Python
1"""2Streaming System Design341. Active Users (last 5 min)5 - Window: Sliding window, 5-min size, sliding every 1-min6 - State: Set of unique user_ids per window7 - Guarantee: At-least-once OK (counting unique users)8 - Implementation:9 group by window(5-min, slide 1-min)10 count distinct user_id11122. Cart Abandonment Alert13 - Window: Session window with 30 min timeout14 - State: 15 - Map of user_id -> last_cart_time16 - Watermark of 30 minutes17 - Guarantee: Exactly-once (important for alerts)18 - Implementation:19 - On add_to_cart: store user_id + timestamp20 - On purchase: remove user_id from state21 - Timer: After 30 min of no purchase, emit alert22233. Revenue per Hour24 - Window: Tumbling window, 1-hour25 - State: Running sum of amount26 - Guarantee: Exactly-once (financial data)27 - Implementation:28 purchases29 .group_by(window(1-hour))30 .sum(amount)31"""3233# Pseudo-code implementation sketch34class EcommerceStreaming:35 def __init__(self):36 self.active_users_window = SlidingWindow(size=300, slide=60) # seconds37 self.cart_sessions = {} # user_id -> last_cart_time38 self.hourly_revenue = TumblingWindow(size=3600)39 40 def process_page_view(self, event):41 self.active_users_window.add(event['user_id'], event['timestamp'])42 43 def process_add_to_cart(self, event):44 self.cart_sessions[event['user_id']] = event['timestamp']45 # Set timer for 30 min46 self.schedule_abandonment_check(event['user_id'], event['timestamp'] + 1800)47 48 def process_purchase(self, event):49 # Remove from cart tracking50 if event['user_id'] in self.cart_sessions:51 del self.cart_sessions[event['user_id']]52 # Add to revenue53 self.hourly_revenue.add(event['amount'], event['timestamp'])54 55 def check_abandonment(self, user_id, trigger_time):56 if user_id in self.cart_sessions:57 cart_time = self.cart_sessions[user_id]58 if trigger_time - cart_time >= 1800: # 30 min59 self.emit_alert(f"Cart abandoned: {user_id}")60 del self.cart_sessions[user_id]61 62 def emit_metrics(self):63 print(f"Active users: {self.active_users_window.count_unique()}")64 print(f"Hourly revenue: {self.hourly_revenue.sum()}")9. Tổng kết
| Concept | Description |
|---|---|
| Event Time vs Processing Time | When event happened vs when processed |
| Windowing | Group events (Tumbling, Sliding, Session) |
| Watermarks | Handle late data |
| State | Maintain information across events |
| Delivery Guarantees | At-most/At-least/Exactly-once |
Key Architectures:
- Lambda: Batch + Speed layers
- Kappa: Stream-only with replay
When Streaming:
- Real-time requirements
- Continuous data flow
- Event-driven systems
Bài tiếp theo: Spark Structured Streaming
