feat: add Claude Code OAuth and Codex CLI as LLM providers (#1166)

* feat: add Claude Code OAuth and Codex CLI providers

Port of bytedance/deer-flow#1136 from @solanian's feat/cli-oauth-providers branch.\n\nCarries the feature forward on top of current main without the original CLA-blocked commit metadata, while preserving attribution in the commit message for review.

* fix: harden CLI credential loading

Align Codex auth loading with the current ~/.codex/auth.json shape, make Docker credential mounts directory-based to avoid broken file binds on hosts without exported credential files, and add focused loader tests.

* refactor: tighten codex auth typing

Replace the temporary Any return type in CodexChatModel._load_codex_auth with the concrete CodexCliCredential type after the credential loader was stabilized.

* fix: load Claude Code OAuth from Keychain

Match Claude Code's macOS storage strategy more closely by checking the Keychain-backed credentials store before falling back to ~/.claude/.credentials.json. Keep explicit file overrides and add focused tests for the Keychain path.

* fix: require explicit Claude OAuth handoff

* style: format thread hooks reasoning request

* docs: document CLI-backed auth providers

* fix: address provider review feedback

* fix: harden provider edge cases

* Fix deferred tools, Codex message normalization, and local sandbox paths

* chore: narrow PR scope to OAuth providers

* chore: remove unrelated frontend changes

* chore: reapply OAuth branch frontend scope cleanup

* fix: preserve upload guards with reasoning effort wiring

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
Purricane
2026-03-22 07:39:50 -07:00
committed by GitHub
parent e119dc74ae
commit 835ba041f8
12 changed files with 1546 additions and 0 deletions

View File

@@ -36,8 +36,35 @@ models:
- OpenAI (`langchain_openai:ChatOpenAI`)
- Anthropic (`langchain_anthropic:ChatAnthropic`)
- DeepSeek (`langchain_deepseek:ChatDeepSeek`)
- Claude Code OAuth (`deerflow.models.claude_provider:ClaudeChatModel`)
- Codex CLI (`deerflow.models.openai_codex_provider:CodexChatModel`)
- Any LangChain-compatible provider
CLI-backed provider examples:
```yaml
models:
- name: gpt-5.4
display_name: GPT-5.4 (Codex CLI)
use: deerflow.models.openai_codex_provider:CodexChatModel
model: gpt-5.4
supports_thinking: true
supports_reasoning_effort: true
- name: claude-sonnet-4.6
display_name: Claude Sonnet 4.6 (Claude Code OAuth)
use: deerflow.models.claude_provider:ClaudeChatModel
model: claude-sonnet-4-6
max_tokens: 4096
supports_thinking: true
```
**Auth behavior for CLI-backed providers**:
- `CodexChatModel` loads Codex CLI auth from `~/.codex/auth.json`
- The Codex Responses endpoint currently rejects `max_tokens` and `max_output_tokens`, so `CodexChatModel` does not expose a request-level token cap
- `ClaudeChatModel` accepts `CLAUDE_CODE_OAUTH_TOKEN`, `ANTHROPIC_AUTH_TOKEN`, `CLAUDE_CODE_OAUTH_TOKEN_FILE_DESCRIPTOR`, `CLAUDE_CODE_CREDENTIALS_PATH`, or plaintext `~/.claude/.credentials.json`
- On macOS, DeerFlow does not probe Keychain automatically. Use `scripts/export_claude_code_oauth.py` to export Claude Code auth explicitly when needed
To use OpenAI's `/v1/responses` endpoint with LangChain, keep using `langchain_openai:ChatOpenAI` and set:
```yaml

View File

@@ -0,0 +1,262 @@
"""Custom Claude provider with OAuth Bearer auth, prompt caching, and smart thinking.
Supports two authentication modes:
1. Standard API key (x-api-key header) — default ChatAnthropic behavior
2. Claude Code OAuth token (Authorization: Bearer header)
- Detected by sk-ant-oat prefix
- Requires anthropic-beta: oauth-2025-04-20,claude-code-20250219
Auto-loads credentials from explicit runtime handoff:
- $ANTHROPIC_API_KEY environment variable
- $CLAUDE_CODE_OAUTH_TOKEN or $ANTHROPIC_AUTH_TOKEN
- $CLAUDE_CODE_OAUTH_TOKEN_FILE_DESCRIPTOR
- $CLAUDE_CODE_CREDENTIALS_PATH
- ~/.claude/.credentials.json
"""
import logging
import time
from typing import Any
import anthropic
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import BaseMessage
logger = logging.getLogger(__name__)
MAX_RETRIES = 3
THINKING_BUDGET_RATIO = 0.8
class ClaudeChatModel(ChatAnthropic):
"""ChatAnthropic with OAuth Bearer auth, prompt caching, and smart thinking.
Config example:
- name: claude-sonnet-4.6
use: deerflow.models.claude_provider:ClaudeChatModel
model: claude-sonnet-4-6
max_tokens: 16384
enable_prompt_caching: true
"""
# Custom fields
enable_prompt_caching: bool = True
prompt_cache_size: int = 3
auto_thinking_budget: bool = True
retry_max_attempts: int = MAX_RETRIES
_is_oauth: bool = False
_oauth_access_token: str = ""
model_config = {"arbitrary_types_allowed": True}
def _validate_retry_config(self) -> None:
if self.retry_max_attempts < 1:
raise ValueError("retry_max_attempts must be >= 1")
def model_post_init(self, __context: Any) -> None:
"""Auto-load credentials and configure OAuth if needed."""
from pydantic import SecretStr
from deerflow.models.credential_loader import (
OAUTH_ANTHROPIC_BETAS,
is_oauth_token,
load_claude_code_credential,
)
self._validate_retry_config()
# Extract actual key value (SecretStr.str() returns '**********')
current_key = ""
if self.anthropic_api_key:
if hasattr(self.anthropic_api_key, "get_secret_value"):
current_key = self.anthropic_api_key.get_secret_value()
else:
current_key = str(self.anthropic_api_key)
# Try the explicit Claude Code OAuth handoff sources if no valid key.
if not current_key or current_key in ("your-anthropic-api-key",):
cred = load_claude_code_credential()
if cred:
current_key = cred.access_token
logger.info(f"Using Claude Code CLI credential (source: {cred.source})")
else:
logger.warning("No Anthropic API key or explicit Claude Code OAuth credential found.")
# Detect OAuth token and configure Bearer auth
if is_oauth_token(current_key):
self._is_oauth = True
self._oauth_access_token = current_key
# Set the token as api_key temporarily (will be swapped to auth_token on client)
self.anthropic_api_key = SecretStr(current_key)
# Add required beta headers for OAuth
self.default_headers = {
**(self.default_headers or {}),
"anthropic-beta": OAUTH_ANTHROPIC_BETAS,
}
# OAuth tokens have a limit of 4 cache_control blocks — disable prompt caching
self.enable_prompt_caching = False
logger.info("OAuth token detected — will use Authorization: Bearer header")
else:
if current_key:
self.anthropic_api_key = SecretStr(current_key)
# Ensure api_key is SecretStr
if isinstance(self.anthropic_api_key, str):
self.anthropic_api_key = SecretStr(self.anthropic_api_key)
super().model_post_init(__context)
# Patch clients immediately after creation for OAuth Bearer auth.
# This must happen after super() because clients are lazily created.
if self._is_oauth:
self._patch_client_oauth(self._client)
self._patch_client_oauth(self._async_client)
def _patch_client_oauth(self, client: Any) -> None:
"""Swap api_key → auth_token on an Anthropic SDK client for OAuth Bearer auth."""
if hasattr(client, "api_key") and hasattr(client, "auth_token"):
client.api_key = None
client.auth_token = self._oauth_access_token
def _get_request_payload(
self,
input_: Any,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> dict:
"""Override to inject prompt caching and thinking budget."""
payload = super()._get_request_payload(input_, stop=stop, **kwargs)
if self.enable_prompt_caching:
self._apply_prompt_caching(payload)
if self.auto_thinking_budget:
self._apply_thinking_budget(payload)
return payload
def _apply_prompt_caching(self, payload: dict) -> None:
"""Apply ephemeral cache_control to system and recent messages."""
# Cache system messages
system = payload.get("system")
if system and isinstance(system, list):
for block in system:
if isinstance(block, dict) and block.get("type") == "text":
block["cache_control"] = {"type": "ephemeral"}
elif system and isinstance(system, str):
payload["system"] = [
{
"type": "text",
"text": system,
"cache_control": {"type": "ephemeral"},
}
]
# Cache recent messages
messages = payload.get("messages", [])
cache_start = max(0, len(messages) - self.prompt_cache_size)
for i in range(cache_start, len(messages)):
msg = messages[i]
if not isinstance(msg, dict):
continue
content = msg.get("content")
if isinstance(content, list):
for block in content:
if isinstance(block, dict):
block["cache_control"] = {"type": "ephemeral"}
elif isinstance(content, str) and content:
msg["content"] = [
{
"type": "text",
"text": content,
"cache_control": {"type": "ephemeral"},
}
]
# Cache the last tool definition
tools = payload.get("tools", [])
if tools and isinstance(tools[-1], dict):
tools[-1]["cache_control"] = {"type": "ephemeral"}
def _apply_thinking_budget(self, payload: dict) -> None:
"""Auto-allocate thinking budget (80% of max_tokens)."""
thinking = payload.get("thinking")
if not thinking or not isinstance(thinking, dict):
return
if thinking.get("type") != "enabled":
return
if thinking.get("budget_tokens"):
return
max_tokens = payload.get("max_tokens", 8192)
thinking["budget_tokens"] = int(max_tokens * THINKING_BUDGET_RATIO)
def _generate(self, messages: list[BaseMessage], stop: list[str] | None = None, **kwargs: Any) -> Any:
"""Override with OAuth patching and retry logic."""
if self._is_oauth:
self._patch_client_oauth(self._client)
last_error = None
for attempt in range(1, self.retry_max_attempts + 1):
try:
return super()._generate(messages, stop=stop, **kwargs)
except anthropic.RateLimitError as e:
last_error = e
if attempt >= self.retry_max_attempts:
raise
wait_ms = self._calc_backoff_ms(attempt, e)
logger.warning(f"Rate limited, retrying attempt {attempt}/{self.retry_max_attempts} after {wait_ms}ms")
time.sleep(wait_ms / 1000)
except anthropic.InternalServerError as e:
last_error = e
if attempt >= self.retry_max_attempts:
raise
wait_ms = self._calc_backoff_ms(attempt, e)
logger.warning(f"Server error, retrying attempt {attempt}/{self.retry_max_attempts} after {wait_ms}ms")
time.sleep(wait_ms / 1000)
raise last_error
async def _agenerate(self, messages: list[BaseMessage], stop: list[str] | None = None, **kwargs: Any) -> Any:
"""Async override with OAuth patching and retry logic."""
import asyncio
if self._is_oauth:
self._patch_client_oauth(self._async_client)
last_error = None
for attempt in range(1, self.retry_max_attempts + 1):
try:
return await super()._agenerate(messages, stop=stop, **kwargs)
except anthropic.RateLimitError as e:
last_error = e
if attempt >= self.retry_max_attempts:
raise
wait_ms = self._calc_backoff_ms(attempt, e)
logger.warning(f"Rate limited, retrying attempt {attempt}/{self.retry_max_attempts} after {wait_ms}ms")
await asyncio.sleep(wait_ms / 1000)
except anthropic.InternalServerError as e:
last_error = e
if attempt >= self.retry_max_attempts:
raise
wait_ms = self._calc_backoff_ms(attempt, e)
logger.warning(f"Server error, retrying attempt {attempt}/{self.retry_max_attempts} after {wait_ms}ms")
await asyncio.sleep(wait_ms / 1000)
raise last_error
@staticmethod
def _calc_backoff_ms(attempt: int, error: Exception) -> int:
"""Exponential backoff with a fixed 20% buffer."""
backoff_ms = 2000 * (1 << (attempt - 1))
jitter_ms = int(backoff_ms * 0.2)
total_ms = backoff_ms + jitter_ms
if hasattr(error, "response") and error.response is not None:
retry_after = error.response.headers.get("Retry-After")
if retry_after:
try:
total_ms = int(retry_after) * 1000
except (ValueError, TypeError):
pass
return total_ms

View File

@@ -0,0 +1,212 @@
"""Auto-load credentials from Claude Code CLI and Codex CLI.
Implements two credential strategies:
1. Claude Code OAuth token from explicit env vars or an exported credentials file
- Uses Authorization: Bearer header (NOT x-api-key)
- Requires anthropic-beta: oauth-2025-04-20,claude-code-20250219
- Supports $CLAUDE_CODE_OAUTH_TOKEN, $CLAUDE_CODE_OAUTH_TOKEN_FILE_DESCRIPTOR, and $ANTHROPIC_AUTH_TOKEN
- Override path with $CLAUDE_CODE_CREDENTIALS_PATH
2. Codex CLI token from ~/.codex/auth.json
- Uses chatgpt.com/backend-api/codex/responses endpoint
- Supports both legacy top-level tokens and current nested tokens shape
- Override path with $CODEX_AUTH_PATH
"""
import json
import logging
import os
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# Required beta headers for Claude Code OAuth tokens
OAUTH_ANTHROPIC_BETAS = "oauth-2025-04-20,claude-code-20250219,interleaved-thinking-2025-05-14"
def is_oauth_token(token: str) -> bool:
"""Check if a token is a Claude Code OAuth token (not a standard API key)."""
return isinstance(token, str) and "sk-ant-oat" in token
@dataclass
class ClaudeCodeCredential:
"""Claude Code CLI OAuth credential."""
access_token: str
refresh_token: str = ""
expires_at: int = 0
source: str = ""
@property
def is_expired(self) -> bool:
if self.expires_at <= 0:
return False
return time.time() * 1000 > self.expires_at - 60_000 # 1 min buffer
@dataclass
class CodexCliCredential:
"""Codex CLI credential."""
access_token: str
account_id: str = ""
source: str = ""
def _resolve_credential_path(env_var: str, default_relative_path: str) -> Path:
configured_path = os.getenv(env_var)
if configured_path:
return Path(configured_path).expanduser()
return Path.home() / default_relative_path
def _load_json_file(path: Path, label: str) -> dict[str, Any] | None:
if not path.exists():
logger.debug(f"{label} not found: {path}")
return None
if path.is_dir():
logger.warning(f"{label} path is a directory, expected a file: {path}")
return None
try:
return json.loads(path.read_text())
except (json.JSONDecodeError, OSError) as e:
logger.warning(f"Failed to read {label}: {e}")
return None
def _read_secret_from_file_descriptor(env_var: str) -> str | None:
fd_value = os.getenv(env_var)
if not fd_value:
return None
try:
fd = int(fd_value)
except ValueError:
logger.warning(f"{env_var} must be an integer file descriptor, got: {fd_value}")
return None
try:
secret = Path(f"/dev/fd/{fd}").read_text().strip()
except OSError as e:
logger.warning(f"Failed to read {env_var}: {e}")
return None
return secret or None
def _credential_from_direct_token(access_token: str, source: str) -> ClaudeCodeCredential | None:
token = access_token.strip()
if not token:
return None
return ClaudeCodeCredential(access_token=token, source=source)
def _iter_claude_code_credential_paths() -> list[Path]:
paths: list[Path] = []
override_path = os.getenv("CLAUDE_CODE_CREDENTIALS_PATH")
if override_path:
paths.append(Path(override_path).expanduser())
default_path = Path.home() / ".claude/.credentials.json"
if not paths or paths[-1] != default_path:
paths.append(default_path)
return paths
def _extract_claude_code_credential(data: dict[str, Any], source: str) -> ClaudeCodeCredential | None:
oauth = data.get("claudeAiOauth", {})
access_token = oauth.get("accessToken", "")
if not access_token:
logger.debug("Claude Code credentials container exists but no accessToken found")
return None
cred = ClaudeCodeCredential(
access_token=access_token,
refresh_token=oauth.get("refreshToken", ""),
expires_at=oauth.get("expiresAt", 0),
source=source,
)
if cred.is_expired:
logger.warning("Claude Code OAuth token is expired. Run 'claude' to refresh.")
return None
return cred
def load_claude_code_credential() -> ClaudeCodeCredential | None:
"""Load OAuth credential from explicit Claude Code handoff sources.
Lookup order:
1. $CLAUDE_CODE_OAUTH_TOKEN or $ANTHROPIC_AUTH_TOKEN
2. $CLAUDE_CODE_OAUTH_TOKEN_FILE_DESCRIPTOR
3. $CLAUDE_CODE_CREDENTIALS_PATH
4. ~/.claude/.credentials.json
Exported credentials files contain:
{
"claudeAiOauth": {
"accessToken": "sk-ant-oat01-...",
"refreshToken": "sk-ant-ort01-...",
"expiresAt": 1773430695128,
"scopes": ["user:inference", ...],
...
}
}
"""
direct_token = os.getenv("CLAUDE_CODE_OAUTH_TOKEN") or os.getenv("ANTHROPIC_AUTH_TOKEN")
if direct_token:
cred = _credential_from_direct_token(direct_token, "claude-cli-env")
if cred:
logger.info("Loaded Claude Code OAuth credential from environment")
return cred
fd_token = _read_secret_from_file_descriptor("CLAUDE_CODE_OAUTH_TOKEN_FILE_DESCRIPTOR")
if fd_token:
cred = _credential_from_direct_token(fd_token, "claude-cli-fd")
if cred:
logger.info("Loaded Claude Code OAuth credential from file descriptor")
return cred
override_path = os.getenv("CLAUDE_CODE_CREDENTIALS_PATH")
override_path_obj = Path(override_path).expanduser() if override_path else None
for cred_path in _iter_claude_code_credential_paths():
data = _load_json_file(cred_path, "Claude Code credentials")
if data is None:
continue
cred = _extract_claude_code_credential(data, "claude-cli-file")
if cred:
source_label = "override path" if override_path_obj is not None and cred_path == override_path_obj else "plaintext file"
logger.info(f"Loaded Claude Code OAuth credential from {source_label} (expires_at={cred.expires_at})")
return cred
return None
def load_codex_cli_credential() -> CodexCliCredential | None:
"""Load credential from Codex CLI (~/.codex/auth.json)."""
cred_path = _resolve_credential_path("CODEX_AUTH_PATH", ".codex/auth.json")
data = _load_json_file(cred_path, "Codex CLI credentials")
if data is None:
return None
tokens = data.get("tokens", {})
if not isinstance(tokens, dict):
tokens = {}
access_token = data.get("access_token") or data.get("token") or tokens.get("access_token", "")
account_id = data.get("account_id") or tokens.get("account_id", "")
if not access_token:
logger.debug("Codex CLI credentials file exists but no token found")
return None
logger.info("Loaded Codex CLI credential")
return CodexCliCredential(
access_token=access_token,
account_id=account_id,
source="codex-cli",
)

View File

@@ -61,6 +61,22 @@ def create_chat_model(name: str | None = None, thinking_enabled: bool = False, *
if not model_config.supports_reasoning_effort and "reasoning_effort" in kwargs:
del kwargs["reasoning_effort"]
# For Codex Responses API models: map thinking mode to reasoning_effort
from deerflow.models.openai_codex_provider import CodexChatModel
if issubclass(model_class, CodexChatModel):
# The ChatGPT Codex endpoint currently rejects max_tokens/max_output_tokens.
model_settings_from_config.pop("max_tokens", None)
# Use explicit reasoning_effort from frontend if provided (low/medium/high)
explicit_effort = kwargs.pop("reasoning_effort", None)
if not thinking_enabled:
model_settings_from_config["reasoning_effort"] = "none"
elif explicit_effort and explicit_effort in ("low", "medium", "high", "xhigh"):
model_settings_from_config["reasoning_effort"] = explicit_effort
elif "reasoning_effort" not in model_settings_from_config:
model_settings_from_config["reasoning_effort"] = "medium"
model_instance = model_class(**kwargs, **model_settings_from_config)
if is_tracing_enabled():

View File

@@ -0,0 +1,396 @@
"""Custom OpenAI Codex provider using ChatGPT Codex Responses API.
Uses Codex CLI OAuth tokens with chatgpt.com/backend-api/codex/responses endpoint.
This is the same endpoint that the Codex CLI uses internally.
Supports:
- Auto-load credentials from ~/.codex/auth.json
- Responses API format (not Chat Completions)
- Tool calling
- Streaming (required by the endpoint)
- Retry with exponential backoff
"""
import json
import logging
import time
from typing import Any
import httpx
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage, ToolMessage
from langchain_core.outputs import ChatGeneration, ChatResult
from deerflow.models.credential_loader import CodexCliCredential, load_codex_cli_credential
logger = logging.getLogger(__name__)
CODEX_BASE_URL = "https://chatgpt.com/backend-api/codex"
MAX_RETRIES = 3
class CodexChatModel(BaseChatModel):
"""LangChain chat model using ChatGPT Codex Responses API.
Config example:
- name: gpt-5.4
use: deerflow.models.openai_codex_provider:CodexChatModel
model: gpt-5.4
reasoning_effort: medium
"""
model: str = "gpt-5.4"
reasoning_effort: str = "medium"
retry_max_attempts: int = MAX_RETRIES
_access_token: str = ""
_account_id: str = ""
model_config = {"arbitrary_types_allowed": True}
@property
def _llm_type(self) -> str:
return "codex-responses"
def _validate_retry_config(self) -> None:
if self.retry_max_attempts < 1:
raise ValueError("retry_max_attempts must be >= 1")
def model_post_init(self, __context: Any) -> None:
"""Auto-load Codex CLI credentials."""
self._validate_retry_config()
cred = self._load_codex_auth()
if cred:
self._access_token = cred.access_token
self._account_id = cred.account_id
logger.info(f"Using Codex CLI credential (account: {self._account_id[:8]}...)")
else:
raise ValueError("Codex CLI credential not found. Expected ~/.codex/auth.json or CODEX_AUTH_PATH.")
super().model_post_init(__context)
def _load_codex_auth(self) -> CodexCliCredential | None:
"""Load access_token and account_id from Codex CLI auth."""
return load_codex_cli_credential()
@classmethod
def _normalize_content(cls, content: Any) -> str:
"""Flatten LangChain content blocks into plain text for Codex."""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = [cls._normalize_content(item) for item in content]
return "\n".join(part for part in parts if part)
if isinstance(content, dict):
for key in ("text", "output"):
value = content.get(key)
if isinstance(value, str):
return value
nested_content = content.get("content")
if nested_content is not None:
return cls._normalize_content(nested_content)
try:
return json.dumps(content, ensure_ascii=False)
except TypeError:
return str(content)
try:
return json.dumps(content, ensure_ascii=False)
except TypeError:
return str(content)
def _convert_messages(self, messages: list[BaseMessage]) -> tuple[str, list[dict]]:
"""Convert LangChain messages to Responses API format.
Returns (instructions, input_items).
"""
instructions_parts: list[str] = []
input_items = []
for msg in messages:
if isinstance(msg, SystemMessage):
content = self._normalize_content(msg.content)
if content:
instructions_parts.append(content)
elif isinstance(msg, HumanMessage):
content = self._normalize_content(msg.content)
input_items.append({"role": "user", "content": content})
elif isinstance(msg, AIMessage):
if msg.content:
content = self._normalize_content(msg.content)
input_items.append({"role": "assistant", "content": content})
if msg.tool_calls:
for tc in msg.tool_calls:
input_items.append(
{
"type": "function_call",
"name": tc["name"],
"arguments": json.dumps(tc["args"]) if isinstance(tc["args"], dict) else tc["args"],
"call_id": tc["id"],
}
)
elif isinstance(msg, ToolMessage):
input_items.append(
{
"type": "function_call_output",
"call_id": msg.tool_call_id,
"output": self._normalize_content(msg.content),
}
)
instructions = "\n\n".join(instructions_parts) or "You are a helpful assistant."
return instructions, input_items
def _convert_tools(self, tools: list[dict]) -> list[dict]:
"""Convert LangChain tool format to Responses API format."""
responses_tools = []
for tool in tools:
if tool.get("type") == "function" and "function" in tool:
fn = tool["function"]
responses_tools.append(
{
"type": "function",
"name": fn["name"],
"description": fn.get("description", ""),
"parameters": fn.get("parameters", {}),
}
)
elif "name" in tool:
responses_tools.append(
{
"type": "function",
"name": tool["name"],
"description": tool.get("description", ""),
"parameters": tool.get("parameters", {}),
}
)
return responses_tools
def _call_codex_api(self, messages: list[BaseMessage], tools: list[dict] | None = None) -> dict:
"""Call the Codex Responses API and return the completed response."""
instructions, input_items = self._convert_messages(messages)
payload = {
"model": self.model,
"instructions": instructions,
"input": input_items,
"store": False,
"stream": True,
"reasoning": {"effort": self.reasoning_effort, "summary": "detailed"} if self.reasoning_effort != "none" else {"effort": "none"},
}
if tools:
payload["tools"] = self._convert_tools(tools)
headers = {
"Authorization": f"Bearer {self._access_token}",
"ChatGPT-Account-ID": self._account_id,
"Content-Type": "application/json",
"Accept": "text/event-stream",
"originator": "codex_cli_rs",
}
last_error = None
for attempt in range(1, self.retry_max_attempts + 1):
try:
return self._stream_response(headers, payload)
except httpx.HTTPStatusError as e:
last_error = e
if e.response.status_code in (429, 500, 529):
if attempt >= self.retry_max_attempts:
raise
wait_ms = 2000 * (1 << (attempt - 1))
logger.warning(f"Codex API error {e.response.status_code}, retrying {attempt}/{self.retry_max_attempts} after {wait_ms}ms")
time.sleep(wait_ms / 1000)
else:
raise
except Exception:
raise
raise last_error
def _stream_response(self, headers: dict, payload: dict) -> dict:
"""Stream SSE from Codex API and collect the final response."""
completed_response = None
with httpx.Client(timeout=300) as client:
with client.stream("POST", f"{CODEX_BASE_URL}/responses", headers=headers, json=payload) as resp:
resp.raise_for_status()
for line in resp.iter_lines():
data = self._parse_sse_data_line(line)
if data and data.get("type") == "response.completed":
completed_response = data["response"]
if not completed_response:
raise RuntimeError("Codex API stream ended without response.completed event")
return completed_response
@staticmethod
def _parse_sse_data_line(line: str) -> dict[str, Any] | None:
"""Parse a data line from the SSE stream, skipping terminal markers."""
if not line.startswith("data:"):
return None
raw_data = line[5:].strip()
if not raw_data or raw_data == "[DONE]":
return None
try:
data = json.loads(raw_data)
except json.JSONDecodeError:
logger.debug(f"Skipping non-JSON Codex SSE frame: {raw_data}")
return None
return data if isinstance(data, dict) else None
def _parse_tool_call_arguments(self, output_item: dict[str, Any]) -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
"""Parse function-call arguments, surfacing malformed payloads safely."""
raw_arguments = output_item.get("arguments", "{}")
if isinstance(raw_arguments, dict):
return raw_arguments, None
normalized_arguments = raw_arguments or "{}"
try:
parsed_arguments = json.loads(normalized_arguments)
except (TypeError, json.JSONDecodeError) as exc:
return None, {
"type": "invalid_tool_call",
"name": output_item.get("name"),
"args": str(raw_arguments),
"id": output_item.get("call_id"),
"error": f"Failed to parse tool arguments: {exc}",
}
if not isinstance(parsed_arguments, dict):
return None, {
"type": "invalid_tool_call",
"name": output_item.get("name"),
"args": str(raw_arguments),
"id": output_item.get("call_id"),
"error": "Tool arguments must decode to a JSON object.",
}
return parsed_arguments, None
def _parse_response(self, response: dict) -> ChatResult:
"""Parse Codex Responses API response into LangChain ChatResult."""
content = ""
tool_calls = []
invalid_tool_calls = []
reasoning_content = ""
for output_item in response.get("output", []):
if output_item.get("type") == "reasoning":
# Extract reasoning summary text
for summary_item in output_item.get("summary", []):
if isinstance(summary_item, dict) and summary_item.get("type") == "summary_text":
reasoning_content += summary_item.get("text", "")
elif isinstance(summary_item, str):
reasoning_content += summary_item
elif output_item.get("type") == "message":
for part in output_item.get("content", []):
if part.get("type") == "output_text":
content += part.get("text", "")
elif output_item.get("type") == "function_call":
parsed_arguments, invalid_tool_call = self._parse_tool_call_arguments(output_item)
if invalid_tool_call:
invalid_tool_calls.append(invalid_tool_call)
continue
tool_calls.append(
{
"name": output_item["name"],
"args": parsed_arguments or {},
"id": output_item.get("call_id", ""),
"type": "tool_call",
}
)
usage = response.get("usage", {})
additional_kwargs = {}
if reasoning_content:
additional_kwargs["reasoning_content"] = reasoning_content
message = AIMessage(
content=content,
tool_calls=tool_calls if tool_calls else [],
invalid_tool_calls=invalid_tool_calls,
additional_kwargs=additional_kwargs,
response_metadata={
"model": response.get("model", self.model),
"usage": usage,
},
)
return ChatResult(
generations=[ChatGeneration(message=message)],
llm_output={
"token_usage": {
"prompt_tokens": usage.get("input_tokens", 0),
"completion_tokens": usage.get("output_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
"model_name": response.get("model", self.model),
},
)
def _generate(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> ChatResult:
"""Generate a response using Codex Responses API."""
tools = kwargs.get("tools", None)
response = self._call_codex_api(messages, tools=tools)
return self._parse_response(response)
def bind_tools(self, tools: list, **kwargs: Any) -> Any:
"""Bind tools for function calling."""
from langchain_core.runnables import RunnableBinding
from langchain_core.tools import BaseTool
from langchain_core.utils.function_calling import convert_to_openai_function
formatted_tools = []
for tool in tools:
if isinstance(tool, BaseTool):
try:
fn = convert_to_openai_function(tool)
formatted_tools.append(
{
"type": "function",
"name": fn["name"],
"description": fn.get("description", ""),
"parameters": fn.get("parameters", {}),
}
)
except Exception:
formatted_tools.append(
{
"type": "function",
"name": tool.name,
"description": tool.description,
"parameters": {"type": "object", "properties": {}},
}
)
elif isinstance(tool, dict):
if "function" in tool:
fn = tool["function"]
formatted_tools.append(
{
"type": "function",
"name": fn["name"],
"description": fn.get("description", ""),
"parameters": fn.get("parameters", {}),
}
)
else:
formatted_tools.append(tool)
return RunnableBinding(bound=self, kwargs={"tools": formatted_tools}, **kwargs)

View File

@@ -0,0 +1,151 @@
from __future__ import annotations
import json
import pytest
from langchain_core.messages import HumanMessage, SystemMessage
from deerflow.models.claude_provider import ClaudeChatModel
from deerflow.models.credential_loader import CodexCliCredential
from deerflow.models.openai_codex_provider import CodexChatModel
def test_codex_provider_rejects_non_positive_retry_attempts():
with pytest.raises(ValueError, match="retry_max_attempts must be >= 1"):
CodexChatModel(retry_max_attempts=0)
def test_codex_provider_requires_credentials(monkeypatch):
monkeypatch.setattr(CodexChatModel, "_load_codex_auth", lambda self: None)
with pytest.raises(ValueError, match="Codex CLI credential not found"):
CodexChatModel()
def test_codex_provider_concatenates_multiple_system_messages(monkeypatch):
monkeypatch.setattr(
CodexChatModel,
"_load_codex_auth",
lambda self: CodexCliCredential(access_token="token", account_id="acct"),
)
model = CodexChatModel()
instructions, input_items = model._convert_messages(
[
SystemMessage(content="First system prompt."),
SystemMessage(content="Second system prompt."),
HumanMessage(content="Hello"),
]
)
assert instructions == "First system prompt.\n\nSecond system prompt."
assert input_items == [{"role": "user", "content": "Hello"}]
def test_codex_provider_flattens_structured_text_blocks(monkeypatch):
monkeypatch.setattr(
CodexChatModel,
"_load_codex_auth",
lambda self: CodexCliCredential(access_token="token", account_id="acct"),
)
model = CodexChatModel()
instructions, input_items = model._convert_messages(
[
HumanMessage(content=[{"type": "text", "text": "Hello from blocks"}]),
]
)
assert instructions == "You are a helpful assistant."
assert input_items == [{"role": "user", "content": "Hello from blocks"}]
def test_claude_provider_rejects_non_positive_retry_attempts():
with pytest.raises(ValueError, match="retry_max_attempts must be >= 1"):
ClaudeChatModel(model="claude-sonnet-4-6", retry_max_attempts=0)
def test_codex_provider_skips_terminal_sse_markers(monkeypatch):
monkeypatch.setattr(
CodexChatModel,
"_load_codex_auth",
lambda self: CodexCliCredential(access_token="token", account_id="acct"),
)
model = CodexChatModel()
assert model._parse_sse_data_line("data: [DONE]") is None
assert model._parse_sse_data_line("event: response.completed") is None
def test_codex_provider_skips_non_json_sse_frames(monkeypatch):
monkeypatch.setattr(
CodexChatModel,
"_load_codex_auth",
lambda self: CodexCliCredential(access_token="token", account_id="acct"),
)
model = CodexChatModel()
assert model._parse_sse_data_line("data: not-json") is None
def test_codex_provider_marks_invalid_tool_call_arguments(monkeypatch):
monkeypatch.setattr(
CodexChatModel,
"_load_codex_auth",
lambda self: CodexCliCredential(access_token="token", account_id="acct"),
)
model = CodexChatModel()
result = model._parse_response(
{
"model": "gpt-5.4",
"output": [
{
"type": "function_call",
"name": "bash",
"arguments": "{invalid",
"call_id": "tc-1",
}
],
"usage": {},
}
)
message = result.generations[0].message
assert message.tool_calls == []
assert len(message.invalid_tool_calls) == 1
assert message.invalid_tool_calls[0]["type"] == "invalid_tool_call"
assert message.invalid_tool_calls[0]["name"] == "bash"
assert message.invalid_tool_calls[0]["args"] == "{invalid"
assert message.invalid_tool_calls[0]["id"] == "tc-1"
assert "Failed to parse tool arguments" in message.invalid_tool_calls[0]["error"]
def test_codex_provider_parses_valid_tool_arguments(monkeypatch):
monkeypatch.setattr(
CodexChatModel,
"_load_codex_auth",
lambda self: CodexCliCredential(access_token="token", account_id="acct"),
)
model = CodexChatModel()
result = model._parse_response(
{
"model": "gpt-5.4",
"output": [
{
"type": "function_call",
"name": "bash",
"arguments": json.dumps({"cmd": "pwd"}),
"call_id": "tc-1",
}
],
"usage": {},
}
)
assert result.generations[0].message.tool_calls == [
{"name": "bash", "args": {"cmd": "pwd"}, "id": "tc-1", "type": "tool_call"}
]

View File

@@ -0,0 +1,156 @@
import json
import os
from deerflow.models.credential_loader import (
load_claude_code_credential,
load_codex_cli_credential,
)
def _clear_claude_code_env(monkeypatch) -> None:
for env_var in (
"CLAUDE_CODE_OAUTH_TOKEN",
"ANTHROPIC_AUTH_TOKEN",
"CLAUDE_CODE_OAUTH_TOKEN_FILE_DESCRIPTOR",
"CLAUDE_CODE_CREDENTIALS_PATH",
):
monkeypatch.delenv(env_var, raising=False)
def test_load_claude_code_credential_from_direct_env(monkeypatch):
_clear_claude_code_env(monkeypatch)
monkeypatch.setenv("CLAUDE_CODE_OAUTH_TOKEN", " sk-ant-oat01-env ")
cred = load_claude_code_credential()
assert cred is not None
assert cred.access_token == "sk-ant-oat01-env"
assert cred.refresh_token == ""
assert cred.source == "claude-cli-env"
def test_load_claude_code_credential_from_anthropic_auth_env(monkeypatch):
_clear_claude_code_env(monkeypatch)
monkeypatch.setenv("ANTHROPIC_AUTH_TOKEN", "sk-ant-oat01-anthropic-auth")
cred = load_claude_code_credential()
assert cred is not None
assert cred.access_token == "sk-ant-oat01-anthropic-auth"
assert cred.source == "claude-cli-env"
def test_load_claude_code_credential_from_file_descriptor(monkeypatch):
_clear_claude_code_env(monkeypatch)
read_fd, write_fd = os.pipe()
try:
os.write(write_fd, b"sk-ant-oat01-fd")
os.close(write_fd)
monkeypatch.setenv("CLAUDE_CODE_OAUTH_TOKEN_FILE_DESCRIPTOR", str(read_fd))
cred = load_claude_code_credential()
finally:
os.close(read_fd)
assert cred is not None
assert cred.access_token == "sk-ant-oat01-fd"
assert cred.refresh_token == ""
assert cred.source == "claude-cli-fd"
def test_load_claude_code_credential_from_override_path(tmp_path, monkeypatch):
_clear_claude_code_env(monkeypatch)
cred_path = tmp_path / "claude-credentials.json"
cred_path.write_text(
json.dumps(
{
"claudeAiOauth": {
"accessToken": "sk-ant-oat01-test",
"refreshToken": "sk-ant-ort01-test",
"expiresAt": 4_102_444_800_000,
}
}
)
)
monkeypatch.setenv("CLAUDE_CODE_CREDENTIALS_PATH", str(cred_path))
cred = load_claude_code_credential()
assert cred is not None
assert cred.access_token == "sk-ant-oat01-test"
assert cred.refresh_token == "sk-ant-ort01-test"
assert cred.source == "claude-cli-file"
def test_load_claude_code_credential_ignores_directory_path(tmp_path, monkeypatch):
_clear_claude_code_env(monkeypatch)
cred_dir = tmp_path / "claude-creds-dir"
cred_dir.mkdir()
monkeypatch.setenv("CLAUDE_CODE_CREDENTIALS_PATH", str(cred_dir))
assert load_claude_code_credential() is None
def test_load_claude_code_credential_falls_back_to_default_file_when_override_is_invalid(tmp_path, monkeypatch):
_clear_claude_code_env(monkeypatch)
monkeypatch.setenv("HOME", str(tmp_path))
cred_dir = tmp_path / "claude-creds-dir"
cred_dir.mkdir()
monkeypatch.setenv("CLAUDE_CODE_CREDENTIALS_PATH", str(cred_dir))
default_path = tmp_path / ".claude" / ".credentials.json"
default_path.parent.mkdir()
default_path.write_text(
json.dumps(
{
"claudeAiOauth": {
"accessToken": "sk-ant-oat01-default",
"refreshToken": "sk-ant-ort01-default",
"expiresAt": 4_102_444_800_000,
}
}
)
)
cred = load_claude_code_credential()
assert cred is not None
assert cred.access_token == "sk-ant-oat01-default"
assert cred.refresh_token == "sk-ant-ort01-default"
assert cred.source == "claude-cli-file"
def test_load_codex_cli_credential_supports_nested_tokens_shape(tmp_path, monkeypatch):
auth_path = tmp_path / "auth.json"
auth_path.write_text(
json.dumps(
{
"tokens": {
"access_token": "codex-access-token",
"account_id": "acct_123",
}
}
)
)
monkeypatch.setenv("CODEX_AUTH_PATH", str(auth_path))
cred = load_codex_cli_credential()
assert cred is not None
assert cred.access_token == "codex-access-token"
assert cred.account_id == "acct_123"
assert cred.source == "codex-cli"
def test_load_codex_cli_credential_supports_legacy_top_level_shape(tmp_path, monkeypatch):
auth_path = tmp_path / "auth.json"
auth_path.write_text(json.dumps({"access_token": "legacy-access-token"}))
monkeypatch.setenv("CODEX_AUTH_PATH", str(auth_path))
cred = load_codex_cli_credential()
assert cred is not None
assert cred.access_token == "legacy-access-token"
assert cred.account_id == ""

View File

@@ -9,6 +9,7 @@ from deerflow.config.app_config import AppConfig
from deerflow.config.model_config import ModelConfig
from deerflow.config.sandbox_config import SandboxConfig
from deerflow.models import factory as factory_module
from deerflow.models import openai_codex_provider as codex_provider_module
# ---------------------------------------------------------------------------
# Helpers
@@ -30,6 +31,7 @@ def _make_model(
supports_reasoning_effort: bool = False,
when_thinking_enabled: dict | None = None,
thinking: dict | None = None,
max_tokens: int | None = None,
) -> ModelConfig:
return ModelConfig(
name=name,
@@ -37,6 +39,7 @@ def _make_model(
description=None,
use=use,
model=name,
max_tokens=max_tokens,
supports_thinking=supports_thinking,
supports_reasoning_effort=supports_reasoning_effort,
when_thinking_enabled=when_thinking_enabled,
@@ -500,6 +503,96 @@ def test_openai_compatible_provider_multiple_models(monkeypatch):
assert captured.get("model") == "MiniMax-M2.5-highspeed"
# ---------------------------------------------------------------------------
# Codex provider reasoning_effort mapping
# ---------------------------------------------------------------------------
class FakeCodexChatModel(FakeChatModel):
pass
def test_codex_provider_disables_reasoning_when_thinking_disabled(monkeypatch):
cfg = _make_app_config(
[
_make_model(
"codex",
use="deerflow.models.openai_codex_provider:CodexChatModel",
supports_thinking=True,
supports_reasoning_effort=True,
)
]
)
_patch_factory(monkeypatch, cfg, model_class=FakeCodexChatModel)
monkeypatch.setattr(codex_provider_module, "CodexChatModel", FakeCodexChatModel)
FakeChatModel.captured_kwargs = {}
factory_module.create_chat_model(name="codex", thinking_enabled=False)
assert FakeChatModel.captured_kwargs.get("reasoning_effort") == "none"
def test_codex_provider_preserves_explicit_reasoning_effort(monkeypatch):
cfg = _make_app_config(
[
_make_model(
"codex",
use="deerflow.models.openai_codex_provider:CodexChatModel",
supports_thinking=True,
supports_reasoning_effort=True,
)
]
)
_patch_factory(monkeypatch, cfg, model_class=FakeCodexChatModel)
monkeypatch.setattr(codex_provider_module, "CodexChatModel", FakeCodexChatModel)
FakeChatModel.captured_kwargs = {}
factory_module.create_chat_model(name="codex", thinking_enabled=True, reasoning_effort="high")
assert FakeChatModel.captured_kwargs.get("reasoning_effort") == "high"
def test_codex_provider_defaults_reasoning_effort_to_medium(monkeypatch):
cfg = _make_app_config(
[
_make_model(
"codex",
use="deerflow.models.openai_codex_provider:CodexChatModel",
supports_thinking=True,
supports_reasoning_effort=True,
)
]
)
_patch_factory(monkeypatch, cfg, model_class=FakeCodexChatModel)
monkeypatch.setattr(codex_provider_module, "CodexChatModel", FakeCodexChatModel)
FakeChatModel.captured_kwargs = {}
factory_module.create_chat_model(name="codex", thinking_enabled=True)
assert FakeChatModel.captured_kwargs.get("reasoning_effort") == "medium"
def test_codex_provider_strips_unsupported_max_tokens(monkeypatch):
cfg = _make_app_config(
[
_make_model(
"codex",
use="deerflow.models.openai_codex_provider:CodexChatModel",
supports_thinking=True,
supports_reasoning_effort=True,
max_tokens=4096,
)
]
)
_patch_factory(monkeypatch, cfg, model_class=FakeCodexChatModel)
monkeypatch.setattr(codex_provider_module, "CodexChatModel", FakeCodexChatModel)
FakeChatModel.captured_kwargs = {}
factory_module.create_chat_model(name="codex", thinking_enabled=True)
assert "max_tokens" not in FakeChatModel.captured_kwargs
def test_openai_responses_api_settings_are_passed_to_chatopenai(monkeypatch):
model = ModelConfig(
name="gpt-5-responses",