MinAI - Về trang chủ
Lý thuyết
9/1335 phút
Đang tải...

State Management

Checkpointing, persistence, và state management trong LangGraph agents

0

🎯 Mục tiêu bài học

TB5 min

Agent workflows cần track state across steps — what was done, what failed, what to do next. LangGraph provides powerful state management.

Sau bài này, bạn sẽ:

✅ LangGraph State design patterns ✅ Checkpointing và persistence ✅ State-based routing ✅ Error recovery from checkpoints

1

📐 LangGraph State Basics

TB5 min

Defining State

python.py
1from typing import TypedDict, Annotated, Literal
2from langgraph.graph import StateGraph, END
3import operator
4
5class AgentState(TypedDict):
6 # Messages accumulate (append)
7 messages: Annotated[list, operator.add]
8
9 # Current step
10 current_step: str
11
12 # Collected data
13 search_results: list
14
15 # Error tracking
16 error_count: int
17 last_error: str
18
19 # Final output
20 final_answer: str

State Reducers

python.py
1# Default: last value wins
2class SimpleState(TypedDict):
3 count: int # set(5) then set(3) → 3
4
5# With operator.add: values accumulate
6class AccumulatingState(TypedDict):
7 messages: Annotated[list, operator.add] # [a] + [b] → [a, b]
8
9# Custom reducer
10def merge_results(existing, new):
11 """Merge search results, removing duplicates."""
12 seen = set()
13 merged = []
14 for item in (existing or []) + (new or []):
15 key = item.get("id") or item.get("content", "")
16 if key not in seen:
17 seen.add(key)
18 merged.append(item)
19 return merged
20
21class SearchState(TypedDict):
22 results: Annotated[list, merge_results]

Checkpoint

Bạn đã hiểu cách define State với TypedDict và reducers chưa?

2

💻 State-Based Workflows

TB5 min

Multi-Step Agent

python.py
1from langchain_openai import ChatOpenAI
2from langgraph.graph import StateGraph, END
3
4class ResearchState(TypedDict):
5 messages: Annotated[list, operator.add]
6 query: str
7 search_results: list
8 analysis: str
9 report: str
10 step: str
11
12llm = ChatOpenAI(model="gpt-4o-mini")
13
14def search_step(state):
15 """Step 1: Search for information."""
16 query = state["query"]
17 # Simulate search
18 results = [
19 {"title": "Result 1", "content": "..."},
20 {"title": "Result 2", "content": "..."},
21 ]
22 return {
23 "search_results": results,
24 "step": "analyze",
25 "messages": [{"role": "system", "content": f"Found {len(results)} results"}]
26 }
27
28def analyze_step(state):
29 """Step 2: Analyze results."""
30 results = state["search_results"]
31
32 analysis = llm.invoke(
33 f"Analyze these search results:\n{results}"
34 ).content
35
36 return {
37 "analysis": analysis,
38 "step": "report",
39 "messages": [{"role": "system", "content": "Analysis complete"}]
40 }
41
42def report_step(state):
43 """Step 3: Generate report."""
44 analysis = state["analysis"]
45 query = state["query"]
46
47 report = llm.invoke(
48 f"Write a report about '{query}' based on:\n{analysis}"
49 ).content
50
51 return {
52 "report": report,
53 "step": "done",
54 "messages": [{"role": "system", "content": "Report generated"}]
55 }
56
57def router(state):
58 """Route to next step."""
59 step = state.get("step", "search")
60 if step == "search":
61 return "search"
62 elif step == "analyze":
63 return "analyze"
64 elif step == "report":
65 return "report"
66 else:
67 return END
68
69# Build graph
70graph = StateGraph(ResearchState)
71graph.add_node("search", search_step)
72graph.add_node("analyze", analyze_step)
73graph.add_node("report", report_step)
74
75graph.set_conditional_entry_point(router, {
76 "search": "search",
77 "analyze": "analyze",
78 "report": "report",
79 END: END
80})
81
82graph.add_edge("search", "analyze")
83graph.add_edge("analyze", "report")
84graph.add_edge("report", END)
85
86app = graph.compile()

Checkpoint

Bạn đã hiểu cách build multi-step workflows với state-based routing chưa?

3

📐 Checkpointing

TB5 min

In-Memory Checkpointer

python.py
1from langgraph.checkpoint.memory import MemorySaver
2
3checkpointer = MemorySaver()
4
5app = graph.compile(checkpointer=checkpointer)
6
7# Run with thread_id
8config = {"configurable": {"thread_id": "research_001"}}
9
10result = app.invoke(
11 {"query": "AI trends 2025", "step": "search", "messages": []},
12 config=config
13)
14
15# Check state at any point
16state = app.get_state(config)
17print(f"Current step: {state.values.get('step')}")
18print(f"Messages: {len(state.values.get('messages', []))}")

SQLite Persistence

python.py
1from langgraph.checkpoint.sqlite import SqliteSaver
2
3# Persistent storage
4db_path = "./agent_checkpoints.db"
5checkpointer = SqliteSaver.from_conn_string(db_path)
6
7app = graph.compile(checkpointer=checkpointer)
8
9# State persists across restarts!
10config = {"configurable": {"thread_id": "research_001"}}
11
12# Session 1: Start research
13result = app.invoke(
14 {"query": "AI trends", "step": "search", "messages": []},
15 config=config
16)
17
18# --- App restarts ---
19
20# Session 2: Resume from checkpoint
21state = app.get_state(config)
22print(f"Resumed at step: {state.values.get('step')}")
23
24# Continue from where we left off
25if state.values.get("step") != "done":
26 result = app.invoke(None, config=config)

Checkpoint History

python.py
1# View all checkpoints for a thread
2config = {"configurable": {"thread_id": "research_001"}}
3
4for state in app.get_state_history(config):
5 print(f"Step: {state.values.get('step')}")
6 print(f"Messages: {len(state.values.get('messages', []))}")
7 print(f"Checkpoint: {state.config}")
8 print("---")

Checkpoint

Bạn đã hiểu cách sử dụng checkpointing để save/restore state chưa?

4

🤖 Human-in-the-Loop

TB5 min

Interrupt for Approval

python.py
1from langgraph.graph import StateGraph, END
2
3class OrderState(TypedDict):
4 messages: Annotated[list, operator.add]
5 order_details: dict
6 approved: bool
7 step: str
8
9def prepare_order(state):
10 """Prepare order details."""
11 return {
12 "order_details": {
13 "product": "MacBook Air M3",
14 "price": 25990000,
15 "shipping": "Express"
16 },
17 "step": "review"
18 }
19
20def execute_order(state):
21 """Execute the approved order."""
22 if not state.get("approved"):
23 return {"step": "cancelled", "messages": [{"role": "system", "content": "Order cancelled"}]}
24
25 # Process order
26 return {
27 "step": "done",
28 "messages": [{"role": "system", "content": "Order placed!"}]
29 }
30
31graph = StateGraph(OrderState)
32graph.add_node("prepare", prepare_order)
33graph.add_node("execute", execute_order)
34
35graph.set_entry_point("prepare")
36graph.add_edge("prepare", "execute")
37graph.add_edge("execute", END)
38
39# Compile with interrupt BEFORE execute
40app = graph.compile(
41 checkpointer=MemorySaver(),
42 interrupt_before=["execute"] # Pause here for human approval
43)
44
45config = {"configurable": {"thread_id": "order_001"}}
46
47# Runs until interrupt
48result = app.invoke(
49 {"messages": [], "step": "prepare", "approved": False},
50 config=config
51)
52
53# Human reviews order
54state = app.get_state(config)
55order = state.values.get("order_details")
56print(f"Order: {order}")
57
58# Human approves → update state and continue
59app.update_state(config, {"approved": True})
60result = app.invoke(None, config=config) # Continue

Checkpoint

Bạn đã hiểu cách implement human-in-the-loop với interrupt_before chưa?

5

📐 State Design Patterns

TB5 min

Task Tracking State

python.py
1class TaskState(TypedDict):
2 messages: Annotated[list, operator.add]
3 tasks: list # List of sub-tasks
4 completed: list # Completed task IDs
5 current_task: str # Current task being worked on
6 results: dict # Task results keyed by task ID
7 status: str # overall status
8
9def task_router(state):
10 """Route to next incomplete task."""
11 completed = set(state.get("completed", []))
12 tasks = state.get("tasks", [])
13
14 for task in tasks:
15 if task["id"] not in completed:
16 return "execute_task"
17
18 return "summarize"

Error Recovery Pattern

python.py
1class RobustState(TypedDict):
2 messages: Annotated[list, operator.add]
3 retries: dict # task_id -> retry count
4 max_retries: int
5 failed_tasks: list
6
7def handle_task_error(state, task_id, error):
8 """Handle error with retry tracking."""
9 retries = state.get("retries", {})
10 count = retries.get(task_id, 0) + 1
11 retries[task_id] = count
12
13 if count >= state.get("max_retries", 3):
14 return {
15 "retries": retries,
16 "failed_tasks": state.get("failed_tasks", []) + [task_id],
17 "messages": [{"role": "system", "content": f"Task {task_id} failed after {count} retries"}]
18 }
19
20 return {
21 "retries": retries,
22 "messages": [{"role": "system", "content": f"Retrying task {task_id} (attempt {count})"}]
23 }

Checkpoint

Bạn đã hiểu các state design patterns (task tracking, error recovery) chưa?

6

🎯 Tổng kết

TB5 min

📝 Quiz

  1. Annotated[list, operator.add] nghĩa là gì?

    • New values được append vào list thay vì replace
    • List được sort
    • List maximum 10 items
    • List chỉ chứa numbers
  2. Checkpointer dùng để làm gì?

    • Save và restore state, cho phép resume interrupted workflows
    • Backup database
    • Log errors
    • Monitor performance
  3. interrupt_before hoạt động thế nào?

    • Pause execution trước node specified, chờ human input
    • Skip node đó
    • Delete node đó
    • Chạy node đó 2 lần

Key Takeaways

  1. TypedDict State — Clear schema cho agent state
  2. Reducers — Control how state updates (replace vs accumulate)
  3. Checkpointing — Save progress, resume after failure
  4. Human-in-the-loop — Interrupt for approval on critical actions
  5. Persistence — SqliteSaver cho state survive restarts

Câu hỏi tự kiểm tra

  1. Annotated[list, operator.add] trong TypedDict state có ý nghĩa gì?
  2. Checkpointing giúp agent workflow như thế nào khi bị interrupt?
  3. interrupt_before hoạt động ra sao trong human-in-the-loop pattern?
  4. Tại sao cần persistence (SqliteSaver) cho state trong production?

🎉 Tuyệt vời! Bạn đã hoàn thành bài học State Management!

Tiếp theo: Hãy cùng khám phá Planning & Self-Reflection — agent tự lên kế hoạch và tự sửa lỗi!


🚀 Bài tiếp theo

Planning & Self-Reflection — Agent tự lên kế hoạch và tự sửa lỗi!