/ LangGraph  AI Agent  LangChain  Python  状态图  人机协作  工作流  大模型 

用 LangGraph 构建可中断 AI Agent:状态图与人机协作实战


封面

什么是 LangGraph?为什么需要它

随着 AI Agent 应用复杂度的提升,单纯的"链式调用"(Chain)已经无法满足生产场景需求。当 Agent 需要循环推理、条件分支、人工介入、状态持久化时,传统 LangChain 的线性管道就显得力不从心。

LangGraph 是 LangChain 团队在 2024 年推出的图式 Agent 编排框架,核心思想是将 Agent 的执行过程建模为一张有向图(Directed Graph),每个节点是一个处理步骤,边代表状态流转路径,整个 Agent 的运行状态被显式管理。

  • 可中断(Interruptible):在任意节点暂停执行,等待外部输入

  • 可恢复(Resumable):从断点续传,无需重跑前置步骤

  • 状态持久化:借助 Checkpointer 将状态存入数据库

  • 人机协作(Human-in-the-loop):在关键决策点插入人工审核

这些特性使 LangGraph 成为构建生产级 AI Agent 的首选框架,尤其适合审批流、客服系统、代码生成等需要可控性的场景。

核心概念:状态图的三要素

理解 LangGraph 必须先掌握三个核心概念:State(状态)Node(节点)Edge(边)

1. State —— 贯穿整个 Graph 的共享数据

State 是一个 TypedDict,定义了 Agent 在整个运行生命周期中需要携带的数据。所有节点共享同一个 State,并可以对其进行局部更新。

from typing import TypedDict, Annotated, List
from langgraph.graph import add_messages

class AgentState(TypedDict):
    messages: Annotated[List, add_messages]  # 消息历史,自动追加
    user_input: str
    approval_status: str  # pending / approved / rejected
    final_result: str

Annotated[List, add_messages] 是 LangGraph 提供的 reducer,表示每次更新 messages 时是追加而非覆盖。你也可以自定义 reducer 函数来控制状态合并逻辑。

2. Node —— 图中的处理单元

节点是普通的 Python 函数,接收当前 State,返回需要更新的字段(字典形式):

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o-mini")

def analyze_request(state: AgentState) -> dict:
    """分析用户请求,决定是否需要人工审批"""
    response = llm.invoke([
        {"role": "system", "content": "你是一个智能审批助手,判断请求风险等级"},
        {"role": "user", "content": state["user_input"]}
    ])
    return {
        "messages": [response],
        "approval_status": "pending"
    }

def execute_action(state: AgentState) -> dict:
    """执行已审批的操作"""
    result = f"操作已执行:{state['user_input']}"
    return {"final_result": result, "approval_status": "done"}

3. Edge —— 控制流转路径

边决定执行完一个节点后跳转到哪里。LangGraph 支持三种边:

  • 普通边:固定跳转 graph.add_edge("node_a", "node_b")

  • 条件边:根据状态动态路由 graph.add_conditional_edges(...)

  • START / END:图的入口和出口节点

实战:构建可中断的智能审批 Agent

下面我们构建一个完整的智能审批 Agent:用户提交操作请求 → AI 分析风险 → 高风险请求暂停等待人工确认 → 审批通过后继续执行。

安装依赖

pip install langgraph langchain-openai
# 如果使用持久化,还需要:
pip install langgraph-checkpoint-sqlite

完整代码实现

from typing import TypedDict, Annotated, List, Literal
from langgraph.graph import StateGraph, START, END, add_messages
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage

# --- 1. 定义状态 ---
class ApprovalState(TypedDict):
    messages: Annotated[List, add_messages]
    user_input: str
    risk_level: str   # low / high
    approval_status: str  # pending / approved / rejected / done
    final_result: str

# --- 2. 定义节点 ---
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

def risk_analyzer(state: ApprovalState) -> dict:
    """AI 分析请求风险等级"""
    prompt = f"""分析以下操作请求的风险等级,只回答 'low' 或 'high':
请求内容:{state['user_input']}
判断标准:涉及资金、数据删除、权限变更等为 high,普通查询为 low"""
    
    response = llm.invoke(prompt)
    risk = "high" if "high" in response.content.lower() else "low"
    return {
        "messages": [AIMessage(content=f"风险分析结果:{risk}")],
        "risk_level": risk,
        "approval_status": "pending"
    }

def wait_for_approval(state: ApprovalState) -> dict:
    """等待人工审批节点(实际会被 interrupt 暂停)"""
    return {"messages": [AIMessage(content="等待人工审批中...")]}

def execute_approved(state: ApprovalState) -> dict:
    """执行已审批操作"""
    return {
        "final_result": f"✅ 操作已执行:{state['user_input']}",
        "approval_status": "done"
    }

def reject_action(state: ApprovalState) -> dict:
    """拒绝操作"""
    return {
        "final_result": "❌ 操作已拒绝",
        "approval_status": "rejected"
    }

# --- 3. 路由函数 ---
def route_by_risk(state: ApprovalState) -> Literal["wait_for_approval", "execute_approved"]:
    if state["risk_level"] == "high":
        return "wait_for_approval"
    return "execute_approved"

def route_by_approval(state: ApprovalState) -> Literal["execute_approved", "reject_action"]:
    # 这个函数在 interrupt 恢复后执行,读取人工更新的状态
    if state["approval_status"] == "approved":
        return "execute_approved"
    return "reject_action"

# --- 4. 构建图 ---
builder = StateGraph(ApprovalState)

builder.add_node("risk_analyzer", risk_analyzer)
builder.add_node("wait_for_approval", wait_for_approval)
builder.add_node("execute_approved", execute_approved)
builder.add_node("reject_action", reject_action)

builder.add_edge(START, "risk_analyzer")
builder.add_conditional_edges("risk_analyzer", route_by_risk)
builder.add_conditional_edges("wait_for_approval", route_by_approval)
builder.add_edge("execute_approved", END)
builder.add_edge("reject_action", END)

# --- 5. 编译图(开启 interrupt)---
checkpointer = MemorySaver()
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["wait_for_approval"]  # 在此节点前暂停
)

运行与中断恢复演示

下面演示如何运行 Graph,处理中断,并根据人工决策恢复执行:

import uuid

# 每个对话需要唯一的 thread_id
config = {"configurable": {"thread_id": str(uuid.uuid4())}}

# --- 第一次运行:提交高风险请求 ---
initial_state = {
    "user_input": "删除生产数据库中所有过期用户数据",
    "messages": [],
    "risk_level": "",
    "approval_status": "",
    "final_result": ""
}

print("=== 开始执行 ===")
for event in graph.stream(initial_state, config, stream_mode="values"):
    if event.get("risk_level"):
        print(f"风险等级: {event['risk_level']}")
    if event.get("approval_status") == "pending":
        print("⏸️  Agent 已暂停,等待人工审批")

# 检查当前状态
snapshot = graph.get_state(config)
print(f"当前节点: {snapshot.next}")  # ('wait_for_approval',)

# --- 人工审批:更新状态 ---
print("\n=== 人工审批:批准操作 ===")
graph.update_state(
    config,
    {"approval_status": "approved"},
    as_node="wait_for_approval"
)

# --- 恢复执行 ---
for event in graph.stream(None, config, stream_mode="values"):
    if event.get("final_result"):
        print(f"执行结果: {event['final_result']}")

# 输出:
# === 开始执行 ===
# 风险等级: high
# ⏸️  Agent 已暂停,等待人工审批
# === 人工审批:批准操作 ===
# 执行结果: ✅ 操作已执行:删除生产数据库中所有过期用户数据

关键 API 说明:

  • graph.stream(None, config):传入 None 表示从断点恢复,不重置状态

  • graph.update_state():在恢复前手动修改状态,模拟人工审批结果

  • graph.get_state(config):获取当前快照,.next 显示下一个待执行节点

持久化与生产部署

生产环境中不能使用 MemorySaver(进程重启后状态丢失),需要接入持久化存储:

# SQLite 持久化(适合单机部署)
from langgraph.checkpoint.sqlite import SqliteSaver
with SqliteSaver.from_conn_string("./checkpoints.db") as checkpointer:
    graph = builder.compile(checkpointer=checkpointer, interrupt_before=["wait_for_approval"])

# PostgreSQL 持久化(适合生产集群)
# pip install langgraph-checkpoint-postgres
from langgraph.checkpoint.postgres import PostgresSaver
DB_URI = "postgresql://user:password@localhost:5432/langgraph"
with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)

使用 LangGraph Platform(原 LangGraph Cloud)可以直接部署 Agent 服务,提供 REST API 接口、任务队列、状态可视化等生产功能,适合团队级 Agent 应用。

最佳实践建议

  • thread_id 与业务实体(如工单 ID)绑定,便于查询历史状态

  • 对外暴露审批接口时,通过 graph.update_state() 接收 Webhook 回调

  • 使用 interrupt_after 替代 interrupt_before 可在节点执行后再暂停,更适合"先执行再确认"场景

  • 为每个节点添加超时和重试逻辑,防止 Agent 永久挂起

  • 通过 LangSmith 接入可观测性,实时追踪 Agent 状态图执行路径

与其他框架的横向对比

选择 Agent 框架时,理解各框架的定位有助于做出合理决策:

  • LangGraph vs AutoGen:LangGraph 更强调显式状态控制和可中断性,AutoGen 更适合多 Agent 对话协作场景

  • LangGraph vs CrewAI:CrewAI 抽象层次更高,上手更快,但定制化能力不如 LangGraph

  • LangGraph vs Dify:Dify 是面向非开发者的无代码平台,LangGraph 面向需要完全代码控制的工程师

  • LangGraph vs 自研状态机:LangGraph 已封装了 checkpointing、streaming、并发等复杂逻辑,重复造轮子的成本极高

总体而言,LangGraph 是目前代码侧构建生产级有状态 Agent 的最佳选择,值得每个 AI 工程师深入掌握。

总结

本文从 LangGraph 的核心概念出发,完整演示了如何构建一个支持中断恢复的智能审批 Agent:

  • 通过 TypedDict State 显式管理 Agent 共享数据

  • 通过 条件边 实现动态路由,根据风险等级决定执行路径

  • 通过 interrupt_before + Checkpointer 实现人机协作审批流

  • 通过 update_state + stream(None) 实现断点恢复

LangGraph 的图式编排模型让复杂 Agent 的状态流转变得可视化、可调试、可审计。随着 AI Agent 应用走向生产,掌握 LangGraph 已成为 AI 工程师的必备技能之一。建议结合官方文档和 LangSmith 进行进一步实践。

发布评论

热门评论区: