用 LangGraph 构建可中断 AI Agent:状态图与人机协作实战

什么是 LangGraph?为什么需要它
随着 AI Agent 应用复杂度的提升,单纯的"链式调用"(Chain)已经无法满足生产场景需求。当 Agent 需要循环推理、条件分支、人工介入、状态持久化时,传统 LangChain 的线性管道就显得力不从心。
LangGraph 是 LangChain 团队在 2024 年推出的图式 Agent 编排框架,核心思想是将 Agent 的执行过程建模为一张有向图(Directed Graph),每个节点是一个处理步骤,边代表状态流转路径,整个 Agent 的运行状态被显式管理。
可中断(Interruptible):在任意节点暂停执行,等待外部输入
可恢复(Resumable):从断点续传,无需重跑前置步骤
状态持久化:借助 Checkpointer 将状态存入数据库
人机协作(Human-in-the-loop):在关键决策点插入人工审核
这些特性使 LangGraph 成为构建生产级 AI Agent 的首选框架,尤其适合审批流、客服系统、代码生成等需要可控性的场景。
核心概念:状态图的三要素
理解 LangGraph 必须先掌握三个核心概念:State(状态)、Node(节点)、Edge(边)。
1. State —— 贯穿整个 Graph 的共享数据
State 是一个 TypedDict,定义了 Agent 在整个运行生命周期中需要携带的数据。所有节点共享同一个 State,并可以对其进行局部更新。
from typing import TypedDict, Annotated, List from langgraph.graph import add_messages class AgentState(TypedDict): messages: Annotated[List, add_messages] # 消息历史,自动追加 user_input: str approval_status: str # pending / approved / rejected final_result: str
Annotated[List, add_messages] 是 LangGraph 提供的 reducer,表示每次更新 messages 时是追加而非覆盖。你也可以自定义 reducer 函数来控制状态合并逻辑。
2. Node —— 图中的处理单元
节点是普通的 Python 函数,接收当前 State,返回需要更新的字段(字典形式):
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini")
def analyze_request(state: AgentState) -> dict:
"""分析用户请求,决定是否需要人工审批"""
response = llm.invoke([
{"role": "system", "content": "你是一个智能审批助手,判断请求风险等级"},
{"role": "user", "content": state["user_input"]}
])
return {
"messages": [response],
"approval_status": "pending"
}
def execute_action(state: AgentState) -> dict:
"""执行已审批的操作"""
result = f"操作已执行:{state['user_input']}"
return {"final_result": result, "approval_status": "done"}3. Edge —— 控制流转路径
边决定执行完一个节点后跳转到哪里。LangGraph 支持三种边:
普通边:固定跳转
graph.add_edge("node_a", "node_b")条件边:根据状态动态路由
graph.add_conditional_edges(...)START / END:图的入口和出口节点
实战:构建可中断的智能审批 Agent
下面我们构建一个完整的智能审批 Agent:用户提交操作请求 → AI 分析风险 → 高风险请求暂停等待人工确认 → 审批通过后继续执行。
安装依赖
pip install langgraph langchain-openai # 如果使用持久化,还需要: pip install langgraph-checkpoint-sqlite
完整代码实现
from typing import TypedDict, Annotated, List, Literal
from langgraph.graph import StateGraph, START, END, add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
# --- 1. 定义状态 ---
class ApprovalState(TypedDict):
messages: Annotated[List, add_messages]
user_input: str
risk_level: str # low / high
approval_status: str # pending / approved / rejected / done
final_result: str
# --- 2. 定义节点 ---
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def risk_analyzer(state: ApprovalState) -> dict:
"""AI 分析请求风险等级"""
prompt = f"""分析以下操作请求的风险等级,只回答 'low' 或 'high':
请求内容:{state['user_input']}
判断标准:涉及资金、数据删除、权限变更等为 high,普通查询为 low"""
response = llm.invoke(prompt)
risk = "high" if "high" in response.content.lower() else "low"
return {
"messages": [AIMessage(content=f"风险分析结果:{risk}")],
"risk_level": risk,
"approval_status": "pending"
}
def wait_for_approval(state: ApprovalState) -> dict:
"""等待人工审批节点(实际会被 interrupt 暂停)"""
return {"messages": [AIMessage(content="等待人工审批中...")]}
def execute_approved(state: ApprovalState) -> dict:
"""执行已审批操作"""
return {
"final_result": f"✅ 操作已执行:{state['user_input']}",
"approval_status": "done"
}
def reject_action(state: ApprovalState) -> dict:
"""拒绝操作"""
return {
"final_result": "❌ 操作已拒绝",
"approval_status": "rejected"
}
# --- 3. 路由函数 ---
def route_by_risk(state: ApprovalState) -> Literal["wait_for_approval", "execute_approved"]:
if state["risk_level"] == "high":
return "wait_for_approval"
return "execute_approved"
def route_by_approval(state: ApprovalState) -> Literal["execute_approved", "reject_action"]:
# 这个函数在 interrupt 恢复后执行,读取人工更新的状态
if state["approval_status"] == "approved":
return "execute_approved"
return "reject_action"
# --- 4. 构建图 ---
builder = StateGraph(ApprovalState)
builder.add_node("risk_analyzer", risk_analyzer)
builder.add_node("wait_for_approval", wait_for_approval)
builder.add_node("execute_approved", execute_approved)
builder.add_node("reject_action", reject_action)
builder.add_edge(START, "risk_analyzer")
builder.add_conditional_edges("risk_analyzer", route_by_risk)
builder.add_conditional_edges("wait_for_approval", route_by_approval)
builder.add_edge("execute_approved", END)
builder.add_edge("reject_action", END)
# --- 5. 编译图(开启 interrupt)---
checkpointer = MemorySaver()
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["wait_for_approval"] # 在此节点前暂停
)运行与中断恢复演示
下面演示如何运行 Graph,处理中断,并根据人工决策恢复执行:
import uuid
# 每个对话需要唯一的 thread_id
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
# --- 第一次运行:提交高风险请求 ---
initial_state = {
"user_input": "删除生产数据库中所有过期用户数据",
"messages": [],
"risk_level": "",
"approval_status": "",
"final_result": ""
}
print("=== 开始执行 ===")
for event in graph.stream(initial_state, config, stream_mode="values"):
if event.get("risk_level"):
print(f"风险等级: {event['risk_level']}")
if event.get("approval_status") == "pending":
print("⏸️ Agent 已暂停,等待人工审批")
# 检查当前状态
snapshot = graph.get_state(config)
print(f"当前节点: {snapshot.next}") # ('wait_for_approval',)
# --- 人工审批:更新状态 ---
print("\n=== 人工审批:批准操作 ===")
graph.update_state(
config,
{"approval_status": "approved"},
as_node="wait_for_approval"
)
# --- 恢复执行 ---
for event in graph.stream(None, config, stream_mode="values"):
if event.get("final_result"):
print(f"执行结果: {event['final_result']}")
# 输出:
# === 开始执行 ===
# 风险等级: high
# ⏸️ Agent 已暂停,等待人工审批
# === 人工审批:批准操作 ===
# 执行结果: ✅ 操作已执行:删除生产数据库中所有过期用户数据关键 API 说明:
graph.stream(None, config):传入None表示从断点恢复,不重置状态graph.update_state():在恢复前手动修改状态,模拟人工审批结果graph.get_state(config):获取当前快照,.next显示下一个待执行节点
持久化与生产部署
生产环境中不能使用 MemorySaver(进程重启后状态丢失),需要接入持久化存储:
# SQLite 持久化(适合单机部署)
from langgraph.checkpoint.sqlite import SqliteSaver
with SqliteSaver.from_conn_string("./checkpoints.db") as checkpointer:
graph = builder.compile(checkpointer=checkpointer, interrupt_before=["wait_for_approval"])
# PostgreSQL 持久化(适合生产集群)
# pip install langgraph-checkpoint-postgres
from langgraph.checkpoint.postgres import PostgresSaver
DB_URI = "postgresql://user:password@localhost:5432/langgraph"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
graph = builder.compile(checkpointer=checkpointer)使用 LangGraph Platform(原 LangGraph Cloud)可以直接部署 Agent 服务,提供 REST API 接口、任务队列、状态可视化等生产功能,适合团队级 Agent 应用。
最佳实践建议
将
thread_id与业务实体(如工单 ID)绑定,便于查询历史状态对外暴露审批接口时,通过
graph.update_state()接收 Webhook 回调使用
interrupt_after替代interrupt_before可在节点执行后再暂停,更适合"先执行再确认"场景为每个节点添加超时和重试逻辑,防止 Agent 永久挂起
通过 LangSmith 接入可观测性,实时追踪 Agent 状态图执行路径
与其他框架的横向对比
选择 Agent 框架时,理解各框架的定位有助于做出合理决策:
LangGraph vs AutoGen:LangGraph 更强调显式状态控制和可中断性,AutoGen 更适合多 Agent 对话协作场景
LangGraph vs CrewAI:CrewAI 抽象层次更高,上手更快,但定制化能力不如 LangGraph
LangGraph vs Dify:Dify 是面向非开发者的无代码平台,LangGraph 面向需要完全代码控制的工程师
LangGraph vs 自研状态机:LangGraph 已封装了 checkpointing、streaming、并发等复杂逻辑,重复造轮子的成本极高
总体而言,LangGraph 是目前代码侧构建生产级有状态 Agent 的最佳选择,值得每个 AI 工程师深入掌握。
总结
本文从 LangGraph 的核心概念出发,完整演示了如何构建一个支持中断恢复的智能审批 Agent:
通过 TypedDict State 显式管理 Agent 共享数据
通过 条件边 实现动态路由,根据风险等级决定执行路径
通过 interrupt_before + Checkpointer 实现人机协作审批流
通过 update_state + stream(None) 实现断点恢复
LangGraph 的图式编排模型让复杂 Agent 的状态流转变得可视化、可调试、可审计。随着 AI Agent 应用走向生产,掌握 LangGraph 已成为 AI 工程师的必备技能之一。建议结合官方文档和 LangSmith 进行进一步实践。
发布评论
热门评论区: