目录概述什么是 Special APICommand状态更新 流程控制的统一接口Context Schema运行时上下文注入Send动态分支与 Map-Reduce 模式总结对比1. 概述什么是 Special APILangGraph 在基础概念State、Node、Edge、Graph之上提供了三个特殊 API用于解决标准图模式难以处理的场景API解决的核心问题引入版本Command节点返回值中同时更新状态 控制流程去向1.0.xcontext_schema向节点传递不属于 State 的运行时依赖模型名、DB 连接、API Key1.0.xSend动态并行分支运行时根据 State 内容创建 N 个并行节点实例1.0.x这三个 API 让 LangGraph 从「固定的有向图」升级为「运行时自适应的动态图」。2. Command状态更新 流程控制的统一接口2.1 传统模式 vs Command 模式传统模式中节点的返回值只做一件事更新 State。流程控制完全由 Edge决定。# 传统节点只更新 State def my_node(state) - dict: return {result: some_value} ​ # 流程控制由边负责 graph.add_conditional_edges(my_node, routing_fn, {...})Command 模式中节点既更新 State又指定下一步去哪个节点from langgraph.types import Command ​ def my_node(state) - Command[AgentState]: return Command( update{messages: [(system, 处理完成)]}, # 状态更新 gotonext_node # 流程控制 )2.2 Command 的核心能力参数类型说明updatedict要合并到 State 中的字段同普通节点返回gotostr下一步要去哪个节点可以是 END关键洞察Command 把「更新状态」和「控制流程」两个职责打包成一个返回值使得中心路由节点可以在一处完成所有决策。2.3 完整示例多 Agent 路由from langgraph.types import Command from langgraph.graph import StateGraph, START, END ​ class AgentState(TypedDict): messages: Annotated[list, lambda x, y: x y] current_agent: str task_completed: bool ​ # 核心路由节点根据消息内容动态路由 def decision_agent(state: AgentState) - Command[AgentState]: # 情况1任务已完成 → 终止流程 if state[task_completed]: return Command( update{messages: [(system, 所有任务处理完成)]}, gotoEND ) ​ last_message state[messages][-1] last_msg_content last_message[1] ​ # 情况2数学任务 → 路由到数学代理 if 数学 in last_msg_content: return Command( update{ messages: [(system, 路由到数学代理)], current_agent: math_agent }, gotomath_agent ) # 情况3翻译任务 → 路由到翻译代理 elif 翻译 in last_msg_content: return Command( update{ messages: [(system, 路由到翻译代理)], current_agent: translation_agent }, gototranslation_agent ) # 情况4未识别 → 标记完成并结束 else: return Command( update{messages: [(system, 任务完成)], task_completed: True}, gotoEND )2.4 Command 的两种跳转方式方式一无条件跳转固定 goto业务节点执行完固定跳转到路由节点def math_agent(state: AgentState) - Command[AgentState]: result 2 2 4 return Command( update{ messages: [(assistant, f数学计算结果: {result})], task_completed: True }, gotodecision_agent # 做完固定回到路由节点 )方式二条件跳转结合条件判断路由节点根据 State 内容动态选择 goto 目标如上文的 decision_agent。2.5 执行流程START │ ▼ decision_agent ──(数学)──→ math_agent │ │ │ ▼ │ ←───(task_completed)─── decision_agent → END │ │──(翻译)──→ translation_agent │ │ │ ▼ │ ←───(task_completed)─── decision_agent → END │ └──(其他)──→ END2.6 测试输出【测试1数学任务】 START → decision_agent → math_agent(224) → decision_agent → END 最终: current_agentdecision_agent, task_completedTrue ​ 【测试2翻译任务】 START → decision_agent → translation_agent(Hello-你好) → decision_agent → END 最终: current_agentdecision_agent, task_completedTrue ​ 【测试3未识别任务】 START → decision_agent → (标记完成) → END 最终: current_agentuser, task_completedTrue2.7 Command 使用场景中心化路由一个路由节点负责所有分支决策工作流引擎每个步骤执行完后指定下一步防循环设计用task_completed标志配合 Command(gotoEND) 防止死循环多 Agent 编排Supervisor Agent 用 Command 动态分派任务3. Context Schema运行时上下文注入3.1 为什么需要 Context Schema标准 LangGraph 节点只能访问 State 中的数据。但有些信息不适合放在 State 中API Key、数据库连接字符串 → 敏感信息不应随 State 流转模型名称、配置参数 → 属于运行环境而非业务状态日志记录器、监控客户端 → 属于依赖注入Context Schema 解决了这个问题它允许在 invoke() 时传入额外的上下文对象节点通过runtime.context访问。3.2 三步使用法第一步定义上下文结构使用dataclass定义上下文类型from dataclasses import dataclass ​ dataclass class ContextSchema: model_name: str db_connection: str api_key: str第二步在构建图时指定 context_schemabuilder StateGraph(AgentState, context_schemaContextSchema)第三步节点函数接收 runtime 参数节点函数增加第二个参数runtime: Runtime[ContextSchema]from langgraph.runtime import Runtime ​ def process_message(state: AgentState, runtime: Runtime[ContextSchema]) - dict: # 通过 runtime.context 访问上下文 model_name runtime.context.model_name db_connection runtime.context.db_connection api_key runtime.context.api_key ​ print(f使用模型: {model_name}) response f使用 {model_name} 处理了您的请求已连接到 {db_connection} return {messages: [AIMessage(contentresponse)], response: response}第四步在 invoke() 时传递 contextcontext ContextSchema( model_namegpt-4-turbo, db_connectionpostgresql://user:***localhost:5432/orders_db, api_keysk-abc...3456 ) ​ result graph.invoke(initial_state, contextcontext)3.3 Context 与 State 的对比维度State状态Context上下文变更节点函数可以修改节点只能读不能修改传递在节点间自动流转通过invoke(context...)传入生命周期随图执行过程变化整次调用固定不变敏感信息可能被序列化/持久化仅运行时存在适合存放业务数据、用户输入配置、凭证、依赖3.4 完整执行流程invoke(initial_state, contextContextSchema) │ ▼ ┌─────────────────────────────────┐ │ process_message(state, runtime) │ │ ├── 读取 context.model_name │ │ ├── 读取 context.db_connection │ │ ├── 读取 context.api_key │ │ └── 返回更新状态 │ └──────────┬──────────────────────┘ ▼ ┌──────────────────────────────────┐ │ generate_response(state, runtime) │ │ ├── 读取 context.model_name │ │ └── 返回最终响应 │ └──────────┬───────────────────────┘ ▼ END3.5 测试输出初始状态: {messages: [HumanMessage(请帮我查询最新的订单信息)], response: } ​ 上下文信息: model_name: gpt-4-turbo db_connection: postgresql://user:***localhost:5432/orders_db api_key: sk-ab*** ​ 执行节点: process_message 用户消息: 请帮我查询最新的订单信息 使用的模型: gpt-4-turbo 数据库连接: postgresql://user:***localhost:5432/orders_db API密钥前缀: sk-ab*** ​ 执行节点: generate_response 使用模型 gpt-4-turbo 生成最终响应 ​ 最终响应: 使用 gpt-4-turbo 处理了您的请求已连接到 postgresql://user:***localhost:5432/orders_db ​ 这是使用 gpt-4-turbo 生成的完整响应。3.6 实用场景多环境切换dev/staging/prod 使用不同 DB 连接A/B 测试不同模型通过 context 传入不同 model_name依赖注入传入日志器、缓存客户端、监控工具多租户系统每个租户传入不同的 API Key 和配置4. Send动态分支与 Map-Reduce 模式4.1 为什么需要 Send普通条件边只能选择一个分支执行。但有些场景需要并行执行多个分支用户提问后同时搜索多个知识库给 N 个客户同时发邮件对列表中每个元素做相同的处理Send对象解决了这个问题它允许条件边返回多个 Send 对象每个 Send 指向同一个或不同节点并传入不同的 State 切片。4.2 Send 的工作机制条件边函数返回 List[Send] → 每个 Send 创建一个独立的节点实例并行执行from langgraph.types import Send ​ def map_subjects_to_jokes(state: AtguiguState) - List[Send]: subjects state[subjects] # [猫, 狗, 程序员] return [ Send(make_joke, {subject: subject}) # 每个主题创建一个 Send for subject in subjects ]每个Send(node, arg)包含node目标节点名称arg传给该节点实例的 State 数据可以是 State 的子集4.3 完整 Map-Reduce 示例from langgraph.types import Send from typing import Annotated, List ​ class AtguiguState(TypedDict): subjects: List[str] jokes: Annotated[List[str], lambda x, y: x y] # 自动合并 ​ # Map 阶段生成主题列表 def generate_subjects(state: AtguiguState) - dict: subjects [猫, 狗, 程序员] return {subjects: subjects} ​ # Map 阶段并行处理每个主题 def make_joke(state: AtguiguState) - dict: subject state.get(subject, 未知) jokes_map { 猫: 为什么猫不喜欢在线购物因为它们更喜欢实体店, 狗: 为什么狗不喜欢计算机因为它们害怕被鼠标咬, 程序员: 为什么程序员喜欢洗衣服因为他们在寻找bugs, } joke jokes_map.get(subject, 这是一个神秘笑话。) return {jokes: [joke]} ​ # 路由函数将主题列表展开为 Send 列表 def map_subjects_to_jokes(state: AtguiguState) - List[Send]: return [Send(make_joke, {subject: s}) for s in state[subjects]] ​ # 构建图 builder StateGraph(AtguiguState) builder.add_node(generate_subjects, generate_subjects) builder.add_node(make_joke, make_joke) ​ builder.add_edge(START, generate_subjects) ​ # 条件边返回 Send 列表 → 动态并行 builder.add_conditional_edges( generate_subjects, map_subjects_to_jokes # 返回 List[Send] ) ​ builder.add_edge(make_joke, END) graph builder.compile()4.4 执行流程START │ ▼ generate_subjects → subjects [猫, 狗, 程序员] │ ▼ map_subjects_to_jokes → 返回 3 个 Send 对象 │ ├── Send(make_joke, {subject: 猫}) ──→ make_joke(猫) ├── Send(make_joke, {subject: 狗}) ──→ make_joke(狗) ← 并行执行 └── Send(make_joke, {subject: 程序员}) ──→ make_joke(程序员) │ ▼ jokes 自动合并Reducer │ ▼ END4.5 测试输出生成主题列表: [猫, 狗, 程序员] 映射主题到joke任务: [猫, 狗, 程序员] 生成Send对象列表: [Send(make_joke, {subject: 猫}), Send(make_joke, {subject: 狗}), Send(make_joke, {subject: 程序员})] ​ 执行节点: make_joke处理主题: 猫 执行节点: make_joke处理主题: 狗 ← 注意并行执行 执行节点: make_joke处理主题: 程序员 ​ 最终结果: { subjects: [猫, 狗, 程序员], jokes: [ 为什么猫不喜欢在线购物因为它们更喜欢实体店, 为什么狗不喜欢计算机因为它们害怕被鼠标咬, 为什么程序员喜欢洗衣服因为他们在寻找bugs ] }4.6 Send 的关键注意点要点说明Reducer 必须支持合并因为多个 Send 实例返回的结果需要合并State 中对应的字段必须使用合适的 Reducer如operator.add、add_messages或自定义合并函数Send 的 arg 是 State 切片传给每个实例的 State 只包含该实例需要的数据不需要传整个 State动态数量Send 的数量在运行时才确定由条件边函数的返回值决定并行执行多个 Send 实例是并行执行的除非设置了最大并发限制节点间独立每个 Send 实例之间的 State 不共享各自独立处理4.7 Send vs 普通条件边对比维度普通条件边Send 条件边返回值类型str目标节点名List[Send]多个目标并行能力否走一个分支是可同时走 N 个分支动态数量固定映射运行时根据 State 内容决定数量适用场景if/else 分支决策批量处理、Map-Reduce4.8 实用场景批量数据处理数据列表 → 并行处理每个元素 → 合并结果多知识库检索查询 → 同时检索多个知识库 → 汇总结果多模型投票问题 → 同时询问多个模型 → 投票选出最佳答案并行工具调用Agent 决定同时调用多个工具 → 收集结果5. 总结对比5.1 三大 Special API 一览API核心功能类比解决什么问题Command返回值中同时包含updategoto函数返回 (结果, 下一步)中心化路由节点需要同时更新状态和指定流程context_schema运行时向节点注入非 State 依赖依赖注入 / 配置注入不希望放在 State 中的环境信息、凭证、配置Send条件边返回多个并行目标Map-Reduce 的 Map 阶段运行时动态创建 N 个并行执行实例5.2 建议的学习路线基础图模式 (State Node Edge) │ ▼ Command → 中心化路由 / 工作流引擎 (状态流程合一) │ ▼ context_schema → 生产部署 / 多环境 / 依赖注入 (运行时上下文) │ ▼ Send → 批量处理 / 并行检索 / Map-Reduce (动态并行分支)5.3 三种模式可以组合使用# Command context_schema Send 可以共存于同一个图中 class MyState(TypedDict): messages: Annotated[list, add_messages] tasks: List[str] ​ dataclass class MyContext: api_key: str model: str ​ # 节点用 Command 控制流程 def router(state: MyState, runtime: Runtime[MyContext]) - Command[MyState]: model runtime.context.model # 从上下文读取 if state[tasks]: # 用 Send 并行处理 return Command( update{messages: [(system, f使用 {model} 处理)]}, goto[Send(worker, {task: t}) for t in state[tasks]] ) return Command(gotoEND)