🎯 Mục tiêu bài học
Khi xử lý hàng ngàn documents, batch processing và pipeline optimization là yếu tố then chốt. Bài này hướng dẫn cách scale text processing hiệu quả.
Sau bài này, bạn sẽ:
✅ Implement batch processing với concurrency control ✅ Sử dụng async processing để tối ưu hiệu suất ✅ Build cost tracking và rate limiting systems ✅ Tạo resilient pipelines với error recovery
🔍 Batch Architecture
Checkpoint
Bạn đã hiểu kiến trúc tổng quát của batch processing chưa?
💻 Basic Batch Processing
1from langchain_openai import ChatOpenAI2from langchain_core.prompts import ChatPromptTemplate3from langchain_core.output_parsers import StrOutputParser45llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)67chain = (8 ChatPromptTemplate.from_messages([9 ("system", "Tom tat text trong 2 cau."),10 ("human", "{text}")11 ])12 | llm13 | StrOutputParser()14)1516# Batch of documents17documents = [f"Document content {i}..." for i in range(100)]1819# Process in batches20results = chain.batch(21 [{"text": doc} for doc in documents],22 config={"max_concurrency": 10} # 10 parallel requests23)Checkpoint
Bạn đã hiểu cách sử dụng batch processing cơ bản chưa?
⚡ Async Processing
1import asyncio2from typing import List34async def process_documents(chain, documents: List[str], batch_size=20):5 results = []6 for i in range(0, len(documents), batch_size):7 batch = documents[i:i+batch_size]8 batch_results = await chain.abatch(9 [{"text": doc} for doc in batch],10 config={"max_concurrency": 10}11 )12 results.extend(batch_results)13 print(f"Processed {min(i+batch_size, len(documents))}/{len(documents)}")14 return results1516# Run17results = asyncio.run(process_documents(chain, documents))Checkpoint
Bạn đã hiểu cách sử dụng async processing để tối ưu hiệu suất chưa?
🛠️ Rate Limiting
1import time2from functools import wraps34class RateLimiter:5 def __init__(self, requests_per_minute=60):6 self.rpm = requests_per_minute7 self.interval = 60 / requests_per_minute8 self.last_request = 09 10 async def wait(self):11 now = time.time()12 elapsed = now - self.last_request13 if elapsed < self.interval:14 await asyncio.sleep(self.interval - elapsed)15 self.last_request = time.time()1617limiter = RateLimiter(requests_per_minute=100)1819async def rate_limited_process(chain, documents):20 results = []21 for doc in documents:22 await limiter.wait()23 result = await chain.ainvoke({"text": doc})24 results.append(result)25 return resultsCheckpoint
Bạn đã hiểu cách implement rate limiting để tránh API limits chưa?
📐 Cost Tracking
1class CostTracker:2 PRICING = {3 "gpt-4o-mini": {"input": 0.15, "output": 0.60}, # per 1M tokens4 "gpt-4o": {"input": 2.50, "output": 10.00},5 }6 7 def __init__(self, model="gpt-4o-mini"):8 self.model = model9 self.total_input_tokens = 010 self.total_output_tokens = 011 12 def track(self, response):13 if hasattr(response, 'response_metadata'):14 usage = response.response_metadata.get('token_usage', {})15 self.total_input_tokens += usage.get('prompt_tokens', 0)16 self.total_output_tokens += usage.get('completion_tokens', 0)17 18 @property19 def total_cost(self):20 pricing = self.PRICING[self.model]21 input_cost = (self.total_input_tokens / 1_000_000) * pricing["input"]22 output_cost = (self.total_output_tokens / 1_000_000) * pricing["output"]23 return input_cost + output_cost24 25 def report(self):26 print(f"Input tokens: {self.total_input_tokens:,}")27 print(f"Output tokens: {self.total_output_tokens:,}")28 print(f"Total cost: ${self.total_cost:.4f}")2930tracker = CostTracker()Checkpoint
Bạn đã hiểu cách track chi phí khi xử lý batch documents chưa?
💻 Document Processing Pipeline
1from langchain_core.runnables import RunnableParallel, RunnablePassthrough2import pandas as pd34# Complete pipeline5def build_pipeline():6 summarize = ChatPromptTemplate.from_messages([7 ("system", "Tom tat trong 3 cau."),8 ("human", "{text}")9 ]) | llm | StrOutputParser()10 11 classify = ChatPromptTemplate.from_messages([12 ("system", "Classify vao: tech/business/health/other. Tra ve 1 tu."),13 ("human", "{text}")14 ]) | llm | StrOutputParser()15 16 extract_keywords = ChatPromptTemplate.from_messages([17 ("system", "Extract 5 keywords, separated by commas."),18 ("human", "{text}")19 ]) | llm | StrOutputParser()20 21 pipeline = RunnableParallel(22 summary=summarize,23 category=classify,24 keywords=extract_keywords,25 original=RunnablePassthrough()26 )27 28 return pipeline2930pipeline = build_pipeline()3132# Process batch33documents = ["Doc 1 content...", "Doc 2 content...", "Doc 3 content..."]34results = pipeline.batch(35 [{"text": doc} for doc in documents],36 config={"max_concurrency": 5}37)3839# To DataFrame40df = pd.DataFrame([41 {42 "text": r["original"]["text"][:100],43 "summary": r["summary"],44 "category": r["category"],45 "keywords": r["keywords"]46 }47 for r in results48])Checkpoint
Bạn đã hiểu cách build document processing pipeline hoàn chỉnh chưa?
🛠️ Error Recovery
1async def resilient_batch(chain, items, max_retries=3):2 results = []3 failed = []4 5 for i, item in enumerate(items):6 for attempt in range(max_retries):7 try:8 result = await chain.ainvoke(item)9 results.append({"index": i, "result": result, "status": "success"})10 break11 except Exception as e:12 if attempt == max_retries - 1:13 results.append({"index": i, "error": str(e), "status": "failed"})14 failed.append(i)15 else:16 await asyncio.sleep(2 ** attempt)17 18 print(f"Success: {len(results) - len(failed)}, Failed: {len(failed)}")19 return resultsCheckpoint
Bạn đã hiểu cách implement error recovery cho batch processing chưa?
🎯 Tổng kết
- Model selection: Dùng gpt-4o-mini cho batch tasks (rẻ hơn 17x)
- Concurrency: max_concurrency=10-20 cho OpenAI
- Caching: Cache kết quả để tránh duplicate calls
- Batching: Group requests để giảm overhead
- Async: Dùng async cho I/O-bound tasks
- Streaming: Stream results cho real-time feedback
- Build batch document processing pipeline
- Implement cost tracking và reporting
- Add rate limiting và error recovery
- Process 100+ documents với parallel analysis
Target: Pipeline xử lý 1000 documents/giờ với cost tracking
Câu hỏi tự kiểm tra
- Khi nào nên sử dụng batch processing thay vì xử lý từng document một?
- Async processing giúp tối ưu hiệu suất batch pipeline như thế nào?
- Tại sao cần implement rate limiting và error recovery trong batch processing?
- Có những chiến lược nào để giảm chi phí khi xử lý hàng ngàn documents với LLMs?
🎉 Tuyệt vời! Bạn đã hoàn thành bài học Batch Processing và Pipelines!
Tiếp theo: Hãy tổng hợp tất cả kiến thức vào Capstone Project — xây dựng Text Processing Platform hoàn chỉnh!
