MinAI - Về trang chủ
Lý thuyết
4/1340 phút
Đang tải...

FastAPI cho AI Applications

Xây dựng AI API chuyên nghiệp với FastAPI - streaming, middleware, background tasks

0

🎯 Mục tiêu bài học

TB5 min

FastAPI là lựa chọn hàng đầu cho AI APIs nhờ async support, auto-documentation, và type safety. Bài này đi sâu vào các patterns đặc thù cho AI apps.

Sau bài này, bạn sẽ:

✅ Hiểu kiến trúc AI API với FastAPI ✅ Implement streaming responses với Server-Sent Events ✅ Xây dựng conversation management cho chat API ✅ Tạo middleware cho authentication và rate limiting ✅ Sử dụng background tasks và Redis caching

1

📐 AI API Architecture

TB5 min
Diagram
Đang vẽ diagram...

Checkpoint

Bạn đã hiểu kiến trúc tổng thể của AI API với middleware, routers và services chưa?

2

⚡ Streaming Responses

TB5 min
python.py
1from fastapi import FastAPI
2from fastapi.responses import StreamingResponse
3from langchain_openai import ChatOpenAI
4from langchain_core.prompts import ChatPromptTemplate
5import json
6
7app = FastAPI()
8llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
9
10chain = (
11 ChatPromptTemplate.from_messages([
12 ("system", "Ban la AI assistant huu ich."),
13 ("human", "{message}")
14 ])
15 | llm
16)
17
18@app.post("/chat/stream")
19async def chat_stream(message: str):
20 async def generate():
21 async for chunk in chain.astream({"message": message}):
22 if hasattr(chunk, 'content') and chunk.content:
23 data = json.dumps({"content": chunk.content})
24 yield f"data: {data}\n\n"
25 yield "data: [DONE]\n\n"
26
27 return StreamingResponse(
28 generate(),
29 media_type="text/event-stream"
30 )

Checkpoint

Bạn đã hiểu cách implement streaming responses với SSE chưa?

3

💻 Conversation Management

TB5 min
python.py
1from pydantic import BaseModel
2from typing import List, Optional
3from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
4
5class Message(BaseModel):
6 role: str
7 content: str
8
9class ChatRequest(BaseModel):
10 messages: List[Message]
11 model: str = "gpt-4o-mini"
12 temperature: float = 0.7
13 max_tokens: int = 1000
14 session_id: Optional[str] = None
15
16# In-memory sessions
17sessions = {}
18
19@app.post("/chat")
20async def chat(request: ChatRequest):
21 llm = ChatOpenAI(
22 model=request.model,
23 temperature=request.temperature,
24 max_tokens=request.max_tokens
25 )
26
27 messages = []
28 for msg in request.messages:
29 if msg.role == "system":
30 messages.append(SystemMessage(content=msg.content))
31 elif msg.role == "user":
32 messages.append(HumanMessage(content=msg.content))
33 elif msg.role == "assistant":
34 messages.append(AIMessage(content=msg.content))
35
36 response = await llm.ainvoke(messages)
37
38 return {
39 "content": response.content,
40 "model": request.model,
41 "usage": response.response_metadata.get("token_usage", {})
42 }

Checkpoint

Bạn đã hiểu cách quản lý conversations và sessions trong AI API chưa?

4

🔒 Middleware

TB5 min

Authentication

python.py
1from fastapi import Request, HTTPException
2from starlette.middleware.base import BaseHTTPMiddleware
3
4class AuthMiddleware(BaseHTTPMiddleware):
5 async def dispatch(self, request: Request, call_next):
6 if request.url.path.startswith("/docs"):
7 return await call_next(request)
8
9 api_key = request.headers.get("X-API-Key")
10 if not api_key or not validate_key(api_key):
11 raise HTTPException(status_code=401, detail="Invalid API key")
12
13 response = await call_next(request)
14 return response
15
16app.add_middleware(AuthMiddleware)

Rate Limiting

python.py
1from collections import defaultdict
2import time
3
4class RateLimitMiddleware(BaseHTTPMiddleware):
5 def __init__(self, app, requests_per_minute=60):
6 super().__init__(app)
7 self.rpm = requests_per_minute
8 self.requests = defaultdict(list)
9
10 async def dispatch(self, request: Request, call_next):
11 client_ip = request.client.host
12 now = time.time()
13
14 # Clean old requests
15 self.requests[client_ip] = [
16 t for t in self.requests[client_ip] if now - t < 60
17 ]
18
19 if len(self.requests[client_ip]) >= self.rpm:
20 raise HTTPException(429, "Rate limit exceeded")
21
22 self.requests[client_ip].append(now)
23 return await call_next(request)
24
25app.add_middleware(RateLimitMiddleware, requests_per_minute=30)

Checkpoint

Bạn đã hiểu cách tạo middleware cho authentication và rate limiting chưa?

5

🛠️ Background Tasks

TB5 min
python.py
1from fastapi import BackgroundTasks
2
3async def process_batch_job(texts: list, job_id: str):
4 # Long-running batch processing
5 results = await chain.abatch([{"message": t} for t in texts])
6 # Save results to database
7 save_results(job_id, results)
8
9@app.post("/batch")
10async def create_batch(texts: list, background_tasks: BackgroundTasks):
11 job_id = generate_job_id()
12 background_tasks.add_task(process_batch_job, texts, job_id)
13 return {"job_id": job_id, "status": "processing"}
14
15@app.get("/batch/{job_id}")
16async def get_batch_status(job_id: str):
17 return get_results(job_id)

Checkpoint

Bạn đã hiểu cách sử dụng background tasks cho batch processing chưa?

6

🛠️ Error Handling

TB5 min
python.py
1from fastapi import HTTPException
2from langchain_core.exceptions import OutputParserException
3
4@app.exception_handler(Exception)
5async def global_exception_handler(request: Request, exc: Exception):
6 if isinstance(exc, OutputParserException):
7 return JSONResponse(status_code=422, content={
8 "error": "parse_error",
9 "message": "Failed to parse LLM output"
10 })
11
12 return JSONResponse(status_code=500, content={
13 "error": "internal_error",
14 "message": "An unexpected error occurred"
15 })

Checkpoint

Bạn đã hiểu cách xử lý các loại errors khác nhau trong AI API chưa?

7

⚡ Caching với Redis

TB5 min
python.py
1import redis
2import hashlib
3import json
4
5r = redis.Redis(host="localhost", port=6379, db=0)
6
7def cache_key(message: str, model: str):
8 return hashlib.md5(f"{model}:{message}".encode()).hexdigest()
9
10@app.post("/chat/cached")
11async def chat_cached(message: str, model: str = "gpt-4o-mini"):
12 key = cache_key(message, model)
13
14 # Check cache
15 cached = r.get(key)
16 if cached:
17 return json.loads(cached)
18
19 # Generate
20 response = await chain.ainvoke({"message": message})
21 result = {"content": response.content, "cached": False}
22
23 # Cache for 1 hour
24 r.setex(key, 3600, json.dumps(result))
25
26 return result

Checkpoint

Bạn đã hiểu cách implement Redis caching cho AI API chưa?

8

🎯 Tổng kết

TB5 min

Bài tập thực hành

Hands-on Exercise
  1. Build streaming chat API với SSE
  2. Implement authentication middleware
  3. Add rate limiting
  4. Setup Redis caching

Target: Production-ready AI API với auth, rate limiting, caching

Câu hỏi tự kiểm tra

  1. Server-Sent Events (SSE) hoạt động như thế nào để implement streaming responses cho AI chat API?
  2. Middleware trong FastAPI có thể được sử dụng cho những mục đích gì khi xây dựng AI applications (auth, logging, rate limit)?
  3. Redis caching giúp giảm chi phí và tăng performance cho AI API như thế nào? Giải thích flow cache hit vs cache miss.
  4. Tại sao cần tách riêng các endpoints cho Chat, Completion và Embedding trong AI API architecture?

🎉 Tuyệt vời! Bạn đã hoàn thành bài học FastAPI cho AI Applications!

Tiếp theo: Chúng ta sẽ học Docker Basics để containerize ứng dụng AI.


🚀 Bài tiếp theo

Docker Basics →