Lý thuyết
Bài 12/15

Kafka & Message Queues

Apache Kafka fundamentals, producers, consumers và integration patterns

Kafka & Message Queues

Apache Kafka Message Queue

1. Introduction to Message Queues

1.1 What is a Message Queue?

Message Queue

Message Queue là hệ thống trung gian cho phép các applications giao tiếp bất đồng bộ bằng cách gửi và nhận messages.

Text
1┌─────────────────────────────────────────────────────────┐
2│ Without Message Queue │
3├─────────────────────────────────────────────────────────┤
4│ │
5│ [App A] ──────────────────────────────> [App B] │
6│ Synchronous, tightly coupled │
7│ A must wait for B │
8│ │
9├─────────────────────────────────────────────────────────┤
10│ With Message Queue │
11├─────────────────────────────────────────────────────────┤
12│ │
13│ [App A] ──> [Message Queue] ──> [App B] │
14│ Asynchronous, decoupled │
15│ A doesn't wait, B consumes when ready │
16│ │
17└─────────────────────────────────────────────────────────┘

1.2 Benefits of Message Queues

BenefitDescription
DecouplingServices don't need to know about each other
ScalabilityAdd more consumers to handle load
ReliabilityMessages persist, survive crashes
Async ProcessingNon-blocking operations
Load LevelingBuffer spikes in traffic

2. Apache Kafka Overview

2.1 What is Kafka?

Apache Kafka = Distributed streaming platform

  • High-throughput, low-latency
  • Fault-tolerant, distributed
  • Stores streams of records durably
  • Processes streams in real-time

2.2 Kafka Architecture

Text
1┌─────────────────────────────────────────────────────────┐
2│ Kafka Cluster │
3├─────────────────────────────────────────────────────────┤
4│ │
5│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
6│ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │
7│ └──────────┘ └──────────┘ └──────────┘ │
8│ │ │ │ │
9│ └─────────────┼─────────────┘ │
10│ │ │
11│ ┌──────────────────┴──────────────────┐ │
12│ │ ZooKeeper │ │
13│ │ (metadata, coordination) │ │
14│ └─────────────────────────────────────┘ │
15│ │
16└─────────────────────────────────────────────────────────┘
17
18 ▲ │
19 │ v
20┌──────────┐ ┌──────────┐
21│ Producer │ │ Consumer │
22│ (writes) │ │ (reads) │
23└──────────┘ └──────────┘

2.3 Core Concepts

Text
1TOPIC (logical channel)
2├── Partition 0
3│ ├── Offset 0: Message A
4│ ├── Offset 1: Message B
5│ └── Offset 2: Message C
6├── Partition 1
7│ ├── Offset 0: Message D
8│ └── Offset 1: Message E
9└── Partition 2
10 └── Offset 0: Message F
11
12KEY CONCEPTS:
13- Topic: Named feed of messages
14- Partition: Ordered, immutable sequence
15- Offset: Position in partition
16- Broker: Kafka server
17- Producer: Publishes messages
18- Consumer: Subscribes and reads
19- Consumer Group: Group of consumers sharing work

3. Kafka Setup

3.1 Docker Compose

yaml
1# docker-compose.yml
2version: '3'
3services:
4 zookeeper:
5 image: confluentinc/cp-zookeeper:7.4.0
6 environment:
7 ZOOKEEPER_CLIENT_PORT: 2181
8 ports:
9 - "2181:2181"
10
11 kafka:
12 image: confluentinc/cp-kafka:7.4.0
13 depends_on:
14 - zookeeper
15 ports:
16 - "9092:9092"
17 environment:
18 KAFKA_BROKER_ID: 1
19 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
20 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
21 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Bash
1# Start Kafka
2docker-compose up -d
3
4# Verify
5docker-compose ps

3.2 Python Client Setup

Bash
1pip install kafka-python
2# or
3pip install confluent-kafka # More performant

4. Kafka Producer

4.1 Basic Producer

Python
1from kafka import KafkaProducer
2import json
3
4# Create producer
5producer = KafkaProducer(
6 bootstrap_servers=['localhost:9092'],
7 value_serializer=lambda v: json.dumps(v).encode('utf-8'),
8 key_serializer=lambda k: k.encode('utf-8') if k else None
9)
10
11# Send message (async)
12future = producer.send(
13 topic='my-topic',
14 value={'user_id': 123, 'action': 'click'},
15 key='user_123'
16)
17
18# Wait for confirmation (optional)
19result = future.get(timeout=10)
20print(f"Sent to partition {result.partition} at offset {result.offset}")
21
22# Flush and close
23producer.flush()
24producer.close()

4.2 Producer with Callbacks

Python
1from kafka import KafkaProducer
2import json
3
4def on_send_success(record_metadata):
5 print(f"Success: topic={record_metadata.topic}, "
6 f"partition={record_metadata.partition}, "
7 f"offset={record_metadata.offset}")
8
9def on_send_error(ex):
10 print(f"Error: {ex}")
11
12producer = KafkaProducer(
13 bootstrap_servers=['localhost:9092'],
14 value_serializer=lambda v: json.dumps(v).encode('utf-8'),
15 acks='all', # Wait for all replicas
16 retries=3, # Retry on failure
17 linger_ms=10, # Batch messages for 10ms
18 batch_size=16384 # Batch size in bytes
19)
20
21# Send with callbacks
22for i in range(100):
23 message = {'id': i, 'data': f'message_{i}'}
24 producer.send('my-topic', value=message) \
25 .add_callback(on_send_success) \
26 .add_errback(on_send_error)
27
28producer.flush()

4.3 Producer Configuration

Python
1producer = KafkaProducer(
2 bootstrap_servers=['localhost:9092'],
3
4 # Serialization
5 value_serializer=lambda v: json.dumps(v).encode('utf-8'),
6 key_serializer=lambda k: k.encode('utf-8') if k else None,
7
8 # Reliability
9 acks='all', # 0, 1, 'all'
10 retries=5,
11 retry_backoff_ms=100,
12
13 # Performance
14 batch_size=16384, # Bytes
15 linger_ms=5, # Wait time for batching
16 buffer_memory=33554432, # Total buffer memory
17
18 # Compression
19 compression_type='gzip', # none, gzip, snappy, lz4, zstd
20
21 # Idempotence (exactly-once)
22 enable_idempotence=True
23)

5. Kafka Consumer

5.1 Basic Consumer

Python
1from kafka import KafkaConsumer
2import json
3
4# Create consumer
5consumer = KafkaConsumer(
6 'my-topic',
7 bootstrap_servers=['localhost:9092'],
8 auto_offset_reset='earliest', # Start from beginning
9 value_deserializer=lambda m: json.loads(m.decode('utf-8')),
10 group_id='my-consumer-group'
11)
12
13# Poll messages
14for message in consumer:
15 print(f"Topic: {message.topic}")
16 print(f"Partition: {message.partition}")
17 print(f"Offset: {message.offset}")
18 print(f"Key: {message.key}")
19 print(f"Value: {message.value}")
20 print("---")
21
22consumer.close()

5.2 Consumer with Manual Commit

Python
1from kafka import KafkaConsumer
2
3consumer = KafkaConsumer(
4 'my-topic',
5 bootstrap_servers=['localhost:9092'],
6 auto_offset_reset='earliest',
7 enable_auto_commit=False, # Manual commit
8 group_id='my-consumer-group'
9)
10
11try:
12 for message in consumer:
13 # Process message
14 process_message(message)
15
16 # Commit offset after successful processing
17 consumer.commit()
18
19except Exception as e:
20 print(f"Error: {e}")
21finally:
22 consumer.close()

5.3 Consumer Group

Python
1# Consumer Group: Multiple consumers sharing work
2# Each partition assigned to exactly one consumer in group
3
4"""
5Topic with 4 partitions:
6
7 Partition 0 Partition 1
8
9 Consumer 1 Consumer 1
10
11 Partition 2 Partition 3
12
13 Consumer 2 Consumer 2
14
15
162 consumers, 4 partitions = 2 partitions each
17
18If Consumer 2 fails:
19Consumer 1 gets all 4 partitions (rebalancing)
20"""
21
22# Multiple consumers with same group_id
23consumer1 = KafkaConsumer('my-topic', group_id='my-group', ...)
24consumer2 = KafkaConsumer('my-topic', group_id='my-group', ...)
25# Partitions distributed between consumer1 and consumer2

5.4 Consumer Configuration

Python
1consumer = KafkaConsumer(
2 'my-topic',
3 bootstrap_servers=['localhost:9092'],
4
5 # Consumer group
6 group_id='my-consumer-group',
7
8 # Offset management
9 auto_offset_reset='earliest', # earliest, latest, none
10 enable_auto_commit=True,
11 auto_commit_interval_ms=5000,
12
13 # Polling
14 max_poll_records=500,
15 max_poll_interval_ms=300000,
16
17 # Session
18 session_timeout_ms=10000,
19 heartbeat_interval_ms=3000,
20
21 # Deserialization
22 value_deserializer=lambda m: json.loads(m.decode('utf-8')),
23 key_deserializer=lambda m: m.decode('utf-8') if m else None
24)

6. Kafka Admin Operations

6.1 Topic Management

Python
1from kafka.admin import KafkaAdminClient, NewTopic
2
3admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
4
5# Create topic
6topic = NewTopic(
7 name='new-topic',
8 num_partitions=3,
9 replication_factor=1,
10 topic_configs={
11 'retention.ms': '86400000', # 1 day
12 'cleanup.policy': 'delete'
13 }
14)
15
16admin.create_topics([topic])
17
18# List topics
19topics = admin.list_topics()
20print(topics)
21
22# Delete topic
23admin.delete_topics(['old-topic'])
24
25admin.close()

6.2 CLI Commands

Bash
1# Create topic
2kafka-topics.sh --create --topic my-topic \
3 --bootstrap-server localhost:9092 \
4 --partitions 3 --replication-factor 1
5
6# List topics
7kafka-topics.sh --list --bootstrap-server localhost:9092
8
9# Describe topic
10kafka-topics.sh --describe --topic my-topic \
11 --bootstrap-server localhost:9092
12
13# Produce messages (CLI)
14kafka-console-producer.sh --topic my-topic \
15 --bootstrap-server localhost:9092
16
17# Consume messages (CLI)
18kafka-console-consumer.sh --topic my-topic \
19 --bootstrap-server localhost:9092 --from-beginning
20
21# Consumer groups
22kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
23kafka-consumer-groups.sh --describe --group my-group \
24 --bootstrap-server localhost:9092

7. Integration Patterns

7.1 Kafka with Spark Streaming

Python
1from pyspark.sql import SparkSession
2from pyspark.sql.functions import *
3from pyspark.sql.types import *
4
5spark = SparkSession.builder \
6 .appName("Kafka-Spark") \
7 .config("spark.jars.packages",
8 "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
9 .getOrCreate()
10
11# Read from Kafka
12df = spark.readStream \
13 .format("kafka") \
14 .option("kafka.bootstrap.servers", "localhost:9092") \
15 .option("subscribe", "my-topic") \
16 .option("startingOffsets", "earliest") \
17 .load()
18
19# Parse JSON
20schema = StructType([
21 StructField("user_id", StringType()),
22 StructField("action", StringType()),
23 StructField("timestamp", TimestampType())
24])
25
26parsed = df \
27 .selectExpr("CAST(value AS STRING)") \
28 .select(from_json(col("value"), schema).alias("data")) \
29 .select("data.*")
30
31# Process and write back to Kafka
32output = parsed \
33 .groupBy(window(col("timestamp"), "5 minutes"), col("action")) \
34 .count()
35
36query = output \
37 .selectExpr("to_json(struct(*)) AS value") \
38 .writeStream \
39 .format("kafka") \
40 .option("kafka.bootstrap.servers", "localhost:9092") \
41 .option("topic", "processed-topic") \
42 .option("checkpointLocation", "/checkpoints/kafka") \
43 .start()
44
45query.awaitTermination()

7.2 Event-Driven Architecture

Python
1# Producer: Order Service
2def publish_order_event(order):
3 event = {
4 'event_type': 'ORDER_CREATED',
5 'order_id': order['id'],
6 'customer_id': order['customer_id'],
7 'items': order['items'],
8 'total': order['total'],
9 'timestamp': datetime.now().isoformat()
10 }
11 producer.send('orders', key=str(order['id']), value=event)
12
13# Consumer: Inventory Service
14def process_order_for_inventory(message):
15 event = message.value
16 if event['event_type'] == 'ORDER_CREATED':
17 for item in event['items']:
18 reserve_inventory(item['product_id'], item['quantity'])
19
20# Consumer: Notification Service
21def process_order_for_notification(message):
22 event = message.value
23 if event['event_type'] == 'ORDER_CREATED':
24 send_confirmation_email(event['customer_id'], event['order_id'])
25
26# Each service has its own consumer group
27inventory_consumer = KafkaConsumer('orders', group_id='inventory-service')
28notification_consumer = KafkaConsumer('orders', group_id='notification-service')
29# Both receive all messages (different groups)

7.3 Dead Letter Queue

Python
1from kafka import KafkaProducer, KafkaConsumer
2import json
3
4producer = KafkaProducer(
5 bootstrap_servers=['localhost:9092'],
6 value_serializer=lambda v: json.dumps(v).encode('utf-8')
7)
8
9consumer = KafkaConsumer(
10 'main-topic',
11 bootstrap_servers=['localhost:9092'],
12 group_id='my-group',
13 enable_auto_commit=False
14)
15
16MAX_RETRIES = 3
17
18for message in consumer:
19 retries = 0
20 while retries < MAX_RETRIES:
21 try:
22 process_message(message.value)
23 consumer.commit()
24 break
25 except Exception as e:
26 retries += 1
27 if retries == MAX_RETRIES:
28 # Send to Dead Letter Queue
29 producer.send(
30 'main-topic-dlq',
31 value={
32 'original_message': message.value,
33 'error': str(e),
34 'original_topic': message.topic,
35 'original_partition': message.partition,
36 'original_offset': message.offset
37 }
38 )
39 consumer.commit()

8. Production Best Practices

8.1 Partitioning Strategy

Python
1# Key-based partitioning (default)
2# Same key always goes to same partition
3producer.send('orders', key='customer_123', value=order)
4# All orders for customer_123 in same partition = ordered
5
6# Custom partitioner
7from kafka.partitioner import Partitioner
8
9class CustomPartitioner(Partitioner):
10 def partition(self, topic, key, all_partitions, available_partitions):
11 # Custom logic
12 if key:
13 return hash(key) % len(all_partitions)
14 return 0
15
16producer = KafkaProducer(partitioner=CustomPartitioner())

8.2 Monitoring

Python
1# Producer metrics
2metrics = producer.metrics()
3print(metrics)
4
5# Consumer lag
6from kafka import KafkaAdminClient
7from kafka.admin import ConsumerGroupCommand
8
9# Check consumer lag via CLI
10# kafka-consumer-groups.sh --describe --group my-group \
11# --bootstrap-server localhost:9092

8.3 Error Handling

Python
1from kafka.errors import KafkaError, NoBrokersAvailable
2
3def safe_produce(producer, topic, value, retries=3):
4 for attempt in range(retries):
5 try:
6 future = producer.send(topic, value=value)
7 result = future.get(timeout=10)
8 return result
9 except NoBrokersAvailable:
10 print(f"No brokers available, retry {attempt + 1}")
11 time.sleep(1)
12 except KafkaError as e:
13 print(f"Kafka error: {e}")
14 if attempt == retries - 1:
15 raise
16 raise Exception("Failed to produce after retries")

9. Thực hành

Kafka Exercise

Exercise: Build Event Pipeline

Python
1# Scenario: Real-time clickstream processing
2#
3# 1. Producer: Generate click events
4# - user_id, page_url, timestamp, session_id
5#
6# 2. Consumer 1: Count clicks per page (last 5 min)
7#
8# 3. Consumer 2: Detect suspicious activity
9# - Alert if user has >100 clicks in 1 minute
10#
11# Requirements:
12# - Use consumer groups
13# - Handle errors with DLQ
14# - Include proper serialization
15
16# YOUR CODE HERE
💡 Xem đáp án
Python
1# producer.py
2from kafka import KafkaProducer
3import json
4import random
5import time
6from datetime import datetime
7
8producer = KafkaProducer(
9 bootstrap_servers=['localhost:9092'],
10 value_serializer=lambda v: json.dumps(v).encode('utf-8'),
11 key_serializer=lambda k: k.encode('utf-8')
12)
13
14pages = ['/home', '/products', '/cart', '/checkout', '/profile']
15users = [f'user_{i}' for i in range(1, 101)]
16
17def generate_click_event():
18 user_id = random.choice(users)
19 return {
20 'user_id': user_id,
21 'page_url': random.choice(pages),
22 'timestamp': datetime.now().isoformat(),
23 'session_id': f'{user_id}_session_{random.randint(1, 5)}'
24 }
25
26print("Starting producer...")
27while True:
28 event = generate_click_event()
29 producer.send('clicks', key=event['user_id'], value=event)
30 print(f"Sent: {event['user_id']} -> {event['page_url']}")
31 time.sleep(random.uniform(0.01, 0.1)) # Simulate varied traffic
32
33
34# consumer_page_counts.py
35from kafka import KafkaConsumer
36from collections import defaultdict
37from datetime import datetime, timedelta
38import json
39
40consumer = KafkaConsumer(
41 'clicks',
42 bootstrap_servers=['localhost:9092'],
43 group_id='page-counter-group',
44 value_deserializer=lambda m: json.loads(m.decode('utf-8')),
45 auto_offset_reset='latest'
46)
47
48# Simple in-memory window (in production, use Redis or similar)
49window_counts = defaultdict(int)
50window_start = datetime.now()
51WINDOW_SIZE = timedelta(minutes=5)
52
53print("Starting page counter...")
54for message in consumer:
55 event = message.value
56 page = event['page_url']
57
58 # Check if window expired
59 if datetime.now() - window_start > WINDOW_SIZE:
60 print(f"\n=== Window Summary ===")
61 for page, count in sorted(window_counts.items(), key=lambda x: -x[1]):
62 print(f" {page}: {count} clicks")
63 window_counts.clear()
64 window_start = datetime.now()
65
66 window_counts[page] += 1
67
68
69# consumer_fraud_detection.py
70from kafka import KafkaConsumer, KafkaProducer
71from collections import defaultdict
72from datetime import datetime, timedelta
73import json
74
75consumer = KafkaConsumer(
76 'clicks',
77 bootstrap_servers=['localhost:9092'],
78 group_id='fraud-detection-group',
79 value_deserializer=lambda m: json.loads(m.decode('utf-8')),
80 auto_offset_reset='latest'
81)
82
83alert_producer = KafkaProducer(
84 bootstrap_servers=['localhost:9092'],
85 value_serializer=lambda v: json.dumps(v).encode('utf-8')
86)
87
88# Track clicks per user (sliding window simulation)
89user_clicks = defaultdict(list)
90THRESHOLD = 100
91WINDOW_MINUTES = 1
92
93print("Starting fraud detector...")
94for message in consumer:
95 event = message.value
96 user_id = event['user_id']
97 event_time = datetime.fromisoformat(event['timestamp'])
98
99 # Add to user's click history
100 user_clicks[user_id].append(event_time)
101
102 # Remove old clicks (outside window)
103 cutoff = event_time - timedelta(minutes=WINDOW_MINUTES)
104 user_clicks[user_id] = [t for t in user_clicks[user_id] if t > cutoff]
105
106 # Check threshold
107 click_count = len(user_clicks[user_id])
108 if click_count > THRESHOLD:
109 alert = {
110 'alert_type': 'SUSPICIOUS_ACTIVITY',
111 'user_id': user_id,
112 'click_count': click_count,
113 'window_minutes': WINDOW_MINUTES,
114 'timestamp': datetime.now().isoformat()
115 }
116 alert_producer.send('alerts', value=alert)
117 print(f"🚨 ALERT: {user_id} has {click_count} clicks in {WINDOW_MINUTES} min!")
118 user_clicks[user_id] = [] # Reset after alert

10. Tổng kết

ConceptDescription
TopicLogical channel for messages
PartitionOrdered, immutable log segment
Consumer GroupLoad balancing across consumers
OffsetMessage position in partition
ProducerPublishes messages to topics
ConsumerSubscribes and processes messages

Key Configurations:

  • acks: Reliability (0, 1, all)
  • auto.offset.reset: Where to start (earliest, latest)
  • enable.auto.commit: Offset management

Best Practices:

  • Use consumer groups for scalability
  • Implement idempotent consumers
  • Handle failures with DLQ
  • Monitor consumer lag

Bài tiếp theo: Text Processing & NLP Basics