🎯 Mục tiêu bài học
LCEL (LangChain Expression Language) cho phép build complex text processing pipelines một cách declarative và composable.
Sau bài này, bạn sẽ:
✅ Hiểu LCEL basics và pipe operator pattern ✅ Implement parallel processing với RunnableParallel ✅ Sử dụng branching logic và dynamic routing ✅ Build batch processing và streaming pipelines
🔍 LCEL Basics Recap
1from langchain_openai import ChatOpenAI2from langchain_core.prompts import ChatPromptTemplate3from langchain_core.output_parsers import StrOutputParser45# Basic chain6chain = (7 ChatPromptTemplate.from_messages([8 ("system", "Ban la text processor."),9 ("human", "{input}")10 ])11 | ChatOpenAI(model="gpt-4o-mini")12 | StrOutputParser()13)1415result = chain.invoke({"input": "Tom tat AI"})Checkpoint
Bạn đã nắm vững LCEL basics với pipe operator chưa?
⚡ Parallel Processing
RunnableParallel
1from langchain_core.runnables import RunnableParallel, RunnablePassthrough23# Process text in parallel4analysis = RunnableParallel(5 summary=summary_chain,6 sentiment=sentiment_chain,7 keywords=keywords_chain,8 translation=translate_chain9)1011# All chains run simultaneously12results = analysis.invoke({"text": "Your long article text here..."})13print(results["summary"])14print(results["sentiment"])15print(results["keywords"])Parallel with Passthrough
1# Keep original input alongside results2pipeline = RunnableParallel(3 original=RunnablePassthrough(),4 summary=summary_chain,5 word_count=lambda x: len(x["text"].split())6)78result = pipeline.invoke({"text": "Long text..."})9# result["original"] = {"text": "Long text..."}10# result["summary"] = "Summary..."11# result["word_count"] = 42Checkpoint
Bạn đã hiểu cách sử dụng RunnableParallel để xử lý song song chưa?
📐 Branching Logic
RunnableBranch
1from langchain_core.runnables import RunnableBranch23# Route based on input4text_router = RunnableBranch(5 (lambda x: x["task"] == "summarize", summary_chain),6 (lambda x: x["task"] == "translate", translate_chain),7 (lambda x: x["task"] == "sentiment", sentiment_chain),8 default_chain # fallback9)1011result = text_router.invoke({12 "task": "summarize",13 "text": "Long article text..."14})Dynamic Routing
1from langchain_core.runnables import RunnableLambda23def route_by_length(input_data):4 text = input_data["text"]5 if len(text) < 500:6 return "stuff"7 elif len(text) < 5000:8 return "map_reduce"9 else:10 return "refine"1112def select_chain(method):13 chains = {14 "stuff": stuff_chain,15 "map_reduce": map_reduce_chain,16 "refine": refine_chain17 }18 return chains[method]1920# Dynamic chain selection21pipeline = (22 RunnablePassthrough.assign(23 method=RunnableLambda(route_by_length)24 )25 | RunnableLambda(lambda x: select_chain(x["method"]).invoke(x))26)Checkpoint
Bạn đã hiểu cách sử dụng branching logic và dynamic routing chưa?
📐 Sequential Chains
Multi-step Processing
1from langchain_core.output_parsers import StrOutputParser23llm = ChatOpenAI(model="gpt-4o-mini")45# Step 1: Extract key points6extract = ChatPromptTemplate.from_messages([7 ("system", "Extract 5 key points tu text sau."),8 ("human", "{text}")9]) | llm | StrOutputParser()1011# Step 2: Rewrite concisely12rewrite = ChatPromptTemplate.from_messages([13 ("system", "Rewrite cac key points thanh doan van ngon gon."),14 ("human", "{key_points}")15]) | llm | StrOutputParser()1617# Step 3: Translate18translate = ChatPromptTemplate.from_messages([19 ("system", "Dich sang tieng Anh, giu nguyen y chinh."),20 ("human", "{rewritten}")21]) | llm | StrOutputParser()2223# Full pipeline24pipeline = (25 RunnablePassthrough.assign(key_points=extract)26 | RunnablePassthrough.assign(rewritten=rewrite)27 | RunnablePassthrough.assign(translated=translate)28)2930result = pipeline.invoke({"text": "Ban van ban dai..."})Checkpoint
Bạn đã hiểu cách xây dựng sequential chains với nhiều steps chưa?
⚡ Batch Processing
1# Process multiple texts at once2texts = [3 {"text": "Article 1..."},4 {"text": "Article 2..."},5 {"text": "Article 3..."},6]78# Batch invoke - toi uu hon loop9results = chain.batch(texts, config={"max_concurrency": 5})1011# Async batch12import asyncio1314async def process_batch():15 results = await chain.abatch(texts)16 return results1718results = asyncio.run(process_batch())Checkpoint
Bạn đã hiểu cách sử dụng batch processing trong LCEL chưa?
⚡ Streaming Patterns
1# Stream final output2for chunk in chain.stream({"text": "Long document..."}):3 print(chunk, end="", flush=True)45# Stream with events6async for event in chain.astream_events(7 {"text": "Article..."},8 version="v2"9):10 if event["event"] == "on_chat_model_stream":11 print(event["data"]["chunk"].content, end="")Checkpoint
Bạn đã hiểu cách implement streaming patterns trong LCEL chưa?
🛠️ Error Handling in LCEL
1from langchain_core.runnables import RunnableLambda23def safe_parse(result):4 try:5 return {"status": "success", "data": result}6 except Exception as e:7 return {"status": "error", "error": str(e)}89# Chain with error handling10safe_chain = chain | RunnableLambda(safe_parse)1112# With retry13retry_chain = chain.with_retry(14 stop_after_attempt=3,15 wait_exponential_jitter=True16)Checkpoint
Bạn đã hiểu cách xử lý lỗi và retry trong LCEL pipeline chưa?
💻 Real-world Pipeline
1# Complete text processing pipeline2from langchain_core.runnables import RunnableParallel34# Input: raw article5# Output: processed content pack6content_pipeline = (7 # Step 1: Clean and extract8 RunnablePassthrough.assign(9 cleaned=clean_chain10 )11 # Step 2: Parallel analysis12 | RunnableParallel(13 summary=summary_chain,14 sentiment=sentiment_chain,15 keywords=keyword_chain,16 category=category_chain,17 original=RunnablePassthrough()18 )19 # Step 3: Generate outputs20 | RunnableParallel(21 report=report_chain,22 social_posts=social_chain,23 email_digest=email_chain24 )25)Checkpoint
Bạn đã hiểu cách kết hợp các LCEL patterns để build real-world pipeline chưa?
🎯 Tổng kết
- Build parallel analysis pipeline (summary + sentiment + keywords)
- Implement dynamic routing dựa trên text length
- Tạo batch processing cho nhiều documents
- Build content repurposing pipeline với LCEL
Câu hỏi tự kiểm tra
- RunnableParallel trong LCEL được sử dụng trong tình huống nào và mang lại lợi ích gì?
- Dynamic routing cho phép pipeline xử lý các loại đầu vào khác nhau như thế nào?
- Sự khác biệt giữa sequential pipeline và parallel pipeline trong LCEL là gì?
- Làm thế nào để xử lý lỗi (error handling) trong một LCEL pipeline phức tạp?
🎉 Tuyệt vời! Bạn đã hoàn thành bài học LCEL Patterns nâng cao!
Tiếp theo: Hãy học cách trích xuất dữ liệu có cấu trúc với Structured Output và Parsing!
