Data Streaming Fundamentals
1. Batch vs Stream Processing
1.1 Comparison
| Aspect | Batch Processing | Stream Processing |
|---|---|---|
| Data | Bounded, finite | Unbounded, infinite |
| Processing | Process all at once | Process as it arrives |
| Latency | Minutes to hours | Milliseconds to seconds |
| Complexity | Simpler | More complex |
| Use Case | Reports, ML training | Real-time alerts, dashboards |
1.2 When to Use Streaming
Use Streaming When
- Real-time insights needed
- Data arrives continuously
- Low latency required (under 1 second)
- Event-driven architecture
Examples:
- Fraud detection (immediate)
- Real-time recommendations
- IoT sensor monitoring
- Live dashboards
- Log analysis
2. Stream Processing Concepts
2.1 Event Time vs Processing Time
Event Time vs Processing Time
| Concept | Description | Example |
|---|---|---|
| Event Time | Khi event thực sự xảy ra | 10:00, 10:01, 10:02, 10:03 |
| Processing Time | Khi event được xử lý | 10:05, 10:06, 10:08, 10:10 |
- Events có thể đến không theo thứ tự (out of order)!
- Điều này ảnh hưởng đến kết quả aggregation
Python
1# Why it matters2# Event at 10:00 might arrive at 10:05 due to network delay3# If you compute "sales in last 5 minutes" at 10:06:4# - Processing time: might miss events still in transit5# - Event time: correct result based on when sales actually happened2.2 Windowing
Windowing Types
| Window Type | Description | Overlap | Example |
|---|---|---|---|
| Tumbling | Fixed, non-overlapping | No | |----W1---- |
| Sliding | Fixed size, slides at interval | Yes | Windows overlap, each event in multiple windows |
| Session | Activity-based, gap timeout | No | |---Session1--- |
Python
1# Tumbling Window: 5-minute windows2# Events [10:00-10:05) go to Window 13# Events [10:05-10:10) go to Window 24# No overlap56# Sliding Window: 5-minute window, sliding every 1 minute7# Window 1: [10:00-10:05)8# Window 2: [10:01-10:06)9# Events can be in multiple windows1011# Session Window: Based on activity12# Gap of 30 minutes creates new session13# User activity from 10:00-10:15, then 11:00-11:3014# = 2 separate sessions2.3 Watermarks
Python
1# Watermark = How long to wait for late data23# Example: Watermark of 10 minutes4# At processing time 10:20:5# - Watermark = 10:10 (10:20 - 10 minutes)6# - Events before 10:10 are considered "late"7# - Events after 10:10 are still accepted89# Trade-off:10# - Longer watermark = More complete results, higher latency11# - Shorter watermark = Faster results, might miss late data2.4 State Management
Python
1# Stream processing often needs state2# Examples:3# - Running counts4# - Session information5# - Aggregations within windows67# Challenges:8# 1. State can grow unboundedly9# 2. Need fault tolerance (checkpointing)10# 3. State must be distributed3. Stream Processing Architectures
3.1 Lambda Architecture
Lambda Architecture
📡Data Source
📦Batch Layer (Hadoop)
⚡Speed Layer (Storm)
📊Batch Views
📈Real-time Views
🖥️Serving Layer
Pros:
- Accurate batch results + fast real-time
- Fault tolerant
Cons:
- Two codebases to maintain
- Complex
3.2 Kappa Architecture
Kappa Architecture
📡Data Source
📨Stream Storage (Kafka)
⚡Stream Processor
Spark Streaming, Flink
🖥️Serving Layer
Pros:
- Single codebase
- Simpler than Lambda
- Reprocess by replaying stream
Cons:
- Requires log-based storage (Kafka)
- May not suit all use cases
4. Exactly-Once Semantics
4.1 Delivery Guarantees
Delivery Guarantees
| Guarantee | Processing | Risk | Use Case |
|---|---|---|---|
| At-Most-Once | 0 or 1 time, fire and forget | Data loss possible | Metrics where some loss OK |
| At-Least-Once | 1+ times, retry on failure | Duplicates possible | Idempotent operations |
| Exactly-Once | Exactly 1 time, most complex | No data loss, no duplicates | Financial transactions |
4.2 Achieving Exactly-Once
Python
1# Strategies:23# 1. Idempotent Operations4# Same operation applied multiple times = same result5# INSERT becomes UPSERT6# Count becomes "set count to X"78# 2. Transactional Processing9# Producer commits offset only after successful processing10# Uses transaction IDs1112# 3. Deduplication13# Assign unique ID to each message14# Check if ID already processed5. Common Streaming Patterns
5.1 Event Sourcing
Python
1# Store events, not state2# State = replay all events34# Example: Bank Account5events = [6 {"type": "deposit", "amount": 100, "time": "10:00"},7 {"type": "withdraw", "amount": 30, "time": "10:05"},8 {"type": "deposit", "amount": 50, "time": "10:10"},9]1011# Current balance = replay events12# 0 + 100 - 30 + 50 = 1201314# Benefits:15# - Complete audit trail16# - Can reconstruct state at any point17# - Easy debugging5.2 CQRS (Command Query Responsibility Segregation)
CQRS Pattern
✏️Commands (Write)
💾Write Model
📡Events Stream
📖Read Model
🔍Queries (Read)
5.3 Stream Joins
Python
1# Stream-Stream Join2# Join two event streams within a time window34# Example: Match clicks with impressions5# Click stream: user_id, ad_id, click_time6# Impression stream: user_id, ad_id, impression_time78# Join within 10-minute window9# Find clicks that happened within 10 minutes of impression1011# Stream-Table Join12# Enrich stream with reference data1314# Example: Enrich orders with customer info15# Order stream + Customer table16# Every order gets customer details6. Streaming Technologies Overview
6.1 Comparison
| Technology | Latency | Exactly-Once | SQL Support | Ecosystem |
|---|---|---|---|---|
| Kafka Streams | Low | Yes | KSQL | Kafka |
| Spark Streaming | Medium | Yes | Yes | Spark |
| Flink | Very Low | Yes | Yes | Standalone |
| Storm | Low | At-least-once | No | Hadoop |
6.2 When to Use What
Python
1# Kafka Streams2# - Already using Kafka3# - JVM-based applications4# - Lightweight streaming56# Spark Structured Streaming7# - Already using Spark8# - Unified batch + stream9# - Complex transformations1011# Apache Flink12# - True event-time processing13# - Very low latency needed14# - Complex event processing1516# Cloud Services17# - AWS Kinesis18# - Azure Stream Analytics19# - Google Dataflow7. Simple Streaming Example
7.1 Concept Demo with Python
Python
1import time2from collections import deque3from datetime import datetime4import random56class SimpleStreamProcessor:7 """Simulate stream processing concepts"""8 9 def __init__(self, window_size_seconds=5):10 self.window_size = window_size_seconds11 self.events = deque()12 self.watermark = None13 14 def process_event(self, event):15 """Process incoming event"""16 event_time = event['timestamp']17 value = event['value']18 19 # Add to window20 self.events.append(event)21 22 # Remove old events (tumbling window)23 cutoff = datetime.now().timestamp() - self.window_size24 while self.events and self.events[0]['timestamp'] < cutoff:25 self.events.popleft()26 27 # Compute window aggregate28 if self.events:29 window_sum = sum(e['value'] for e in self.events)30 window_count = len(self.events)31 print(f"Window: count={window_count}, sum={window_sum}, avg={window_sum/window_count:.2f}")32 33 def simulate_stream(self, duration_seconds=30):34 """Simulate streaming events"""35 start = time.time()36 37 while time.time() - start < duration_seconds:38 # Generate random event39 event = {40 'timestamp': datetime.now().timestamp(),41 'value': random.randint(1, 100)42 }43 44 print(f"\nReceived event: value={event['value']}")45 self.process_event(event)46 47 # Sleep random interval48 time.sleep(random.uniform(0.5, 1.5))495051# Demo52processor = SimpleStreamProcessor(window_size_seconds=5)53processor.simulate_stream(duration_seconds=20)7.2 Windowed Aggregation
Python
1from datetime import datetime, timedelta2from collections import defaultdict34class WindowedAggregator:5 """Tumbling window aggregation"""6 7 def __init__(self, window_minutes=1):8 self.window_minutes = window_minutes9 self.windows = defaultdict(list)10 11 def get_window_key(self, timestamp):12 """Get window key for timestamp"""13 dt = datetime.fromtimestamp(timestamp)14 # Round down to window boundary15 minute_bucket = (dt.minute // self.window_minutes) * self.window_minutes16 window_start = dt.replace(minute=minute_bucket, second=0, microsecond=0)17 return window_start.isoformat()18 19 def add_event(self, event):20 """Add event to appropriate window"""21 window_key = self.get_window_key(event['timestamp'])22 self.windows[window_key].append(event)23 24 # Check if window should be emitted25 self.emit_completed_windows()26 27 def emit_completed_windows(self):28 """Emit aggregates for completed windows"""29 current_time = datetime.now()30 current_window = self.get_window_key(current_time.timestamp())31 32 for window_key in list(self.windows.keys()):33 if window_key < current_window:34 events = self.windows[window_key]35 total = sum(e['value'] for e in events)36 count = len(events)37 38 print(f"Window [{window_key}] completed: count={count}, sum={total}")39 del self.windows[window_key]404142# Usage example43agg = WindowedAggregator(window_minutes=1)4445# Simulate events46import time47import random4849for i in range(20):50 event = {51 'timestamp': datetime.now().timestamp(),52 'value': random.randint(10, 50),53 'event_id': i54 }55 print(f"Event {i}: value={event['value']}")56 agg.add_event(event)57 time.sleep(0.5)8. Thực hành
Exercise
Exercise 1: Design Streaming System
Python
1# Scenario: E-commerce real-time analytics2# 3# Events:4# - page_view: {user_id, page, timestamp}5# - add_to_cart: {user_id, product_id, timestamp}6# - purchase: {user_id, order_id, amount, timestamp}7#8# Requirements:9# 1. Real-time dashboard: Active users (last 5 min)10# 2. Alert: Cart abandonment (add_to_cart without purchase in 30 min)11# 3. Metric: Revenue per hour12#13# Design:14# - What windowing strategy for each?15# - What state needs to be maintained?16# - What delivery guarantee needed?1718# YOUR DESIGN HERE💡 Xem đáp án
Python
1"""2Streaming System Design341. Active Users (last 5 min)5 - Window: Sliding window, 5-min size, sliding every 1-min6 - State: Set of unique user_ids per window7 - Guarantee: At-least-once OK (counting unique users)8 - Implementation:9 group by window(5-min, slide 1-min)10 count distinct user_id11122. Cart Abandonment Alert13 - Window: Session window with 30 min timeout14 - State: 15 - Map of user_id -> last_cart_time16 - Watermark of 30 minutes17 - Guarantee: Exactly-once (important for alerts)18 - Implementation:19 - On add_to_cart: store user_id + timestamp20 - On purchase: remove user_id from state21 - Timer: After 30 min of no purchase, emit alert22233. Revenue per Hour24 - Window: Tumbling window, 1-hour25 - State: Running sum of amount26 - Guarantee: Exactly-once (financial data)27 - Implementation:28 purchases29 .group_by(window(1-hour))30 .sum(amount)31"""3233# Pseudo-code implementation sketch34class EcommerceStreaming:35 def __init__(self):36 self.active_users_window = SlidingWindow(size=300, slide=60) # seconds37 self.cart_sessions = {} # user_id -> last_cart_time38 self.hourly_revenue = TumblingWindow(size=3600)39 40 def process_page_view(self, event):41 self.active_users_window.add(event['user_id'], event['timestamp'])42 43 def process_add_to_cart(self, event):44 self.cart_sessions[event['user_id']] = event['timestamp']45 # Set timer for 30 min46 self.schedule_abandonment_check(event['user_id'], event['timestamp'] + 1800)47 48 def process_purchase(self, event):49 # Remove from cart tracking50 if event['user_id'] in self.cart_sessions:51 del self.cart_sessions[event['user_id']]52 # Add to revenue53 self.hourly_revenue.add(event['amount'], event['timestamp'])54 55 def check_abandonment(self, user_id, trigger_time):56 if user_id in self.cart_sessions:57 cart_time = self.cart_sessions[user_id]58 if trigger_time - cart_time >= 1800: # 30 min59 self.emit_alert(f"Cart abandoned: {user_id}")60 del self.cart_sessions[user_id]61 62 def emit_metrics(self):63 print(f"Active users: {self.active_users_window.count_unique()}")64 print(f"Hourly revenue: {self.hourly_revenue.sum()}")9. Tổng kết
| Concept | Description |
|---|---|
| Event Time vs Processing Time | When event happened vs when processed |
| Windowing | Group events (Tumbling, Sliding, Session) |
| Watermarks | Handle late data |
| State | Maintain information across events |
| Delivery Guarantees | At-most/At-least/Exactly-once |
Key Architectures:
- Lambda: Batch + Speed layers
- Kappa: Stream-only with replay
When Streaming:
- Real-time requirements
- Continuous data flow
- Event-driven systems
Bài tiếp theo: Spark Structured Streaming
