Lý thuyết
35 phút
Bài 1/3

Production Architecture

Kiến trúc cho GenAI applications trong production

🏗️ Production Architecture cho GenAI

Deploying GenAI apps vào production đòi hỏi architecture khác với development. Bài này cover các patterns và best practices.

Development vs Production

AspectDevelopmentProduction
Scale1 user1000+ users
LatencySeconds OKMilliseconds matter
ReliabilityCan restartMust be resilient
CostFlat ratePer-request matters
MonitoringConsole logsFull observability

Architecture Patterns

1. Synchronous Pattern

Diagram
graph LR
    C[Client] --> A[API Server]
    A --> L[LLM API]
    L --> A
    A --> C

Use case: Simple Q&A, short responses Pros: Simple, predictable Cons: Slow for long responses

Python
1from fastapi import FastAPI
2from openai import OpenAI
3
4app = FastAPI()
5client = OpenAI()
6
7@app.post("/chat")
8async def chat(message: str):
9 response = client.chat.completions.create(
10 model="gpt-4o-mini",
11 messages=[{"role": "user", "content": message}]
12 )
13 return {"response": response.choices[0].message.content}

2. Streaming Pattern

Diagram
graph LR
    C[Client] --> A[API Server]
    A --> L[LLM API]
    L -.->|Stream| A
    A -.->|Stream| C

Use case: Chat interfaces, long responses Pros: Better UX, lower perceived latency Cons: More complex to implement

Python
1from fastapi import FastAPI
2from fastapi.responses import StreamingResponse
3from openai import OpenAI
4
5app = FastAPI()
6client = OpenAI()
7
8@app.post("/chat/stream")
9async def chat_stream(message: str):
10 async def generate():
11 stream = client.chat.completions.create(
12 model="gpt-4o-mini",
13 messages=[{"role": "user", "content": message}],
14 stream=True
15 )
16 for chunk in stream:
17 if chunk.choices[0].delta.content:
18 yield f"data: {chunk.choices[0].delta.content}\n\n"
19 yield "data: [DONE]\n\n"
20
21 return StreamingResponse(
22 generate(),
23 media_type="text/event-stream"
24 )

3. Async Queue Pattern

Diagram
graph LR
    C[Client] --> A[API Server]
    A --> Q[Queue]
    Q --> W[Worker]
    W --> L[LLM API]
    W --> D[(Database)]
    C -.->|Poll| A

Use case: Long-running tasks, batch processing Pros: Scalable, reliable Cons: Not real-time

Python
1from fastapi import FastAPI, BackgroundTasks
2from redis import Redis
3import uuid
4
5app = FastAPI()
6redis = Redis()
7
8@app.post("/tasks")
9async def create_task(prompt: str, background_tasks: BackgroundTasks):
10 task_id = str(uuid.uuid4())
11
12 # Queue task
13 redis.lpush("task_queue", f"{task_id}:{prompt}")
14
15 # Return immediately
16 return {"task_id": task_id, "status": "queued"}
17
18@app.get("/tasks/{task_id}")
19async def get_task(task_id: str):
20 result = redis.get(f"result:{task_id}")
21 if result:
22 return {"status": "completed", "result": result.decode()}
23 return {"status": "processing"}

Caching Strategies

1. Exact Match Cache

Python
1import hashlib
2from redis import Redis
3
4redis = Redis()
5
6def get_cached_response(prompt: str):
7 cache_key = hashlib.md5(prompt.encode()).hexdigest()
8 cached = redis.get(f"llm_cache:{cache_key}")
9
10 if cached:
11 return cached.decode()
12 return None
13
14def cache_response(prompt: str, response: str, ttl: int = 3600):
15 cache_key = hashlib.md5(prompt.encode()).hexdigest()
16 redis.setex(f"llm_cache:{cache_key}", ttl, response)
17
18# Usage
19@app.post("/chat")
20async def chat(message: str):
21 # Check cache
22 cached = get_cached_response(message)
23 if cached:
24 return {"response": cached, "cached": True}
25
26 # Generate
27 response = await generate_response(message)
28
29 # Cache
30 cache_response(message, response)
31
32 return {"response": response, "cached": False}

2. Semantic Cache

Python
1from langchain.cache import RedisSemanticCache
2from langchain_openai import OpenAIEmbeddings
3import langchain
4
5# Setup semantic cache
6langchain.llm_cache = RedisSemanticCache(
7 redis_url="redis://localhost:6379",
8 embedding=OpenAIEmbeddings(),
9 score_threshold=0.95 # Similarity threshold
10)
11
12# Similar prompts will hit cache
13# "What is Python?" and "Tell me about Python"
14# might return same cached response

Rate Limiting

Token Bucket Algorithm

Python
1from fastapi import FastAPI, HTTPException
2from redis import Redis
3import time
4
5app = FastAPI()
6redis = Redis()
7
8def check_rate_limit(user_id: str, limit: int = 10, window: int = 60):
9 """Token bucket rate limiter"""
10 key = f"rate_limit:{user_id}"
11
12 current = redis.get(key)
13 if current is None:
14 redis.setex(key, window, 1)
15 return True
16
17 if int(current) >= limit:
18 return False
19
20 redis.incr(key)
21 return True
22
23@app.post("/chat")
24async def chat(message: str, user_id: str):
25 if not check_rate_limit(user_id):
26 raise HTTPException(429, "Rate limit exceeded")
27
28 # Process request
29 return await generate_response(message)

Tiered Rate Limits

Python
1TIER_LIMITS = {
2 "free": {"requests": 10, "tokens": 10000},
3 "pro": {"requests": 100, "tokens": 100000},
4 "enterprise": {"requests": 1000, "tokens": 1000000}
5}
6
7def get_user_limits(user_id: str):
8 tier = get_user_tier(user_id) # From database
9 return TIER_LIMITS.get(tier, TIER_LIMITS["free"])

Error Handling

Retry với Exponential Backoff

Python
1import asyncio
2from tenacity import retry, stop_after_attempt, wait_exponential
3
4@retry(
5 stop=stop_after_attempt(3),
6 wait=wait_exponential(multiplier=1, min=1, max=10)
7)
8async def call_llm_with_retry(prompt: str):
9 try:
10 return await generate_response(prompt)
11 except Exception as e:
12 print(f"Retry due to: {e}")
13 raise
14
15# Usage
16@app.post("/chat")
17async def chat(message: str):
18 try:
19 response = await call_llm_with_retry(message)
20 return {"response": response}
21 except Exception as e:
22 return {"error": "Service temporarily unavailable", "retry_after": 60}

Fallback Models

Python
1async def generate_with_fallback(prompt: str):
2 """Try primary model, fallback to secondary"""
3
4 # Try GPT-4
5 try:
6 return await call_gpt4(prompt)
7 except Exception as e:
8 print(f"GPT-4 failed: {e}")
9
10 # Fallback to GPT-3.5
11 try:
12 return await call_gpt35(prompt)
13 except Exception as e:
14 print(f"GPT-3.5 failed: {e}")
15
16 # Final fallback
17 return "I'm experiencing issues. Please try again later."

Scalability Patterns

Horizontal Scaling

yaml
1# docker-compose.yml
2services:
3 api:
4 build: .
5 deploy:
6 replicas: 3
7 environment:
8 - REDIS_URL=redis://redis:6379
9
10 redis:
11 image: redis:alpine
12
13 nginx:
14 image: nginx
15 ports:
16 - "80:80"
17 depends_on:
18 - api

Load Balancing

nginx
1# nginx.conf
2upstream api_servers {
3 least_conn;
4 server api1:8000;
5 server api2:8000;
6 server api3:8000;
7}
8
9server {
10 location /api {
11 proxy_pass http://api_servers;
12 proxy_http_version 1.1;
13 proxy_set_header Connection "";
14 }
15}

Best Practices

Production Checklist
  1. Caching: Implement caching để reduce costs
  2. Rate Limiting: Protect từ abuse
  3. Error Handling: Graceful degradation
  4. Monitoring: Track latency, errors, costs
  5. Scaling: Design for horizontal scale
  6. Security: API keys, authentication

Bài tập thực hành

Hands-on Exercise

Build Production-Ready API:

  1. FastAPI server với streaming
  2. Redis caching
  3. Rate limiting
  4. Error handling với retry
  5. Basic monitoring

Target: API có thể handle 100 concurrent users


Tiếp theo

Bài tiếp theo: FastAPI for AI - Deep dive vào building AI APIs với FastAPI.