LangGraph实战:打造人在环(Human-in-the-Loop)智能调研Agent
LangGraph实战:打造人在环(Human-in-the-Loop)智能调研Agent
在AI自动化浪潮中,纯AI驱动的工作流往往难以应对复杂场景——比如行业深度调研、投资尽调、合规报告等需要精准决策、人工校验的场景。此时,「人在环(Human-in-the-Loop)」机制就成为了关键:让AI负责重复性的搜索、精读、内容生成,让人专注于审核、决策和优化建议,实现「AI效率+人工可靠性」的双重优势。
本文将基于LangGraph,手把手教你打造一个「人在环」的智能调研Agent,解决之前遇到的依赖导入、状态持久化等坑,最终实现「AI自动调研→人工审核→反馈迭代→最终报告」的全流程自动化。
核心亮点:
-
✅ 完整人在环逻辑:人工审核+反馈迭代+多轮优化
-
✅ 状态持久化:支持中断恢复,避免重复执行任务
-
✅ 多工具协同:Serper搜索+自定义网页精读,获取深度信息
一、前置知识与环境准备
1.1 依赖安装命令
执行以下命令,安装所有核心依赖(已适配最新版本,避坑首选):
# 升级LangGraph到最新版(内置MemorySaver)
pip install -U langgraph
# 核心依赖:大模型、工具、环境配置等
pip install langchain langchain-openai langchain-community python-dotenv serpapi requests beautifulsoup4
1.2 环境变量配置
创建.env文件,配置DeepSeek大模型和Serper搜索的API密钥(需自行注册获取):
# .env文件内容
DEEPSEEK_API_KEY="你的DeepSeek API Key"
DEEPSEEK_BASE_URL="https://api.deepseek.com"
SERPER_API_KEY="你的Serper API Key"
备注:DeepSeek用于逻辑决策和内容生成,Serper用于获取互联网实时信息,两者缺一不可。
二、核心代码实现(完整可运行)
以下代码整合了所有修正细节(重点解决MemorySaver导入、网页精读工具替代等问题),按模块解析,可直接复制运行。
2.1 导入核心模块
from typing import TypedDict, Annotated, List, Dict, Literal
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_core.messages import HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools import Tool
from langchain_community.utilities import GoogleSerperAPIWrapper
from langchain_core.prompts import PromptTemplate
from dotenv import load_dotenv
import os
import json
import operator
import requests
from bs4 import BeautifulSoup
# 加载环境变量(自动读取.env文件)
load_dotenv()
2.2 初始化核心组件
包含大模型、搜索工具、自定义网页精读工具(替代废弃的RequestsTool):
# 1. 初始化DeepSeek大模型(逻辑稳定优先,温度调低)
llm = ChatOpenAI(
model="deepseek-chat",
api_key=os.getenv("DEEPSEEK_API_KEY"),
base_url=os.getenv("DEEPSEEK_BASE_URL"),
temperature=0.1,
max_tokens=4096
)
# 2. 初始化Serper搜索工具(获取实时行业信息)
serper_search = GoogleSerperAPIWrapper(serper_api_key=os.getenv("SERPER_API_KEY"))
search_tool = Tool(
name="serper_search",
description="用于搜索互联网实时信息,输入具体子问题,输出搜索结果(标题、链接、摘要)",
func=serper_search.run
)
# 3. 自定义网页精读工具
def custom_read_webpage(url: str) -> str:
try:
# 模拟浏览器访问,避免被网页拦截
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # 抛出HTTP错误(如404、500)
# 解析网页,提取正文(去除无关标签)
soup = BeautifulSoup(response.text, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
# 截断过长内容,避免超出大模型上下文限制
text = soup.get_text(separator="
", strip=True)
return text[:8000] if len(text) > 8000 else text
except Exception as e:
return f"网页精读失败:{str(e)}(URL:{url})"
# 封装为LangChain标准工具,供大模型调用
read_tool = Tool(
name="read_webpage",
description="用于精读权威网页链接(政府官网、行业协会等),输入URL,输出正文纯文本",
func=custom_read_webpage
)
2.3 定义工作流状态(State)
状态是LangGraph的核心,用于存储工作流中所有关键信息,重点新增「人在环」相关字段:
class ResearchState(TypedDict):
main_question: str # 用户原始调研问题
sub_tasks: List[str] # 拆分后的子任务列表(如市场规模、竞争格局)
current_task_idx: int # 当前正在执行的子任务索引
completed_tasks: Dict[str, str] # 已完成的子任务及结果
tool_temp_result: str # 工具调用临时结果(搜索/精读内容)
report_draft: str # 生成的报告初稿
final_report: str # 人工审核通过后的最终报告
messages: Annotated[List[AIMessage], operator.add] # 消息历史(用于调试)
# 人在环核心字段(关键新增)
human_feedback: Literal["approve", "revise", "abort"] | None # 人工反馈:通过/修改/终止
revision_suggestion: str # 人工给出的具体修改建议
2.4 定义工作流节点(含人在环核心节点)
共9个节点,覆盖「任务拆解→搜索→精读→报告生成→人工审核→反馈优化」全流程,重点讲解人在环相关节点:
# 节点1:任务拆解(将复杂问题拆分为可执行子任务)
def split_tasks(state: ResearchState) -> ResearchState:
prompt = PromptTemplate(
template="""你是资深行业分析师,将用户复杂调研问题拆分为3-5个具体子任务,按逻辑顺序排列。
要求:子任务具体可量化,输出JSON列表(仅JSON,无其他内容),示例:["子任务1","子任务2"]
用户问题:{main_question}""",
input_variables=["main_question"]
)
response = llm.invoke(prompt.format(main_question=state["main_question"]))
# 异常处理:避免JSON解析失败
try:
sub_tasks = json.loads(response.content)
except json.JSONDecodeError:
sub_tasks = [
f"{state['main_question']} - 市场规模与核心数据",
f"{state['main_question']} - 竞争格局与头部玩家",
f"{state['main_question']} - 发展趋势与行业挑战"
]
return {
"sub_tasks": sub_tasks,
"current_task_idx": 0, # 从第一个子任务开始执行
"messages": [AIMessage(content=f"已拆分子任务:{sub_tasks}")]
}
# 节点2:执行搜索(根据当前子任务,调用Serper获取信息)
def search(state: ResearchState) -> ResearchState:
current_task = state["sub_tasks"][state["current_task_idx"]]
search_result = search_tool.invoke(current_task)
formatted_result = f"子任务「{current_task}」搜索结果:
{search_result}"
return {
"tool_temp_result": formatted_result,
"messages": [AIMessage(content=f"完成搜索:{formatted_result[:200]}...")]
}
# 节点3:执行精读(从搜索结果中选权威链接,获取详细信息)
def read(state: ResearchState) -> ResearchState:
search_result = state["tool_temp_result"]
# 让大模型筛选权威链接(优先政府、行业协会、头部企业官网)
select_prompt = PromptTemplate(
template="从搜索结果中选1-2个最权威的URL,仅输出URL,每行一个:{search_result}",
input_variables=["search_result"]
)
urls_response = llm.invoke(select_prompt.format(search_result=search_result))
urls = [u.strip() for u in urls_response.content.split("
") if u.startswith("https")]
# 精读每个链接,整合内容
read_content = []
for url in urls[:2]: # 最多精读2个,避免内容过长
content = read_tool.invoke(url)
read_content.append(f"精读链接 {url}:
{content[:1000]}...")
return {
"tool_temp_result": "
".join(read_content),
"messages": [AIMessage(content=f"完成精读:共处理{len(urls)}个链接")]
}
# 节点4:生成子任务结果(整合搜索/精读信息,生成结构化结果)
def generate_task_result(state: ResearchState) -> ResearchState:
current_task = state["sub_tasks"][state["current_task_idx"]]
tool_info = state["tool_temp_result"]
result_prompt = PromptTemplate(
template="基于参考信息,生成子任务详细结果,分点论述、标注数据来源。
子任务:{current_task}
参考信息:{tool_info}",
input_variables=["current_task", "tool_info"]
)
task_result = llm.invoke(result_prompt.format(current_task=current_task, tool_info=tool_info))
# 更新已完成任务字典
completed_tasks = state["completed_tasks"].copy()
completed_tasks[current_task] = task_result.content
return {
"completed_tasks": completed_tasks,
"tool_temp_result": "", # 重置临时结果,准备下一个子任务
"messages": [AIMessage(content=f"完成子任务:{current_task}")]
}
# 节点5:切换下一个子任务(更新索引,进入下一轮循环)
def next_task(state: ResearchState) -> ResearchState:
return {
"current_task_idx": state["current_task_idx"] + 1,
"messages": [AIMessage(content=f"切换至子任务索引:{state['current_task_idx'] + 1}")]
}
# 节点6:生成报告初稿(整合所有子任务结果,生成Markdown报告)
def generate_report(state: ResearchState) -> ResearchState:
main_question = state["main_question"]
completed_tasks = state["completed_tasks"]
# 构建报告框架
report_content = f"# 「{main_question}」深度调研报告
"
for task, result in completed_tasks.items():
report_content += f"## {task}
{result}
"
# 让大模型补充总结部分
summary_response = llm.invoke(report_content + "
## 总结
请补充综合总结:")
final_draft = report_content + summary_response.content
return {
"report_draft": final_draft,
"messages": [AIMessage(content="报告初稿生成完成,请人工审核")]
}
# 节点7:人工审核(人在环核心节点,暂停工作流,等待人工反馈)
def human_review(state: ResearchState) -> ResearchState:
# 打印报告初稿,提示人工介入
print("
" + "="*50 + " [人工审核节点] " + "="*50)
print("报告初稿:
", state["report_draft"])
print("="*110)
return {"messages": [AIMessage(content="工作流已暂停,等待人工反馈...")]}
# 节点8:处理人工反馈(根据反馈,决定工作流下一步走向)
def handle_human_feedback(state: ResearchState) -> ResearchState:
feedback = state["human_feedback"]
suggestion = state["revision_suggestion"]
if feedback == "revise":
# 人工要求修改:携带建议,进入优化节点
return {"messages": [AIMessage(content=f"收到修改建议:{suggestion},开始优化报告")]}
elif feedback == "approve":
# 人工审核通过:将初稿设为最终报告
return {
"final_report": state["report_draft"],
"messages": [AIMessage(content="人工审核通过,报告已定稿")]
}
else: # feedback == "abort"
# 人工终止任务:清空状态,结束工作流
return {
"final_report": "任务已被人工终止",
"messages": [AIMessage(content="任务已终止,未生成最终报告")]
}
# 节点9:报告优化(根据人工建议,优化报告初稿)
def optimize_report(state: ResearchState) -> ResearchState:
draft = state["report_draft"]
suggestion = state["revision_suggestion"]
optimize_prompt = PromptTemplate(
template="""根据人工修改建议,优化报告初稿,要求:
1. 严格按照建议修改,补充缺失内容、修正逻辑;
2. 优化Markdown格式,保持专业性;
3. 直接输出优化后的完整报告,无需额外说明。
修改建议:{suggestion}
报告初稿:{draft}""",
input_variables=["suggestion", "draft"]
)
optimized = llm.invoke(optimize_prompt.format(suggestion=suggestion, draft=draft))
return {
"report_draft": optimized.content, # 覆盖初稿,供再次审核
"messages": [AIMessage(content="报告优化完成,请再次人工审核")]
}
2.5 定义路由函数(控制工作流走向)
路由函数用于判断“下一步该执行哪个节点”,实现工作流的动态分支和循环:
# 路由1:判断子任务是否全部完成
def check_task_complete(state: ResearchState) -> str:
current_idx = state["current_task_idx"]
total_tasks = len(state["sub_tasks"])
# 若当前是最后一个子任务 → 生成报告;否则 → 执行下一个子任务
return "generate_report" if current_idx >= total_tasks - 1 else "next_task"
# 路由2:根据人工反馈,决定下一步行动
def decide_after_review(state: ResearchState) -> str:
feedback = state["human_feedback"]
if feedback == "revise":
return "optimize_report" # 修改 → 优化报告
elif feedback == "approve" or feedback == "abort":
return "end" # 通过/终止 → 结束工作流
2.6 构建并编译工作流
将节点和路由函数连接,形成完整工作流,启用状态持久化:
# 1. 初始化StateGraph,指定状态类型
workflow = StateGraph(ResearchState)
# 2. 注册所有节点(必须先注册,才能连接边)
workflow.add_node("split_tasks", split_tasks)
workflow.add_node("search", search)
workflow.add_node("read", read)
workflow.add_node("generate_task_result", generate_task_result)
workflow.add_node("next_task", next_task)
workflow.add_node("generate_report", generate_report)
workflow.add_node("human_review", human_review)
workflow.add_node("handle_human_feedback", handle_human_feedback)
workflow.add_node("optimize_report", optimize_report)
# 3. 设置工作流入口点(从任务拆解开始)
workflow.set_entry_point("split_tasks")
# 4. 连接边:子任务执行循环(拆解→搜索→精读→生成结果→判断是否完成)
workflow.add_edge("split_tasks", "search")
workflow.add_edge("search", "read")
workflow.add_edge("read", "generate_task_result")
# 条件边:生成子任务结果后,判断是否全部完成
workflow.add_conditional_edges(
"generate_task_result",
check_task_complete,
{"next_task": "next_task", "generate_report": "generate_report"}
)
workflow.add_edge("next_task", "search") # 下一个子任务 → 重新搜索
# 5. 连接边:人在环流程(生成报告→人工审核→反馈处理→动态分支)
workflow.add_edge("generate_report", "human_review")
workflow.add_edge("human_review", "handle_human_feedback")
# 条件边:根据人工反馈,决定优化还是结束
workflow.add_conditional_edges(
"handle_human_feedback",
decide_after_review,
{"optimize_report": "optimize_report", "end": END}
)
workflow.add_edge("optimize_report", "human_review") # 优化后 → 再次人工审核
# 6. 初始化检查点(状态持久化,支持中断恢复)
memory = MemorySaver()
# 7. 编译工作流(启用检查点,确保状态不丢失)
app = workflow.compile(checkpointer=memory)
2.7 运行工作流(含人工交互)
编写运行逻辑,实现“AI自动执行→人工反馈→继续执行”的交互流程:
if __name__ == "__main__":
# 唯一任务ID(用于关联检查点,实现中断恢复)
thread_id = "research_task_001"
# 工作流初始状态
initial_state = {
"main_question": "2025-2030年中国人工智能大模型行业发展深度调研",
"sub_tasks": [],
"current_task_idx": 0,
"completed_tasks": {},
"tool_temp_result": "",
"report_draft": "",
"final_report": "",
"messages": [],
"human_feedback": None,
"revision_suggestion": ""
}
# 第一步:AI自动执行调研流程(直到人工审核节点)
print("开始执行深度调研工作流...")
result = app.invoke(
initial_state,
config={"configurable": {"thread_id": thread_id}} # 关联任务ID,保存状态
)
# 第二步:人工输入反馈(核心交互环节)
print("
请输入人工反馈:")
feedback = input("输入 'approve'(通过)/ 'revise'(修改)/ 'abort'(终止):").strip().lower()
suggestion = ""
if feedback == "revise":
suggestion = input("请输入具体修改建议:").strip()
# 第三步:根据人工反馈,继续执行工作流
print("
继续执行工作流...")
result = app.invoke(
{"human_feedback": feedback, "revision_suggestion": suggestion},
config={"configurable": {"thread_id": thread_id}} # 复用任务ID,恢复状态
)
# 输出最终结果,并保存报告
print("
" + "="*100)
print("工作流执行完成,最终结果:")
print(result["final_report"])
print("="*100)
# 保存最终报告(若任务未终止)
if result["final_report"] != "任务已被人工终止":
with open("final_ai_research_report.md", "w", encoding="utf-8") as f:
f.write(result["final_report"])
print("
最终报告已保存至:final_ai_research_report.md")
三、运行流程演示(手把手操作)
运行代码后,整个工作流分为3个关键阶段,全程无需手动干预AI执行,只需在人工审核节点输入反馈即可:
3.1 阶段1:AI自动调研(无需人工干预)
代码运行后,AI会自动执行:
任务拆解 → 搜索(子任务1)→ 精读 → 生成子任务1结果 → 切换子任务2 → 搜索 → 精读 → 生成子任务2结果 → … → 生成报告初稿
3.2 阶段2:人工审核(核心交互)
AI生成报告初稿后,工作流暂停,打印报告内容,并提示输入反馈:
================================================== [人工审核节点] ==================================================
报告初稿:
# 「2025-2030年中国人工智能大模型行业发展深度调研」深度调研报告
...(报告内容省略)
=====================================================================================================================
请输入人工反馈:
输入 'approve'(通过)/ 'revise'(修改)/ 'abort'(终止):revise
请输入具体修改建议:补充2024年Q4市场规模数据,优化竞争格局分析的头部企业排序
3.3 阶段3:反馈迭代与最终报告
输入反馈后,工作流继续执行:
人工反馈处理 → 报告优化 → 再次进入人工审核节点 → 输入approve(通过)→ 生成最终报告 → 保存报告到本地
四、核心价值与扩展方向
4.1 核心价值
本文打造的人在环智能调研Agent,相比纯AI调研,优势在于:
-
可靠性更高:人工审核避免AI生成错误信息、遗漏关键内容;
-
灵活性更强:支持多轮反馈迭代,精准匹配人工需求;
-
效率更高:AI承担搜索、精读、排版等重复性工作,人专注决策;
-
可追溯:状态持久化,支持中断恢复,便于后续复盘和修改。
4.2 扩展方向(按需优化)
基于本文框架,可轻松扩展为更贴合业务的Agent:
-
多模态扩展:添加图表生成工具,为报告自动生成市场规模折线图、竞争格局柱状图;
-
多人工审核:新增权限控制,支持多人协同审核(如分析师审核→主管审核);
-
异步执行:结合消息队列(如RabbitMQ),实现工作流异步执行,人工反馈通过Web界面提交;
-
记忆增强:集成FAISS向量数据库,存储历史报告,支持报告内容检索和复用。
五、总结
LangGraph的核心是「状态+节点+边」,通过简单的节点定义和路由控制,就能实现复杂的人在环工作流。本文从实际踩坑出发,修正了LangGraph最新版本的依赖问题,完整实现了从AI自动调研到人工审核、反馈迭代的全流程,最终生成结构化报告。
该框架不仅适用于行业调研,还可迁移到投资尽调、政策分析、内容创作等需要「AI辅助+人工决策」的场景。只要掌握节点、状态和路由的核心逻辑,就能快速打造符合自身需求的人在环智能Agent。







