mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-03 22:32:12 +08:00
516 lines
19 KiB
Python
516 lines
19 KiB
Python
"""DeerFlow Sandbox Provisioner Service.
|
|
|
|
Dynamically creates and manages per-sandbox Pods in Kubernetes.
|
|
Each ``sandbox_id`` gets its own Pod + NodePort Service. The backend
|
|
accesses sandboxes directly via ``{NODE_HOST}:{NodePort}``.
|
|
|
|
The provisioner connects to the host machine's Kubernetes cluster via a
|
|
mounted kubeconfig (``~/.kube/config``). Sandbox Pods run on the host
|
|
K8s and are accessed by the backend via ``{NODE_HOST}:{NodePort}``.
|
|
|
|
Endpoints:
|
|
POST /api/sandboxes — Create a sandbox Pod + Service
|
|
DELETE /api/sandboxes/{sandbox_id} — Destroy a sandbox Pod + Service
|
|
GET /api/sandboxes/{sandbox_id} — Get sandbox status & URL
|
|
GET /api/sandboxes — List all sandboxes
|
|
GET /health — Provisioner health check
|
|
|
|
Architecture (docker-compose-dev):
|
|
┌────────────┐ HTTP ┌─────────────┐ K8s API ┌──────────────┐
|
|
│ remote │ ─────▸ │ provisioner │ ────────▸ │ host K8s │
|
|
│ _backend │ │ :8002 │ │ API server │
|
|
└────────────┘ └─────────────┘ └──────┬───────┘
|
|
│ creates
|
|
┌─────────────┐ ┌──────▼───────┐
|
|
│ backend │ ────────▸ │ sandbox │
|
|
│ │ direct │ Pod(s) │
|
|
└─────────────┘ NodePort └──────────────┘
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
|
|
import urllib3
|
|
from fastapi import FastAPI, HTTPException
|
|
from kubernetes import client as k8s_client
|
|
from kubernetes import config as k8s_config
|
|
from kubernetes.client.rest import ApiException
|
|
from pydantic import BaseModel
|
|
|
|
# Suppress only the InsecureRequestWarning from urllib3
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
)
|
|
|
|
# ── Configuration (all tuneable via environment variables) ───────────────
|
|
|
|
K8S_NAMESPACE = os.environ.get("K8S_NAMESPACE", "deer-flow")
|
|
SANDBOX_IMAGE = os.environ.get(
|
|
"SANDBOX_IMAGE",
|
|
"enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest",
|
|
)
|
|
SKILLS_HOST_PATH = os.environ.get("SKILLS_HOST_PATH", "/skills")
|
|
THREADS_HOST_PATH = os.environ.get("THREADS_HOST_PATH", "/.deer-flow/threads")
|
|
|
|
# Path to the kubeconfig *inside* the provisioner container.
|
|
# Typically the host's ~/.kube/config is mounted here.
|
|
KUBECONFIG_PATH = os.environ.get("KUBECONFIG_PATH", "/root/.kube/config")
|
|
|
|
# The hostname / IP that the *backend container* uses to reach NodePort
|
|
# services on the host Kubernetes node. On Docker Desktop for macOS this
|
|
# is ``host.docker.internal``; on Linux it may be the host's LAN IP.
|
|
NODE_HOST = os.environ.get("NODE_HOST", "host.docker.internal")
|
|
|
|
# ── K8s client setup ────────────────────────────────────────────────────
|
|
|
|
core_v1: k8s_client.CoreV1Api | None = None
|
|
|
|
|
|
def _init_k8s_client() -> k8s_client.CoreV1Api:
|
|
"""Load kubeconfig from the mounted host config and return a CoreV1Api.
|
|
|
|
Tries the mounted kubeconfig first, then falls back to in-cluster
|
|
config (useful if the provisioner itself runs inside K8s).
|
|
"""
|
|
if os.path.exists(KUBECONFIG_PATH):
|
|
if os.path.isdir(KUBECONFIG_PATH):
|
|
raise RuntimeError(
|
|
f"KUBECONFIG_PATH points to a directory, expected a file: {KUBECONFIG_PATH}"
|
|
)
|
|
try:
|
|
k8s_config.load_kube_config(config_file=KUBECONFIG_PATH)
|
|
logger.info(f"Loaded kubeconfig from {KUBECONFIG_PATH}")
|
|
except Exception as exc:
|
|
raise RuntimeError(
|
|
f"Failed to load kubeconfig from {KUBECONFIG_PATH}: {exc}"
|
|
) from exc
|
|
else:
|
|
logger.warning(
|
|
f"Kubeconfig not found at {KUBECONFIG_PATH}; trying in-cluster config"
|
|
)
|
|
try:
|
|
k8s_config.load_incluster_config()
|
|
except Exception as exc:
|
|
raise RuntimeError(
|
|
"Failed to initialize Kubernetes client. "
|
|
f"No kubeconfig at {KUBECONFIG_PATH}, and in-cluster config is unavailable: {exc}"
|
|
) from exc
|
|
|
|
# When connecting from inside Docker to the host's K8s API, the
|
|
# kubeconfig may reference ``localhost`` or ``127.0.0.1``. We
|
|
# optionally rewrite the server address so it reaches the host.
|
|
k8s_api_server = os.environ.get("K8S_API_SERVER")
|
|
if k8s_api_server:
|
|
configuration = k8s_client.Configuration.get_default_copy()
|
|
configuration.host = k8s_api_server
|
|
# Self-signed certs are common for local clusters
|
|
configuration.verify_ssl = False
|
|
api_client = k8s_client.ApiClient(configuration)
|
|
return k8s_client.CoreV1Api(api_client)
|
|
|
|
return k8s_client.CoreV1Api()
|
|
|
|
|
|
def _wait_for_kubeconfig(timeout: int = 30) -> None:
|
|
"""Wait for kubeconfig file if configured, then continue with fallback support."""
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
if os.path.exists(KUBECONFIG_PATH):
|
|
if os.path.isfile(KUBECONFIG_PATH):
|
|
logger.info(f"Found kubeconfig file at {KUBECONFIG_PATH}")
|
|
return
|
|
if os.path.isdir(KUBECONFIG_PATH):
|
|
raise RuntimeError(
|
|
"Kubeconfig path is a directory. "
|
|
f"Please mount a kubeconfig file at {KUBECONFIG_PATH}."
|
|
)
|
|
raise RuntimeError(
|
|
f"Kubeconfig path exists but is not a regular file: {KUBECONFIG_PATH}"
|
|
)
|
|
logger.info(f"Waiting for kubeconfig at {KUBECONFIG_PATH} …")
|
|
time.sleep(2)
|
|
logger.warning(
|
|
f"Kubeconfig not found at {KUBECONFIG_PATH} after {timeout}s; "
|
|
"will attempt in-cluster Kubernetes config"
|
|
)
|
|
|
|
|
|
def _ensure_namespace() -> None:
|
|
"""Create the K8s namespace if it does not yet exist."""
|
|
try:
|
|
core_v1.read_namespace(K8S_NAMESPACE)
|
|
logger.info(f"Namespace '{K8S_NAMESPACE}' already exists")
|
|
except ApiException as exc:
|
|
if exc.status == 404:
|
|
ns = k8s_client.V1Namespace(
|
|
metadata=k8s_client.V1ObjectMeta(
|
|
name=K8S_NAMESPACE,
|
|
labels={
|
|
"app.kubernetes.io/name": "deer-flow",
|
|
"app.kubernetes.io/component": "sandbox",
|
|
},
|
|
)
|
|
)
|
|
core_v1.create_namespace(ns)
|
|
logger.info(f"Created namespace '{K8S_NAMESPACE}'")
|
|
else:
|
|
raise
|
|
|
|
|
|
# ── FastAPI lifespan ─────────────────────────────────────────────────────
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(_app: FastAPI):
|
|
global core_v1
|
|
_wait_for_kubeconfig()
|
|
core_v1 = _init_k8s_client()
|
|
_ensure_namespace()
|
|
logger.info("Provisioner is ready (using host Kubernetes)")
|
|
yield
|
|
|
|
|
|
app = FastAPI(title="DeerFlow Sandbox Provisioner", lifespan=lifespan)
|
|
|
|
|
|
# ── Request / Response models ───────────────────────────────────────────
|
|
|
|
|
|
class CreateSandboxRequest(BaseModel):
|
|
sandbox_id: str
|
|
thread_id: str
|
|
|
|
|
|
class SandboxResponse(BaseModel):
|
|
sandbox_id: str
|
|
sandbox_url: str # Direct access URL, e.g. http://host.docker.internal:{NodePort}
|
|
status: str
|
|
|
|
|
|
# ── K8s resource helpers ─────────────────────────────────────────────────
|
|
|
|
|
|
def _pod_name(sandbox_id: str) -> str:
|
|
return f"sandbox-{sandbox_id}"
|
|
|
|
|
|
def _svc_name(sandbox_id: str) -> str:
|
|
return f"sandbox-{sandbox_id}-svc"
|
|
|
|
|
|
def _sandbox_url(node_port: int) -> str:
|
|
"""Build the sandbox URL using the configured NODE_HOST."""
|
|
return f"http://{NODE_HOST}:{node_port}"
|
|
|
|
|
|
def _build_pod(sandbox_id: str, thread_id: str) -> k8s_client.V1Pod:
|
|
"""Construct a Pod manifest for a single sandbox."""
|
|
return k8s_client.V1Pod(
|
|
metadata=k8s_client.V1ObjectMeta(
|
|
name=_pod_name(sandbox_id),
|
|
namespace=K8S_NAMESPACE,
|
|
labels={
|
|
"app": "deer-flow-sandbox",
|
|
"sandbox-id": sandbox_id,
|
|
"app.kubernetes.io/name": "deer-flow",
|
|
"app.kubernetes.io/component": "sandbox",
|
|
},
|
|
),
|
|
spec=k8s_client.V1PodSpec(
|
|
containers=[
|
|
k8s_client.V1Container(
|
|
name="sandbox",
|
|
image=SANDBOX_IMAGE,
|
|
image_pull_policy="IfNotPresent",
|
|
ports=[
|
|
k8s_client.V1ContainerPort(
|
|
name="http",
|
|
container_port=8080,
|
|
protocol="TCP",
|
|
)
|
|
],
|
|
readiness_probe=k8s_client.V1Probe(
|
|
http_get=k8s_client.V1HTTPGetAction(
|
|
path="/v1/sandbox",
|
|
port=8080,
|
|
),
|
|
initial_delay_seconds=5,
|
|
period_seconds=5,
|
|
timeout_seconds=3,
|
|
failure_threshold=3,
|
|
),
|
|
liveness_probe=k8s_client.V1Probe(
|
|
http_get=k8s_client.V1HTTPGetAction(
|
|
path="/v1/sandbox",
|
|
port=8080,
|
|
),
|
|
initial_delay_seconds=10,
|
|
period_seconds=10,
|
|
timeout_seconds=3,
|
|
failure_threshold=3,
|
|
),
|
|
resources=k8s_client.V1ResourceRequirements(
|
|
requests={
|
|
"cpu": "100m",
|
|
"memory": "256Mi",
|
|
"ephemeral-storage": "500Mi",
|
|
},
|
|
limits={
|
|
"cpu": "1000m",
|
|
"memory": "1Gi",
|
|
"ephemeral-storage": "500Mi",
|
|
},
|
|
),
|
|
volume_mounts=[
|
|
k8s_client.V1VolumeMount(
|
|
name="skills",
|
|
mount_path="/mnt/skills",
|
|
read_only=True,
|
|
),
|
|
k8s_client.V1VolumeMount(
|
|
name="user-data",
|
|
mount_path="/mnt/user-data",
|
|
read_only=False,
|
|
),
|
|
],
|
|
security_context=k8s_client.V1SecurityContext(
|
|
privileged=False,
|
|
allow_privilege_escalation=True,
|
|
),
|
|
)
|
|
],
|
|
volumes=[
|
|
k8s_client.V1Volume(
|
|
name="skills",
|
|
host_path=k8s_client.V1HostPathVolumeSource(
|
|
path=SKILLS_HOST_PATH,
|
|
type="Directory",
|
|
),
|
|
),
|
|
k8s_client.V1Volume(
|
|
name="user-data",
|
|
host_path=k8s_client.V1HostPathVolumeSource(
|
|
path=f"{THREADS_HOST_PATH}/{thread_id}/user-data",
|
|
type="DirectoryOrCreate",
|
|
),
|
|
),
|
|
],
|
|
restart_policy="Always",
|
|
),
|
|
)
|
|
|
|
|
|
def _build_service(sandbox_id: str) -> k8s_client.V1Service:
|
|
"""Construct a NodePort Service manifest (port auto-allocated by K8s)."""
|
|
return k8s_client.V1Service(
|
|
metadata=k8s_client.V1ObjectMeta(
|
|
name=_svc_name(sandbox_id),
|
|
namespace=K8S_NAMESPACE,
|
|
labels={
|
|
"app": "deer-flow-sandbox",
|
|
"sandbox-id": sandbox_id,
|
|
"app.kubernetes.io/name": "deer-flow",
|
|
"app.kubernetes.io/component": "sandbox",
|
|
},
|
|
),
|
|
spec=k8s_client.V1ServiceSpec(
|
|
type="NodePort",
|
|
ports=[
|
|
k8s_client.V1ServicePort(
|
|
name="http",
|
|
port=8080,
|
|
target_port=8080,
|
|
protocol="TCP",
|
|
# nodePort omitted → K8s auto-allocates from the range
|
|
)
|
|
],
|
|
selector={
|
|
"sandbox-id": sandbox_id,
|
|
},
|
|
),
|
|
)
|
|
|
|
|
|
def _get_node_port(sandbox_id: str) -> int | None:
|
|
"""Read the K8s-allocated NodePort from the Service."""
|
|
try:
|
|
svc = core_v1.read_namespaced_service(_svc_name(sandbox_id), K8S_NAMESPACE)
|
|
for port in svc.spec.ports or []:
|
|
if port.name == "http":
|
|
return port.node_port
|
|
except ApiException:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _get_pod_phase(sandbox_id: str) -> str:
|
|
"""Return the Pod phase (Pending / Running / Succeeded / Failed / Unknown)."""
|
|
try:
|
|
pod = core_v1.read_namespaced_pod(_pod_name(sandbox_id), K8S_NAMESPACE)
|
|
return pod.status.phase or "Unknown"
|
|
except ApiException:
|
|
return "NotFound"
|
|
|
|
|
|
# ── API endpoints ────────────────────────────────────────────────────────
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
"""Provisioner health check."""
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.post("/api/sandboxes", response_model=SandboxResponse)
|
|
async def create_sandbox(req: CreateSandboxRequest):
|
|
"""Create a sandbox Pod + NodePort Service for *sandbox_id*.
|
|
|
|
If the sandbox already exists, returns the existing information
|
|
(idempotent).
|
|
"""
|
|
sandbox_id = req.sandbox_id
|
|
thread_id = req.thread_id
|
|
|
|
logger.info(
|
|
f"Received request to create sandbox '{sandbox_id}' for thread '{thread_id}'"
|
|
)
|
|
|
|
# ── Fast path: sandbox already exists ────────────────────────────
|
|
existing_port = _get_node_port(sandbox_id)
|
|
if existing_port:
|
|
return SandboxResponse(
|
|
sandbox_id=sandbox_id,
|
|
sandbox_url=_sandbox_url(existing_port),
|
|
status=_get_pod_phase(sandbox_id),
|
|
)
|
|
|
|
# ── Create Pod ───────────────────────────────────────────────────
|
|
try:
|
|
core_v1.create_namespaced_pod(K8S_NAMESPACE, _build_pod(sandbox_id, thread_id))
|
|
logger.info(f"Created Pod {_pod_name(sandbox_id)}")
|
|
except ApiException as exc:
|
|
if exc.status != 409: # 409 = AlreadyExists
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Pod creation failed: {exc.reason}"
|
|
)
|
|
|
|
# ── Create Service ───────────────────────────────────────────────
|
|
try:
|
|
core_v1.create_namespaced_service(K8S_NAMESPACE, _build_service(sandbox_id))
|
|
logger.info(f"Created Service {_svc_name(sandbox_id)}")
|
|
except ApiException as exc:
|
|
if exc.status != 409:
|
|
# Roll back the Pod on failure
|
|
try:
|
|
core_v1.delete_namespaced_pod(_pod_name(sandbox_id), K8S_NAMESPACE)
|
|
except ApiException:
|
|
pass
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Service creation failed: {exc.reason}"
|
|
)
|
|
|
|
# ── Read the auto-allocated NodePort ─────────────────────────────
|
|
node_port: int | None = None
|
|
for _ in range(20):
|
|
node_port = _get_node_port(sandbox_id)
|
|
if node_port:
|
|
break
|
|
time.sleep(0.5)
|
|
|
|
if not node_port:
|
|
raise HTTPException(
|
|
status_code=500, detail="NodePort was not allocated in time"
|
|
)
|
|
|
|
return SandboxResponse(
|
|
sandbox_id=sandbox_id,
|
|
sandbox_url=_sandbox_url(node_port),
|
|
status=_get_pod_phase(sandbox_id),
|
|
)
|
|
|
|
|
|
@app.delete("/api/sandboxes/{sandbox_id}")
|
|
async def destroy_sandbox(sandbox_id: str):
|
|
"""Destroy a sandbox Pod + Service."""
|
|
errors: list[str] = []
|
|
|
|
# Delete Service
|
|
try:
|
|
core_v1.delete_namespaced_service(_svc_name(sandbox_id), K8S_NAMESPACE)
|
|
logger.info(f"Deleted Service {_svc_name(sandbox_id)}")
|
|
except ApiException as exc:
|
|
if exc.status != 404:
|
|
errors.append(f"service: {exc.reason}")
|
|
|
|
# Delete Pod
|
|
try:
|
|
core_v1.delete_namespaced_pod(_pod_name(sandbox_id), K8S_NAMESPACE)
|
|
logger.info(f"Deleted Pod {_pod_name(sandbox_id)}")
|
|
except ApiException as exc:
|
|
if exc.status != 404:
|
|
errors.append(f"pod: {exc.reason}")
|
|
|
|
if errors:
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Partial cleanup: {', '.join(errors)}"
|
|
)
|
|
|
|
return {"ok": True, "sandbox_id": sandbox_id}
|
|
|
|
|
|
@app.get("/api/sandboxes/{sandbox_id}", response_model=SandboxResponse)
|
|
async def get_sandbox(sandbox_id: str):
|
|
"""Return current status and URL for a sandbox."""
|
|
node_port = _get_node_port(sandbox_id)
|
|
if not node_port:
|
|
raise HTTPException(status_code=404, detail=f"Sandbox '{sandbox_id}' not found")
|
|
|
|
return SandboxResponse(
|
|
sandbox_id=sandbox_id,
|
|
sandbox_url=_sandbox_url(node_port),
|
|
status=_get_pod_phase(sandbox_id),
|
|
)
|
|
|
|
|
|
@app.get("/api/sandboxes")
|
|
async def list_sandboxes():
|
|
"""List every sandbox currently managed in the namespace."""
|
|
try:
|
|
services = core_v1.list_namespaced_service(
|
|
K8S_NAMESPACE,
|
|
label_selector="app=deer-flow-sandbox",
|
|
)
|
|
except ApiException as exc:
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Failed to list services: {exc.reason}"
|
|
)
|
|
|
|
sandboxes: list[SandboxResponse] = []
|
|
for svc in services.items:
|
|
sid = (svc.metadata.labels or {}).get("sandbox-id")
|
|
if not sid:
|
|
continue
|
|
node_port = None
|
|
for port in svc.spec.ports or []:
|
|
if port.name == "http":
|
|
node_port = port.node_port
|
|
break
|
|
if node_port:
|
|
sandboxes.append(
|
|
SandboxResponse(
|
|
sandbox_id=sid,
|
|
sandbox_url=_sandbox_url(node_port),
|
|
status=_get_pod_phase(sid),
|
|
)
|
|
)
|
|
|
|
return {"sandboxes": sandboxes, "count": len(sandboxes)}
|