从"LangChain Agent 不够用了"出发 —— 理解 StateGraph、条件边、循环、多 Agent 协作和 Memory
你已经学会了 LangChain LCEL(线性管道 A|B|C),但当你想构建"真正自主的 Agent"时:
| 痛点 | 具体表现 | 根因 |
|---|---|---|
| 流程硬编码 | 拆解→搜索→总结→报告的步骤写死,无法动态调整 | 线性管道无分支 |
| 没有状态管理 | 中间结果在函数内部,函数结束就丢了,无法追踪 Agent 干了什么 | 无显式 State |
| 条件逻辑脆弱 | if len(report) < 100: retry 硬编码检查,无法适应所有场景 |
逻辑散落各处 |
| 循环难实现 | "搜索→不满意→改关键词再搜→满意→继续" 需要手写 while + 退出条件 | 无原生循环机制 |
| 多角色无法建模 | Research / Write / Review 三个 Agent 怎么交接?手写函数调用顺序 | 无调度框架 |
| LangChain LCEL | LangGraph | |
|---|---|---|
| 结构 | 线性管道 A|B|C | 有向图(节点+边) |
| 流程控制 | 固定顺序 | 动态路由(条件边) |
| 循环 | 不支持 | 天然支持 |
| 状态 | 隐式(管道内传递) | 显式(State TypedDict) |
| 可观测 | 只能看最终输出 | 每个节点的输入输出都可追踪 |
| 多角色 | 需要手动编排 | 通过 State 天然共享 |
产品从 A 站 → B 站 → C 站,固定路线,不能回头。适合标准化、重复性任务。
经理(Supervisor)分配任务,员工(Agent)干活,干完交回经理,不满意打回重做(循环!)。
State 是整个图的数据载体。所有节点都能读写它。定义为一个 TypedDict:
from typing import Annotated, TypedDict
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # 追加模式:新消息不会覆盖旧消息
search_results: str # 覆盖模式:新值直接替换旧值
final_answer: str
节点 A 返回 {messages: [msg1]},节点 B 返回 {messages: [msg2]}
→ State.messages = [msg1, msg2](追加,不覆盖)
节点 A 返回 {search_results: "结果A"},节点 B 返回 {search_results: "结果B"}
→ State.search_results = "结果B"(后写的覆盖先写的)
每个 Node 是一个 Python 函数,接收 State,返回要更新的字段:
def search_node(state: AgentState) -> dict:
# 1. 读取 State
query = state["messages"][-1].content
# 2. 做处理...
result = search_knowledge_base(query)
# 3. 返回要更新的字段(不直接修改 State!)
return {
"search_results": result,
"messages": [AIMessage(content=result)],
}
Node 不直接修改 State——返回一个 dict,框架负责合并
返回值只包含要更新的字段——不需要返回整个 State
一个 Node 只做一件事——搜索的只搜索,总结的只总结
A 做完后必然去 B。就像工厂的固定传送带。
builder.add_edge("search", "answer") # 搜索完 → 必然去回答
builder.add_edge("answer", END) # 回答完 → 结束
A 做完后,根据 State 内容动态决定下一步去向。就像分拣机,检查标签后分流。
def router(state: AgentState) -> str:
# 看 State 决定去哪——这就是"动态决策"
if state.get("search_satisfied"):
return "answer" # 满意 → 去回答
else:
return "search" # 不满意 → 重新搜索(循环!)
builder.add_conditional_edges(
"search", router,
{"answer": "answer", "search": "search"}
)
一次 invoke() 内,State 在节点间传递
例:搜索结果 → 总结 → 报告
都在一次调用内完成
State TypedDict
跨多次 invoke(),状态持久化
例:第1轮研究"AI趋势"→ 第2轮追问"详细说说第2点"→ Agent 记得上一轮结果
MemorySaver / SqliteSaver
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# 每次调用传入 thread_id(标识一次对话)
config = {"configurable": {"thread_id": "conversation_1"}}
# 第1轮
graph.invoke({"messages": [HumanMessage(content="研究AI趋势")]}, config)
# 第2轮(同一个 thread_id,Agent 记得上一轮的所有中间状态!)
graph.invoke({"messages": [HumanMessage(content="详细说说第2点")]}, config)
thread_id = 不同会议的编号——"项目A的第3次讨论"
这是 LangGraph 的 "Hello World"——最简单的线性工作流。
class Demo1State(TypedDict):
messages: Annotated[list, add_messages]
search_results: str
builder = StateGraph(Demo1State)
builder.add_node("search", search_node)
builder.add_node("answer", answer_node)
builder.add_edge(START, "search")
builder.add_edge("search", "answer")
builder.add_edge("answer", END)
graph = builder.compile()
def ask_question(query):
results = search(query) # 步骤1
answer = llm.invoke( # 步骤2
f"基于{results}回答{query}")
return answer
# 痛点:
# - 中间状态不可见
# - 加一步要改整个函数
# - 没有错误处理
graph = build_demo1()
result = graph.invoke({
"messages": [HumanMessage(content=q)],
"search_results": "",
})
# 优势:
# - graph.stream() 可看每一步
# - 加节点只需 add_node + add_edge
# - 框架提供错误边界
真实的研究任务需要多步骤协作——每步的输出是下一步的输入。
每个节点的输出被下一个节点读取——State 就是"交接单"。
| Demo 1 | Demo 2 | |
|---|---|---|
| 节点数 | 2 | 4 |
| 复杂度 | 一问一答 | 拆解 → 搜索 → 总结 → 报告 |
| State 字段 | 2 | 5(每步一个产出) |
| LLM 调用次数 | 1 次(仅回答) | 4 次(拆解/总结/报告 + 搜索逻辑) |
| LLM 角色 | 仅回答 | 拆解、总结、报告——LLM 作为"处理器" |
Supervisor 模式 + Reviewer 循环 + Memory —— 这是 LangChain 做不到的。
并行执行:研究、写作、审核同时进行
→ 但写作需要研究结果、审核需要写作结果 → 不能并行!
Supervisor 模式:按依赖顺序调度
→ 研究完成 → 写作 → 审核
没有 Reviewer:Writer 写什么就是什么,质量不可控
→ 可能输出格式混乱、遗漏关键信息
有 Reviewer:独立的质量关卡
→ 通过/打回 + 具体修改意见 → 质量闭环
revision_count 追踪修改次数
达到上限(如 3 次)→ 强制 FINISH
→ 即使不完全满意也结束,避免死循环
第1轮:研究 AI 趋势 → 搜索 → 写报告
第2轮:追问"详细说说第2点"
→ Agent 记得上一轮的所有中间状态
不需要重新搜索!
# LangGraph 的 Node 里用 LangChain 的组件
def researcher_node(state):
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
llm = ChatOpenAI(...) # LangChain 组件
prompt = ChatPromptTemplate.from_messages([...]) # LangChain 组件
chain = prompt | llm | StrOutputParser() # LangChain LCEL
return {"research_findings": chain.invoke(state)}
# LangGraph 负责"什么时候调用谁"(流程控制)
# LangChain 负责"怎么调用"(组件实现)
class State(TypedDict):
revision_count: int
def supervisor(state):
if state["revision_count"] > 3:
return "FINISH" # 强制结束
def safe_node(state):
try:
result = risky_op(state)
return {"output": result}
except Exception as e:
return {"messages": [AIMessage(
content=f"节点失败: {e}")]}
def search_with_timeout(state):
signal.alarm(30) # 30 秒超时
try:
result = search(state)
signal.alarm(0)
return result
except TimeoutError:
return {"search_results": "超时"}
| 类型 | 场景 |
|---|---|
MemorySaver | 开发/测试,重启丢失 |
SqliteSaver | 单机部署,持久化 |
PostgresSaver | 生产环境,多实例 |
逐步观察每个节点的输入输出——这是手写 pipeline 做不到的:
for step in graph.stream(initial_state):
for node_name, output in step.items():
print(f"[{node_name}] {output}")
# 输出示例:
# [search] {'search_results': 'LangGraph 是...'}
# [answer] {'messages': [AIMessage('LangGraph 是一个构建...')]}
| 资源 | 说明 |
|---|---|
langgraph_guide.md | 完整教学指南(11 章) |
langgraph_demo.py | 3 个可运行 Demo(python langgraph_demo.py) |
006-langchain-learning | 前置知识:LangChain LCEL、Tool Calling |