Java 程序员第 23阶段:多 Agent 角色协同,实现复杂业务任务拆分执行
在单 Agent 模式下智能体独立完成从理解到执行的全流程。然而面对复杂业务场景——如贷款审批、订单履约、风控审核——单一 Agent 往往面临能力边界清晰、任务处理串行化、难以并行协作等问题。多 Agent 角色协同通过将不同职责封装为独立角色让它们分工协作、消息互通、结果汇总从而实现复杂任务的并行拆分与高效执行。一、为什么需要多 Agent 协同1.1 单 Agent 的局限性-能力单一一个大模型同时处理规划、查询、计算、审核容易产生指令干扰-串行瓶颈所有步骤顺序执行I/O 密集型操作如外部 API 调用严重拖慢整体耗时-维护困难将所有逻辑塞入一个 Prompt策略调整互相耦合难以单独优化某个环节-可扩展性差新增业务环节需要修改核心 Prompt容易引发回归风险1.2 多 Agent 协同的核心价值价值点说明**并行执行**独立子任务由不同 Agent 并发处理总耗时接近最长子任务而非累加**职责单一**每个 Agent 只需专注自身领域Prompt 更稳定效果更可控**灵活编排**调度者根据任务类型动态决定 Agent 组合流程可配置可扩展**容错隔离**单个 Agent 失败不影响其他任务结果兜底策略更清晰**团队协作**模拟人类团队中的专项负责 交接配合模式逻辑更接近真实业务流程二、多 Agent 协作的核心角色在多 Agent 体系中通常需要以下几种核心角色2.1 调度者Dispatcher / Orchestrator调度者是系统的中枢负责- 接收用户原始请求- 分析任务结构进行子任务拆分- 将子任务分配给对应的执行 Agent- 监控子任务执行状态- 收集结果并触发汇总流程调度者本身通常不执行具体业务逻辑而是管理者角色类似项目经理。2.2 执行者Executor执行者是任务的实际执行者每个执行者专注于特定类型的子任务- 调用外部 API 获取数据- 执行本地业务规则计算- 完成文件读写、数据库操作等工具调用执行者之间相互独立不直接通信状态通过调度者或共享存储传递。2.3 专家Specialist专家提供垂直领域的深度能力如-风控专家专精欺诈检测、信用评分-法律专家合同条款解读、合规检查-技术专家代码生成、SQL 优化建议专家可被多个执行者在需要时调用类似团队中的顾问角色。2.4 汇总者Aggregator汇总者负责- 收集各执行者返回的子任务结果- 按业务逻辑合并、排序、补全- 生成面向用户的统一输出- 处理结果冲突或缺失的兜底逻辑三、角色间的通信机制多 Agent 协作的核心难题之一是角色间如何通信。常见模式有以下几种3.1 直接通信Point-to-Point两个 Agent 之间直接建立通信通道一对一传递消息和数据。__INLINE_AgentA → AgentB: 发送任务指令AgentB → AgentA: 返回执行结果____INLINE_适用场景两个 Agent 之间存在强依赖需要实时同步反馈。3.2 共享内存Shared Memory / Context所有 Agent 读写同一份共享上下文Context或记忆Memory存储。____INLINE_java// Java 实现ConcurrentHashMap 作为共享内存MapString, Object sharedContext new ConcurrentHashMap();// 各 Agent 将结果写入共享上下文sharedContext.put(creditScore, creditResult);sharedContext.put(incomeVerified, incomeResult);// 汇总者从共享上下文读取所有结果Object credit sharedContext.get(creditScore);____INLINE_适用场景结果之间相对独立通过共享存储中转减少耦合。3.3 消息队列Message Queue使用 Kafka、RabbitMQ 等消息中间件Agent 之间通过异步消息进行通信。____INLINE_java// 生产者执行者发送结果消息kafkaTemplate.send(agent-results, taskId, resultJson);// 消费者汇总者订阅结果消息KafkaListener(topics agent-results)public void onResultReceived(ResultMessage msg) {collectedResults.put(msg.getTaskId(), msg.getResult());}____INLINE_适用场景高并发、分布式部署、需要事件溯源的复杂流程。3.4 事件驱动Event-Driven通过发布/订阅模式一个 Agent 的行为自动触发其他 Agent 的响应。____INLINE_java// Spring 事件驱动实现// 执行者发布事件applicationEventPublisher.publishEvent(new TaskCompletedEvent(this, taskResult));// 汇总者监听事件EventListenerpublic void onTaskCompleted(TaskCompletedEvent event) {results.add(event.getResult());if (results.size() expectedCount) {aggregator.mergeAndOutput(results);}}____INLINE_适用场景需要松耦合、多个 Agent 对同一事件有不同响应的场景。四、任务拆分与汇总的工程实践4.1 任务拆分策略任务拆分是将复杂请求变为可并行执行的子任务。常用策略按业务环节拆分将流程分为预处理 → 核心计算 → 后处理每个环节独立 Agent 负责。按数据维度拆分将数据分片如按用户、按地区每个分片由独立 Agent 并行处理最后合并。按能力类型拆分如将查询计算审核拆分为不同 Agent每个 Agent 调用不同的工具集。____INLINE_java// Java 实现任务拆分public ListSubTask split(MainTask mainTask) {ListSubTask subTasks new ArrayList();subTasks.add(new SubTask(credit_check, mainTask.getUserId(), CREDIT_CHECK_PROMPT));subTasks.add(new SubTask(income_verify, mainTask.getUserId(), INCOME_VERIFY_PROMPT));subTasks.add(new SubTask(risk_evaluate, mainTask.getUserId(), RISK_EVALUATE_PROMPT));return subTasks;}____INLINE_4.2 并行执行与结果收集使用 Java 的 __CompletableFuture__INLINE_ 实现并行执行____INLINE_java// 并行执行所有子任务CompletableFuture[] futures subTasks.stream().map(task - CompletableFuture.supplyAsync(() - executor.run(task), taskExecutor)).toArray(CompletableFuture[]::new);// 等待所有任务完成CompletableFuture.allOf(futures).join();// 收集结果ListTaskResult results Arrays.stream(futures).map(CompletableFuture::join).collect(Collectors.toList());____INLINE_其中 __taskExecutor是自定义的ThreadPoolTaskExecutor__INLINE_可根据 Agent 类型配置不同线程池____INLINE_javaBeanpublic TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(50);executor.setQueueCapacity(200);executor.setThreadNamePrefix(agent-executor-);executor.initialize();return executor;}____INLINE_4.3 结果汇总与兜底处理汇总者需要处理以下情况-全部成功按预设顺序合并结果-部分失败记录失败任务进行重试或使用兜底默认值-结果冲突如两个 Agent 对同一指标给出不同结论按优先级或置信度裁决____INLINE_javapublic AggregationResult merge(ListTaskResult results) {// 按依赖顺序排序results.sort(Comparator.comparing(TaskResult::getPriority));MapString, Object merged new HashMap();for (TaskResult result : results) {if (result.isSuccess()) {merged.put(result.getKey(), result.getValue());} else {// 失败时使用兜底值merged.put(result.getKey(), getFallback(result));}}return new AggregationResult(merged, evaluateConfidence(merged));}____INLINE_4.4 超时与重试策略每个子任务应设置独立的超时时间防止个别 Agent 卡顿拖垮整个流程____INLINE_javaCompletableFutureTaskResult future CompletableFuture.supplyAsync(() - executor.run(task)).orTimeout(30, TimeUnit.SECONDS) // 30秒超时.exceptionally(ex - {log.warn(Task {} failed: {}, task.getId(), ex.getMessage());return TaskResult.failure(task.getId(), ex.getMessage());});____INLINE_重试策略使用 Spring Retry 或自定义指数退避____INLINE_javaRetryable(maxAttempts 3, backoff Backoff(delay 1000, multiplier 2))public TaskResult runWithRetry(SubTask task) {return executor.run(task);}____INLINE_五、实战案例贷款审批多 Agent 协同5.1 业务场景用户提交贷款申请后端需要自动完成以下校验1.征信查询调三方征信接口获取用户信用分2.收入核验核实用户工资流水与纳税记录3.风控评估综合信用分、收入、负债比计算风险等级4.额度计算根据风险等级与收入确定贷款额度5.报告生成汇总所有结果生成审批建议5.2 Agent 角色配置Agent职责输入输出Dispatcher任务调度与汇总贷款申请审批结果CreditAgent征信查询用户ID信用分 逾期记录IncomeAgent收入核验用户ID 工资流水月均收入 纳税等级RiskAgent风控评估信用分 收入风险等级 建议额度ReportAgent报告生成各Agent结果完整审批报告5.3 核心代码实现____INLINE_javaServicepublic class LoanApproveService {Autowired private DispatcherAgent dispatcher;Autowired private CreditAgent creditAgent;Autowired private IncomeAgent incomeAgent;Autowired private RiskAgent riskAgent;Autowired private ReportAgent reportAgent;public ApprovalResult approve(LoanApplication app) {// ① 任务拆分ListSubTask tasks dispatcher.split(app);// ② 并行执行征信 收入可并行MapString, TaskResult results new ConcurrentHashMap();CountDownLatch latch new CountDownLatch(2);CompletableFuture.runAsync(() - {results.put(credit, creditAgent.check(app.getUserId()));latch.countDown();});CompletableFuture.runAsync(() - {results.put(income, incomeAgent.verify(app));latch.countDown();});latch.await(30, TimeUnit.SECONDS);// ③ 风控评估依赖前两步结果TaskResult riskResult riskAgent.evaluate((CreditResult) results.get(credit),(IncomeResult) results.get(income));// ④ 汇总输出return dispatcher.aggregate(results, riskResult);}}____INLINE_5.4 执行效果指标串行执行多 Agent 并行总耗时~4.5s各步骤累加~1.8s最长路径吞吐量1x~2.5x失败影响任一步骤失败全流程失败单 Agent 失败可局部重试六、架构设计的关键考量6.1 任务粒度控制子任务并非越细越好。过细的拆分带来- Agent 间通信开销增加- 调度复杂度上升- 上下文碎片化风险建议将任务拆分到一个 Agent 能独立完成一个完整子目标的粒度。6.2 依赖关系管理部分子任务之间存在依赖如风控依赖征信结果。调度者需要维护一张有向无环图DAG按层级推进执行____INLINE_Level 0: 无依赖任务可并行Level 1: 依赖 L0 的任务Level 2: 依赖 L1 的任务...____INLINE_6.3 状态一致性多 Agent 并行执行时需确保- 共享状态读写的线程安全__ConcurrentHashMap__INLINE_ 或分布式锁- 事件发布的事务性避免事件发出但本地事务回滚- 结果幂等性重试不会产生重复副作用6.4 可观测性每个 Agent 的输入输出、耗时、成功率都应纳入监控____INLINE_java// Agent 执行埋点Around(execution(com.example.agent.Agent.run(..)))public Object monitorAgent(ProceedingJoinPoint pjp) {String agentName pjp.getTarget().getClass().getSimpleName();long start System.currentTimeMillis();try {Object result pjp.proceed();metrics.record(agentName, success, System.currentTimeMillis() - start);return result;} catch (Exception e) {metrics.record(agentName, failure, System.currentTimeMillis() - start);throw e;}}____INLINE_七、总结多 Agent 角色协同是复杂业务任务拆分执行的核心架构模式。它通过调度者统一规划、执行者并行处理、专家提供深度能力、汇总者整合输出将原本串行的大任务分解为可独立、可并行、可组合的子任务。在 Java 后端实现中Spring 生态提供了丰富的基础设施__CompletableFuture支持异步并行、ApplicationEventPublisher支持事件驱动、Retryable支持重试策略、ThreadPoolTaskExecutor 支持资源隔离。结合合理的任务拆分策略与结果汇总机制可以构建出高效、可靠、可观测的多 Agent 协同系统。核心原则只有一条让每个 Agent 专注于一件事让调度者掌控全局节奏。作者洛水石作者洛水石