MinAI - Về trang chủ
Dự án
12/13120 phút
Đang tải...

Capstone Project: Document Q&A System

Xây dựng production-ready RAG system hoàn chỉnh

0

🎯 Mục tiêu bài học

TB5 min

Áp dụng toàn bộ kiến thức đã học để xây dựng một Document Q&A System hoàn chỉnh với production-level quality.

Sau bài này, bạn sẽ:

✅ Xây dựng end-to-end RAG pipeline ✅ Document processing với chunking strategies ✅ Hybrid search + reranking ✅ Evaluation với RAGAS

1

🔍 Project Overview

TB5 min

Yêu cầu hệ thống

Xây dựng Vietnamese Document Q&A System có thể:

  • Load và process nhiều loại documents (PDF, DOCX, TXT)
  • Chunk documents với metadata enrichment
  • Hybrid search (BM25 + Semantic)
  • Reranking với Cross-Encoder
  • Generate answers với citations
  • Evaluate quality với RAGAS

Project Setup

Bash
1# Create project
2mkdir rag-qa-system && cd rag-qa-system
3
4# Install dependencies
5pip install langchain langchain-openai langchain-chroma
6pip install chromadb sentence-transformers
7pip install rank-bm25 underthesea ragas
8pip install pypdf python-docx unstructured
python.py
1# requirements.txt
2langchain>=0.2.0
3langchain-openai>=0.1.0
4langchain-chroma>=0.1.0
5chromadb>=0.5.0
6sentence-transformers>=3.0.0
7rank-bm25>=0.2.2
8underthesea>=6.8.0
9ragas>=0.1.0
10pypdf>=4.0.0
11python-docx>=1.0.0

Project Structure

📁rag-qa-system/
📂data/
📂policies/
📂manuals/
📂src/
🐍__init__.py
🐍document_loader.py
🐍chunker.py
🐍retriever.py
🐍generator.py
🐍pipeline.py
📂evaluation/
📄eval_data.json
🐍evaluate.py
📄requirements.txt
🐍main.py

Checkpoint

Bạn đã hiểu project structure và requirements chưa?

2

💻 Document Processing

TB5 min

Document Loader

python.py
1# src/document_loader.py
2
3from langchain_community.document_loaders import (
4 PyPDFLoader,
5 Docx2txtLoader,
6 TextLoader,
7 DirectoryLoader
8)
9from pathlib import Path
10from typing import List
11from langchain.schema import Document
12
13class DocumentLoader:
14 """Load documents from various formats."""
15
16 LOADER_MAP = {
17 ".pdf": PyPDFLoader,
18 ".docx": Docx2txtLoader,
19 ".txt": TextLoader,
20 ".md": TextLoader,
21 }
22
23 def load_file(self, file_path: str) -> List[Document]:
24 """Load a single file."""
25 ext = Path(file_path).suffix.lower()
26
27 if ext not in self.LOADER_MAP:
28 raise ValueError(f"Unsupported format: {ext}")
29
30 loader = self.LOADER_MAP[ext](file_path)
31 docs = loader.load()
32
33 # Add metadata
34 for doc in docs:
35 doc.metadata["source"] = file_path
36 doc.metadata["file_type"] = ext
37 doc.metadata["file_name"] = Path(file_path).name
38
39 print(f" Loaded {len(docs)} pages from {Path(file_path).name}")
40 return docs
41
42 def load_directory(self, dir_path: str) -> List[Document]:
43 """Load all supported files from directory."""
44 all_docs = []
45 dir_path = Path(dir_path)
46
47 for ext, loader_cls in self.LOADER_MAP.items():
48 files = list(dir_path.glob(f"**/*{ext}"))
49 for file in files:
50 try:
51 docs = self.load_file(str(file))
52 all_docs.extend(docs)
53 except Exception as e:
54 print(f" ⚠️ Error loading {file}: {e}")
55
56 print(f"\nTotal: {len(all_docs)} documents loaded")
57 return all_docs

Document Chunker

python.py
1# src/chunker.py
2
3from langchain.text_splitter import RecursiveCharacterTextSplitter
4from typing import List
5from langchain.schema import Document
6from underthesea import sent_tokenize
7
8class DocumentChunker:
9 """Smart document chunking with metadata."""
10
11 def __init__(self, chunk_size=500, chunk_overlap=50):
12 self.splitter = RecursiveCharacterTextSplitter(
13 chunk_size=chunk_size,
14 chunk_overlap=chunk_overlap,
15 separators=["\n\n", "\n", ". ", " ", ""],
16 length_function=len
17 )
18
19 def chunk_documents(self, documents: List[Document]) -> List[Document]:
20 """Chunk documents with enriched metadata."""
21 chunks = self.splitter.split_documents(documents)
22
23 # Enrich metadata
24 for i, chunk in enumerate(chunks):
25 chunk.metadata["chunk_id"] = i
26 chunk.metadata["chunk_size"] = len(chunk.page_content)
27
28 # Extract first sentence as summary
29 sentences = sent_tokenize(chunk.page_content)
30 if sentences:
31 chunk.metadata["first_sentence"] = sentences[0][:100]
32
33 print(f"Created {len(chunks)} chunks from {len(documents)} documents")
34 print(f"Avg chunk size: {sum(len(c.page_content) for c in chunks) / len(chunks):.0f} chars")
35 return chunks

Checkpoint

Bạn đã implement được DocumentLoader và DocumentChunker chưa?

3

💻 Retrieval System

TB5 min

Hybrid Retriever

python.py
1# src/retriever.py
2
3from langchain_openai import OpenAIEmbeddings
4from langchain_chroma import Chroma
5from rank_bm25 import BM25Okapi
6from sentence_transformers import CrossEncoder
7from underthesea import word_tokenize
8from typing import List, Dict
9import numpy as np
10
11class HybridRetriever:
12 """Hybrid search with BM25 + Semantic + Reranking."""
13
14 def __init__(self, chunks, collection_name="qa_system"):
15 self.chunks = chunks
16 self.contents = [c.page_content for c in chunks]
17
18 # BM25
19 tokenized = [word_tokenize(c, format="text").split() for c in self.contents]
20 self.bm25 = BM25Okapi(tokenized)
21
22 # Vector store
23 self.embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
24 self.vectorstore = Chroma.from_documents(
25 documents=chunks,
26 embedding=self.embeddings,
27 collection_name=collection_name
28 )
29
30 # Reranker
31 self.reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
32
33 print(f"Retriever initialized with {len(chunks)} chunks")
34
35 def search(self, query: str, top_k: int = 5, initial_k: int = 20) -> List[Dict]:
36 """Full retrieval pipeline."""
37
38 # Stage 1: BM25
39 tokenized_query = word_tokenize(query, format="text").split()
40 bm25_scores = self.bm25.get_scores(tokenized_query)
41 bm25_top = np.argsort(bm25_scores)[-initial_k:][::-1]
42 bm25_results = [(int(i), self.contents[i]) for i in bm25_top if bm25_scores[i] > 0]
43
44 # Stage 2: Semantic
45 semantic_results = self.vectorstore.similarity_search_with_score(query, k=initial_k)
46
47 # Stage 3: Merge (RRF)
48 all_contents = {}
49 rrf_scores = {}
50 k = 60
51
52 for rank, (idx, content) in enumerate(bm25_results):
53 doc_hash = hash(content)
54 all_contents[doc_hash] = {"content": content, "index": idx}
55 rrf_scores[doc_hash] = rrf_scores.get(doc_hash, 0) + 1 / (k + rank + 1)
56
57 for rank, (doc, score) in enumerate(semantic_results):
58 doc_hash = hash(doc.page_content)
59 all_contents[doc_hash] = {
60 "content": doc.page_content,
61 "metadata": doc.metadata
62 }
63 rrf_scores[doc_hash] = rrf_scores.get(doc_hash, 0) + 1 / (k + rank + 1)
64
65 # Stage 4: Rerank top candidates
66 sorted_hashes = sorted(rrf_scores, key=rrf_scores.get, reverse=True)[:initial_k]
67 candidates = [all_contents[h]["content"] for h in sorted_hashes]
68
69 if candidates:
70 pairs = [(query, c) for c in candidates]
71 rerank_scores = self.reranker.predict(pairs)
72
73 results = []
74 for content, score in sorted(zip(candidates, rerank_scores),
75 key=lambda x: x[1], reverse=True)[:top_k]:
76 results.append({
77 "content": content,
78 "rerank_score": float(score)
79 })
80 return results
81
82 return []

Checkpoint

Bạn đã implement được HybridRetriever với BM25 + Semantic + Reranking chưa?

4

💻 Answer Generation

TB5 min

Generator with Citations

python.py
1# src/generator.py
2
3from langchain_openai import ChatOpenAI
4from langchain_core.prompts import ChatPromptTemplate
5from langchain_core.output_parsers import StrOutputParser
6from typing import List, Dict
7
8class AnswerGenerator:
9 """Generate answers with citations."""
10
11 def __init__(self, model="gpt-4o-mini"):
12 self.llm = ChatOpenAI(model=model, temperature=0)
13
14 self.prompt = ChatPromptTemplate.from_template(
15 """Bạn là trợ lý AI chuyên trả lời câu hỏi dựa trên tài liệu.
16
17 NGUYÊN TC:
18 - Ch tr li da trên context đưc cung cp
19 - Nếu không tìm thy câu tr li, nói rõ
20 - Trích dn ngun khi có th
21 - Tr li bng tiếng Vit
22
23 CONTEXT:
24 {context}
25
26 CÂU HI: {question}
27
28 TR LI:"""
29 )
30
31 self.chain = self.prompt | self.llm | StrOutputParser()
32
33 def generate(self, question: str, contexts: List[Dict]) -> Dict:
34 """Generate answer from retrieved contexts."""
35 # Format context
36 context_text = "\n\n---\n\n".join([
37 f"[Nguồn {i+1}]: {ctx['content']}"
38 for i, ctx in enumerate(contexts)
39 ])
40
41 # Generate
42 answer = self.chain.invoke({
43 "question": question,
44 "context": context_text
45 })
46
47 return {
48 "answer": answer,
49 "sources": [ctx.get("content", "")[:100] for ctx in contexts],
50 "num_sources": len(contexts)
51 }

Checkpoint

Bạn đã implement được AnswerGenerator với Vietnamese prompt và citations chưa?

5

💻 Complete Pipeline

TB5 min

Main Pipeline

python.py
1# src/pipeline.py
2
3from .document_loader import DocumentLoader
4from .chunker import DocumentChunker
5from .retriever import HybridRetriever
6from .generator import AnswerGenerator
7
8class DocumentQAPipeline:
9 """End-to-end Document Q&A Pipeline."""
10
11 def __init__(self, data_dir: str, chunk_size: int = 500):
12 print("🚀 Initializing Document Q&A Pipeline...")
13
14 # Step 1: Load documents
15 print("\n📄 Loading documents...")
16 loader = DocumentLoader()
17 documents = loader.load_directory(data_dir)
18
19 # Step 2: Chunk documents
20 print("\n✂️ Chunking documents...")
21 chunker = DocumentChunker(chunk_size=chunk_size)
22 self.chunks = chunker.chunk_documents(documents)
23
24 # Step 3: Build retriever
25 print("\n🔍 Building retriever...")
26 self.retriever = HybridRetriever(self.chunks)
27
28 # Step 4: Initialize generator
29 print("\n🤖 Initializing generator...")
30 self.generator = AnswerGenerator()
31
32 print("\n✅ Pipeline ready!")
33
34 def query(self, question: str, top_k: int = 5) -> dict:
35 """Answer a question."""
36 # Retrieve
37 contexts = self.retriever.search(question, top_k=top_k)
38
39 # Generate
40 result = self.generator.generate(question, contexts)
41 result["contexts"] = contexts
42
43 return result
44
45 def interactive(self):
46 """Interactive Q&A mode."""
47 print("\n💬 Interactive Q&A Mode (type 'quit' to exit)")
48 print("=" * 50)
49
50 while True:
51 question = input("\n❓ Câu hỏi: ").strip()
52 if question.lower() in ["quit", "exit", "q"]:
53 print("Goodbye! 👋")
54 break
55
56 result = self.query(question)
57 print(f"\n📝 Trả lời:\n{result['answer']}")
58 print(f"\n📚 Sources: {result['num_sources']} documents used")

Main Entry Point

python.py
1# main.py
2
3from src.pipeline import DocumentQAPipeline
4
5def main():
6 # Initialize pipeline
7 pipeline = DocumentQAPipeline(data_dir="./data")
8
9 # Single query
10 result = pipeline.query("Mức lương tối thiểu vùng 1 là bao nhiêu?")
11 print(f"\nAnswer: {result['answer']}")
12
13 # Interactive mode
14 pipeline.interactive()
15
16if __name__ == "__main__":
17 main()

Checkpoint

Bạn đã kết nối tất cả components thành end-to-end pipeline chưa?

6

📊 Evaluation

TB5 min

RAGAS Evaluation

python.py
1# evaluation/evaluate.py
2
3from ragas import evaluate
4from ragas.metrics import (
5 faithfulness,
6 answer_relevancy,
7 context_precision,
8 context_recall
9)
10from datasets import Dataset
11from langchain_openai import ChatOpenAI, OpenAIEmbeddings
12import json
13
14def evaluate_pipeline(pipeline, eval_file="evaluation/eval_data.json"):
15 """Evaluate RAG pipeline with RAGAS."""
16
17 # Load eval data
18 with open(eval_file) as f:
19 eval_data = json.load(f)
20
21 # Get predictions
22 questions = []
23 answers = []
24 contexts = []
25 ground_truths = []
26
27 for item in eval_data:
28 result = pipeline.query(item["question"])
29
30 questions.append(item["question"])
31 answers.append(result["answer"])
32 contexts.append([c["content"] for c in result["contexts"]])
33 ground_truths.append(item["ground_truth"])
34
35 # Create dataset
36 dataset = Dataset.from_dict({
37 "question": questions,
38 "answer": answers,
39 "contexts": contexts,
40 "ground_truth": ground_truths
41 })
42
43 # Evaluate
44 result = evaluate(
45 dataset,
46 metrics=[faithfulness, answer_relevancy, context_precision, context_recall],
47 llm=ChatOpenAI(model="gpt-4o-mini"),
48 embeddings=OpenAIEmbeddings()
49 )
50
51 print("\n📊 RAGAS Evaluation Results:")
52 print(f" Faithfulness: {result['faithfulness']:.3f}")
53 print(f" Answer Relevancy: {result['answer_relevancy']:.3f}")
54 print(f" Context Precision: {result['context_precision']:.3f}")
55 print(f" Context Recall: {result['context_recall']:.3f}")
56
57 # Detailed per-question
58 df = result.to_pandas()
59 low_quality = df[df["faithfulness"] < 0.7]
60 if len(low_quality) > 0:
61 print(f"\n⚠️ {len(low_quality)} questions with low faithfulness:")
62 for _, row in low_quality.iterrows():
63 print(f" Q: {row['question']}")
64
65 return result

Evaluation Data Format

JSON
1[
2 {
3 "question": "Mức lương tối thiểu vùng 1 là bao nhiêu?",
4 "ground_truth": "Mức lương tối thiểu vùng 1 là 4.680.000 đồng/tháng theo Nghị định 38/2022."
5 },
6 {
7 "question": "Thời gian nghỉ phép năm là bao nhiêu ngày?",
8 "ground_truth": "Người lao động được nghỉ 12 ngày phép mỗi năm."
9 }
10]

Checkpoint

Bạn đã implement được RAGAS evaluation cho pipeline chưa?

7

🎯 Tổng kết

TB5 min

Rubric đánh giá

Tiêu chíXuất sắc (9-10)Tốt (7-8)Đạt (5-6)
Document ProcessingMulti-format, metadata enrichmentChunking + basic metadataBasic loading
RetrievalHybrid + RerankingSemantic + BM25Semantic only
GenerationCitations, Vietnamese, guardrailsPrompt engineeringBasic generation
EvaluationRAGAS + custom + A/B testingRAGAS evaluationBasic testing
Code QualityClean, modular, documentedWell structuredWorking code

Key Takeaways

  1. End-to-End Pipeline — Document → Chunks → Index → Retrieve → Generate → Evaluate
  2. Hybrid Search — BM25 + Semantic + Reranking cho kết quả tốt nhất
  3. Vietnamese Support — underthesea tokenizer, Vietnamese prompts
  4. Quality Metrics — RAGAS provides standardized evaluation framework
  5. Production Ready — Modular architecture, error handling, monitoring

Câu hỏi tự kiểm tra

  1. Mô tả data flow từ raw document đến final answer trong pipeline của bạn.
  2. Tại sao hybrid search + reranking tốt hơn semantic search đơn lẻ?
  3. Làm thế nào để evaluate RAG system một cách systematic?
  4. Khi nào nên dùng chunk_size lớn vs nhỏ?

🎉 Chúc mừng! Bạn đã hoàn thành toàn bộ khóa học GenAI RAG!

Bạn đã xây dựng được Document Q&A System hoàn chỉnh từ document processing đến evaluation. Hãy tiếp tục mở rộng project với các tính năng nâng cao!