Kafka & Message Queues
1. Introduction to Message Queues
1.1 What is a Message Queue?
Message Queue
Message Queue là hệ thống trung gian cho phép các applications giao tiếp bất đồng bộ bằng cách gửi và nhận messages.
Text
1┌─────────────────────────────────────────────────────────┐2│ Without Message Queue │3├─────────────────────────────────────────────────────────┤4│ │5│ [App A] ──────────────────────────────> [App B] │6│ Synchronous, tightly coupled │7│ A must wait for B │8│ │9├─────────────────────────────────────────────────────────┤10│ With Message Queue │11├─────────────────────────────────────────────────────────┤12│ │13│ [App A] ──> [Message Queue] ──> [App B] │14│ Asynchronous, decoupled │15│ A doesn't wait, B consumes when ready │16│ │17└─────────────────────────────────────────────────────────┘1.2 Benefits of Message Queues
| Benefit | Description |
|---|---|
| Decoupling | Services don't need to know about each other |
| Scalability | Add more consumers to handle load |
| Reliability | Messages persist, survive crashes |
| Async Processing | Non-blocking operations |
| Load Leveling | Buffer spikes in traffic |
2. Apache Kafka Overview
2.1 What is Kafka?
Apache Kafka = Distributed streaming platform
- High-throughput, low-latency
- Fault-tolerant, distributed
- Stores streams of records durably
- Processes streams in real-time
2.2 Kafka Architecture
Text
1┌─────────────────────────────────────────────────────────┐2│ Kafka Cluster │3├─────────────────────────────────────────────────────────┤4│ │5│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │6│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │7│ └──────────┘ └──────────┘ └──────────┘ │8│ │ │ │ │9│ └─────────────┼─────────────┘ │10│ │ │11│ ┌──────────────────┴──────────────────┐ │12│ │ ZooKeeper │ │13│ │ (metadata, coordination) │ │14│ └─────────────────────────────────────┘ │15│ │16└─────────────────────────────────────────────────────────┘17 18 ▲ │19 │ v20┌──────────┐ ┌──────────┐21│ Producer │ │ Consumer │22│ (writes) │ │ (reads) │23└──────────┘ └──────────┘2.3 Core Concepts
Text
1TOPIC (logical channel)2├── Partition 03│ ├── Offset 0: Message A4│ ├── Offset 1: Message B5│ └── Offset 2: Message C6├── Partition 17│ ├── Offset 0: Message D8│ └── Offset 1: Message E9└── Partition 210 └── Offset 0: Message F11 12KEY CONCEPTS:13- Topic: Named feed of messages14- Partition: Ordered, immutable sequence15- Offset: Position in partition16- Broker: Kafka server17- Producer: Publishes messages18- Consumer: Subscribes and reads19- Consumer Group: Group of consumers sharing work3. Kafka Setup
3.1 Docker Compose
yaml
1# docker-compose.yml2version: '3'3services:4 zookeeper:5 image: confluentinc/cp-zookeeper:7.4.06 environment:7 ZOOKEEPER_CLIENT_PORT: 21818 ports:9 - "2181:2181"10 11 kafka:12 image: confluentinc/cp-kafka:7.4.013 depends_on:14 - zookeeper15 ports:16 - "9092:9092"17 environment:18 KAFKA_BROKER_ID: 119 KAFKA_ZOOKEEPER_CONNECT: zookeeper:218120 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:909221 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1Bash
1# Start Kafka2docker-compose up -d3 4# Verify5docker-compose ps3.2 Python Client Setup
Bash
1pip install kafka-python2# or3pip install confluent-kafka # More performant4. Kafka Producer
4.1 Basic Producer
Python
1from kafka import KafkaProducer2import json34# Create producer5producer = KafkaProducer(6 bootstrap_servers=['localhost:9092'],7 value_serializer=lambda v: json.dumps(v).encode('utf-8'),8 key_serializer=lambda k: k.encode('utf-8') if k else None9)1011# Send message (async)12future = producer.send(13 topic='my-topic',14 value={'user_id': 123, 'action': 'click'},15 key='user_123'16)1718# Wait for confirmation (optional)19result = future.get(timeout=10)20print(f"Sent to partition {result.partition} at offset {result.offset}")2122# Flush and close23producer.flush()24producer.close()4.2 Producer with Callbacks
Python
1from kafka import KafkaProducer2import json34def on_send_success(record_metadata):5 print(f"Success: topic={record_metadata.topic}, "6 f"partition={record_metadata.partition}, "7 f"offset={record_metadata.offset}")89def on_send_error(ex):10 print(f"Error: {ex}")1112producer = KafkaProducer(13 bootstrap_servers=['localhost:9092'],14 value_serializer=lambda v: json.dumps(v).encode('utf-8'),15 acks='all', # Wait for all replicas16 retries=3, # Retry on failure17 linger_ms=10, # Batch messages for 10ms18 batch_size=16384 # Batch size in bytes19)2021# Send with callbacks22for i in range(100):23 message = {'id': i, 'data': f'message_{i}'}24 producer.send('my-topic', value=message) \25 .add_callback(on_send_success) \26 .add_errback(on_send_error)2728producer.flush()4.3 Producer Configuration
Python
1producer = KafkaProducer(2 bootstrap_servers=['localhost:9092'],3 4 # Serialization5 value_serializer=lambda v: json.dumps(v).encode('utf-8'),6 key_serializer=lambda k: k.encode('utf-8') if k else None,7 8 # Reliability9 acks='all', # 0, 1, 'all'10 retries=5,11 retry_backoff_ms=100,12 13 # Performance14 batch_size=16384, # Bytes15 linger_ms=5, # Wait time for batching16 buffer_memory=33554432, # Total buffer memory17 18 # Compression19 compression_type='gzip', # none, gzip, snappy, lz4, zstd20 21 # Idempotence (exactly-once)22 enable_idempotence=True23)5. Kafka Consumer
5.1 Basic Consumer
Python
1from kafka import KafkaConsumer2import json34# Create consumer5consumer = KafkaConsumer(6 'my-topic',7 bootstrap_servers=['localhost:9092'],8 auto_offset_reset='earliest', # Start from beginning9 value_deserializer=lambda m: json.loads(m.decode('utf-8')),10 group_id='my-consumer-group'11)1213# Poll messages14for message in consumer:15 print(f"Topic: {message.topic}")16 print(f"Partition: {message.partition}")17 print(f"Offset: {message.offset}")18 print(f"Key: {message.key}")19 print(f"Value: {message.value}")20 print("---")2122consumer.close()5.2 Consumer with Manual Commit
Python
1from kafka import KafkaConsumer23consumer = KafkaConsumer(4 'my-topic',5 bootstrap_servers=['localhost:9092'],6 auto_offset_reset='earliest',7 enable_auto_commit=False, # Manual commit8 group_id='my-consumer-group'9)1011try:12 for message in consumer:13 # Process message14 process_message(message)15 16 # Commit offset after successful processing17 consumer.commit()18 19except Exception as e:20 print(f"Error: {e}")21finally:22 consumer.close()5.3 Consumer Group
Python
1# Consumer Group: Multiple consumers sharing work2# Each partition assigned to exactly one consumer in group34"""5Topic with 4 partitions:6┌────────────────────────────────────────┐7│ Partition 0 │ Partition 1 │ │8│ ↓ │ ↓ │ │9│ Consumer 1 │ Consumer 1 │ │10├───────────────┼───────────────┤ │11│ Partition 2 │ Partition 3 │ │12│ ↓ │ ↓ │ │13│ Consumer 2 │ Consumer 2 │ │14└────────────────────────────────────────┘15162 consumers, 4 partitions = 2 partitions each1718If Consumer 2 fails:19Consumer 1 gets all 4 partitions (rebalancing)20"""2122# Multiple consumers with same group_id23consumer1 = KafkaConsumer('my-topic', group_id='my-group', ...)24consumer2 = KafkaConsumer('my-topic', group_id='my-group', ...)25# Partitions distributed between consumer1 and consumer25.4 Consumer Configuration
Python
1consumer = KafkaConsumer(2 'my-topic',3 bootstrap_servers=['localhost:9092'],4 5 # Consumer group6 group_id='my-consumer-group',7 8 # Offset management9 auto_offset_reset='earliest', # earliest, latest, none10 enable_auto_commit=True,11 auto_commit_interval_ms=5000,12 13 # Polling14 max_poll_records=500,15 max_poll_interval_ms=300000,16 17 # Session18 session_timeout_ms=10000,19 heartbeat_interval_ms=3000,20 21 # Deserialization22 value_deserializer=lambda m: json.loads(m.decode('utf-8')),23 key_deserializer=lambda m: m.decode('utf-8') if m else None24)6. Kafka Admin Operations
6.1 Topic Management
Python
1from kafka.admin import KafkaAdminClient, NewTopic23admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])45# Create topic6topic = NewTopic(7 name='new-topic',8 num_partitions=3,9 replication_factor=1,10 topic_configs={11 'retention.ms': '86400000', # 1 day12 'cleanup.policy': 'delete'13 }14)1516admin.create_topics([topic])1718# List topics19topics = admin.list_topics()20print(topics)2122# Delete topic23admin.delete_topics(['old-topic'])2425admin.close()6.2 CLI Commands
Bash
1# Create topic2kafka-topics.sh --create --topic my-topic \3 --bootstrap-server localhost:9092 \4 --partitions 3 --replication-factor 15 6# List topics7kafka-topics.sh --list --bootstrap-server localhost:90928 9# Describe topic10kafka-topics.sh --describe --topic my-topic \11 --bootstrap-server localhost:909212 13# Produce messages (CLI)14kafka-console-producer.sh --topic my-topic \15 --bootstrap-server localhost:909216 17# Consume messages (CLI)18kafka-console-consumer.sh --topic my-topic \19 --bootstrap-server localhost:9092 --from-beginning20 21# Consumer groups22kafka-consumer-groups.sh --list --bootstrap-server localhost:909223kafka-consumer-groups.sh --describe --group my-group \24 --bootstrap-server localhost:90927. Integration Patterns
7.1 Kafka with Spark Streaming
Python
1from pyspark.sql import SparkSession2from pyspark.sql.functions import *3from pyspark.sql.types import *45spark = SparkSession.builder \6 .appName("Kafka-Spark") \7 .config("spark.jars.packages", 8 "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \9 .getOrCreate()1011# Read from Kafka12df = spark.readStream \13 .format("kafka") \14 .option("kafka.bootstrap.servers", "localhost:9092") \15 .option("subscribe", "my-topic") \16 .option("startingOffsets", "earliest") \17 .load()1819# Parse JSON20schema = StructType([21 StructField("user_id", StringType()),22 StructField("action", StringType()),23 StructField("timestamp", TimestampType())24])2526parsed = df \27 .selectExpr("CAST(value AS STRING)") \28 .select(from_json(col("value"), schema).alias("data")) \29 .select("data.*")3031# Process and write back to Kafka32output = parsed \33 .groupBy(window(col("timestamp"), "5 minutes"), col("action")) \34 .count()3536query = output \37 .selectExpr("to_json(struct(*)) AS value") \38 .writeStream \39 .format("kafka") \40 .option("kafka.bootstrap.servers", "localhost:9092") \41 .option("topic", "processed-topic") \42 .option("checkpointLocation", "/checkpoints/kafka") \43 .start()4445query.awaitTermination()7.2 Event-Driven Architecture
Python
1# Producer: Order Service2def publish_order_event(order):3 event = {4 'event_type': 'ORDER_CREATED',5 'order_id': order['id'],6 'customer_id': order['customer_id'],7 'items': order['items'],8 'total': order['total'],9 'timestamp': datetime.now().isoformat()10 }11 producer.send('orders', key=str(order['id']), value=event)1213# Consumer: Inventory Service14def process_order_for_inventory(message):15 event = message.value16 if event['event_type'] == 'ORDER_CREATED':17 for item in event['items']:18 reserve_inventory(item['product_id'], item['quantity'])1920# Consumer: Notification Service21def process_order_for_notification(message):22 event = message.value23 if event['event_type'] == 'ORDER_CREATED':24 send_confirmation_email(event['customer_id'], event['order_id'])2526# Each service has its own consumer group27inventory_consumer = KafkaConsumer('orders', group_id='inventory-service')28notification_consumer = KafkaConsumer('orders', group_id='notification-service')29# Both receive all messages (different groups)7.3 Dead Letter Queue
Python
1from kafka import KafkaProducer, KafkaConsumer2import json34producer = KafkaProducer(5 bootstrap_servers=['localhost:9092'],6 value_serializer=lambda v: json.dumps(v).encode('utf-8')7)89consumer = KafkaConsumer(10 'main-topic',11 bootstrap_servers=['localhost:9092'],12 group_id='my-group',13 enable_auto_commit=False14)1516MAX_RETRIES = 31718for message in consumer:19 retries = 020 while retries < MAX_RETRIES:21 try:22 process_message(message.value)23 consumer.commit()24 break25 except Exception as e:26 retries += 127 if retries == MAX_RETRIES:28 # Send to Dead Letter Queue29 producer.send(30 'main-topic-dlq',31 value={32 'original_message': message.value,33 'error': str(e),34 'original_topic': message.topic,35 'original_partition': message.partition,36 'original_offset': message.offset37 }38 )39 consumer.commit()8. Production Best Practices
8.1 Partitioning Strategy
Python
1# Key-based partitioning (default)2# Same key always goes to same partition3producer.send('orders', key='customer_123', value=order)4# All orders for customer_123 in same partition = ordered56# Custom partitioner7from kafka.partitioner import Partitioner89class CustomPartitioner(Partitioner):10 def partition(self, topic, key, all_partitions, available_partitions):11 # Custom logic12 if key:13 return hash(key) % len(all_partitions)14 return 01516producer = KafkaProducer(partitioner=CustomPartitioner())8.2 Monitoring
Python
1# Producer metrics2metrics = producer.metrics()3print(metrics)45# Consumer lag6from kafka import KafkaAdminClient7from kafka.admin import ConsumerGroupCommand89# Check consumer lag via CLI10# kafka-consumer-groups.sh --describe --group my-group \11# --bootstrap-server localhost:90928.3 Error Handling
Python
1from kafka.errors import KafkaError, NoBrokersAvailable23def safe_produce(producer, topic, value, retries=3):4 for attempt in range(retries):5 try:6 future = producer.send(topic, value=value)7 result = future.get(timeout=10)8 return result9 except NoBrokersAvailable:10 print(f"No brokers available, retry {attempt + 1}")11 time.sleep(1)12 except KafkaError as e:13 print(f"Kafka error: {e}")14 if attempt == retries - 1:15 raise16 raise Exception("Failed to produce after retries")9. Thực hành
Kafka Exercise
Exercise: Build Event Pipeline
Python
1# Scenario: Real-time clickstream processing2# 3# 1. Producer: Generate click events4# - user_id, page_url, timestamp, session_id5#6# 2. Consumer 1: Count clicks per page (last 5 min)7#8# 3. Consumer 2: Detect suspicious activity9# - Alert if user has >100 clicks in 1 minute10#11# Requirements:12# - Use consumer groups13# - Handle errors with DLQ14# - Include proper serialization1516# YOUR CODE HERE💡 Xem đáp án
Python
1# producer.py2from kafka import KafkaProducer3import json4import random5import time6from datetime import datetime78producer = KafkaProducer(9 bootstrap_servers=['localhost:9092'],10 value_serializer=lambda v: json.dumps(v).encode('utf-8'),11 key_serializer=lambda k: k.encode('utf-8')12)1314pages = ['/home', '/products', '/cart', '/checkout', '/profile']15users = [f'user_{i}' for i in range(1, 101)]1617def generate_click_event():18 user_id = random.choice(users)19 return {20 'user_id': user_id,21 'page_url': random.choice(pages),22 'timestamp': datetime.now().isoformat(),23 'session_id': f'{user_id}_session_{random.randint(1, 5)}'24 }2526print("Starting producer...")27while True:28 event = generate_click_event()29 producer.send('clicks', key=event['user_id'], value=event)30 print(f"Sent: {event['user_id']} -> {event['page_url']}")31 time.sleep(random.uniform(0.01, 0.1)) # Simulate varied traffic323334# consumer_page_counts.py35from kafka import KafkaConsumer36from collections import defaultdict37from datetime import datetime, timedelta38import json3940consumer = KafkaConsumer(41 'clicks',42 bootstrap_servers=['localhost:9092'],43 group_id='page-counter-group',44 value_deserializer=lambda m: json.loads(m.decode('utf-8')),45 auto_offset_reset='latest'46)4748# Simple in-memory window (in production, use Redis or similar)49window_counts = defaultdict(int)50window_start = datetime.now()51WINDOW_SIZE = timedelta(minutes=5)5253print("Starting page counter...")54for message in consumer:55 event = message.value56 page = event['page_url']57 58 # Check if window expired59 if datetime.now() - window_start > WINDOW_SIZE:60 print(f"\n=== Window Summary ===")61 for page, count in sorted(window_counts.items(), key=lambda x: -x[1]):62 print(f" {page}: {count} clicks")63 window_counts.clear()64 window_start = datetime.now()65 66 window_counts[page] += 1676869# consumer_fraud_detection.py70from kafka import KafkaConsumer, KafkaProducer71from collections import defaultdict72from datetime import datetime, timedelta73import json7475consumer = KafkaConsumer(76 'clicks',77 bootstrap_servers=['localhost:9092'],78 group_id='fraud-detection-group',79 value_deserializer=lambda m: json.loads(m.decode('utf-8')),80 auto_offset_reset='latest'81)8283alert_producer = KafkaProducer(84 bootstrap_servers=['localhost:9092'],85 value_serializer=lambda v: json.dumps(v).encode('utf-8')86)8788# Track clicks per user (sliding window simulation)89user_clicks = defaultdict(list)90THRESHOLD = 10091WINDOW_MINUTES = 19293print("Starting fraud detector...")94for message in consumer:95 event = message.value96 user_id = event['user_id']97 event_time = datetime.fromisoformat(event['timestamp'])98 99 # Add to user's click history100 user_clicks[user_id].append(event_time)101 102 # Remove old clicks (outside window)103 cutoff = event_time - timedelta(minutes=WINDOW_MINUTES)104 user_clicks[user_id] = [t for t in user_clicks[user_id] if t > cutoff]105 106 # Check threshold107 click_count = len(user_clicks[user_id])108 if click_count > THRESHOLD:109 alert = {110 'alert_type': 'SUSPICIOUS_ACTIVITY',111 'user_id': user_id,112 'click_count': click_count,113 'window_minutes': WINDOW_MINUTES,114 'timestamp': datetime.now().isoformat()115 }116 alert_producer.send('alerts', value=alert)117 print(f"🚨 ALERT: {user_id} has {click_count} clicks in {WINDOW_MINUTES} min!")118 user_clicks[user_id] = [] # Reset after alert10. Tổng kết
| Concept | Description |
|---|---|
| Topic | Logical channel for messages |
| Partition | Ordered, immutable log segment |
| Consumer Group | Load balancing across consumers |
| Offset | Message position in partition |
| Producer | Publishes messages to topics |
| Consumer | Subscribes and processes messages |
Key Configurations:
acks: Reliability (0, 1, all)auto.offset.reset: Where to start (earliest, latest)enable.auto.commit: Offset management
Best Practices:
- Use consumer groups for scalability
- Implement idempotent consumers
- Handle failures with DLQ
- Monitor consumer lag
Bài tiếp theo: Text Processing & NLP Basics
