feat: track token usage per conversation turn (#1218)

* feat: track token usage per conversation turn

Add token usage tracking to the streaming API so consumers can monitor
cost per turn without additional API calls.

Changes:

1. _serialize_message now includes usage_metadata for AI messages in
   values events, exposing input_tokens/output_tokens/total_tokens
   from LangChain's native metadata.

2. stream() accumulates token usage across all AI messages in a turn
   and emits the cumulative totals in the end event:
   {usage: {input_tokens: N, output_tokens: N, total_tokens: N}}

3. Each messages-tuple AI event with text content now includes a
   per-message usage_metadata field for granular tracking.

This enables the frontend to display token consumption per turn,
support cost-aware UX, and let users monitor API spending.

10 tests added covering serialization passthrough and cumulative
aggregation logic.

Co-Authored-By: OpenClaw <noreply@openclaw.ai>

* fix: address Copilot review - use Mapping access for usage_metadata

- Replace getattr(usage, 'input_tokens', 0) with usage.get('input_tokens', 0)
  since LangChain usage_metadata is a dict, not an object
- Remove unused 'import pytest' (fixes Ruff F401)
- Add proper stream() integration tests for cumulative usage in end event
  and per-message usage_metadata in messages-tuple events

---------

Co-authored-by: Exploreunive <Exploreunive@users.noreply.github.com>
Co-authored-by: OpenClaw <noreply@openclaw.ai>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
haoliangxu
2026-03-21 10:29:52 +08:00
committed by GitHub
parent e69dc2961f
commit 06cba217c3
2 changed files with 327 additions and 6 deletions

View File

@@ -235,6 +235,8 @@ class DeerFlowClient:
d: dict[str, Any] = {"type": "ai", "content": msg.content, "id": getattr(msg, "id", None)}
if msg.tool_calls:
d["tool_calls"] = [{"name": tc["name"], "args": tc["args"], "id": tc.get("id")} for tc in msg.tool_calls]
if getattr(msg, "usage_metadata", None):
d["usage_metadata"] = msg.usage_metadata
return d
if isinstance(msg, ToolMessage):
return {
@@ -296,9 +298,10 @@ class DeerFlowClient:
StreamEvent with one of:
- type="values" data={"title": str|None, "messages": [...], "artifacts": [...]}
- type="messages-tuple" data={"type": "ai", "content": str, "id": str}
- type="messages-tuple" data={"type": "ai", "content": str, "id": str, "usage_metadata": {...}}
- type="messages-tuple" data={"type": "ai", "content": "", "id": str, "tool_calls": [...]}
- type="messages-tuple" data={"type": "tool", "content": str, "name": str, "tool_call_id": str, "id": str}
- type="end" data={}
- type="end" data={"usage": {"input_tokens": int, "output_tokens": int, "total_tokens": int}}
"""
if thread_id is None:
thread_id = str(uuid.uuid4())
@@ -310,6 +313,7 @@ class DeerFlowClient:
context = {"thread_id": thread_id}
seen_ids: set[str] = set()
cumulative_usage: dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
for chunk in self._agent.stream(state, config=config, context=context, stream_mode="values"):
messages = chunk.get("messages", [])
@@ -322,6 +326,13 @@ class DeerFlowClient:
seen_ids.add(msg_id)
if isinstance(msg, AIMessage):
# Track token usage from AI messages
usage = getattr(msg, "usage_metadata", None)
if usage:
cumulative_usage["input_tokens"] += usage.get("input_tokens", 0) or 0
cumulative_usage["output_tokens"] += usage.get("output_tokens", 0) or 0
cumulative_usage["total_tokens"] += usage.get("total_tokens", 0) or 0
if msg.tool_calls:
yield StreamEvent(
type="messages-tuple",
@@ -335,10 +346,14 @@ class DeerFlowClient:
text = self._extract_text(msg.content)
if text:
yield StreamEvent(
type="messages-tuple",
data={"type": "ai", "content": text, "id": msg_id},
)
event_data: dict[str, Any] = {"type": "ai", "content": text, "id": msg_id}
if usage:
event_data["usage_metadata"] = {
"input_tokens": usage.get("input_tokens", 0) or 0,
"output_tokens": usage.get("output_tokens", 0) or 0,
"total_tokens": usage.get("total_tokens", 0) or 0,
}
yield StreamEvent(type="messages-tuple", data=event_data)
elif isinstance(msg, ToolMessage):
yield StreamEvent(
@@ -362,7 +377,7 @@ class DeerFlowClient:
},
)
yield StreamEvent(type="end", data={})
yield StreamEvent(type="end", data={"usage": cumulative_usage})
def chat(self, message: str, *, thread_id: str | None = None, **kwargs) -> str:
"""Send a message and return the final text response.