Agent Framework 定义流程节点以及节点的流式输出
今天我们开启Agent Framework Workflow系列当我们在构建 AI Agent 或多步骤自动化系统时工作通常不是一步完成的。而是一个任务可能会被拆分成多个独立步骤每个步骤单独处理的过程。在 Agent Framework 的 workflow 中执行器Executor是工作流里的基本处理单元。它接收输入执行一段具体逻辑然后输出结果给下一个执行器。比如一个任务可以通过工作流编排进行串联在执行过程中依次完成输入预处理、模型推理以及结果后处理等阶段。如何创建执行器我们定义两个 executor。第一个是UppercaseExecutor负责把输入字符串转换成大写internal sealed class UppercaseExecutor() : Executorstring, string(UppercaseExecutor) { public override ValueTaskstring HandleAsync( string message, IWorkflowContext context, CancellationToken cancellationToken default) ValueTask.FromResult(message.ToUpperInvariant()); }它继承自Executor这表示该执行器接收一个string类型输入并返回一个string类型结果。构造函数中的UppercaseExecutor是执行器 ID在后续流式事件输出中可以用来识别是哪一个执行器完成了处理。第二个执行器是ReverseTextExecutor负责反转字符串internal sealed class ReverseTextExecutor() : Executorstring, string(ReverseTextExecutor) { public override ValueTaskstring HandleAsync( string message, IWorkflowContext context, CancellationToken cancellationToken default) { return ValueTask.FromResult(string.Concat(message.Reverse())); } }如果输入是HELLO, WORLD!那么它会返回!DLROW ,OLLEH如何构建工作流当我们有了执行器之后就可以使用WorkflowBuilder将它们连接起来。示例中的核心代码如下UppercaseExecutor uppercase new(); ReverseTextExecutor reverse new(); WorkflowBuilder builder new(uppercase); builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); var workflow builder.Build();这里主要做了几件事完成了工作流的构建首先创建 UppercaseExecutor 和 ReverseTextExecutor 两个执行器实例分别保存为 uppercase 和 reverse然后以 uppercase 作为工作流起点在 uppercase 和 reverse 之间建立执行顺序使 UppercaseExecutor 的输出作为 ReverseTextExecutor 的输入最后指定 reverse 的输出作为整个 workflow 的最终结果并构建出 workflow 实例。其中builder.AddEdge(uppercase, reverse)表示上游执行器uppercase的输出会传递给下游执行器reverse。而.WithOutputFrom(reverse)表示reverse执行器的结果会作为工作流的输出。工作流的流式输出在Agent Framework Workflow中工作流不是等待全部执行完之后才返回而是使用的是流式输出我们接下来看一下流式工作流的原理await using StreamingRun run await InProcessExecution.RunStreamingAsync(workflow, input: Hello, World!); awaitforeach (WorkflowEvent evt in run.WatchStreamAsync()) { if (evt is ExecutorCompletedEvent executorCompleted) { Console.WriteLine(${executorCompleted.ExecutorId}: {executorCompleted.Data}); } elseif (evt is WorkflowErrorEvent workflowError) { Console.ForegroundColor ConsoleColor.Red; Console.Error.WriteLine(workflowError.Exception?.ToString() ?? Unknown workflow error occurred.); Console.ResetColor(); } elseif (evt is ExecutorFailedEvent executorFailed) { Console.ForegroundColor ConsoleColor.Red; Console.Error.WriteLine($Executor {executorFailed.ExecutorId} failed with {(executorFailed.Data null ? unknown error : $exception {executorFailed.Data})}.); Console.ResetColor(); } }这里的使用了RunStreamingAsync。它不会只返回最终结果而是返回一个StreamingRun对象。通过这个对象可以监听工作流执行过程中的事件流。接下来使用await foreach (WorkflowEvent evt in run.WatchStreamAsync())逐个读取事件。这里是最最核心的地方。监听执行器完成事件当某个执行器完成处理后会产生ExecutorCompletedEvent。示例代码中这样处理if (evt is ExecutorCompletedEvent executorCompleted) { Console.WriteLine(${executorCompleted.ExecutorId}: {executorCompleted.Data}); }这意味着每当一个 executor 完成时程序都会打印执行器 ID当前执行器输出的数据运行后可以看到如下输出我们不需要等到整个工作流全部结束才能知道中间发生了什么。每一步完成后都可以立即拿到对应事件。处理工作流错误除了正常完成事件示例中还处理了两类错误事件。第一类是WorkflowErrorEventelse if (evt is WorkflowErrorEvent workflowError) { Console.ForegroundColor ConsoleColor.Red; Console.Error.WriteLine( workflowError.Exception?.ToString() ?? Unknown workflow error occurred.); Console.ResetColor(); }这类事件表示整个工作流层面发生了错误。第二类是ExecutorFailedEventelse if (evt is ExecutorFailedEvent executorFailed) { Console.ForegroundColor ConsoleColor.Red; Console.Error.WriteLine( $Executor {executorFailed.ExecutorId} failed with ${(executorFailed.Data null ? unknown error : $exception {executorFailed.Data})}.); Console.ResetColor(); }这类事件表示某个具体 executor 执行失败。相比WorkflowErrorEventExecutorFailedEvent更具体因为它包含了失败的 executor ID。对于复杂工作流来说这对于排查问题非常有帮助。小结这一节我们介绍了Agent Framework Workflow的一些基础感念如何定义执行器Executor如何使用 WorkflowBuilder 构建工作流如何使用 RunStreamingAsync 启动流式工作流如何使用 WatchStreamAsync 监听工作流事件如何区分处理执行器完成事件、执行器失败事件和工作流错误事件源代码地址https://github.com/bingbing-gui/dotnet-agent-playbook/tree/master/src/ai-agent/Agent-Framework/31-WorkFlow-Executor