【LangGraph】 源码剖析(三):PregelLoop 深度拆解——BSP 循环的源码级实现
【LangGraph】 源码剖析三PregelLoop 深度拆解——BSP 循环的源码级实现写在前面前两篇我们拆完了 LangGraph 的架构全貌和 Channel/Reducer 状态系统。今天我们进入 LangGraph 最核心、最精巧、也最容易被误解的组件——PregelLoop。它是整个框架的发动机负责把 StateGraph 的定义变成真正的执行流。很多人以为 LangGraph 的执行就是按顺序跑节点但实际上PregelLoop 实现了一个完整的BSPBulk Synchronous Parallel循环——每个 Superstep 都经历 Plan → Execute → Update → Checkpoint 四个阶段通过版本追踪机制决定哪些节点需要重新执行通过条件边实现路由和循环。理解了 PregelLoop你就理解了 LangGraph 为什么能做到并行执行、断点续跑和条件路由。 文章目录 一、PregelLoop 是什么一张图看懂全貌 二、四阶段生命周期Plan → Execute → Update → Checkpoint 三、版本追踪LangGraph 最精巧的设计⚡ 四、PregelRunner并行执行引擎 五、条件边路由循环是怎么实现的 六、ReAct Agent 完整走读6 个 Superstep 的旅程 七、系列预告 一、PregelLoop 是什么一张图看懂全貌1.1 PregelLoop 的定位PregelLoop 是 LangGraph 运行时的核心循环定义在langgraph/pregel/_loop.py中。它不是一个简单的 while 循环而是一个精心设计的状态机管理着从加载 Checkpoint到输出最终结果的完整生命周期。当你调用graph.invoke()或graph.stream()时控制流最终都会进入 PregelLoop。它的核心逻辑可以概括为一句话不断执行 Superstep直到没有新任务产生。1.2 PregelLoop 的入口# pregel/main.pyclassPregel(PregelProtocol):definvoke(self,input,configNone,*,stream_modevalues,...):# 1. 编译配置# 2. 创建 PregelLoop# 3. 运行循环asyncforchunkinself.stream(input,config,stream_modestream_mode):passreturnchunk# 返回最终结果defstream(self,input,configNone,*,stream_modevalues,...):# 委托给 PregelLooploopPregelLoop(...)yieldfromloop# 逐步 yield 中间结果PregelLoop 实现了__iter__/__next__协议每次next()调用执行一个 Superstepyield 该步骤的输出。这使得stream()可以自然地逐步返回结果而invoke()只需消费完所有步骤后返回最后一个。 二、四阶段生命周期Plan → Execute → Update → Checkpoint每个 Superstep 都严格遵循四个阶段这是 BSP 模型的核心约束2.1 Phase 1: Plan — 决定谁该执行prepare_next_tasks()是整个循环的大脑它通过比较 Channel 版本号来决定哪些节点需要重新执行。具体逻辑是遍历所有节点检查每个节点的触发 Channeltrigger channels如果某个 Channel 的当前版本 该节点上次见过的版本则该节点需要重新执行将需要执行的节点封装为 Task 对象加入任务列表# pregel/_algo.py (简化版)defprepare_next_tasks(channels,checkpoint,nodes):tasks[]fornode_name,node_configinnodes.items():fortrigger_chinnode_config[trigger_channels]:current_vercheckpoint[channel_versions].get(trigger_ch)seen_vercheckpoint[versions_seen].get(node_name,{}).get(trigger_ch)ifcurrent_ver!seen_ver:# 版本不同 → 需要执行tasks.append(Task(namenode_name,...))breakreturntasks这个设计的精妙之处在于它天然支持条件边。条件边函数的输出决定了写入哪些 Channel只有被写入的 Channel 版本才会递增从而只有下游节点会被触发。2.2 Phase 2: Execute — 并行执行所有就绪任务PregelRunner.tick()负责并行执行所有就绪的 Task。关键点并行执行多个 Task 通过 asyncio 或线程池并发运行读写隔离Task 执行时只能读取当前 Channel 的值不能看到其他 Task 的写入写入缓冲每个 Task 的写入先缓存在 Task 对象的writes列表中等 Phase 3 统一应用# pregel/_runner.py (简化版)classPregelRunner:asyncdeftick(self,tasks,channels,...):# 并行执行所有任务resultsawaitasyncio.gather(*[self._run_task(task,channels)fortaskintasks])# 每个任务的写入已缓存在 task.writes 中returnresults2.3 Phase 3: Update — 原子写入 Channelapply_writes()是状态一致性的守护者。它按确定性顺序task path 排序将所有 Task 的写入应用到 Channel按 task path 排序确保确定性将写入按 Channel 分组调用每个 Channel 的update()方法触发 Reducer 合并更新channel_versions被写入的 Channel 版本递增更新versions_seen记录每个节点见过的版本返回被更新的 Channel 集合供下一个 Superstep 的 Plan 阶段使用# pregel/_algo.py (简化版)defapply_writes(channels,tasks,checkpoint):updated_channelsset()# 按 task path 排序确保确定性fortaskinsorted(tasks,keylambdat:t.path):forchannel_name,valueintask.writes:channels[channel_name].update([value])checkpoint[channel_versions][channel_name]1updated_channels.add(channel_name)# 更新该节点见过的版本checkpoint[versions_seen][task.name]{ch:checkpoint[channel_versions][ch]forchinupdated_channels}returnupdated_channels2.4 Phase 4: Checkpoint — 保存快照 决定是否继续create_checkpoint()保存完整快照包括 Channel 值、版本信息和待执行任务。如果 Phase 3 更新了 Channel有新版本则回到 Phase 1 继续循环否则循环结束。 三、版本追踪LangGraph 最精巧的设计版本追踪是 PregelLoop 最精巧的设计也是理解 LangGraph 执行模型的关键。3.1 两个版本映射Checkpoint 维护两个版本映射映射含义示例channel_versions每个 Channel 的当前版本号{messages: v3, question: v1}versions_seen每个节点上次见过的各 Channel 版本{agent: {messages: v2}, tools: {messages: v1}}3.2 版本递增规则每次 Channel 被update()成功写入版本号递增LastValueChannel每次写入递增BinaryOperatorAggregateChannel每次非空写入递增TopicChannel每次有新值发布递增3.3 版本比较决定执行prepare_next_tasks的核心判断ifchannel_versions[ch]versions_seen[node_name].get(ch,-1):# Channel 已更新但节点还没见过 → 需要执行这个设计解决了三个关键问题① 避免重复执行节点已见过最新版本不会因无关更新重新执行。例如如果只有messagesChannel 被更新那么只依赖questionChannel 的节点不会重新执行。② 条件边路由路由函数的输出决定写入哪些 Channel只有被写入的 Channel 版本递增。例如条件边should_continue返回tools时只有messagesChannel 被写入版本递增tools节点被触发返回END时没有 Channel 被写入循环结束。③ 循环终止当没有 Channel 版本递增时没有节点需要执行循环自然结束。这比手动设置最大迭代次数更优雅——循环终止是状态驱动的而非计数器驱动的。⚡ 四、PregelRunner并行执行引擎PregelRunner 是 PregelLoop 的肌肉负责在 Phase 2 并行执行所有就绪的 Task。4.1 执行模型PregelRunner 支持两种执行模式模式说明适用场景asyncio使用asyncio.gather并发执行默认模式节点函数是 asyncthread pool使用线程池并行执行节点函数是 sync或需要 CPU 并行4.2 读写隔离PregelRunner 的关键约束是读写隔离Task 执行时只能读取当前 Channel 的值不能看到同一步其他 Task 的写入。这是 BSP 模型的核心保证——所有写入在同步屏障后统一应用。这意味着如果 Node A 和 Node B 在同一个 Superstep 中并行执行Node A 的写入对 Node B 不可见反之亦然。它们只能看到上一个 Superstep 结束时的 Channel 值。4.3 流式输出PregelRunner 还负责流式输出的拦截。当stream_modemessages时它通过StreamMessagesHandler拦截 LLM 的 token 级输出实现实时流式推送# 流式模式拦截 LLM tokenifstream_modemessages:handlerStreamMessagesHandler()# LLM 每生成一个 tokenhandler 就 yield 一次asyncfortokeninllm.astream(prompt,callbacks[handler]):yieldtoken 五、条件边路由循环是怎么实现的LangGraph 的条件边conditional_edge不是在节点执行后立即路由而是在apply_writes 阶段评估。这是理解循环的关键。5.1 条件边的执行时机# 添加条件边graph.add_conditional_edges(agent,should_continue,{tools:tools,END:END,})当agent节点执行完毕后它的写入被缓存在 Task 的writes列表中。在apply_writes阶段系统会调用should_continue(state)函数传入当前 State根据返回值如tools或END决定写入哪些 Channel只有被写入的 Channel 版本递增触发下游节点5.2 循环 条件边 版本追踪ReAct Agent 的循环agent → tools → agent → … → END就是通过条件边 版本追踪实现的agent执行后如果 LLM 返回了tool_call条件边路由到toolstools执行后写入messagesChannel版本递增prepare_next_tasks发现messages版本更新agent需要重新执行agent再次执行如果 LLM 不再返回tool_call条件边路由到END没有新 Channel 被写入循环结束 六、ReAct Agent 完整走读6 个 Superstep 的旅程让我们用一个完整的 ReAct Agent 示例走读 PregelLoop 的每一步Step 0: 用户输入用户调用graph.invoke({messages: [HumanMessage(北京天气如何)]})。PregelLoop 初始化 Channel将用户消息写入messagesChannel版本设为 v1。Step 1: agent 节点首次执行Plan:messagesv1 agent的 null →agent需要执行Execute: LLM 调用返回AIMessage(tool_callget_weather(北京))Update:messagesChannel 更新版本递增到 v2条件边:should_continue检测到tool_call→ 路由到toolsStep 2: tools 节点执行Plan:messagesv2 tools的 null →tools需要执行agent已见过 v2 → 不需要Execute: 调用get_weather(北京)返回ToolMessage(晴天 25°C)Update:messagesChannel 更新版本递增到 v3Step 3: agent 节点再次执行Plan:messagesv3 agent的 v2 →agent需要重新执行Execute: LLM 基于工具结果生成最终回答AIMessage(北京今天晴天25°C)Update:messagesChannel 更新版本递增到 v4条件边:should_continue检测到无tool_call→ 路由到ENDStep 4: 循环结束Plan: 条件边路由到 END没有更多节点需要执行循环结束返回最终 State关键观察agent 执行了 2 次Step 1 和 Step 3因为messagesChannel 被更新了 2 次tools 执行了 1 次Step 2只在有tool_call时触发条件边在 apply_writes 阶段评估不是在节点执行后立即路由每一步都是完整的 BSP 周期Plan → Execute → Update → Checkpoint如果中途断电从 Checkpoint 恢复后版本号机制确保只重跑未完成的步骤 七、系列预告第四篇也是最后一篇我们将拆解 LangGraph 的Checkpoint 与人机交互系统核心问题Checkpoint 的完整数据结构是什么样的MemorySaver / SqliteSaver / PostgresSaver 的区别和选型interrupt_before / interrupt_after 是怎么实现的Human-in-the-Loop 的断点续跑和时间旅行机制Command(goto…) 跳转到任意历史节点的原理关注我不要错过最终篇 总结速查卡PregelLoop 核心概念概念一句话解释PregelLoopBSP 循环的状态机不断执行 Superstep 直到无新任务Superstep一次完整的 Plan → Execute → Update → Checkpoint 周期prepare_next_tasks通过版本比较决定哪些节点需要重新执行PregelRunner.tick并行执行所有就绪 Task读写隔离apply_writes原子写入 Channel更新版本号确定性排序channel_versions每个 Channel 的当前版本号写入时递增versions_seen每个节点上次见过的各 Channel 版本四阶段速查阶段核心函数做了什么Planprepare_next_tasks版本比较 → 决定谁该执行ExecutePregelRunner.tick并行执行 → 缓存写入Updateapply_writes原子写入 → 版本递增Checkpointcreate_checkpoint保存快照 → 决定是否继续一句话总结PregelLoop 是 LangGraph 的发动机——每个 Superstep 经历 Plan版本比较决定谁执行→ Execute并行执行读写隔离→ Update原子写入版本递增→ Checkpoint保存快照决定继续四阶段。版本追踪机制是整个设计最精巧的部分channel_versions 记录世界状态versions_seen 记录节点认知两者的差异驱动循环前进。条件边不是跳转指令而是写入哪些 Channel的决策——写入触发版本递增版本递增触发节点执行这就是 LangGraph 循环的本质。参考链接LangGraph GitHub 仓库LangGraph Pregel 运行时文档DeepWiki: Pregel Execution EngineZEngineer: LangGraph Deep Analysis