目录一、基础概念1.状态1State特性-Reducer规约器2.节点3.边二、LangGraph的高级特性1.Send机制[send(节点,参数)]2.Command节点跳转3.checkpointer持久化3.1持久化redis的使用4.Threads 会话id5.Configuration节点内部配置6.interupt中断7.Subgraph子图8.Graphviz图可视化9.ToolNode工具节点三、对图的细粒度控制1.人机交互2.retryRetryPolicy节点重试策略3.同一用户所有消息的持久化存储4.checkpoint持久化存储到数据库5.消息的摘要和删除RemoveMessage处理6.toolNode工具调用失败处理7.toolNode工具调用失败转到新的工具节点调用7.将图state状态注入给工具函数8.将图第三方存储和配置注入给工具函数9.工具函数中更新state一、基础概念核心围绕更新图的State1.状态定义的变量类全局共享的数据容器,独立的数据库1Reducer规约器把每次节点return的消息都追加到消息列表消息存储只在当前的图中有效对当前新的值进行返回并合并之后的值类似消息记录自定义规约器operator.add add_messages MessagesState class ChatState(TypedDict): messages: Annotated[Sequence[AnyMessage], operator.add]2.节点执行业务逻辑更改状态的可执行函数3.边连接节点间的流转规则可以为函数1正常边2条件边决定走向流程函数控制return “事件”根据“事件”“节点”跳转from langchain_core.messages import AnyMessage from langchain_core.runnables import RunnableConfig from typing_extensions import TypedDict from langgraph.graph import StateGraph, START, END #定义State class State(TypedDict): messages: list[AnyMessage] extra_field: int #定义Node def process_input_node(state: State, config: RunnableConfig): print(fprocess_input_node :{state}) user_id config.get(configurable, {}).get(user_id, default_user) print(fNode process_input_node processing for user: {user_id}) return {process_input: state[extra_field]2} #定义另一个Node def another_node(state: State): # 这个节点没有 config 参数 print(fanother_node :{state}) state[extra_field] 10 return {some_other_data: done} def node_a(state: State): print(Called A) print(fnode_a :{state}) return {extra_field: state[extra_field] 2} def node_b(state: State): print(Called B) print(fnode_b :{state}) return {extra_field: state[extra_field] 1} # 路由选择 def route_tools(state: State): # 判断是否为偶数最后一位二进制位为0表示偶数 if state[extra_field] 1 0: return tj_1 else: return tj_2 graph_builder StateGraph(State) graph_builder.add_node(processor, process_input_node) graph_builder.add_node(finalizer, another_node) graph_builder.add_node(node_a, node_a) graph_builder.add_node(node_b, node_b) graph_builder.add_edge(START, processor) #条件边:事件节点 graph_builder.add_conditional_edges(processor, route_tools, {tj_1: node_b, tj_2: node_a}) graph_builder.add_edge(node_a, END) graph_builder.add_edge(node_b, finalizer) # 从业务节点连接到结束节点 graph_builder.add_edge(finalizer, END) # 编译 graph graph_builder.compile() #打印{messages: [My, name], extra_field: 1}中间的业务节点对状态并没有贡献 print(graph.invoke({messages: [My, name], extra_field: 2}))二、LangGraph的高级特性Schema数据结构reducer归约器一直叠加消息1.Send节点跳转根据条件跳转到节点[send(节点别名,参数)]Map-Reduce扇出。将一个列表拆分成多个任务并行执行并行。同时启动多个节点或同一节点的多个实例。必须放在add_conditional_edges的返回结果中。import operator from typing import Annotated from typing import TypedDict from langgraph.graph import StateGraph from langgraph.types import Send from langgraph.graph import END, START class OverallState(TypedDict): # 属性 subjects: list[str] # 保存消息 jokes: Annotated[list[str], operator.add] # 条件节点 def continue_to_jokes(state: OverallState): # [ # Send(generate_joke, {subject: cats}), # Send(generate_joke, {subject: dogs}) # ] # send(节点别名, {临时参数: cats}) return [Send(generate_joke, {linshi_v: s}) for s in state[subjects]] def generate_joke(state: OverallState): v state[linshi_v] res fJoke about {v} return {jokes: [res]} workflows StateGraph(OverallState) workflows.add_node(generate_joke,generate_joke) # 条件边 workflows.add_conditional_edges(START, continue_to_jokes) workflows.add_edge(generate_joke, END) graph workflows.compile() # {subjects: [cats, dogs]}是因为state里面定义了这个属性字段 result graph.invoke({subjects: [cats, dogs]}) # 最终返回的结果也是定义的状态的内容 # {subjects: [cats, dogs], # jokes: [Joke about cats, Joke about dogs]} print(result)2.Command节点跳转动态路由。在节点内部决定下一步去哪、更新什么状态。串行/跳转。决定下一个执行的单一节点。直接更新全局状态update字段。直接放在节点函数的return中。**Command(resumexxx) 专门用来恢复interrupt()暂停的流程 **graphCommand.PARENT, # 指定跳转的父图 gotogoto, # 跳转到子图相应节点 resume # 给被暂停的interrupt节点传递数据 update{foo: value}, # 更新值的内容,value是占位符config1.state graph.get_state(config)到底拿到了什么当你执行这行代码时返回的state对象通常是StateSnapshot类型包含几个核心字段state.values: 当前全局状态字典即你的foo、user_feedback等数据。state.next:最关键的字段。它是一个元组Tuple记录了接下来要执行的节点名称。state.tasks: 当前正在处理或被中断的任务列表。state.config: 当前状态对应的配置包含checkpoint_id等。3.checkpointer持久化保存图中state的状态state更新一次保存一次编译的时候传入启用invoke调用的时候必须结合 config使用父图设置了检查点子图一样具有检查点参数config:配置metadata:与此检查点相关的元数据values:此时状态的通道next:图中下一个要执行的节点名称元组tasks:包含有关要执行的下个任务的RregelTask对象的元组3.1持久化在redis中的使用4.Threads_id 和session_idsession_id:LangChain 核心生态全组件通用,用户 / 客户端级「长期会话标识」thread_id:LangGraph 专属单次图执行级图的State的持久化典型业务场景客服机器人的工作流单个用户session_idcustomer_789发起多次咨询每次咨询触发一次 LangGraph 工作流执行# 配置双层标识session_id用户级 thread_id单次图执行级 config: RunnableConfig { configurable: { session_id: user_001, # 手动指定用户001的唯一会话标识 thread_id: joke_graph_run_001 # 手动指定本次笑话生成的图执行标识 } }5.Configuration节点内部配置特殊保留参数可用于切换某个大模型与图中传入的配置不同这个手动处理接收def _call_model(state: AgentState, config: RunnableConfig): print(graph.invoke({messages: [HumanMessage(content你是谁)]}, configconfig))6.interupt中断需要结合checkpointerconfigcommand使用写了interupt就得写两次调用第一次运行到节点等待第二次传递值得到数据在图中的某个节点使用interupt运行到该节点操作暂停调用这个图的时候需要通过command的resume传递一个消息给暂停的节点让图继续运行**Command(resumexxx)专门用来恢复interrupt()暂停的流程 **7.Subgraph子图父图把子图当做一个节点用一般有共享属性字段builder.add_node(node_2, subgraph)在父图里面调用子图使用一般没有共享属性字段response Subgraph.invoke({subBar: state[ParFoo]})8.Graphviz图可视化from IPython.display import Image, display 可视化图 #方式一需要单独安装graphviz # 方法从官网下载 # 访问 Graphviz官网 https://graphviz.org/download/ # 下载Windows版本的安装包 # 运行安装程序并按照提示完成安装 # 将Graphviz的bin目录添加到系统PATH环境变量中通常是C:\Program Files\Graphviz\bin # 并且请用try块包裹下面的语句因为执行会报错但是不影响图片的生成 try: display(Image(app.get_graph().draw_png(output_file_path./可视化图1.png))) except: pass #方式二不需要额外安装软件但是访问网址mermaid.ink非常容易失败开启科学上网比较容易成功 display(Image(app.get_graph().draw_mermaid_png(output_file_path./可视化图2.png)))9.ToolNode工具节点对工具进行实际操作使用toolnodellm.bind_toolsagent当工具操作完后还需要判断是否要继续调用工具# 做实际操作的 tool_node ToolNode([multiply,add]) # 这个是让模型知道有哪些工具可以用 model_with_tools llm.bind_tools([multiply,add])from typing import TypedDict from gradio import Image from langchain_core.prompts import PromptTemplate from langchain_core.tools import tool from langgraph.constants import START, END from langgraph.graph import MessagesState, StateGraph from langgraph.prebuilt import ToolNode from langchain_openai import ChatOpenAI import os from dotenv import load_dotenv load_dotenv() def get_client(): api_key os.getenv(ali_api_key) modelqwen-plus base_urlhttps://dashscope.aliyuncs.com/compatible-mode/v1 # langchain调用访问大模型 client ChatOpenAI(api_key api_key,base_url base_url,modelmodel) return client llm get_client() tool def add(a:int,b:int): 计算两数相加 return ab tool def mul(a:int,b:int): 两数相乘 return a*b # 定义工具节点调用工具 toolNode ToolNode([add,mul],name工具) # 让大模型知道有哪些工具使用 model_with_tools llm.bind_tools([add,mul]) def call_model(state: MessagesState): messages state[messages] response model_with_tools.invoke(messages) return {messages: [response]} def use_tool(state: MessagesState): messages state[messages] last_ai_message messages[-1] print(last_ai_message,last_ai_message) if last_ai_message.tool_calls: return toolNode return END builder StateGraph(MessagesState) builder.add_node(call_model, call_model) builder.add_node(toolNode, toolNode) builder.add_edge(START, call_model) # 条件边,判断是否需要一直调用工具 builder.add_conditional_edges(call_model, use_tool) # 如果调用了工具把调用工具后的结果再给大模型判断是否还需要调用工具 builder.add_edge(toolNode, call_model) graphbuilder.compile() res graph.invoke({messages: [{role: user, content: 102x2?}]}) # print(res) # from IPython.display import Image, display # try: # display(Image(graph.get_graph().draw_png(output_file_path./toolNode.png))) # except: # pass9.1 ToolNode工具调用失败后的处理工具可能失败的原因无效的参数外部api不可用、工具内部逻辑问题等如果不处理这些错误程序直接报错处理了会根据代码里面的内容模型返回友好提示通过handle_tool_errors True实现# 当输入长沙工具调用就失败了 tool def get_weather(location: str): 获取当前天气. print(location, location) if location SH: raise ValueError(输入查询必须是专有名词) elif location 上海: return 气温23度有雾. else: # 为什么不用return因为工具需要返回正确的答案 raise ValueError(无效输入.) # tool_node ToolNode([get_weather]) tool_node ToolNode([get_weather],handle_tool_errors True)9.2.toolNode工具调用失败转而使用新的工具节点当第一个工具调用失败后转而使用新的工具节点要求第一个工具节点里面设定规则比如调用错误后在toolmessage里面设置错误第二个工具匹配到这个错误开始使用工具10.retryRetryPolicy节点重试策略当节点里面执行异常后进行重试可以自定义重试的异常也可以所有异常都重试builder.add_node(query_database,query_database,retryRetryPolicy(retry_onsqlite3.OperationalError)) builder.add_node(model, call_model, retryRetryPolicy(max_attempts5)) def query_database(state): try: # 数据库逻辑执行出错会「自然抛出异常」 conn sqlite3.connect(test.db) conn.execute(SELECT * FROM table) return {messages: [{role: assistant, content: 查询成功}]} except Exception as e: # 这个except的触发时机框架完成所有重试3次后仍抛出异常时 # 此时重试已全部失败捕获后「无需重抛」自定义处理即可 error_info f数据库查询失败已重试{2}次原因{str(e)} # 返回失败状态让图继续执行后续节点不终止流程 return {messages: [{role: assistant, content: error_info}]} # 重试策略正常配置会完整执行 builder.add_node( query_database, query_database, retryRetryPolicy(retry_onException, max_attempts3) # 1次原始2次重试 )initial_interval第一次失败后“等几秒”再试。0.5 → 等 0.5 秒。backoff_factor每多失败一次等待时间乘以这个数。2.0 → 第二次 1 s第三次 2 s第四次 4 s …max_interval“最长能等多久”的上限防止无限翻倍。128 → 不管失败多少次最多等 128 s。max_attempts总共尝试次数含第一次。3 → 第一次 最多再重试 2 次。jitter是否加随机扰动避免所有节点同时重试造成“ thundering herd ”。True → 在计算出的等待时间上随机 ± 一点。retry_on只有这些异常才重试其余异常直接抛给用户。默认只认Exception的子集网络超时、5xx 等11.消息的摘要和删除RemoveMessage处理摘要发生在“把消息送进 LLM 之前”这一步。删除消息是根据删除消息列表里面对应id实现的维度RunnableWithMessageHistoryLangGraph 摘要模式存储内容完整消息列表逐条 Human/AI1 段摘要 最近 2 条消息Token 增长线性增加 → 易爆表几乎恒定只留最新句跨会话同session_id可续聊同thread_id可续聊信息丢失无仅保留摘要细节被丢弃实现方式链外层自动帮你拼消息图节点里手动总结 RemoveMessage适用场景短对话、快速接入长对话、Token 敏感、需断点续跑图的方法调用app.get_state(config)读取最新快照调试、手动改状态前app.update_state(config, values, *, as_nodeNone)手动写入/补丁状态删消息、注入摘要、人工干预app.stream(..., config)从快照断点继续跑断点续聊、交互式对话app.invoke(input, config)一次性跑完并返回终态脚本批量测试app.get_state_history(config)⭐遍历该线程所有历史快照回溯、审计、可视化app.wakeup(config)⭐唤醒被中断的图人机审批、异步回调app.search(*, thread_id, ...)⭐跨线程搜索状态后台运营查询12.将图state状态注入给工具函数注入状态InjectedState注入后工具函数就能正常的取值使用Annotated是 Python 3.9 自带的类型注解增强器Annotated[真正类型, 元数据1, 元数据2, ...]InjectedState 是 LangGraph 提供的一个标记常量含义 “这个参数不需要 LLM 填由框架把当前图的 state 整字典注入进来。# 存储方式 class State(AgentState): docs: List[str] tool def get_context(question: str, state: Annotated[dict, InjectedState]): 获取回答问题的相关背景. return \n\n.join(doc for doc in state[docs])13.将图第三方存储和配置注入给工具函数InjectedStore()RunnableConfig每次app.stream/app.invoke时传的config字典在图内部会被自动封装成 RunnableConfig 实例# 数据库方式 def get_context( question: str, config: RunnableConfig, # 参数1注入运行时配置 store: Annotated[BaseStore, InjectedStore()], # 参数2注入文档存储 ) - Tuple[str, List[Document]]: 获取回答问题的相关背景. # 从运行时配置中获取 user_id user_id config.get(configurable, {}).get(user_id) # 从注入的 store 中根据 user_id 搜索文档 docs [item.value[doc] for item in store.search((documents, user_id))] return \n\n.join(doc for doc in docs) # 返回拼接后的文档字符串 doc_store InMemoryStore() graph create_agent(model, tools, checkpointercheckpointer, storedoc_store)14.工具函数中更新state工具函数不是节点不能直接更新state通过工具id和return command实现# 自动注入tool_call的id tool_call_id: Annotated[str, InjectedToolCallId], return Command( update{ user_info: user_info, messages: [ ToolMessage( 成功查询用户信息, tool_call_idtool_call_id ) ], } )stream_mode的参数模式每帧推送的内容典型用途values整个状态对象的快照messages、user_info…全量调试、前端需要随时拿到完整上下文updates仅本次被改动的字段增量节省流量只拿变化messages仅消息数组里新增的那一条聊天 UI 逐字逐句渲染debug调试信息节点进出、异常等排查流程15.大量工具如何选择调用RAG思想16.Milddle ware中间件1.langchain中间件的作用控制和自定义Agent执行的每一步包括日志记录工具选择备选方案等2.预定义中间件Summarization摘要Human-in-the-loop人机交互PII detection处理个人身份信息3.自定义中间件基于装饰器的中间件before_agent / after_agent:整个智能体Agent开始/结束执行一次任务前/后仅运行一次before_model / after_model:在每次调用大模型之前都会运行 / 在每次拿到大模型的回复之后都会运行wrap_model_call- 包裹住了整个模型调用过程可以完全控制调用。wrap_tool_call- 包裹住了每次工具如查询数据库、调用API的执行过程。dynamic_prompt- 通常结合 before_model或 wrap_model_call使用在模型调用前生效。