🎯 Mục tiêu bài học
Agent workflows cần track state across steps — what was done, what failed, what to do next. LangGraph provides powerful state management.
Sau bài này, bạn sẽ:
✅ LangGraph State design patterns ✅ Checkpointing và persistence ✅ State-based routing ✅ Error recovery from checkpoints
📐 LangGraph State Basics
Defining State
1from typing import TypedDict, Annotated, Literal2from langgraph.graph import StateGraph, END3import operator45class AgentState(TypedDict):6 # Messages accumulate (append)7 messages: Annotated[list, operator.add]8 9 # Current step10 current_step: str11 12 # Collected data13 search_results: list14 15 # Error tracking16 error_count: int17 last_error: str18 19 # Final output20 final_answer: strState Reducers
1# Default: last value wins2class SimpleState(TypedDict):3 count: int # set(5) then set(3) → 345# With operator.add: values accumulate6class AccumulatingState(TypedDict):7 messages: Annotated[list, operator.add] # [a] + [b] → [a, b]89# Custom reducer10def merge_results(existing, new):11 """Merge search results, removing duplicates."""12 seen = set()13 merged = []14 for item in (existing or []) + (new or []):15 key = item.get("id") or item.get("content", "")16 if key not in seen:17 seen.add(key)18 merged.append(item)19 return merged2021class SearchState(TypedDict):22 results: Annotated[list, merge_results]Checkpoint
Bạn đã hiểu cách define State với TypedDict và reducers chưa?
💻 State-Based Workflows
Multi-Step Agent
1from langchain_openai import ChatOpenAI2from langgraph.graph import StateGraph, END34class ResearchState(TypedDict):5 messages: Annotated[list, operator.add]6 query: str7 search_results: list8 analysis: str9 report: str10 step: str1112llm = ChatOpenAI(model="gpt-4o-mini")1314def search_step(state):15 """Step 1: Search for information."""16 query = state["query"]17 # Simulate search18 results = [19 {"title": "Result 1", "content": "..."},20 {"title": "Result 2", "content": "..."},21 ]22 return {23 "search_results": results,24 "step": "analyze",25 "messages": [{"role": "system", "content": f"Found {len(results)} results"}]26 }2728def analyze_step(state):29 """Step 2: Analyze results."""30 results = state["search_results"]31 32 analysis = llm.invoke(33 f"Analyze these search results:\n{results}"34 ).content35 36 return {37 "analysis": analysis,38 "step": "report",39 "messages": [{"role": "system", "content": "Analysis complete"}]40 }4142def report_step(state):43 """Step 3: Generate report."""44 analysis = state["analysis"]45 query = state["query"]46 47 report = llm.invoke(48 f"Write a report about '{query}' based on:\n{analysis}"49 ).content50 51 return {52 "report": report,53 "step": "done",54 "messages": [{"role": "system", "content": "Report generated"}]55 }5657def router(state):58 """Route to next step."""59 step = state.get("step", "search")60 if step == "search":61 return "search"62 elif step == "analyze":63 return "analyze"64 elif step == "report":65 return "report"66 else:67 return END6869# Build graph70graph = StateGraph(ResearchState)71graph.add_node("search", search_step)72graph.add_node("analyze", analyze_step)73graph.add_node("report", report_step)7475graph.set_conditional_entry_point(router, {76 "search": "search",77 "analyze": "analyze",78 "report": "report",79 END: END80})8182graph.add_edge("search", "analyze")83graph.add_edge("analyze", "report")84graph.add_edge("report", END)8586app = graph.compile()Checkpoint
Bạn đã hiểu cách build multi-step workflows với state-based routing chưa?
📐 Checkpointing
In-Memory Checkpointer
1from langgraph.checkpoint.memory import MemorySaver23checkpointer = MemorySaver()45app = graph.compile(checkpointer=checkpointer)67# Run with thread_id8config = {"configurable": {"thread_id": "research_001"}}910result = app.invoke(11 {"query": "AI trends 2025", "step": "search", "messages": []},12 config=config13)1415# Check state at any point16state = app.get_state(config)17print(f"Current step: {state.values.get('step')}")18print(f"Messages: {len(state.values.get('messages', []))}")SQLite Persistence
1from langgraph.checkpoint.sqlite import SqliteSaver23# Persistent storage4db_path = "./agent_checkpoints.db"5checkpointer = SqliteSaver.from_conn_string(db_path)67app = graph.compile(checkpointer=checkpointer)89# State persists across restarts!10config = {"configurable": {"thread_id": "research_001"}}1112# Session 1: Start research13result = app.invoke(14 {"query": "AI trends", "step": "search", "messages": []},15 config=config16)1718# --- App restarts ---1920# Session 2: Resume from checkpoint21state = app.get_state(config)22print(f"Resumed at step: {state.values.get('step')}")2324# Continue from where we left off25if state.values.get("step") != "done":26 result = app.invoke(None, config=config)Checkpoint History
1# View all checkpoints for a thread2config = {"configurable": {"thread_id": "research_001"}}34for state in app.get_state_history(config):5 print(f"Step: {state.values.get('step')}")6 print(f"Messages: {len(state.values.get('messages', []))}")7 print(f"Checkpoint: {state.config}")8 print("---")Checkpoint
Bạn đã hiểu cách sử dụng checkpointing để save/restore state chưa?
🤖 Human-in-the-Loop
Interrupt for Approval
1from langgraph.graph import StateGraph, END23class OrderState(TypedDict):4 messages: Annotated[list, operator.add]5 order_details: dict6 approved: bool7 step: str89def prepare_order(state):10 """Prepare order details."""11 return {12 "order_details": {13 "product": "MacBook Air M3",14 "price": 25990000,15 "shipping": "Express"16 },17 "step": "review"18 }1920def execute_order(state):21 """Execute the approved order."""22 if not state.get("approved"):23 return {"step": "cancelled", "messages": [{"role": "system", "content": "Order cancelled"}]}24 25 # Process order26 return {27 "step": "done",28 "messages": [{"role": "system", "content": "Order placed!"}]29 }3031graph = StateGraph(OrderState)32graph.add_node("prepare", prepare_order)33graph.add_node("execute", execute_order)3435graph.set_entry_point("prepare")36graph.add_edge("prepare", "execute")37graph.add_edge("execute", END)3839# Compile with interrupt BEFORE execute40app = graph.compile(41 checkpointer=MemorySaver(),42 interrupt_before=["execute"] # Pause here for human approval43)4445config = {"configurable": {"thread_id": "order_001"}}4647# Runs until interrupt48result = app.invoke(49 {"messages": [], "step": "prepare", "approved": False},50 config=config51)5253# Human reviews order54state = app.get_state(config)55order = state.values.get("order_details")56print(f"Order: {order}")5758# Human approves → update state and continue59app.update_state(config, {"approved": True})60result = app.invoke(None, config=config) # ContinueCheckpoint
Bạn đã hiểu cách implement human-in-the-loop với interrupt_before chưa?
📐 State Design Patterns
Task Tracking State
1class TaskState(TypedDict):2 messages: Annotated[list, operator.add]3 tasks: list # List of sub-tasks4 completed: list # Completed task IDs5 current_task: str # Current task being worked on6 results: dict # Task results keyed by task ID7 status: str # overall status89def task_router(state):10 """Route to next incomplete task."""11 completed = set(state.get("completed", []))12 tasks = state.get("tasks", [])13 14 for task in tasks:15 if task["id"] not in completed:16 return "execute_task"17 18 return "summarize"Error Recovery Pattern
1class RobustState(TypedDict):2 messages: Annotated[list, operator.add]3 retries: dict # task_id -> retry count4 max_retries: int5 failed_tasks: list6 7def handle_task_error(state, task_id, error):8 """Handle error with retry tracking."""9 retries = state.get("retries", {})10 count = retries.get(task_id, 0) + 111 retries[task_id] = count12 13 if count >= state.get("max_retries", 3):14 return {15 "retries": retries,16 "failed_tasks": state.get("failed_tasks", []) + [task_id],17 "messages": [{"role": "system", "content": f"Task {task_id} failed after {count} retries"}]18 }19 20 return {21 "retries": retries,22 "messages": [{"role": "system", "content": f"Retrying task {task_id} (attempt {count})"}]23 }Checkpoint
Bạn đã hiểu các state design patterns (task tracking, error recovery) chưa?
🎯 Tổng kết
📝 Quiz
-
Annotated[list, operator.add] nghĩa là gì?
- New values được append vào list thay vì replace
- List được sort
- List maximum 10 items
- List chỉ chứa numbers
-
Checkpointer dùng để làm gì?
- Save và restore state, cho phép resume interrupted workflows
- Backup database
- Log errors
- Monitor performance
-
interrupt_before hoạt động thế nào?
- Pause execution trước node specified, chờ human input
- Skip node đó
- Delete node đó
- Chạy node đó 2 lần
Key Takeaways
- TypedDict State — Clear schema cho agent state
- Reducers — Control how state updates (replace vs accumulate)
- Checkpointing — Save progress, resume after failure
- Human-in-the-loop — Interrupt for approval on critical actions
- Persistence — SqliteSaver cho state survive restarts
Câu hỏi tự kiểm tra
- Annotated[list, operator.add] trong TypedDict state có ý nghĩa gì?
- Checkpointing giúp agent workflow như thế nào khi bị interrupt?
- interrupt_before hoạt động ra sao trong human-in-the-loop pattern?
- Tại sao cần persistence (SqliteSaver) cho state trong production?
🎉 Tuyệt vời! Bạn đã hoàn thành bài học State Management!
Tiếp theo: Hãy cùng khám phá Planning & Self-Reflection — agent tự lên kế hoạch và tự sửa lỗi!
🚀 Bài tiếp theo
Planning & Self-Reflection — Agent tự lên kế hoạch và tự sửa lỗi!
