Kafka & Message Queues
1. Introduction to Message Queues
1.1 What is a Message Queue?
Message Queue
Message Queue là hệ thống trung gian cho phép các applications giao tiếp bất đồng bộ bằng cách gửi và nhận messages.
Without vs With Message Queue
| Pattern | Flow | Characteristics |
|---|---|---|
| Without MQ | App A → App B (direct) | Synchronous, tightly coupled, A must wait for B |
| With MQ | App A → Message Queue → App B | Asynchronous, decoupled, A doesn't wait, B consumes when ready |
1.2 Benefits of Message Queues
| Benefit | Description |
|---|---|
| Decoupling | Services don't need to know about each other |
| Scalability | Add more consumers to handle load |
| Reliability | Messages persist, survive crashes |
| Async Processing | Non-blocking operations |
| Load Leveling | Buffer spikes in traffic |
2. Apache Kafka Overview
2.1 What is Kafka?
Apache Kafka = Distributed streaming platform
- High-throughput, low-latency
- Fault-tolerant, distributed
- Stores streams of records durably
- Processes streams in real-time
2.2 Kafka Architecture
Kafka Architecture
📤Producer (writes)
🖥️Kafka Cluster
Broker 1, Broker 2, Broker 3
📥Consumer (reads)
🔧ZooKeeper
Metadata & Coordination
2.3 Core Concepts
📋Topic (logical channel)
📦Partition 0
✉️Offset 0: Message A
✉️Offset 1: Message B
✉️Offset 2: Message C
📦Partition 1
✉️Offset 0: Message D
✉️Offset 1: Message E
📦Partition 2
✉️Offset 0: Message F
Key Concepts
| Concept | Description |
|---|---|
| Topic | Named feed of messages |
| Partition | Ordered, immutable sequence |
| Offset | Position in partition |
| Broker | Kafka server |
| Producer | Publishes messages |
| Consumer | Subscribes and reads |
| Consumer Group | Group of consumers sharing work |
3. Kafka Setup
3.1 Docker Compose
yaml
1# docker-compose.yml2version: '3'3services:4 zookeeper:5 image: confluentinc/cp-zookeeper:7.4.06 environment:7 ZOOKEEPER_CLIENT_PORT: 21818 ports:9 - "2181:2181"10 11 kafka:12 image: confluentinc/cp-kafka:7.4.013 depends_on:14 - zookeeper15 ports:16 - "9092:9092"17 environment:18 KAFKA_BROKER_ID: 119 KAFKA_ZOOKEEPER_CONNECT: zookeeper:218120 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:909221 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1Bash
1# Start Kafka2docker-compose up -d3 4# Verify5docker-compose ps3.2 Python Client Setup
Bash
1pip install kafka-python2# or3pip install confluent-kafka # More performant4. Kafka Producer
4.1 Basic Producer
Python
1from kafka import KafkaProducer2import json34# Create producer5producer = KafkaProducer(6 bootstrap_servers=['localhost:9092'],7 value_serializer=lambda v: json.dumps(v).encode('utf-8'),8 key_serializer=lambda k: k.encode('utf-8') if k else None9)1011# Send message (async)12future = producer.send(13 topic='my-topic',14 value={'user_id': 123, 'action': 'click'},15 key='user_123'16)1718# Wait for confirmation (optional)19result = future.get(timeout=10)20print(f"Sent to partition {result.partition} at offset {result.offset}")2122# Flush and close23producer.flush()24producer.close()4.2 Producer with Callbacks
Python
1from kafka import KafkaProducer2import json34def on_send_success(record_metadata):5 print(f"Success: topic={record_metadata.topic}, "6 f"partition={record_metadata.partition}, "7 f"offset={record_metadata.offset}")89def on_send_error(ex):10 print(f"Error: {ex}")1112producer = KafkaProducer(13 bootstrap_servers=['localhost:9092'],14 value_serializer=lambda v: json.dumps(v).encode('utf-8'),15 acks='all', # Wait for all replicas16 retries=3, # Retry on failure17 linger_ms=10, # Batch messages for 10ms18 batch_size=16384 # Batch size in bytes19)2021# Send with callbacks22for i in range(100):23 message = {'id': i, 'data': f'message_{i}'}24 producer.send('my-topic', value=message) \25 .add_callback(on_send_success) \26 .add_errback(on_send_error)2728producer.flush()4.3 Producer Configuration
Python
1producer = KafkaProducer(2 bootstrap_servers=['localhost:9092'],3 4 # Serialization5 value_serializer=lambda v: json.dumps(v).encode('utf-8'),6 key_serializer=lambda k: k.encode('utf-8') if k else None,7 8 # Reliability9 acks='all', # 0, 1, 'all'10 retries=5,11 retry_backoff_ms=100,12 13 # Performance14 batch_size=16384, # Bytes15 linger_ms=5, # Wait time for batching16 buffer_memory=33554432, # Total buffer memory17 18 # Compression19 compression_type='gzip', # none, gzip, snappy, lz4, zstd20 21 # Idempotence (exactly-once)22 enable_idempotence=True23)5. Kafka Consumer
5.1 Basic Consumer
Python
1from kafka import KafkaConsumer2import json34# Create consumer5consumer = KafkaConsumer(6 'my-topic',7 bootstrap_servers=['localhost:9092'],8 auto_offset_reset='earliest', # Start from beginning9 value_deserializer=lambda m: json.loads(m.decode('utf-8')),10 group_id='my-consumer-group'11)1213# Poll messages14for message in consumer:15 print(f"Topic: {message.topic}")16 print(f"Partition: {message.partition}")17 print(f"Offset: {message.offset}")18 print(f"Key: {message.key}")19 print(f"Value: {message.value}")20 print("---")2122consumer.close()5.2 Consumer with Manual Commit
Python
1from kafka import KafkaConsumer23consumer = KafkaConsumer(4 'my-topic',5 bootstrap_servers=['localhost:9092'],6 auto_offset_reset='earliest',7 enable_auto_commit=False, # Manual commit8 group_id='my-consumer-group'9)1011try:12 for message in consumer:13 # Process message14 process_message(message)15 16 # Commit offset after successful processing17 consumer.commit()18 19except Exception as e:20 print(f"Error: {e}")21finally:22 consumer.close()5.3 Consumer Group
Python
1# Consumer Group: Multiple consumers sharing work2# Each partition assigned to exactly one consumer in group34"""5Topic with 4 partitions:6┌────────────────────────────────────────┐7│ Partition 0 │ Partition 1 │ │8│ ↓ │ ↓ │ │9│ Consumer 1 │ Consumer 1 │ │10├───────────────┼───────────────┤ │11│ Partition 2 │ Partition 3 │ │12│ ↓ │ ↓ │ │13│ Consumer 2 │ Consumer 2 │ │14└────────────────────────────────────────┘15162 consumers, 4 partitions = 2 partitions each1718If Consumer 2 fails:19Consumer 1 gets all 4 partitions (rebalancing)20"""2122# Multiple consumers with same group_id23consumer1 = KafkaConsumer('my-topic', group_id='my-group', ...)24consumer2 = KafkaConsumer('my-topic', group_id='my-group', ...)25# Partitions distributed between consumer1 and consumer25.4 Consumer Configuration
Python
1consumer = KafkaConsumer(2 'my-topic',3 bootstrap_servers=['localhost:9092'],4 5 # Consumer group6 group_id='my-consumer-group',7 8 # Offset management9 auto_offset_reset='earliest', # earliest, latest, none10 enable_auto_commit=True,11 auto_commit_interval_ms=5000,12 13 # Polling14 max_poll_records=500,15 max_poll_interval_ms=300000,16 17 # Session18 session_timeout_ms=10000,19 heartbeat_interval_ms=3000,20 21 # Deserialization22 value_deserializer=lambda m: json.loads(m.decode('utf-8')),23 key_deserializer=lambda m: m.decode('utf-8') if m else None24)6. Kafka Admin Operations
6.1 Topic Management
Python
1from kafka.admin import KafkaAdminClient, NewTopic23admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])45# Create topic6topic = NewTopic(7 name='new-topic',8 num_partitions=3,9 replication_factor=1,10 topic_configs={11 'retention.ms': '86400000', # 1 day12 'cleanup.policy': 'delete'13 }14)1516admin.create_topics([topic])1718# List topics19topics = admin.list_topics()20print(topics)2122# Delete topic23admin.delete_topics(['old-topic'])2425admin.close()6.2 CLI Commands
Bash
1# Create topic2kafka-topics.sh --create --topic my-topic \3 --bootstrap-server localhost:9092 \4 --partitions 3 --replication-factor 15 6# List topics7kafka-topics.sh --list --bootstrap-server localhost:90928 9# Describe topic10kafka-topics.sh --describe --topic my-topic \11 --bootstrap-server localhost:909212 13# Produce messages (CLI)14kafka-console-producer.sh --topic my-topic \15 --bootstrap-server localhost:909216 17# Consume messages (CLI)18kafka-console-consumer.sh --topic my-topic \19 --bootstrap-server localhost:9092 --from-beginning20 21# Consumer groups22kafka-consumer-groups.sh --list --bootstrap-server localhost:909223kafka-consumer-groups.sh --describe --group my-group \24 --bootstrap-server localhost:90927. Integration Patterns
7.1 Kafka with Spark Streaming
Python
1from pyspark.sql import SparkSession2from pyspark.sql.functions import *3from pyspark.sql.types import *45spark = SparkSession.builder \6 .appName("Kafka-Spark") \7 .config("spark.jars.packages", 8 "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \9 .getOrCreate()1011# Read from Kafka12df = spark.readStream \13 .format("kafka") \14 .option("kafka.bootstrap.servers", "localhost:9092") \15 .option("subscribe", "my-topic") \16 .option("startingOffsets", "earliest") \17 .load()1819# Parse JSON20schema = StructType([21 StructField("user_id", StringType()),22 StructField("action", StringType()),23 StructField("timestamp", TimestampType())24])2526parsed = df \27 .selectExpr("CAST(value AS STRING)") \28 .select(from_json(col("value"), schema).alias("data")) \29 .select("data.*")3031# Process and write back to Kafka32output = parsed \33 .groupBy(window(col("timestamp"), "5 minutes"), col("action")) \34 .count()3536query = output \37 .selectExpr("to_json(struct(*)) AS value") \38 .writeStream \39 .format("kafka") \40 .option("kafka.bootstrap.servers", "localhost:9092") \41 .option("topic", "processed-topic") \42 .option("checkpointLocation", "/checkpoints/kafka") \43 .start()4445query.awaitTermination()7.2 Event-Driven Architecture
Python
1# Producer: Order Service2def publish_order_event(order):3 event = {4 'event_type': 'ORDER_CREATED',5 'order_id': order['id'],6 'customer_id': order['customer_id'],7 'items': order['items'],8 'total': order['total'],9 'timestamp': datetime.now().isoformat()10 }11 producer.send('orders', key=str(order['id']), value=event)1213# Consumer: Inventory Service14def process_order_for_inventory(message):15 event = message.value16 if event['event_type'] == 'ORDER_CREATED':17 for item in event['items']:18 reserve_inventory(item['product_id'], item['quantity'])1920# Consumer: Notification Service21def process_order_for_notification(message):22 event = message.value23 if event['event_type'] == 'ORDER_CREATED':24 send_confirmation_email(event['customer_id'], event['order_id'])2526# Each service has its own consumer group27inventory_consumer = KafkaConsumer('orders', group_id='inventory-service')28notification_consumer = KafkaConsumer('orders', group_id='notification-service')29# Both receive all messages (different groups)7.3 Dead Letter Queue
Python
1from kafka import KafkaProducer, KafkaConsumer2import json34producer = KafkaProducer(5 bootstrap_servers=['localhost:9092'],6 value_serializer=lambda v: json.dumps(v).encode('utf-8')7)89consumer = KafkaConsumer(10 'main-topic',11 bootstrap_servers=['localhost:9092'],12 group_id='my-group',13 enable_auto_commit=False14)1516MAX_RETRIES = 31718for message in consumer:19 retries = 020 while retries < MAX_RETRIES:21 try:22 process_message(message.value)23 consumer.commit()24 break25 except Exception as e:26 retries += 127 if retries == MAX_RETRIES:28 # Send to Dead Letter Queue29 producer.send(30 'main-topic-dlq',31 value={32 'original_message': message.value,33 'error': str(e),34 'original_topic': message.topic,35 'original_partition': message.partition,36 'original_offset': message.offset37 }38 )39 consumer.commit()8. Production Best Practices
8.1 Partitioning Strategy
Python
1# Key-based partitioning (default)2# Same key always goes to same partition3producer.send('orders', key='customer_123', value=order)4# All orders for customer_123 in same partition = ordered56# Custom partitioner7from kafka.partitioner import Partitioner89class CustomPartitioner(Partitioner):10 def partition(self, topic, key, all_partitions, available_partitions):11 # Custom logic12 if key:13 return hash(key) % len(all_partitions)14 return 01516producer = KafkaProducer(partitioner=CustomPartitioner())8.2 Monitoring
Python
1# Producer metrics2metrics = producer.metrics()3print(metrics)45# Consumer lag6from kafka import KafkaAdminClient7from kafka.admin import ConsumerGroupCommand89# Check consumer lag via CLI10# kafka-consumer-groups.sh --describe --group my-group \11# --bootstrap-server localhost:90928.3 Error Handling
Python
1from kafka.errors import KafkaError, NoBrokersAvailable23def safe_produce(producer, topic, value, retries=3):4 for attempt in range(retries):5 try:6 future = producer.send(topic, value=value)7 result = future.get(timeout=10)8 return result9 except NoBrokersAvailable:10 print(f"No brokers available, retry {attempt + 1}")11 time.sleep(1)12 except KafkaError as e:13 print(f"Kafka error: {e}")14 if attempt == retries - 1:15 raise16 raise Exception("Failed to produce after retries")9. Thực hành
Kafka Exercise
Exercise: Build Event Pipeline
Python
1# Scenario: Real-time clickstream processing2# 3# 1. Producer: Generate click events4# - user_id, page_url, timestamp, session_id5#6# 2. Consumer 1: Count clicks per page (last 5 min)7#8# 3. Consumer 2: Detect suspicious activity9# - Alert if user has >100 clicks in 1 minute10#11# Requirements:12# - Use consumer groups13# - Handle errors with DLQ14# - Include proper serialization1516# YOUR CODE HERE💡 Xem đáp án
Python
1# producer.py2from kafka import KafkaProducer3import json4import random5import time6from datetime import datetime78producer = KafkaProducer(9 bootstrap_servers=['localhost:9092'],10 value_serializer=lambda v: json.dumps(v).encode('utf-8'),11 key_serializer=lambda k: k.encode('utf-8')12)1314pages = ['/home', '/products', '/cart', '/checkout', '/profile']15users = [f'user_{i}' for i in range(1, 101)]1617def generate_click_event():18 user_id = random.choice(users)19 return {20 'user_id': user_id,21 'page_url': random.choice(pages),22 'timestamp': datetime.now().isoformat(),23 'session_id': f'{user_id}_session_{random.randint(1, 5)}'24 }2526print("Starting producer...")27while True:28 event = generate_click_event()29 producer.send('clicks', key=event['user_id'], value=event)30 print(f"Sent: {event['user_id']} -> {event['page_url']}")31 time.sleep(random.uniform(0.01, 0.1)) # Simulate varied traffic323334# consumer_page_counts.py35from kafka import KafkaConsumer36from collections import defaultdict37from datetime import datetime, timedelta38import json3940consumer = KafkaConsumer(41 'clicks',42 bootstrap_servers=['localhost:9092'],43 group_id='page-counter-group',44 value_deserializer=lambda m: json.loads(m.decode('utf-8')),45 auto_offset_reset='latest'46)4748# Simple in-memory window (in production, use Redis or similar)49window_counts = defaultdict(int)50window_start = datetime.now()51WINDOW_SIZE = timedelta(minutes=5)5253print("Starting page counter...")54for message in consumer:55 event = message.value56 page = event['page_url']57 58 # Check if window expired59 if datetime.now() - window_start > WINDOW_SIZE:60 print(f"\n=== Window Summary ===")61 for page, count in sorted(window_counts.items(), key=lambda x: -x[1]):62 print(f" {page}: {count} clicks")63 window_counts.clear()64 window_start = datetime.now()65 66 window_counts[page] += 1676869# consumer_fraud_detection.py70from kafka import KafkaConsumer, KafkaProducer71from collections import defaultdict72from datetime import datetime, timedelta73import json7475consumer = KafkaConsumer(76 'clicks',77 bootstrap_servers=['localhost:9092'],78 group_id='fraud-detection-group',79 value_deserializer=lambda m: json.loads(m.decode('utf-8')),80 auto_offset_reset='latest'81)8283alert_producer = KafkaProducer(84 bootstrap_servers=['localhost:9092'],85 value_serializer=lambda v: json.dumps(v).encode('utf-8')86)8788# Track clicks per user (sliding window simulation)89user_clicks = defaultdict(list)90THRESHOLD = 10091WINDOW_MINUTES = 19293print("Starting fraud detector...")94for message in consumer:95 event = message.value96 user_id = event['user_id']97 event_time = datetime.fromisoformat(event['timestamp'])98 99 # Add to user's click history100 user_clicks[user_id].append(event_time)101 102 # Remove old clicks (outside window)103 cutoff = event_time - timedelta(minutes=WINDOW_MINUTES)104 user_clicks[user_id] = [t for t in user_clicks[user_id] if t > cutoff]105 106 # Check threshold107 click_count = len(user_clicks[user_id])108 if click_count > THRESHOLD:109 alert = {110 'alert_type': 'SUSPICIOUS_ACTIVITY',111 'user_id': user_id,112 'click_count': click_count,113 'window_minutes': WINDOW_MINUTES,114 'timestamp': datetime.now().isoformat()115 }116 alert_producer.send('alerts', value=alert)117 print(f"🚨 ALERT: {user_id} has {click_count} clicks in {WINDOW_MINUTES} min!")118 user_clicks[user_id] = [] # Reset after alert10. Tổng kết
| Concept | Description |
|---|---|
| Topic | Logical channel for messages |
| Partition | Ordered, immutable log segment |
| Consumer Group | Load balancing across consumers |
| Offset | Message position in partition |
| Producer | Publishes messages to topics |
| Consumer | Subscribes and processes messages |
Key Configurations:
acks: Reliability (0, 1, all)auto.offset.reset: Where to start (earliest, latest)enable.auto.commit: Offset management
Best Practices:
- Use consumer groups for scalability
- Implement idempotent consumers
- Handle failures with DLQ
- Monitor consumer lag
Bài tiếp theo: Text Processing & NLP Basics
