🎯 Mục tiêu bài học
Áp dụng toàn bộ kiến thức đã học để xây dựng một Document Q&A System hoàn chỉnh với production-level quality.
Sau bài này, bạn sẽ:
✅ Xây dựng end-to-end RAG pipeline ✅ Document processing với chunking strategies ✅ Hybrid search + reranking ✅ Evaluation với RAGAS
🔍 Project Overview
Yêu cầu hệ thống
Xây dựng Vietnamese Document Q&A System có thể:
- Load và process nhiều loại documents (PDF, DOCX, TXT)
- Chunk documents với metadata enrichment
- Hybrid search (BM25 + Semantic)
- Reranking với Cross-Encoder
- Generate answers với citations
- Evaluate quality với RAGAS
Project Setup
1# Create project2mkdir rag-qa-system && cd rag-qa-system3 4# Install dependencies5pip install langchain langchain-openai langchain-chroma6pip install chromadb sentence-transformers7pip install rank-bm25 underthesea ragas8pip install pypdf python-docx unstructured1# requirements.txt2langchain>=0.2.03langchain-openai>=0.1.04langchain-chroma>=0.1.05chromadb>=0.5.06sentence-transformers>=3.0.07rank-bm25>=0.2.28underthesea>=6.8.09ragas>=0.1.010pypdf>=4.0.011python-docx>=1.0.0Project Structure
Checkpoint
Bạn đã hiểu project structure và requirements chưa?
💻 Document Processing
Document Loader
1# src/document_loader.py23from langchain_community.document_loaders import (4 PyPDFLoader,5 Docx2txtLoader,6 TextLoader,7 DirectoryLoader8)9from pathlib import Path10from typing import List11from langchain.schema import Document1213class DocumentLoader:14 """Load documents from various formats."""15 16 LOADER_MAP = {17 ".pdf": PyPDFLoader,18 ".docx": Docx2txtLoader,19 ".txt": TextLoader,20 ".md": TextLoader,21 }22 23 def load_file(self, file_path: str) -> List[Document]:24 """Load a single file."""25 ext = Path(file_path).suffix.lower()26 27 if ext not in self.LOADER_MAP:28 raise ValueError(f"Unsupported format: {ext}")29 30 loader = self.LOADER_MAP[ext](file_path)31 docs = loader.load()32 33 # Add metadata34 for doc in docs:35 doc.metadata["source"] = file_path36 doc.metadata["file_type"] = ext37 doc.metadata["file_name"] = Path(file_path).name38 39 print(f" Loaded {len(docs)} pages from {Path(file_path).name}")40 return docs41 42 def load_directory(self, dir_path: str) -> List[Document]:43 """Load all supported files from directory."""44 all_docs = []45 dir_path = Path(dir_path)46 47 for ext, loader_cls in self.LOADER_MAP.items():48 files = list(dir_path.glob(f"**/*{ext}"))49 for file in files:50 try:51 docs = self.load_file(str(file))52 all_docs.extend(docs)53 except Exception as e:54 print(f" ⚠️ Error loading {file}: {e}")55 56 print(f"\nTotal: {len(all_docs)} documents loaded")57 return all_docsDocument Chunker
1# src/chunker.py23from langchain.text_splitter import RecursiveCharacterTextSplitter4from typing import List5from langchain.schema import Document6from underthesea import sent_tokenize78class DocumentChunker:9 """Smart document chunking with metadata."""10 11 def __init__(self, chunk_size=500, chunk_overlap=50):12 self.splitter = RecursiveCharacterTextSplitter(13 chunk_size=chunk_size,14 chunk_overlap=chunk_overlap,15 separators=["\n\n", "\n", ". ", " ", ""],16 length_function=len17 )18 19 def chunk_documents(self, documents: List[Document]) -> List[Document]:20 """Chunk documents with enriched metadata."""21 chunks = self.splitter.split_documents(documents)22 23 # Enrich metadata24 for i, chunk in enumerate(chunks):25 chunk.metadata["chunk_id"] = i26 chunk.metadata["chunk_size"] = len(chunk.page_content)27 28 # Extract first sentence as summary29 sentences = sent_tokenize(chunk.page_content)30 if sentences:31 chunk.metadata["first_sentence"] = sentences[0][:100]32 33 print(f"Created {len(chunks)} chunks from {len(documents)} documents")34 print(f"Avg chunk size: {sum(len(c.page_content) for c in chunks) / len(chunks):.0f} chars")35 return chunksCheckpoint
Bạn đã implement được DocumentLoader và DocumentChunker chưa?
💻 Retrieval System
Hybrid Retriever
1# src/retriever.py23from langchain_openai import OpenAIEmbeddings4from langchain_chroma import Chroma5from rank_bm25 import BM25Okapi6from sentence_transformers import CrossEncoder7from underthesea import word_tokenize8from typing import List, Dict9import numpy as np1011class HybridRetriever:12 """Hybrid search with BM25 + Semantic + Reranking."""13 14 def __init__(self, chunks, collection_name="qa_system"):15 self.chunks = chunks16 self.contents = [c.page_content for c in chunks]17 18 # BM2519 tokenized = [word_tokenize(c, format="text").split() for c in self.contents]20 self.bm25 = BM25Okapi(tokenized)21 22 # Vector store23 self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")24 self.vectorstore = Chroma.from_documents(25 documents=chunks,26 embedding=self.embeddings,27 collection_name=collection_name28 )29 30 # Reranker31 self.reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")32 33 print(f"Retriever initialized with {len(chunks)} chunks")34 35 def search(self, query: str, top_k: int = 5, initial_k: int = 20) -> List[Dict]:36 """Full retrieval pipeline."""37 38 # Stage 1: BM2539 tokenized_query = word_tokenize(query, format="text").split()40 bm25_scores = self.bm25.get_scores(tokenized_query)41 bm25_top = np.argsort(bm25_scores)[-initial_k:][::-1]42 bm25_results = [(int(i), self.contents[i]) for i in bm25_top if bm25_scores[i] > 0]43 44 # Stage 2: Semantic45 semantic_results = self.vectorstore.similarity_search_with_score(query, k=initial_k)46 47 # Stage 3: Merge (RRF)48 all_contents = {}49 rrf_scores = {}50 k = 6051 52 for rank, (idx, content) in enumerate(bm25_results):53 doc_hash = hash(content)54 all_contents[doc_hash] = {"content": content, "index": idx}55 rrf_scores[doc_hash] = rrf_scores.get(doc_hash, 0) + 1 / (k + rank + 1)56 57 for rank, (doc, score) in enumerate(semantic_results):58 doc_hash = hash(doc.page_content)59 all_contents[doc_hash] = {60 "content": doc.page_content,61 "metadata": doc.metadata62 }63 rrf_scores[doc_hash] = rrf_scores.get(doc_hash, 0) + 1 / (k + rank + 1)64 65 # Stage 4: Rerank top candidates66 sorted_hashes = sorted(rrf_scores, key=rrf_scores.get, reverse=True)[:initial_k]67 candidates = [all_contents[h]["content"] for h in sorted_hashes]68 69 if candidates:70 pairs = [(query, c) for c in candidates]71 rerank_scores = self.reranker.predict(pairs)72 73 results = []74 for content, score in sorted(zip(candidates, rerank_scores), 75 key=lambda x: x[1], reverse=True)[:top_k]:76 results.append({77 "content": content,78 "rerank_score": float(score)79 })80 return results81 82 return []Checkpoint
Bạn đã implement được HybridRetriever với BM25 + Semantic + Reranking chưa?
💻 Answer Generation
Generator with Citations
1# src/generator.py23from langchain_openai import ChatOpenAI4from langchain_core.prompts import ChatPromptTemplate5from langchain_core.output_parsers import StrOutputParser6from typing import List, Dict78class AnswerGenerator:9 """Generate answers with citations."""10 11 def __init__(self, model="gpt-4o-mini"):12 self.llm = ChatOpenAI(model=model, temperature=0)13 14 self.prompt = ChatPromptTemplate.from_template(15 """Bạn là trợ lý AI chuyên trả lời câu hỏi dựa trên tài liệu.16 17 NGUYÊN TẮC:18 - Chỉ trả lời dựa trên context được cung cấp19 - Nếu không tìm thấy câu trả lời, nói rõ20 - Trích dẫn nguồn khi có thể21 - Trả lời bằng tiếng Việt22 23 CONTEXT:24 {context}25 26 CÂU HỎI: {question}27 28 TRẢ LỜI:"""29 )30 31 self.chain = self.prompt | self.llm | StrOutputParser()32 33 def generate(self, question: str, contexts: List[Dict]) -> Dict:34 """Generate answer from retrieved contexts."""35 # Format context36 context_text = "\n\n---\n\n".join([37 f"[Nguồn {i+1}]: {ctx['content']}"38 for i, ctx in enumerate(contexts)39 ])40 41 # Generate42 answer = self.chain.invoke({43 "question": question,44 "context": context_text45 })46 47 return {48 "answer": answer,49 "sources": [ctx.get("content", "")[:100] for ctx in contexts],50 "num_sources": len(contexts)51 }Checkpoint
Bạn đã implement được AnswerGenerator với Vietnamese prompt và citations chưa?
💻 Complete Pipeline
Main Pipeline
1# src/pipeline.py23from .document_loader import DocumentLoader4from .chunker import DocumentChunker5from .retriever import HybridRetriever6from .generator import AnswerGenerator78class DocumentQAPipeline:9 """End-to-end Document Q&A Pipeline."""10 11 def __init__(self, data_dir: str, chunk_size: int = 500):12 print("🚀 Initializing Document Q&A Pipeline...")13 14 # Step 1: Load documents15 print("\n📄 Loading documents...")16 loader = DocumentLoader()17 documents = loader.load_directory(data_dir)18 19 # Step 2: Chunk documents20 print("\n✂️ Chunking documents...")21 chunker = DocumentChunker(chunk_size=chunk_size)22 self.chunks = chunker.chunk_documents(documents)23 24 # Step 3: Build retriever25 print("\n🔍 Building retriever...")26 self.retriever = HybridRetriever(self.chunks)27 28 # Step 4: Initialize generator29 print("\n🤖 Initializing generator...")30 self.generator = AnswerGenerator()31 32 print("\n✅ Pipeline ready!")33 34 def query(self, question: str, top_k: int = 5) -> dict:35 """Answer a question."""36 # Retrieve37 contexts = self.retriever.search(question, top_k=top_k)38 39 # Generate40 result = self.generator.generate(question, contexts)41 result["contexts"] = contexts42 43 return result44 45 def interactive(self):46 """Interactive Q&A mode."""47 print("\n💬 Interactive Q&A Mode (type 'quit' to exit)")48 print("=" * 50)49 50 while True:51 question = input("\n❓ Câu hỏi: ").strip()52 if question.lower() in ["quit", "exit", "q"]:53 print("Goodbye! 👋")54 break55 56 result = self.query(question)57 print(f"\n📝 Trả lời:\n{result['answer']}")58 print(f"\n📚 Sources: {result['num_sources']} documents used")Main Entry Point
1# main.py23from src.pipeline import DocumentQAPipeline45def main():6 # Initialize pipeline7 pipeline = DocumentQAPipeline(data_dir="./data")8 9 # Single query10 result = pipeline.query("Mức lương tối thiểu vùng 1 là bao nhiêu?")11 print(f"\nAnswer: {result['answer']}")12 13 # Interactive mode14 pipeline.interactive()1516if __name__ == "__main__":17 main()Checkpoint
Bạn đã kết nối tất cả components thành end-to-end pipeline chưa?
📊 Evaluation
RAGAS Evaluation
1# evaluation/evaluate.py23from ragas import evaluate4from ragas.metrics import (5 faithfulness,6 answer_relevancy,7 context_precision,8 context_recall9)10from datasets import Dataset11from langchain_openai import ChatOpenAI, OpenAIEmbeddings12import json1314def evaluate_pipeline(pipeline, eval_file="evaluation/eval_data.json"):15 """Evaluate RAG pipeline with RAGAS."""16 17 # Load eval data18 with open(eval_file) as f:19 eval_data = json.load(f)20 21 # Get predictions22 questions = []23 answers = []24 contexts = []25 ground_truths = []26 27 for item in eval_data:28 result = pipeline.query(item["question"])29 30 questions.append(item["question"])31 answers.append(result["answer"])32 contexts.append([c["content"] for c in result["contexts"]])33 ground_truths.append(item["ground_truth"])34 35 # Create dataset36 dataset = Dataset.from_dict({37 "question": questions,38 "answer": answers,39 "contexts": contexts,40 "ground_truth": ground_truths41 })42 43 # Evaluate44 result = evaluate(45 dataset,46 metrics=[faithfulness, answer_relevancy, context_precision, context_recall],47 llm=ChatOpenAI(model="gpt-4o-mini"),48 embeddings=OpenAIEmbeddings()49 )50 51 print("\n📊 RAGAS Evaluation Results:")52 print(f" Faithfulness: {result['faithfulness']:.3f}")53 print(f" Answer Relevancy: {result['answer_relevancy']:.3f}")54 print(f" Context Precision: {result['context_precision']:.3f}")55 print(f" Context Recall: {result['context_recall']:.3f}")56 57 # Detailed per-question58 df = result.to_pandas()59 low_quality = df[df["faithfulness"] < 0.7]60 if len(low_quality) > 0:61 print(f"\n⚠️ {len(low_quality)} questions with low faithfulness:")62 for _, row in low_quality.iterrows():63 print(f" Q: {row['question']}")64 65 return resultEvaluation Data Format
1[2 {3 "question": "Mức lương tối thiểu vùng 1 là bao nhiêu?",4 "ground_truth": "Mức lương tối thiểu vùng 1 là 4.680.000 đồng/tháng theo Nghị định 38/2022."5 },6 {7 "question": "Thời gian nghỉ phép năm là bao nhiêu ngày?",8 "ground_truth": "Người lao động được nghỉ 12 ngày phép mỗi năm."9 }10]Checkpoint
Bạn đã implement được RAGAS evaluation cho pipeline chưa?
🎯 Tổng kết
Rubric đánh giá
| Tiêu chí | Xuất sắc (9-10) | Tốt (7-8) | Đạt (5-6) |
|---|---|---|---|
| Document Processing | Multi-format, metadata enrichment | Chunking + basic metadata | Basic loading |
| Retrieval | Hybrid + Reranking | Semantic + BM25 | Semantic only |
| Generation | Citations, Vietnamese, guardrails | Prompt engineering | Basic generation |
| Evaluation | RAGAS + custom + A/B testing | RAGAS evaluation | Basic testing |
| Code Quality | Clean, modular, documented | Well structured | Working code |
Key Takeaways
- End-to-End Pipeline — Document → Chunks → Index → Retrieve → Generate → Evaluate
- Hybrid Search — BM25 + Semantic + Reranking cho kết quả tốt nhất
- Vietnamese Support — underthesea tokenizer, Vietnamese prompts
- Quality Metrics — RAGAS provides standardized evaluation framework
- Production Ready — Modular architecture, error handling, monitoring
Câu hỏi tự kiểm tra
- Mô tả data flow từ raw document đến final answer trong pipeline của bạn.
- Tại sao hybrid search + reranking tốt hơn semantic search đơn lẻ?
- Làm thế nào để evaluate RAG system một cách systematic?
- Khi nào nên dùng chunk_size lớn vs nhỏ?
🎉 Chúc mừng! Bạn đã hoàn thành toàn bộ khóa học GenAI RAG!
Bạn đã xây dựng được Document Q&A System hoàn chỉnh từ document processing đến evaluation. Hãy tiếp tục mở rộng project với các tính năng nâng cao!
