Human-in-the-loop
下面提供 LangGraph 中 Human-in-the-loop 的完整可运行示例,包含两种常见模式:
- 节点内主动中断(推荐,清晰)
- 条件边 + 外部控制(适用于更复杂的审批流程)
我会先给出一个最标准的实现方式,并详细解释每一步。
一、环境准备
pip install langgraph langchain langchain-openai
不需要实际调用 OpenAI 也可以演示,这里会用模拟函数代替 LLM 调用。
二、核心机制说明
LangGraph 实现 Human-in-the-loop 依赖两个特性:
- Checkpointer:在执行过程中持久化状态。
interrupt():在节点函数内调用,暂停图执行,等待用户提供输入。- 用户提供输入后,从断点恢复,
interrupt()会返回用户输入的值。
- 用户提供输入后,从断点恢复,
流程:
执行 → 遇到 interrupt() → 保存状态并抛出 GraphInterrupt 异常
↓
用户调用 .invoke(Command(resume=value)) 传入继续值
↓
从中断节点恢复执行,interrupt() 返回值 = 用户输入
三、完整代码示例:审核工作流
场景
一个内容生成 Agent,生成草稿后必须经过人工审核,审核通过才能发布,否则重新生成。
from typing import TypedDict, List
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command
# ---------- 状态定义 ----------
class AgentState(TypedDict):
draft: str # 生成的草稿内容
feedback: str # 人工反馈(如果有)
approved: bool # 是否审核通过
iteration: int # 当前重新生成的次数
# ---------- 节点函数 ----------
def generate_draft(state: AgentState):
"""模拟 LLM 生成文章草稿(根据反馈迭代改进)"""
iteration = state.get("iteration", 0)
feedback = state.get("feedback", "")
if iteration == 0:
draft = "人工智能正在改变世界。"
else:
draft = f"(根据反馈「{feedback}」修改)人工智能深刻改变世界,并带来新机遇。"
return {"draft": draft, "iteration": iteration + 1}
def human_review(state: AgentState):
"""人工审核节点:中断执行,等待用户输入"""
print(f"\n--- 待审核草稿 ---\n{state['draft']}\n------------------")
# 中断并等待用户输入
# 这里可以返回任意结构的数据,比如 {"approved": bool, "feedback": str}
user_input = interrupt(
prompt="请输入审核结果 (yes/no) 以及反馈意见,用 | 分隔,例如:yes|很棒"
)
# 解析用户输入
parts = user_input.split("|")
approved = parts[0].strip().lower() == "yes"
feedback = parts[1] if len(parts) > 1 else ""
return {"approved": approved, "feedback": feedback}
def should_publish_or_retry(state: AgentState):
"""条件边:根据审核结果决定下一步"""
if state["approved"]:
return "publish"
else:
# 限制重试次数,防止无限循环
if state["iteration"] >= 3:
return "publish" # 强制发布或进入其他处理
return "generate_draft"
def publish(state: AgentState):
"""发布节点"""
print(f"\n✅ 最终发布内容:{state['draft']}")
return {}
# ---------- 构建图 ----------
builder = StateGraph(AgentState)
builder.add_node("generate_draft", generate_draft)
builder.add_node("human_review", human_review)
builder.add_node("publish", publish)
builder.add_edge(START, "generate_draft")
builder.add_edge("generate_draft", "human_review")
builder.add_conditional_edges("human_review", should_publish_or_retry)
builder.add_edge("publish", END)
# 必须设置 checkpointer 以支持中断和恢复
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
# ---------- 执行与交互 ----------
# 每个执行实例需要唯一的 thread_id
config = {"configurable": {"thread_id": "user-123"}}
# 第一次调用:图会执行到 human_review 并中断
print("=== 第一次执行 ===")
for event in graph.stream({}, config=config, stream_mode="values"):
if "draft" in event:
print(f"当前草稿: {event['draft']}")
if "approved" in event:
print(f"审核结果: {event['approved']}")
# 此时程序暂停,等待用户输入。实际使用时,可以从命令行/Web表单获取输入。
# 模拟用户输入:
user_response = "no|内容太简单,请增加深度分析"
print("\n=== 恢复执行:提供用户输入 ===")
# 使用 Command(resume=...) 恢复
for event in graph.stream(
Command(resume=user_response),
config=config,
stream_mode="values"
):
if "draft" in event:
print(f"新草稿: {event['draft']}")
if "approved" in event:
print(f"最终审核: {event['approved']}")
# 如果第二次审核通过,则会自动发布;否则再次中断,可继续交互。
运行结果示例
=== 第一次执行 ===
当前草稿: 人工智能正在改变世界。
--- 待审核草稿 ---
人工智能正在改变世界。
------------------
=== 恢复执行:提供用户输入 ===
当前草稿: (根据反馈「内容太简单,请增加深度分析」修改)人工智能深刻改变世界,并带来新机遇。
--- 待审核草稿 ---
(根据反馈「内容太简单,请增加深度分析」修改)人工智能深刻改变世界,并带来新机遇。
------------------
(再次中断,等待审核...)
实际运行中,第二次恢复时可以继续输入
yes|...,图就会走到publish并结束。
四、另一种模式:使用 Command 与条件边手动控制
如果你希望中断点完全由外部逻辑决定(例如等待一个 Webhook),可以不使用 interrupt(),而是在图执行到某个节点后返回一个特殊标记,然后由调用方判断是否继续。
# 伪代码示意
def wait_for_approval(state):
return {"status": "waiting"} # 不调用 interrupt
# 编译时传 checkpointer
# 外部循环:
while True:
result = graph.invoke(..., config)
if result["status"] == "waiting":
user_input = get_user_input_from_ui()
graph.update_state(config, {"feedback": user_input, "status": "approved"})
continue
break
但这种模式不如 interrupt() 优雅,因为需要手动管理状态更新和循环。
五、关键要点总结
| 组件 | 作用 |
|---|---|
MemorySaver | 保存每一步的状态,支持恢复 |
interrupt() | 在节点内暂停,返回用户输入 |
Command(resume) | 恢复执行时携带用户数据 |
thread_id | 标识不同的会话,不同对话的状态相互隔离 |
注意事项
interrupt()只能在图编译时 启用了 checkpointer 的情况下工作。- 一个线程(thread_id)中只能有一个活跃的中断点。在恢复之前再次调用
stream/invoke会报错。 - 可以通过
graph.get_state(config)查看当前中断位置。
六、实际集成到 Web 应用(FastAPI 示例思路)
from fastapi import FastAPI
from langgraph.types import Command
app = FastAPI()
graph = ... # 编译后的图
@app.post("/start")
def start_session():
config = {"configurable": {"thread_id": "session-123"}}
# 启动执行,直到第一次中断
graph.invoke({}, config)
state = graph.get_state(config)
return {"interrupt_info": state.tasks[0].interrupts}
@app.post("/resume")
def resume_session(user_input: str):
config = {"configurable": {"thread_id": "session-123"}}
final_state = graph.invoke(Command(resume=user_input), config)
return final_state
现在你已经有了一个完整的 Human-in-the-loop 代码模板,可以直接复制运行,并按照自己的业务逻辑扩展。