fix: 修复多轮 Agent 调用中 token 用量累加问题 (alibaba#4488)#4518
Open
YuqiGuo105 wants to merge 1 commit intoalibaba:mainfrom
Open
fix: 修复多轮 Agent 调用中 token 用量累加问题 (alibaba#4488)#4518YuqiGuo105 wants to merge 1 commit intoalibaba:mainfrom
YuqiGuo105 wants to merge 1 commit intoalibaba:mainfrom
Conversation
## 根因分析
在多轮 Agent 工具调用场景中,用户观察到前端显示的 token 用量(约 4 万)
远低于 DashScope 实际计费(约 20 万)。原因是三个 Bug 协同导致:
1. **NodeExecutor**: 流式传输中 DashScope 的 streamUsage=true 最后一个
chunk 是 usage-only chunk(result==null,仅携带 usage 元数据),但原代码
的 filter() 在 doOnNext() 之前执行,导致这个 chunk 被直接丢弃,usage 丢失。
2. **AgentLlmNode**: 流式分支返回 Map.of("messages", msg),没有
_TOKEN_USAGE_ key,下游 GraphRunnerContext.findTokenUsageInDeltaState()
无法找到 usage 数据。
3. **GraphRunnerContext**: 每轮调用发现 _TOKEN_USAGE_ 时直接
this.tokenUsage = newUsage 覆盖旧值,多轮调用只保留最后一轮的 token 数。
## 修复内容
1. **NodeExecutor**: 在 filter() 之前添加 doOnNext() 捕获所有 chunk 的 usage
到 latestUsageRef;在 concatWith 完成时将捕获的 usage 应用到 StreamingOutput。
2. **AgentLlmNode**: 流式分支改用 HashMap,加入 _TOKEN_USAGE_ key
(EmptyUsage 占位符),确保下游能触发 usage 收集逻辑。
3. **GraphRunnerContext**: findTokenUsageInDeltaState() 改为调用
accumulateTokenUsage(),使用 DefaultUsage 累加各轮的 prompt/completion/total。
## 测试
新增 TokenUsageAccumulationTest(2 个端到端测试):
- testStreamingUsageOnlyChunkCaptured: 验证 usage-only chunk 被正确捕获
- testUsageAccumulationAcrossNodes: 验证多节点执行时 usage 正确累加
## 修复后的正确数据流(详细版)
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ 第 1 轮 LLM 调用 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ 1. DashScope API 返回流式响应 (streamUsage=true) │ │
│ │ chunk1: {content: "我需要调用", result: Output, usage: null} │ │
│ │ chunk2: {content: "search工具", result: Output, usage: null} │ │
│ │ chunk3: {content: null, result: null, usage: {5000,2000,7000}} │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ 2. NodeExecutor 处理流 【Fix 1】 │ │
│ │ │ │
│ │ AtomicReference<Usage> latestUsageRef = new AtomicReference<>(); │ │
│ │ │ │
│ │ .doOnNext(response -> { // ← 先执行,捕获所有 │ │
│ │ if (response.getMetadata().getUsage() != null) { │ │
│ │ latestUsageRef.set(response.getMetadata().getUsage()); │ │
│ │ } │ │
│ │ }) │ │
│ │ .filter(response -> response.getResult() != null) // ← 后过滤 │ │
│ │ │ │
│ │ chunk1: doOnNext(usage=null) → filter(✅通过) → 输出 │ │
│ │ chunk2: doOnNext(usage=null) → filter(✅通过) → 输出 │ │
│ │ chunk3: doOnNext(usage={5000,2000,7000}) → filter(❌丢弃) │ │
│ │ ↑ 但 usage 已保存到 latestUsageRef! │ │
│ │ │ │
│ │ 流完成时 concatWith: │ │
│ │ completionOutput.setUsage(latestUsageRef.get()) │ │
│ │ // usage = {prompt:5000, completion:2000, total:7000} │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ 3. AgentLlmNode 构建返回值 【Fix 2】 │ │
│ │ │ │
│ │ // 原代码: return Map.of("messages", msg); ← 不可变,无法加 key │ │
│ │ │ │
│ │ // 修复后: │ │
│ │ Map<String, Object> result = new HashMap<>(); │ │
│ │ result.put("messages", assistantMessage); │ │
│ │ result.put(outputKey, assistantMessage); // 如果有 outputKey │ │
│ │ result.put("_TOKEN_USAGE_", usage != null ? usage : EmptyUsage); │ │
│ │ │ │
│ │ 返回: {"messages": msg, "_TOKEN_USAGE_": {5000,2000,7000}} │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌──────────────────────────────────────────────────────────────────────┐ │
│ │ 4. GraphRunnerContext 累加 usage 【Fix 3】 │ │
│ │ │ │
│ │ findTokenUsageInDeltaState(deltaState): │ │
│ │ 遍历 keys: ["messages", "_TOKEN_USAGE_"] │ │
│ │ 发现 "_TOKEN_USAGE_" → 调用 accumulateTokenUsage() │ │
│ │ │ │
│ │ accumulateTokenUsage(incoming = {5000,2000,7000}): │ │
│ │ existing = this.tokenUsage // 第 1 轮时为 null │ │
│ │ accPrompt = 0 + 5000 = 5000 │ │
│ │ accCompletion = 0 + 2000 = 2000 │ │
│ │ accTotal = 0 + 7000 = 7000 │ │
│ │ this.tokenUsage = new DefaultUsage(5000, 2000, 7000) │ │
│ └──────────────────────────────────────────────────────────────────────┘ │
│ │
│ Context 状态: tokenUsage = {prompt:5000, completion:2000, total:7000} │
└─────────────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────────────┐
│ 第 2 轮 LLM 调用 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ DashScope 返回: usage = {prompt:8000, completion:3000, total:11000} │
│ ↓ │
│ NodeExecutor: latestUsageRef 捕获 {8000,3000,11000} │
│ ↓ │
│ AgentLlmNode: 返回 {"messages": msg, "_TOKEN_USAGE_": {8000,3000,11000}} │
│ ↓ │
│ GraphRunnerContext.accumulateTokenUsage(): │
│ existing = {5000, 2000, 7000} │
│ incoming = {8000, 3000, 11000} │
│ ───────────────────────────────── │
│ accPrompt = 5000 + 8000 = 13000 │
│ accCompletion = 2000 + 3000 = 5000 │
│ accTotal = 7000 + 11000 = 18000 │
│ this.tokenUsage = new DefaultUsage(13000, 5000, 18000) │
│ │
│ Context 状态: tokenUsage = {prompt:13000, completion:5000, total:18000} │
└─────────────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────────────┐
│ 第 3 轮 LLM 调用 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ DashScope 返回: usage = {prompt:6000, completion:2500, total:8500} │
│ ↓ │
│ NodeExecutor: latestUsageRef 捕获 {6000,2500,8500} │
│ ↓ │
│ AgentLlmNode: 返回 {"messages": msg, "_TOKEN_USAGE_": {6000,2500,8500}} │
│ ↓ │
│ GraphRunnerContext.accumulateTokenUsage(): │
│ existing = {13000, 5000, 18000} │
│ incoming = {6000, 2500, 8500} │
│ ───────────────────────────────── │
│ accPrompt = 13000 + 6000 = 19000 │
│ accCompletion = 5000 + 2500 = 7500 │
│ accTotal = 18000 + 8500 = 26500 │
│ this.tokenUsage = new DefaultUsage(19000, 7500, 26500) │
│ │
│ Context 状态: tokenUsage = {prompt:19000, completion:7500, total:26500} │
└─────────────────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────────────────┐
│ Agent 执行完成 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ GraphRunnerContext.getTokenUsage(): │
│ return {prompt: 19000, completion: 7500, total: 26500} │
│ │
│ ┌────────────────────────────────────────────────────────────────────┐ │
│ │ 最终对比 │ │
│ ├────────────────────────────────────────────────────────────────────┤ │
│ │ 指标 │ 修复前(Bug) │ 修复后 │ 实际 │ │
│ ├────────────────────────────────────────────────────────────────────┤ │
│ │ prompt_tokens │ 0 或 6000 │ 19,000 │ 19,000 │ │
│ │ completion_tokens │ 0 或 2500 │ 7,500 │ 7,500 │ │
│ │ total_tokens │ 0 或 8500 │ 26,500 │ 26,500 │ │
│ ├────────────────────────────────────────────────────────────────────┤ │
│ │ 前端显示 │ ~0 或 ~8500 │ 26,500 ✅ │ │ │
│ │ DashScope 计费 │ 26,500 │ 26,500 ✅ │ │ │
│ │ 差距 │ 3~∞ 倍 │ 0 │ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
```
Closes alibaba#4488
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
根因分析
在多轮 Agent 工具调用场景中,用户观察到前端显示的 token 用量(约 4 万)
远低于 DashScope 实际计费(约 20 万)。原因是三个 Bug 协同导致:
NodeExecutor: 流式传输中 DashScope 的 streamUsage=true 最后一个 chunk 是 usage-only chunk(result==null,仅携带 usage 元数据),但原代码 的 filter() 在 doOnNext() 之前执行,导致这个 chunk 被直接丢弃,usage 丢失。
AgentLlmNode: 流式分支返回 Map.of("messages", msg),没有 TOKEN_USAGE key,下游 GraphRunnerContext.findTokenUsageInDeltaState() 无法找到 usage 数据。
GraphRunnerContext: 每轮调用发现 TOKEN_USAGE 时直接 this.tokenUsage = newUsage 覆盖旧值,多轮调用只保留最后一轮的 token 数。
修复内容
NodeExecutor: 在 filter() 之前添加 doOnNext() 捕获所有 chunk 的 usage 到 latestUsageRef;在 concatWith 完成时将捕获的 usage 应用到 StreamingOutput。
AgentLlmNode: 流式分支改用 HashMap,加入 TOKEN_USAGE key (EmptyUsage 占位符),确保下游能触发 usage 收集逻辑。
GraphRunnerContext: findTokenUsageInDeltaState() 改为调用 accumulateTokenUsage(),使用 DefaultUsage 累加各轮的 prompt/completion/total。
测试
新增 TokenUsageAccumulationTest(2 个端到端测试):
修复后的正确数据流(详细版)
Closes #4488
Describe what this PR does / why we need it
Does this pull request fix one issue?
Describe how you did it
Describe how to verify it
Special notes for reviews