🎯 Mục tiêu bài học
FastAPI là lựa chọn hàng đầu cho AI APIs nhờ async support, auto-documentation, và type safety. Bài này đi sâu vào các patterns đặc thù cho AI apps.
Sau bài này, bạn sẽ:
✅ Hiểu kiến trúc AI API với FastAPI ✅ Implement streaming responses với Server-Sent Events ✅ Xây dựng conversation management cho chat API ✅ Tạo middleware cho authentication và rate limiting ✅ Sử dụng background tasks và Redis caching
📐 AI API Architecture
Checkpoint
Bạn đã hiểu kiến trúc tổng thể của AI API với middleware, routers và services chưa?
⚡ Streaming Responses
1from fastapi import FastAPI2from fastapi.responses import StreamingResponse3from langchain_openai import ChatOpenAI4from langchain_core.prompts import ChatPromptTemplate5import json67app = FastAPI()8llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)910chain = (11 ChatPromptTemplate.from_messages([12 ("system", "Ban la AI assistant huu ich."),13 ("human", "{message}")14 ])15 | llm16)1718@app.post("/chat/stream")19async def chat_stream(message: str):20 async def generate():21 async for chunk in chain.astream({"message": message}):22 if hasattr(chunk, 'content') and chunk.content:23 data = json.dumps({"content": chunk.content})24 yield f"data: {data}\n\n"25 yield "data: [DONE]\n\n"26 27 return StreamingResponse(28 generate(),29 media_type="text/event-stream"30 )Checkpoint
Bạn đã hiểu cách implement streaming responses với SSE chưa?
💻 Conversation Management
1from pydantic import BaseModel2from typing import List, Optional3from langchain_core.messages import HumanMessage, AIMessage, SystemMessage45class Message(BaseModel):6 role: str7 content: str89class ChatRequest(BaseModel):10 messages: List[Message]11 model: str = "gpt-4o-mini"12 temperature: float = 0.713 max_tokens: int = 100014 session_id: Optional[str] = None1516# In-memory sessions17sessions = {}1819@app.post("/chat")20async def chat(request: ChatRequest):21 llm = ChatOpenAI(22 model=request.model,23 temperature=request.temperature,24 max_tokens=request.max_tokens25 )26 27 messages = []28 for msg in request.messages:29 if msg.role == "system":30 messages.append(SystemMessage(content=msg.content))31 elif msg.role == "user":32 messages.append(HumanMessage(content=msg.content))33 elif msg.role == "assistant":34 messages.append(AIMessage(content=msg.content))35 36 response = await llm.ainvoke(messages)37 38 return {39 "content": response.content,40 "model": request.model,41 "usage": response.response_metadata.get("token_usage", {})42 }Checkpoint
Bạn đã hiểu cách quản lý conversations và sessions trong AI API chưa?
🔒 Middleware
Authentication
1from fastapi import Request, HTTPException2from starlette.middleware.base import BaseHTTPMiddleware34class AuthMiddleware(BaseHTTPMiddleware):5 async def dispatch(self, request: Request, call_next):6 if request.url.path.startswith("/docs"):7 return await call_next(request)8 9 api_key = request.headers.get("X-API-Key")10 if not api_key or not validate_key(api_key):11 raise HTTPException(status_code=401, detail="Invalid API key")12 13 response = await call_next(request)14 return response1516app.add_middleware(AuthMiddleware)Rate Limiting
1from collections import defaultdict2import time34class RateLimitMiddleware(BaseHTTPMiddleware):5 def __init__(self, app, requests_per_minute=60):6 super().__init__(app)7 self.rpm = requests_per_minute8 self.requests = defaultdict(list)9 10 async def dispatch(self, request: Request, call_next):11 client_ip = request.client.host12 now = time.time()13 14 # Clean old requests15 self.requests[client_ip] = [16 t for t in self.requests[client_ip] if now - t < 6017 ]18 19 if len(self.requests[client_ip]) >= self.rpm:20 raise HTTPException(429, "Rate limit exceeded")21 22 self.requests[client_ip].append(now)23 return await call_next(request)2425app.add_middleware(RateLimitMiddleware, requests_per_minute=30)Checkpoint
Bạn đã hiểu cách tạo middleware cho authentication và rate limiting chưa?
🛠️ Background Tasks
1from fastapi import BackgroundTasks23async def process_batch_job(texts: list, job_id: str):4 # Long-running batch processing5 results = await chain.abatch([{"message": t} for t in texts])6 # Save results to database7 save_results(job_id, results)89@app.post("/batch")10async def create_batch(texts: list, background_tasks: BackgroundTasks):11 job_id = generate_job_id()12 background_tasks.add_task(process_batch_job, texts, job_id)13 return {"job_id": job_id, "status": "processing"}1415@app.get("/batch/{job_id}")16async def get_batch_status(job_id: str):17 return get_results(job_id)Checkpoint
Bạn đã hiểu cách sử dụng background tasks cho batch processing chưa?
🛠️ Error Handling
1from fastapi import HTTPException2from langchain_core.exceptions import OutputParserException34@app.exception_handler(Exception)5async def global_exception_handler(request: Request, exc: Exception):6 if isinstance(exc, OutputParserException):7 return JSONResponse(status_code=422, content={8 "error": "parse_error",9 "message": "Failed to parse LLM output"10 })11 12 return JSONResponse(status_code=500, content={13 "error": "internal_error",14 "message": "An unexpected error occurred"15 })Checkpoint
Bạn đã hiểu cách xử lý các loại errors khác nhau trong AI API chưa?
⚡ Caching với Redis
1import redis2import hashlib3import json45r = redis.Redis(host="localhost", port=6379, db=0)67def cache_key(message: str, model: str):8 return hashlib.md5(f"{model}:{message}".encode()).hexdigest()910@app.post("/chat/cached")11async def chat_cached(message: str, model: str = "gpt-4o-mini"):12 key = cache_key(message, model)13 14 # Check cache15 cached = r.get(key)16 if cached:17 return json.loads(cached)18 19 # Generate20 response = await chain.ainvoke({"message": message})21 result = {"content": response.content, "cached": False}22 23 # Cache for 1 hour24 r.setex(key, 3600, json.dumps(result))25 26 return resultCheckpoint
Bạn đã hiểu cách implement Redis caching cho AI API chưa?
🎯 Tổng kết
Bài tập thực hành
- Build streaming chat API với SSE
- Implement authentication middleware
- Add rate limiting
- Setup Redis caching
Target: Production-ready AI API với auth, rate limiting, caching
Câu hỏi tự kiểm tra
- Server-Sent Events (SSE) hoạt động như thế nào để implement streaming responses cho AI chat API?
- Middleware trong FastAPI có thể được sử dụng cho những mục đích gì khi xây dựng AI applications (auth, logging, rate limit)?
- Redis caching giúp giảm chi phí và tăng performance cho AI API như thế nào? Giải thích flow cache hit vs cache miss.
- Tại sao cần tách riêng các endpoints cho Chat, Completion và Embedding trong AI API architecture?
🎉 Tuyệt vời! Bạn đã hoàn thành bài học FastAPI cho AI Applications!
Tiếp theo: Chúng ta sẽ học Docker Basics để containerize ứng dụng AI.
