MinAI - Về trang chủ
Lý thuyết
6/1335 phút
Đang tải...

LCEL Patterns nâng cao

LangChain Expression Language patterns cho text processing pipelines

0

🎯 Mục tiêu bài học

TB5 min

LCEL (LangChain Expression Language) cho phép build complex text processing pipelines một cách declarative và composable.

Sau bài này, bạn sẽ:

✅ Hiểu LCEL basics và pipe operator pattern ✅ Implement parallel processing với RunnableParallel ✅ Sử dụng branching logic và dynamic routing ✅ Build batch processing và streaming pipelines

1

🔍 LCEL Basics Recap

TB5 min
Diagram
Đang vẽ diagram...
python.py
1from langchain_openai import ChatOpenAI
2from langchain_core.prompts import ChatPromptTemplate
3from langchain_core.output_parsers import StrOutputParser
4
5# Basic chain
6chain = (
7 ChatPromptTemplate.from_messages([
8 ("system", "Ban la text processor."),
9 ("human", "{input}")
10 ])
11 | ChatOpenAI(model="gpt-4o-mini")
12 | StrOutputParser()
13)
14
15result = chain.invoke({"input": "Tom tat AI"})

Checkpoint

Bạn đã nắm vững LCEL basics với pipe operator chưa?

2

⚡ Parallel Processing

TB5 min

RunnableParallel

python.py
1from langchain_core.runnables import RunnableParallel, RunnablePassthrough
2
3# Process text in parallel
4analysis = RunnableParallel(
5 summary=summary_chain,
6 sentiment=sentiment_chain,
7 keywords=keywords_chain,
8 translation=translate_chain
9)
10
11# All chains run simultaneously
12results = analysis.invoke({"text": "Your long article text here..."})
13print(results["summary"])
14print(results["sentiment"])
15print(results["keywords"])

Parallel with Passthrough

python.py
1# Keep original input alongside results
2pipeline = RunnableParallel(
3 original=RunnablePassthrough(),
4 summary=summary_chain,
5 word_count=lambda x: len(x["text"].split())
6)
7
8result = pipeline.invoke({"text": "Long text..."})
9# result["original"] = {"text": "Long text..."}
10# result["summary"] = "Summary..."
11# result["word_count"] = 42

Checkpoint

Bạn đã hiểu cách sử dụng RunnableParallel để xử lý song song chưa?

3

📐 Branching Logic

TB5 min

RunnableBranch

python.py
1from langchain_core.runnables import RunnableBranch
2
3# Route based on input
4text_router = RunnableBranch(
5 (lambda x: x["task"] == "summarize", summary_chain),
6 (lambda x: x["task"] == "translate", translate_chain),
7 (lambda x: x["task"] == "sentiment", sentiment_chain),
8 default_chain # fallback
9)
10
11result = text_router.invoke({
12 "task": "summarize",
13 "text": "Long article text..."
14})

Dynamic Routing

python.py
1from langchain_core.runnables import RunnableLambda
2
3def route_by_length(input_data):
4 text = input_data["text"]
5 if len(text) < 500:
6 return "stuff"
7 elif len(text) < 5000:
8 return "map_reduce"
9 else:
10 return "refine"
11
12def select_chain(method):
13 chains = {
14 "stuff": stuff_chain,
15 "map_reduce": map_reduce_chain,
16 "refine": refine_chain
17 }
18 return chains[method]
19
20# Dynamic chain selection
21pipeline = (
22 RunnablePassthrough.assign(
23 method=RunnableLambda(route_by_length)
24 )
25 | RunnableLambda(lambda x: select_chain(x["method"]).invoke(x))
26)

Checkpoint

Bạn đã hiểu cách sử dụng branching logic và dynamic routing chưa?

4

📐 Sequential Chains

TB5 min

Multi-step Processing

python.py
1from langchain_core.output_parsers import StrOutputParser
2
3llm = ChatOpenAI(model="gpt-4o-mini")
4
5# Step 1: Extract key points
6extract = ChatPromptTemplate.from_messages([
7 ("system", "Extract 5 key points tu text sau."),
8 ("human", "{text}")
9]) | llm | StrOutputParser()
10
11# Step 2: Rewrite concisely
12rewrite = ChatPromptTemplate.from_messages([
13 ("system", "Rewrite cac key points thanh doan van ngon gon."),
14 ("human", "{key_points}")
15]) | llm | StrOutputParser()
16
17# Step 3: Translate
18translate = ChatPromptTemplate.from_messages([
19 ("system", "Dich sang tieng Anh, giu nguyen y chinh."),
20 ("human", "{rewritten}")
21]) | llm | StrOutputParser()
22
23# Full pipeline
24pipeline = (
25 RunnablePassthrough.assign(key_points=extract)
26 | RunnablePassthrough.assign(rewritten=rewrite)
27 | RunnablePassthrough.assign(translated=translate)
28)
29
30result = pipeline.invoke({"text": "Ban van ban dai..."})

Checkpoint

Bạn đã hiểu cách xây dựng sequential chains với nhiều steps chưa?

5

⚡ Batch Processing

TB5 min
python.py
1# Process multiple texts at once
2texts = [
3 {"text": "Article 1..."},
4 {"text": "Article 2..."},
5 {"text": "Article 3..."},
6]
7
8# Batch invoke - toi uu hon loop
9results = chain.batch(texts, config={"max_concurrency": 5})
10
11# Async batch
12import asyncio
13
14async def process_batch():
15 results = await chain.abatch(texts)
16 return results
17
18results = asyncio.run(process_batch())

Checkpoint

Bạn đã hiểu cách sử dụng batch processing trong LCEL chưa?

6

⚡ Streaming Patterns

TB5 min
python.py
1# Stream final output
2for chunk in chain.stream({"text": "Long document..."}):
3 print(chunk, end="", flush=True)
4
5# Stream with events
6async for event in chain.astream_events(
7 {"text": "Article..."},
8 version="v2"
9):
10 if event["event"] == "on_chat_model_stream":
11 print(event["data"]["chunk"].content, end="")

Checkpoint

Bạn đã hiểu cách implement streaming patterns trong LCEL chưa?

7

🛠️ Error Handling in LCEL

TB5 min
python.py
1from langchain_core.runnables import RunnableLambda
2
3def safe_parse(result):
4 try:
5 return {"status": "success", "data": result}
6 except Exception as e:
7 return {"status": "error", "error": str(e)}
8
9# Chain with error handling
10safe_chain = chain | RunnableLambda(safe_parse)
11
12# With retry
13retry_chain = chain.with_retry(
14 stop_after_attempt=3,
15 wait_exponential_jitter=True
16)

Checkpoint

Bạn đã hiểu cách xử lý lỗi và retry trong LCEL pipeline chưa?

8

💻 Real-world Pipeline

TB5 min
python.py
1# Complete text processing pipeline
2from langchain_core.runnables import RunnableParallel
3
4# Input: raw article
5# Output: processed content pack
6content_pipeline = (
7 # Step 1: Clean and extract
8 RunnablePassthrough.assign(
9 cleaned=clean_chain
10 )
11 # Step 2: Parallel analysis
12 | RunnableParallel(
13 summary=summary_chain,
14 sentiment=sentiment_chain,
15 keywords=keyword_chain,
16 category=category_chain,
17 original=RunnablePassthrough()
18 )
19 # Step 3: Generate outputs
20 | RunnableParallel(
21 report=report_chain,
22 social_posts=social_chain,
23 email_digest=email_chain
24 )
25)

Checkpoint

Bạn đã hiểu cách kết hợp các LCEL patterns để build real-world pipeline chưa?

9

🎯 Tổng kết

TB5 min
Hands-on Exercise
  1. Build parallel analysis pipeline (summary + sentiment + keywords)
  2. Implement dynamic routing dựa trên text length
  3. Tạo batch processing cho nhiều documents
  4. Build content repurposing pipeline với LCEL

Câu hỏi tự kiểm tra

  1. RunnableParallel trong LCEL được sử dụng trong tình huống nào và mang lại lợi ích gì?
  2. Dynamic routing cho phép pipeline xử lý các loại đầu vào khác nhau như thế nào?
  3. Sự khác biệt giữa sequential pipeline và parallel pipeline trong LCEL là gì?
  4. Làm thế nào để xử lý lỗi (error handling) trong một LCEL pipeline phức tạp?

🎉 Tuyệt vời! Bạn đã hoàn thành bài học LCEL Patterns nâng cao!

Tiếp theo: Hãy học cách trích xuất dữ liệu có cấu trúc với Structured Output và Parsing!


🚀 Bài tiếp theo

Structured Output và Parsing →