MinAI - Về trang chủ
Lý thuyết
11/1335 phút
Đang tải...

Batch Processing và Pipelines

Xử lý văn bản hàng loạt với LLMs - batch processing, async, cost optimization

0

🎯 Mục tiêu bài học

TB5 min

Khi xử lý hàng ngàn documents, batch processing và pipeline optimization là yếu tố then chốt. Bài này hướng dẫn cách scale text processing hiệu quả.

Sau bài này, bạn sẽ:

✅ Implement batch processing với concurrency control ✅ Sử dụng async processing để tối ưu hiệu suất ✅ Build cost tracking và rate limiting systems ✅ Tạo resilient pipelines với error recovery

1

🔍 Batch Architecture

TB5 min
Diagram
Đang vẽ diagram...

Checkpoint

Bạn đã hiểu kiến trúc tổng quát của batch processing chưa?

2

💻 Basic Batch Processing

TB5 min
python.py
1from langchain_openai import ChatOpenAI
2from langchain_core.prompts import ChatPromptTemplate
3from langchain_core.output_parsers import StrOutputParser
4
5llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
6
7chain = (
8 ChatPromptTemplate.from_messages([
9 ("system", "Tom tat text trong 2 cau."),
10 ("human", "{text}")
11 ])
12 | llm
13 | StrOutputParser()
14)
15
16# Batch of documents
17documents = [f"Document content {i}..." for i in range(100)]
18
19# Process in batches
20results = chain.batch(
21 [{"text": doc} for doc in documents],
22 config={"max_concurrency": 10} # 10 parallel requests
23)

Checkpoint

Bạn đã hiểu cách sử dụng batch processing cơ bản chưa?

3

⚡ Async Processing

TB5 min
python.py
1import asyncio
2from typing import List
3
4async def process_documents(chain, documents: List[str], batch_size=20):
5 results = []
6 for i in range(0, len(documents), batch_size):
7 batch = documents[i:i+batch_size]
8 batch_results = await chain.abatch(
9 [{"text": doc} for doc in batch],
10 config={"max_concurrency": 10}
11 )
12 results.extend(batch_results)
13 print(f"Processed {min(i+batch_size, len(documents))}/{len(documents)}")
14 return results
15
16# Run
17results = asyncio.run(process_documents(chain, documents))

Checkpoint

Bạn đã hiểu cách sử dụng async processing để tối ưu hiệu suất chưa?

4

🛠️ Rate Limiting

TB5 min
python.py
1import time
2from functools import wraps
3
4class RateLimiter:
5 def __init__(self, requests_per_minute=60):
6 self.rpm = requests_per_minute
7 self.interval = 60 / requests_per_minute
8 self.last_request = 0
9
10 async def wait(self):
11 now = time.time()
12 elapsed = now - self.last_request
13 if elapsed < self.interval:
14 await asyncio.sleep(self.interval - elapsed)
15 self.last_request = time.time()
16
17limiter = RateLimiter(requests_per_minute=100)
18
19async def rate_limited_process(chain, documents):
20 results = []
21 for doc in documents:
22 await limiter.wait()
23 result = await chain.ainvoke({"text": doc})
24 results.append(result)
25 return results

Checkpoint

Bạn đã hiểu cách implement rate limiting để tránh API limits chưa?

5

📐 Cost Tracking

TB5 min
python.py
1class CostTracker:
2 PRICING = {
3 "gpt-4o-mini": {"input": 0.15, "output": 0.60}, # per 1M tokens
4 "gpt-4o": {"input": 2.50, "output": 10.00},
5 }
6
7 def __init__(self, model="gpt-4o-mini"):
8 self.model = model
9 self.total_input_tokens = 0
10 self.total_output_tokens = 0
11
12 def track(self, response):
13 if hasattr(response, 'response_metadata'):
14 usage = response.response_metadata.get('token_usage', {})
15 self.total_input_tokens += usage.get('prompt_tokens', 0)
16 self.total_output_tokens += usage.get('completion_tokens', 0)
17
18 @property
19 def total_cost(self):
20 pricing = self.PRICING[self.model]
21 input_cost = (self.total_input_tokens / 1_000_000) * pricing["input"]
22 output_cost = (self.total_output_tokens / 1_000_000) * pricing["output"]
23 return input_cost + output_cost
24
25 def report(self):
26 print(f"Input tokens: {self.total_input_tokens:,}")
27 print(f"Output tokens: {self.total_output_tokens:,}")
28 print(f"Total cost: ${self.total_cost:.4f}")
29
30tracker = CostTracker()

Checkpoint

Bạn đã hiểu cách track chi phí khi xử lý batch documents chưa?

6

💻 Document Processing Pipeline

TB5 min
python.py
1from langchain_core.runnables import RunnableParallel, RunnablePassthrough
2import pandas as pd
3
4# Complete pipeline
5def build_pipeline():
6 summarize = ChatPromptTemplate.from_messages([
7 ("system", "Tom tat trong 3 cau."),
8 ("human", "{text}")
9 ]) | llm | StrOutputParser()
10
11 classify = ChatPromptTemplate.from_messages([
12 ("system", "Classify vao: tech/business/health/other. Tra ve 1 tu."),
13 ("human", "{text}")
14 ]) | llm | StrOutputParser()
15
16 extract_keywords = ChatPromptTemplate.from_messages([
17 ("system", "Extract 5 keywords, separated by commas."),
18 ("human", "{text}")
19 ]) | llm | StrOutputParser()
20
21 pipeline = RunnableParallel(
22 summary=summarize,
23 category=classify,
24 keywords=extract_keywords,
25 original=RunnablePassthrough()
26 )
27
28 return pipeline
29
30pipeline = build_pipeline()
31
32# Process batch
33documents = ["Doc 1 content...", "Doc 2 content...", "Doc 3 content..."]
34results = pipeline.batch(
35 [{"text": doc} for doc in documents],
36 config={"max_concurrency": 5}
37)
38
39# To DataFrame
40df = pd.DataFrame([
41 {
42 "text": r["original"]["text"][:100],
43 "summary": r["summary"],
44 "category": r["category"],
45 "keywords": r["keywords"]
46 }
47 for r in results
48])

Checkpoint

Bạn đã hiểu cách build document processing pipeline hoàn chỉnh chưa?

7

🛠️ Error Recovery

TB5 min
python.py
1async def resilient_batch(chain, items, max_retries=3):
2 results = []
3 failed = []
4
5 for i, item in enumerate(items):
6 for attempt in range(max_retries):
7 try:
8 result = await chain.ainvoke(item)
9 results.append({"index": i, "result": result, "status": "success"})
10 break
11 except Exception as e:
12 if attempt == max_retries - 1:
13 results.append({"index": i, "error": str(e), "status": "failed"})
14 failed.append(i)
15 else:
16 await asyncio.sleep(2 ** attempt)
17
18 print(f"Success: {len(results) - len(failed)}, Failed: {len(failed)}")
19 return results

Checkpoint

Bạn đã hiểu cách implement error recovery cho batch processing chưa?

8

🎯 Tổng kết

TB5 min
Performance Tips
  1. Model selection: Dùng gpt-4o-mini cho batch tasks (rẻ hơn 17x)
  2. Concurrency: max_concurrency=10-20 cho OpenAI
  3. Caching: Cache kết quả để tránh duplicate calls
  4. Batching: Group requests để giảm overhead
  5. Async: Dùng async cho I/O-bound tasks
  6. Streaming: Stream results cho real-time feedback
Hands-on Exercise
  1. Build batch document processing pipeline
  2. Implement cost tracking và reporting
  3. Add rate limiting và error recovery
  4. Process 100+ documents với parallel analysis

Target: Pipeline xử lý 1000 documents/giờ với cost tracking

Câu hỏi tự kiểm tra

  1. Khi nào nên sử dụng batch processing thay vì xử lý từng document một?
  2. Async processing giúp tối ưu hiệu suất batch pipeline như thế nào?
  3. Tại sao cần implement rate limiting và error recovery trong batch processing?
  4. Có những chiến lược nào để giảm chi phí khi xử lý hàng ngàn documents với LLMs?

🎉 Tuyệt vời! Bạn đã hoàn thành bài học Batch Processing và Pipelines!

Tiếp theo: Hãy tổng hợp tất cả kiến thức vào Capstone Project — xây dựng Text Processing Platform hoàn chỉnh!


🚀 Bài tiếp theo

Capstone Project - Text Processing Platform →