feat: add claude-to-deerflow skill for DeerFlow API integration (#1024)

* feat: add claude-to-deerflow skill for DeerFlow API integration

Add a new skill that enables Claude Code to interact with the DeerFlow
AI agent platform via its HTTP API, including chat streaming and status
checking capabilities.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: fix telegram channel

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
DanielWalnut
2026-03-08 22:06:24 +08:00
committed by GitHub
parent 3721c82ba8
commit 8871fca5cb
5 changed files with 597 additions and 3 deletions

View File

@@ -46,6 +46,7 @@ DeerFlow has newly integrated the intelligent search and crawling toolset indepe
- [From Deep Research to Super Agent Harness](#from-deep-research-to-super-agent-harness) - [From Deep Research to Super Agent Harness](#from-deep-research-to-super-agent-harness)
- [Core Features](#core-features) - [Core Features](#core-features)
- [Skills \& Tools](#skills--tools) - [Skills \& Tools](#skills--tools)
- [Claude Code Integration](#claude-code-integration)
- [Sub-Agents](#sub-agents) - [Sub-Agents](#sub-agents)
- [Sandbox \& File System](#sandbox--file-system) - [Sandbox \& File System](#sandbox--file-system)
- [Context Engineering](#context-engineering) - [Context Engineering](#context-engineering)
@@ -311,6 +312,35 @@ Tools follow the same philosophy. DeerFlow comes with a core toolset — web sea
└── your-custom-skill/SKILL.md ← yours └── your-custom-skill/SKILL.md ← yours
``` ```
#### Claude Code Integration
The `claude-to-deerflow` skill lets you interact with a running DeerFlow instance directly from [Claude Code](https://docs.anthropic.com/en/docs/claude-code). Send research tasks, check status, manage threads — all without leaving the terminal.
**Install the skill**:
```bash
npx skills add https://github.com/bytedance/deer-flow --skill claude-to-deerflow
```
Then make sure DeerFlow is running (default at `http://localhost:2026`) and use the `/claude-to-deerflow` command in Claude Code.
**What you can do**:
- Send messages to DeerFlow and get streaming responses
- Choose execution modes: flash (fast), standard, pro (planning), ultra (sub-agents)
- Check DeerFlow health, list models/skills/agents
- Manage threads and conversation history
- Upload files for analysis
**Environment variables** (optional, for custom endpoints):
```bash
DEERFLOW_URL=http://localhost:2026 # Unified proxy base URL
DEERFLOW_GATEWAY_URL=http://localhost:2026 # Gateway API
DEERFLOW_LANGGRAPH_URL=http://localhost:2026/api/langgraph # LangGraph API
```
See [`skills/public/claude-to-deerflow/SKILL.md`](skills/public/claude-to-deerflow/SKILL.md) for the full API reference.
### Sub-Agents ### Sub-Agents
Complex tasks rarely fit in a single pass. DeerFlow decomposes them. Complex tasks rarely fit in a single pass. DeerFlow decomposes them.

View File

@@ -79,10 +79,10 @@ class TelegramChannel(Channel):
async def stop(self) -> None: async def stop(self) -> None:
self._running = False self._running = False
self.bus.unsubscribe_outbound(self._on_outbound) self.bus.unsubscribe_outbound(self._on_outbound)
if self._application and self._tg_loop: if self._tg_loop and self._tg_loop.is_running():
self._tg_loop.call_soon_threadsafe(self._tg_loop.stop) self._tg_loop.call_soon_threadsafe(self._tg_loop.stop)
if self._thread: if self._thread:
self._thread.join(timeout=5) self._thread.join(timeout=10)
self._thread = None self._thread = None
self._application = None self._application = None
logger.info("Telegram channel stopped") logger.info("Telegram channel stopped")
@@ -151,10 +151,25 @@ class TelegramChannel(Channel):
self._tg_loop = asyncio.new_event_loop() self._tg_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._tg_loop) asyncio.set_event_loop(self._tg_loop)
try: try:
self._tg_loop.run_until_complete(self._application.run_polling(close_loop=False)) # Cannot use run_polling() because it calls add_signal_handler(),
# which only works in the main thread. Instead, manually
# initialize the application and start the updater.
self._tg_loop.run_until_complete(self._application.initialize())
self._tg_loop.run_until_complete(self._application.start())
self._tg_loop.run_until_complete(self._application.updater.start_polling())
self._tg_loop.run_forever()
except Exception: except Exception:
if self._running: if self._running:
logger.exception("Telegram polling error") logger.exception("Telegram polling error")
finally:
# Graceful shutdown
try:
if self._application.updater.running:
self._tg_loop.run_until_complete(self._application.updater.stop())
self._tg_loop.run_until_complete(self._application.stop())
self._tg_loop.run_until_complete(self._application.shutdown())
except Exception:
logger.exception("Error during Telegram shutdown")
def _check_user(self, user_id: int) -> bool: def _check_user(self, user_id: int) -> bool:
if not self._allowed_users: if not self._allowed_users:

View File

@@ -0,0 +1,217 @@
---
name: claude-to-deerflow
description: "Interact with DeerFlow AI agent platform via its HTTP API. Use this skill when the user wants to send messages or questions to DeerFlow for research/analysis, start a DeerFlow conversation thread, check DeerFlow status or health, list available models/skills/agents in DeerFlow, manage DeerFlow memory, upload files to DeerFlow threads, or delegate complex research tasks to DeerFlow. Also use when the user mentions deerflow, deer flow, or wants to run a deep research task that DeerFlow can handle."
---
# DeerFlow Skill
Communicate with a running DeerFlow instance via its HTTP API. DeerFlow is an AI agent platform
built on LangGraph that orchestrates sub-agents for research, code execution, web browsing, and more.
## Architecture
DeerFlow exposes two API surfaces behind an Nginx reverse proxy:
| Service | Direct Port | Via Proxy | Purpose |
|----------------|-------------|----------------------------------|----------------------------------|
| Gateway API | 8001 | `$DEERFLOW_GATEWAY_URL` | REST endpoints (models, skills, memory, uploads) |
| LangGraph API | 2024 | `$DEERFLOW_LANGGRAPH_URL` | Agent threads, runs, streaming |
## Environment Variables
All URLs are configurable via environment variables. **Read these env vars before making any request.**
| Variable | Default | Description |
|-------------------------|------------------------------------------|------------------------------------|
| `DEERFLOW_URL` | `http://localhost:2026` | Unified proxy base URL |
| `DEERFLOW_GATEWAY_URL` | `${DEERFLOW_URL}` | Gateway API base (models, skills, memory, uploads) |
| `DEERFLOW_LANGGRAPH_URL`| `${DEERFLOW_URL}/api/langgraph` | LangGraph API base (threads, runs) |
When making curl calls, always resolve the URL like this:
```bash
# Resolve base URLs from env (do this FIRST before any API call)
DEERFLOW_URL="${DEERFLOW_URL:-http://localhost:2026}"
DEERFLOW_GATEWAY_URL="${DEERFLOW_GATEWAY_URL:-$DEERFLOW_URL}"
DEERFLOW_LANGGRAPH_URL="${DEERFLOW_LANGGRAPH_URL:-$DEERFLOW_URL/api/langgraph}"
```
## Available Operations
### 1. Health Check
Verify DeerFlow is running:
```bash
curl -s "$DEERFLOW_GATEWAY_URL/health"
```
### 2. Send a Message (Streaming)
This is the primary operation. It creates a thread and streams the agent's response.
**Step 1: Create a thread**
```bash
curl -s -X POST "$DEERFLOW_LANGGRAPH_URL/threads" \
-H "Content-Type: application/json" \
-d '{}'
```
Response: `{"thread_id": "<uuid>", ...}`
**Step 2: Stream a run**
```bash
curl -s -N -X POST "$DEERFLOW_LANGGRAPH_URL/threads/<thread_id>/runs/stream" \
-H "Content-Type: application/json" \
-d '{
"assistant_id": "lead_agent",
"input": {
"messages": [
{
"type": "human",
"content": [{"type": "text", "text": "YOUR MESSAGE HERE"}]
}
]
},
"stream_mode": ["values", "messages-tuple"],
"stream_subgraphs": true,
"config": {
"recursion_limit": 1000
},
"context": {
"thinking_enabled": true,
"is_plan_mode": true,
"subagent_enabled": true,
"thread_id": "<thread_id>"
}
}'
```
The response is an SSE stream. Each event has the format:
```
event: <event_type>
data: <json_data>
```
Key event types:
- `metadata` — run metadata including `run_id`
- `values` — full state snapshot with `messages` array
- `messages-tuple` — incremental message updates (AI text chunks, tool calls, tool results)
- `end` — stream is complete
**Context modes** (set via `context`):
- Flash mode: `thinking_enabled: false, is_plan_mode: false, subagent_enabled: false`
- Standard mode: `thinking_enabled: true, is_plan_mode: false, subagent_enabled: false`
- Pro mode: `thinking_enabled: true, is_plan_mode: true, subagent_enabled: false`
- Ultra mode: `thinking_enabled: true, is_plan_mode: true, subagent_enabled: true`
### 3. Continue a Conversation
To send follow-up messages, reuse the same `thread_id` from step 2 and POST another run
with the new message.
### 4. List Models
```bash
curl -s "$DEERFLOW_GATEWAY_URL/api/models"
```
Returns: `{"models": [{"name": "...", "provider": "...", ...}, ...]}`
### 5. List Skills
```bash
curl -s "$DEERFLOW_GATEWAY_URL/api/skills"
```
Returns: `{"skills": [{"name": "...", "enabled": true, ...}, ...]}`
### 6. Enable/Disable a Skill
```bash
curl -s -X PUT "$DEERFLOW_GATEWAY_URL/api/skills/<skill_name>" \
-H "Content-Type: application/json" \
-d '{"enabled": true}'
```
### 7. List Agents
```bash
curl -s "$DEERFLOW_GATEWAY_URL/api/agents"
```
Returns: `{"agents": [{"name": "...", ...}, ...]}`
### 8. Get Memory
```bash
curl -s "$DEERFLOW_GATEWAY_URL/api/memory"
```
Returns user context, facts, and conversation history summaries.
### 9. Upload Files to a Thread
```bash
curl -s -X POST "$DEERFLOW_GATEWAY_URL/api/threads/<thread_id>/uploads" \
-F "files=@/path/to/file.pdf"
```
Supports PDF, PPTX, XLSX, DOCX — automatically converts to Markdown.
### 10. List Uploaded Files
```bash
curl -s "$DEERFLOW_GATEWAY_URL/api/threads/<thread_id>/uploads/list"
```
### 11. Get Thread History
```bash
curl -s "$DEERFLOW_LANGGRAPH_URL/threads/<thread_id>/history"
```
### 12. List Threads
```bash
curl -s -X POST "$DEERFLOW_LANGGRAPH_URL/threads/search" \
-H "Content-Type: application/json" \
-d '{"limit": 20, "sort_by": "updated_at", "sort_order": "desc"}'
```
## Usage Script
For sending messages and collecting the full response, use the helper script:
```bash
bash /path/to/skills/claude-to-deerflow/scripts/chat.sh "Your question here"
```
See `scripts/chat.sh` for the implementation. The script:
1. Checks health
2. Creates a thread
3. Streams the run and collects the final AI response
4. Prints the result
## Parsing SSE Output
The stream returns SSE events. To extract the final AI response from a `values` event:
- Look for the last `event: values` block
- Parse its `data` JSON
- The `messages` array contains all messages; the last one with `type: "ai"` is the response
- The `content` field of that message is the AI's text reply
## Error Handling
- If health check fails, DeerFlow is not running. Inform the user they need to start it.
- If the stream returns an error event, extract and display the error message.
- Common issues: port not open, services still starting up, config errors.
## Tips
- For quick questions, use flash mode (fastest, no planning).
- For research tasks, use pro or ultra mode (enables planning and sub-agents).
- You can upload files first, then reference them in your message.
- Thread IDs persist — you can return to a conversation later.

View File

@@ -0,0 +1,234 @@
#!/usr/bin/env bash
# chat.sh — Send a message to DeerFlow and collect the streaming response.
#
# Usage:
# bash chat.sh "Your question here"
# bash chat.sh "Your question" <thread_id> # continue conversation
# bash chat.sh "Your question" "" pro # specify mode
# DEERFLOW_URL=http://host:2026 bash chat.sh "hi" # custom endpoint
#
# Environment variables:
# DEERFLOW_URL — Unified proxy base URL (default: http://localhost:2026)
# DEERFLOW_GATEWAY_URL — Gateway API base URL (default: $DEERFLOW_URL)
# DEERFLOW_LANGGRAPH_URL — LangGraph API base URL (default: $DEERFLOW_URL/api/langgraph)
#
# Modes: flash, standard, pro (default), ultra
set -euo pipefail
DEERFLOW_URL="${DEERFLOW_URL:-http://localhost:2026}"
GATEWAY_URL="${DEERFLOW_GATEWAY_URL:-$DEERFLOW_URL}"
LANGGRAPH_URL="${DEERFLOW_LANGGRAPH_URL:-$DEERFLOW_URL/api/langgraph}"
MESSAGE="${1:?Usage: chat.sh <message> [thread_id] [mode]}"
THREAD_ID="${2:-}"
MODE="${3:-pro}"
# --- Health check ---
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" "${GATEWAY_URL}/health" 2>/dev/null || echo "000")
if [ "$HTTP_CODE" = "000" ] || [ "$HTTP_CODE" -ge 400 ]; then
echo "ERROR: DeerFlow is not reachable at ${GATEWAY_URL} (HTTP ${HTTP_CODE})" >&2
echo "Make sure DeerFlow is running. Start it with: cd <deerflow-dir> && make dev" >&2
exit 1
fi
# --- Create or reuse thread ---
if [ -z "$THREAD_ID" ]; then
THREAD_RESP=$(curl -s -X POST "${LANGGRAPH_URL}/threads" \
-H "Content-Type: application/json" \
-d '{}')
THREAD_ID=$(echo "$THREAD_RESP" | python3 -c "import sys,json; print(json.load(sys.stdin)['thread_id'])" 2>/dev/null)
if [ -z "$THREAD_ID" ]; then
echo "ERROR: Failed to create thread. Response: ${THREAD_RESP}" >&2
exit 1
fi
echo "Thread: ${THREAD_ID}" >&2
fi
# --- Build context based on mode ---
case "$MODE" in
flash)
CONTEXT='{"thinking_enabled":false,"is_plan_mode":false,"subagent_enabled":false,"thread_id":"'"$THREAD_ID"'"}'
;;
standard)
CONTEXT='{"thinking_enabled":true,"is_plan_mode":false,"subagent_enabled":false,"thread_id":"'"$THREAD_ID"'"}'
;;
pro)
CONTEXT='{"thinking_enabled":true,"is_plan_mode":true,"subagent_enabled":false,"thread_id":"'"$THREAD_ID"'"}'
;;
ultra)
CONTEXT='{"thinking_enabled":true,"is_plan_mode":true,"subagent_enabled":true,"thread_id":"'"$THREAD_ID"'"}'
;;
*)
echo "ERROR: Unknown mode '${MODE}'. Use: flash, standard, pro, ultra" >&2
exit 1
;;
esac
# --- Escape message for JSON ---
ESCAPED_MSG=$(python3 -c "import json,sys; print(json.dumps(sys.argv[1]))" "$MESSAGE")
# --- Build request body ---
BODY=$(cat <<ENDJSON
{
"assistant_id": "lead_agent",
"input": {
"messages": [
{
"type": "human",
"content": [{"type": "text", "text": ${ESCAPED_MSG}}]
}
]
},
"stream_mode": ["values", "messages-tuple"],
"stream_subgraphs": true,
"config": {
"recursion_limit": 1000
},
"context": ${CONTEXT}
}
ENDJSON
)
# --- Stream the run and extract final response ---
# We collect the full SSE output, then parse the last values event to get the AI response.
TMPFILE=$(mktemp)
trap "rm -f '$TMPFILE'" EXIT
curl -s -N -X POST "${LANGGRAPH_URL}/threads/${THREAD_ID}/runs/stream" \
-H "Content-Type: application/json" \
-d "$BODY" > "$TMPFILE"
# Parse the SSE output: extract the last "event: values" data block and get the final AI message
python3 - "$TMPFILE" "$GATEWAY_URL" "$THREAD_ID" << 'PYEOF'
import json
import sys
sse_file = sys.argv[1] if len(sys.argv) > 1 else None
gateway_url = sys.argv[2].rstrip("/") if len(sys.argv) > 2 else "http://localhost:2026"
thread_id = sys.argv[3] if len(sys.argv) > 3 else ""
if not sse_file:
sys.exit(1)
with open(sse_file, "r") as f:
raw = f.read()
# Parse SSE events
events = []
current_event = None
current_data_lines = []
for line in raw.split("\n"):
if line.startswith("event:"):
if current_event and current_data_lines:
events.append((current_event, "\n".join(current_data_lines)))
current_event = line[len("event:"):].strip()
current_data_lines = []
elif line.startswith("data:"):
current_data_lines.append(line[len("data:"):].strip())
elif line == "" and current_event:
if current_data_lines:
events.append((current_event, "\n".join(current_data_lines)))
current_event = None
current_data_lines = []
# Flush remaining
if current_event and current_data_lines:
events.append((current_event, "\n".join(current_data_lines)))
import posixpath
def extract_response_text(messages):
"""Mirror manager.py _extract_response_text: handles ask_clarification interrupt + regular AI."""
for msg in reversed(messages):
if not isinstance(msg, dict):
continue
msg_type = msg.get("type")
# ask_clarification interrupt: tool message with name ask_clarification
if msg_type == "tool" and msg.get("name") == "ask_clarification":
content = msg.get("content", "")
if isinstance(content, str) and content:
return content
# Regular AI message
if msg_type == "ai":
content = msg.get("content", "")
if isinstance(content, str) and content:
return content
if isinstance(content, list):
parts = []
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
parts.append(block.get("text", ""))
elif isinstance(block, str):
parts.append(block)
text = "".join(parts)
if text:
return text
return ""
def extract_artifacts(messages):
"""Mirror manager.py _extract_artifacts: only artifacts from the last response cycle."""
artifacts = []
for msg in reversed(messages):
if not isinstance(msg, dict):
continue
if msg.get("type") == "human":
break
if msg.get("type") == "ai":
for tc in msg.get("tool_calls", []):
if isinstance(tc, dict) and tc.get("name") == "present_files":
paths = tc.get("args", {}).get("filepaths", [])
if isinstance(paths, list):
artifacts.extend(p for p in paths if isinstance(p, str))
return artifacts
def artifact_url(virtual_path):
# virtual_path like /mnt/user-data/outputs/file.md
# API endpoint: {gateway}/api/threads/{thread_id}/artifacts/{path without leading slash}
path = virtual_path.lstrip("/")
return f"{gateway_url}/api/threads/{thread_id}/artifacts/{path}"
def format_artifact_text(artifacts):
urls = [artifact_url(p) for p in artifacts]
if len(urls) == 1:
return f"Created File: {urls[0]}"
return "Created Files:\n" + "\n".join(urls)
# Find the last "values" event with messages
result_messages = None
for event_type, data_str in reversed(events):
if event_type != "values":
continue
try:
data = json.loads(data_str)
except json.JSONDecodeError:
continue
if "messages" in data:
result_messages = data["messages"]
break
if result_messages is not None:
response_text = extract_response_text(result_messages)
artifacts = extract_artifacts(result_messages)
if artifacts:
artifact_text = format_artifact_text(artifacts)
response_text = (response_text + "\n\n" + artifact_text) if response_text else artifact_text
if response_text:
print(response_text)
else:
print("(No response from agent)", file=sys.stderr)
sys.exit(1)
else:
# Check for error events
for event_type, data_str in events:
if event_type == "error":
print(f"ERROR from DeerFlow: {data_str}", file=sys.stderr)
sys.exit(1)
print("No AI response found in the stream.", file=sys.stderr)
if len(raw) < 2000:
print(f"Raw SSE output:\n{raw}", file=sys.stderr)
sys.exit(1)
PYEOF
echo ""
echo "---"
echo "Thread ID: ${THREAD_ID}" >&2

View File

@@ -0,0 +1,98 @@
#!/usr/bin/env bash
# status.sh — Check DeerFlow status and list available resources.
#
# Usage:
# bash status.sh # health + summary
# bash status.sh models # list models
# bash status.sh skills # list skills
# bash status.sh agents # list agents
# bash status.sh threads # list recent threads
# bash status.sh memory # show memory
# bash status.sh thread <id> # show thread history
#
# Environment variables:
# DEERFLOW_URL — Unified proxy base URL (default: http://localhost:2026)
# DEERFLOW_GATEWAY_URL — Gateway API base URL (default: $DEERFLOW_URL)
# DEERFLOW_LANGGRAPH_URL — LangGraph API base URL (default: $DEERFLOW_URL/api/langgraph)
set -euo pipefail
DEERFLOW_URL="${DEERFLOW_URL:-http://localhost:2026}"
GATEWAY_URL="${DEERFLOW_GATEWAY_URL:-$DEERFLOW_URL}"
LANGGRAPH_URL="${DEERFLOW_LANGGRAPH_URL:-$DEERFLOW_URL/api/langgraph}"
CMD="${1:-health}"
ARG="${2:-}"
case "$CMD" in
health)
echo "Checking DeerFlow at ${GATEWAY_URL}..."
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" "${GATEWAY_URL}/health" 2>/dev/null || echo "000")
if [ "$HTTP_CODE" = "000" ]; then
echo "UNREACHABLE — DeerFlow is not running at ${GATEWAY_URL}"
exit 1
elif [ "$HTTP_CODE" -ge 400 ]; then
echo "ERROR — Health check returned HTTP ${HTTP_CODE}"
exit 1
else
echo "OK — DeerFlow is running (HTTP ${HTTP_CODE})"
fi
;;
models)
curl -s "${GATEWAY_URL}/api/models" | python3 -m json.tool
;;
skills)
curl -s "${GATEWAY_URL}/api/skills" | python3 -m json.tool
;;
agents)
curl -s "${GATEWAY_URL}/api/agents" | python3 -m json.tool
;;
threads)
curl -s -X POST "${LANGGRAPH_URL}/threads/search" \
-H "Content-Type: application/json" \
-d '{"limit": 20, "sort_by": "updated_at", "sort_order": "desc", "select": ["thread_id", "updated_at", "values"]}' \
| python3 -c "
import json, sys
threads = json.load(sys.stdin)
if not threads:
print('No threads found.')
sys.exit(0)
for t in threads:
tid = t.get('thread_id', '?')
updated = t.get('updated_at', '?')
title = (t.get('values') or {}).get('title', '(untitled)')
print(f'{tid} {updated} {title}')
"
;;
memory)
curl -s "${GATEWAY_URL}/api/memory" | python3 -m json.tool
;;
thread)
if [ -z "$ARG" ]; then
echo "Usage: status.sh thread <thread_id>" >&2
exit 1
fi
curl -s "${LANGGRAPH_URL}/threads/${ARG}/history" | python3 -c "
import json, sys
data = json.load(sys.stdin)
if isinstance(data, list):
for state in data[:5]:
values = state.get('values', {})
msgs = values.get('messages', [])
for m in msgs[-5:]:
role = m.get('type', '?')
content = m.get('content', '')
if isinstance(content, list):
content = ' '.join(p.get('text','') for p in content if isinstance(p, dict))
preview = content[:200] if content else '(empty)'
print(f'[{role}] {preview}')
print('---')
else:
print(json.dumps(data, indent=2))
"
;;
*)
echo "Unknown command: ${CMD}" >&2
echo "Usage: status.sh [health|models|skills|agents|threads|memory|thread <id>]" >&2
exit 1
;;
esac