mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-19 04:14:46 +08:00
feat: add gateway module with FastAPI server (#5)
* chore: add .claude/ to .gitignore Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * feat: add gateway module with FastAPI server - Add new gateway module with FastAPI app for API routing - Add gateway and serve commands to Makefile - Add fastapi, httpx, uvicorn, sse-starlette dependencies - Fix model config retrieval in lead_agent (support both model_name and model) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
4
backend/src/gateway/__init__.py
Normal file
4
backend/src/gateway/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .app import app, create_app
|
||||
from .config import GatewayConfig, get_gateway_config
|
||||
|
||||
__all__ = ["app", "create_app", "GatewayConfig", "get_gateway_config"]
|
||||
64
backend/src/gateway/app.py
Normal file
64
backend/src/gateway/app.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import logging
|
||||
from collections.abc import AsyncGenerator
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
from src.gateway.config import get_gateway_config
|
||||
from src.gateway.routers import models, proxy
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
"""Application lifespan handler."""
|
||||
config = get_gateway_config()
|
||||
logger.info(f"Starting API Gateway on {config.host}:{config.port}")
|
||||
logger.info(f"Proxying to LangGraph server at {config.langgraph_url}")
|
||||
yield
|
||||
logger.info("Shutting down API Gateway")
|
||||
|
||||
|
||||
def create_app() -> FastAPI:
|
||||
"""Create and configure the FastAPI application.
|
||||
|
||||
Returns:
|
||||
Configured FastAPI application instance.
|
||||
"""
|
||||
config = get_gateway_config()
|
||||
|
||||
app = FastAPI(
|
||||
title="DeerFlow API Gateway",
|
||||
description="API Gateway for DeerFlow - proxies to LangGraph Server and provides custom endpoints",
|
||||
version="0.1.0",
|
||||
lifespan=lifespan,
|
||||
)
|
||||
|
||||
# Add CORS middleware
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=config.cors_origins,
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# Include routers
|
||||
# Models API is mounted at /api/models
|
||||
app.include_router(models.router)
|
||||
|
||||
# Proxy router handles all LangGraph paths (must be last due to catch-all)
|
||||
app.include_router(proxy.router)
|
||||
|
||||
@app.get("/health")
|
||||
async def health_check() -> dict:
|
||||
"""Health check endpoint."""
|
||||
return {"status": "healthy", "service": "deer-flow-gateway"}
|
||||
|
||||
return app
|
||||
|
||||
|
||||
# Create app instance for uvicorn
|
||||
app = create_app()
|
||||
33
backend/src/gateway/config.py
Normal file
33
backend/src/gateway/config.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import os
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class GatewayConfig(BaseModel):
|
||||
"""Configuration for the API Gateway."""
|
||||
|
||||
host: str = Field(default="0.0.0.0", description="Host to bind the gateway server")
|
||||
port: int = Field(default=8000, description="Port to bind the gateway server")
|
||||
langgraph_url: str = Field(default="http://localhost:2024", description="URL of the LangGraph server to proxy requests to")
|
||||
cors_origins: list[str] = Field(default_factory=lambda: ["http://localhost:3000"], description="Allowed CORS origins")
|
||||
proxy_timeout: float = Field(default=300.0, description="Timeout for proxy requests in seconds")
|
||||
stream_timeout: float = Field(default=600.0, description="Timeout for streaming requests in seconds")
|
||||
|
||||
|
||||
_gateway_config: GatewayConfig | None = None
|
||||
|
||||
|
||||
def get_gateway_config() -> GatewayConfig:
|
||||
"""Get gateway config, loading from environment if available."""
|
||||
global _gateway_config
|
||||
if _gateway_config is None:
|
||||
cors_origins_str = os.getenv("CORS_ORIGINS", "http://localhost:3000")
|
||||
_gateway_config = GatewayConfig(
|
||||
host=os.getenv("GATEWAY_HOST", "0.0.0.0"),
|
||||
port=int(os.getenv("GATEWAY_PORT", "8000")),
|
||||
langgraph_url=os.getenv("LANGGRAPH_URL", "http://localhost:2024"),
|
||||
cors_origins=cors_origins_str.split(","),
|
||||
proxy_timeout=float(os.getenv("PROXY_TIMEOUT", "300")),
|
||||
stream_timeout=float(os.getenv("STREAM_TIMEOUT", "600")),
|
||||
)
|
||||
return _gateway_config
|
||||
3
backend/src/gateway/routers/__init__.py
Normal file
3
backend/src/gateway/routers/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from . import models, proxy
|
||||
|
||||
__all__ = ["models", "proxy"]
|
||||
67
backend/src/gateway/routers/models.py
Normal file
67
backend/src/gateway/routers/models.py
Normal file
@@ -0,0 +1,67 @@
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from src.config import get_app_config
|
||||
|
||||
router = APIRouter(prefix="/api", tags=["models"])
|
||||
|
||||
|
||||
class ModelResponse(BaseModel):
|
||||
"""Response model for model information."""
|
||||
|
||||
name: str = Field(..., description="Unique identifier for the model")
|
||||
display_name: str | None = Field(None, description="Human-readable name")
|
||||
description: str | None = Field(None, description="Model description")
|
||||
supports_thinking: bool = Field(default=False, description="Whether model supports thinking mode")
|
||||
|
||||
|
||||
class ModelsListResponse(BaseModel):
|
||||
"""Response model for listing all models."""
|
||||
|
||||
models: list[ModelResponse]
|
||||
|
||||
|
||||
@router.get("/models", response_model=ModelsListResponse)
|
||||
async def list_models() -> ModelsListResponse:
|
||||
"""List all available models from configuration.
|
||||
|
||||
Returns model information suitable for frontend display,
|
||||
excluding sensitive fields like API keys and internal configuration.
|
||||
"""
|
||||
config = get_app_config()
|
||||
models = [
|
||||
ModelResponse(
|
||||
name=model.name,
|
||||
display_name=model.display_name,
|
||||
description=model.description,
|
||||
supports_thinking=model.supports_thinking,
|
||||
)
|
||||
for model in config.models
|
||||
]
|
||||
return ModelsListResponse(models=models)
|
||||
|
||||
|
||||
@router.get("/models/{model_name}", response_model=ModelResponse)
|
||||
async def get_model(model_name: str) -> ModelResponse:
|
||||
"""Get a specific model by name.
|
||||
|
||||
Args:
|
||||
model_name: The unique name of the model to retrieve.
|
||||
|
||||
Returns:
|
||||
Model information if found.
|
||||
|
||||
Raises:
|
||||
HTTPException: 404 if model not found.
|
||||
"""
|
||||
config = get_app_config()
|
||||
model = config.get_model_config(model_name)
|
||||
if model is None:
|
||||
raise HTTPException(status_code=404, detail=f"Model '{model_name}' not found")
|
||||
|
||||
return ModelResponse(
|
||||
name=model.name,
|
||||
display_name=model.display_name,
|
||||
description=model.description,
|
||||
supports_thinking=model.supports_thinking,
|
||||
)
|
||||
141
backend/src/gateway/routers/proxy.py
Normal file
141
backend/src/gateway/routers/proxy.py
Normal file
@@ -0,0 +1,141 @@
|
||||
import logging
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
import httpx
|
||||
from fastapi import APIRouter, Request, Response
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
from src.gateway.config import get_gateway_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(tags=["proxy"])
|
||||
|
||||
# Hop-by-hop headers that should not be forwarded
|
||||
EXCLUDED_HEADERS = {
|
||||
"host",
|
||||
"connection",
|
||||
"keep-alive",
|
||||
"proxy-authenticate",
|
||||
"proxy-authorization",
|
||||
"te",
|
||||
"trailers",
|
||||
"transfer-encoding",
|
||||
"upgrade",
|
||||
"content-length",
|
||||
}
|
||||
|
||||
|
||||
async def stream_response(client: httpx.AsyncClient, method: str, url: str, headers: dict, body: bytes | None, timeout: float) -> AsyncGenerator[bytes, None]:
|
||||
"""Stream response from the upstream server.
|
||||
|
||||
Args:
|
||||
client: The httpx async client.
|
||||
method: HTTP method.
|
||||
url: Target URL.
|
||||
headers: Request headers.
|
||||
body: Request body.
|
||||
timeout: Request timeout.
|
||||
|
||||
Yields:
|
||||
Response chunks.
|
||||
"""
|
||||
async with client.stream(
|
||||
method=method,
|
||||
url=url,
|
||||
headers=headers,
|
||||
content=body,
|
||||
timeout=timeout,
|
||||
) as response:
|
||||
async for chunk in response.aiter_bytes():
|
||||
yield chunk
|
||||
|
||||
|
||||
async def proxy_request(request: Request, path: str) -> Response | StreamingResponse:
|
||||
"""Proxy a request to the LangGraph server.
|
||||
|
||||
Args:
|
||||
request: The incoming FastAPI request.
|
||||
path: The path to proxy to.
|
||||
|
||||
Returns:
|
||||
Response or StreamingResponse depending on content type.
|
||||
"""
|
||||
config = get_gateway_config()
|
||||
target_url = f"{config.langgraph_url}/{path}"
|
||||
|
||||
# Preserve query parameters
|
||||
if request.url.query:
|
||||
target_url = f"{target_url}?{request.url.query}"
|
||||
|
||||
# Prepare headers (exclude hop-by-hop headers)
|
||||
headers = {key: value for key, value in request.headers.items() if key.lower() not in EXCLUDED_HEADERS}
|
||||
|
||||
# Read request body for non-GET requests
|
||||
body = None
|
||||
if request.method not in ("GET", "HEAD"):
|
||||
body = await request.body()
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
try:
|
||||
# First, make a non-streaming request to check content type
|
||||
response = await client.request(
|
||||
method=request.method,
|
||||
url=target_url,
|
||||
headers=headers,
|
||||
content=body,
|
||||
timeout=config.proxy_timeout,
|
||||
)
|
||||
|
||||
content_type = response.headers.get("content-type", "")
|
||||
|
||||
# Check if response is SSE (Server-Sent Events)
|
||||
if "text/event-stream" in content_type:
|
||||
# For SSE, we need to re-request with streaming
|
||||
return StreamingResponse(
|
||||
stream_response(client, request.method, target_url, headers, body, config.stream_timeout),
|
||||
media_type="text/event-stream",
|
||||
headers={
|
||||
"Cache-Control": "no-cache",
|
||||
"Connection": "keep-alive",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
)
|
||||
|
||||
# Prepare response headers
|
||||
response_headers = dict(response.headers)
|
||||
for header in ["transfer-encoding", "connection", "keep-alive"]:
|
||||
response_headers.pop(header, None)
|
||||
|
||||
return Response(
|
||||
content=response.content,
|
||||
status_code=response.status_code,
|
||||
headers=response_headers,
|
||||
)
|
||||
|
||||
except httpx.TimeoutException:
|
||||
logger.error(f"Proxy request to {target_url} timed out")
|
||||
return Response(
|
||||
content='{"error": "Proxy request timed out"}',
|
||||
status_code=504,
|
||||
media_type="application/json",
|
||||
)
|
||||
except httpx.RequestError as e:
|
||||
logger.error(f"Proxy request to {target_url} failed: {e}")
|
||||
return Response(
|
||||
content='{"error": "Proxy request failed"}',
|
||||
status_code=502,
|
||||
media_type="application/json",
|
||||
)
|
||||
|
||||
|
||||
@router.api_route(
|
||||
"/{path:path}",
|
||||
methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
|
||||
)
|
||||
async def proxy_langgraph(request: Request, path: str) -> Response:
|
||||
"""Proxy all requests to LangGraph server.
|
||||
|
||||
This catch-all route forwards requests to the LangGraph server.
|
||||
"""
|
||||
return await proxy_request(request, path)
|
||||
Reference in New Issue
Block a user