mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-17 19:44:45 +08:00
* fix(checkpointer): return InMemorySaver instead of None when not configured (#1016) * fix(checkpointer): also fix get_checkpointer() to return InMemorySaver Make all three checkpointer functions consistent: - make_checkpointer() (async) → InMemorySaver - checkpointer_context() (sync) → InMemorySaver - get_checkpointer() (sync singleton) → InMemorySaver This ensures DeerFlowClient always has a valid checkpointer. * fix: address CI failure and Copilot review feedback - Fix import order in test_checkpointer_none_fix.py (I001 ruff error) - Fix type annotation: _checkpointer should be Checkpointer | None - Update docstring: change "None if not configured" to "InMemorySaver if not configured" - Ensure app config is loaded before checking checkpointer config to prevent incorrect InMemorySaver fallback --------- Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
@@ -10,7 +10,7 @@ Usage (e.g. FastAPI lifespan)::
|
||||
from src.agents.checkpointer.async_provider import make_checkpointer
|
||||
|
||||
async with make_checkpointer() as checkpointer:
|
||||
app.state.checkpointer = checkpointer # None if not configured
|
||||
app.state.checkpointer = checkpointer # InMemorySaver if not configured
|
||||
|
||||
For sync usage see :mod:`src.agents.checkpointer.provider`.
|
||||
"""
|
||||
@@ -87,20 +87,22 @@ async def _async_checkpointer(config) -> AsyncIterator[Checkpointer]:
|
||||
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def make_checkpointer() -> AsyncIterator[Checkpointer | None]:
|
||||
async def make_checkpointer() -> AsyncIterator[Checkpointer]:
|
||||
"""Async context manager that yields a checkpointer for the caller's lifetime.
|
||||
Resources are opened on enter and closed on exit — no global state::
|
||||
|
||||
async with make_checkpointer() as checkpointer:
|
||||
app.state.checkpointer = checkpointer
|
||||
|
||||
Yields ``None`` when no checkpointer is configured in *config.yaml*.
|
||||
Yields an ``InMemorySaver`` when no checkpointer is configured in *config.yaml*.
|
||||
"""
|
||||
|
||||
config = get_app_config()
|
||||
|
||||
if config.checkpointer is None:
|
||||
yield None
|
||||
from langgraph.checkpoint.memory import InMemorySaver
|
||||
|
||||
yield InMemorySaver()
|
||||
return
|
||||
|
||||
async with _async_checkpointer(config.checkpointer) as saver:
|
||||
|
||||
@@ -107,14 +107,14 @@ def _sync_checkpointer_cm(config: CheckpointerConfig) -> Iterator[Checkpointer]:
|
||||
# Sync singleton
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_checkpointer: Checkpointer = None
|
||||
_checkpointer: Checkpointer | None = None
|
||||
_checkpointer_ctx = None # open context manager keeping the connection alive
|
||||
|
||||
|
||||
def get_checkpointer() -> Checkpointer | None:
|
||||
def get_checkpointer() -> Checkpointer:
|
||||
"""Return the global sync checkpointer singleton, creating it on first call.
|
||||
|
||||
Returns ``None`` when no checkpointer is configured in *config.yaml*.
|
||||
Returns an ``InMemorySaver`` when no checkpointer is configured in *config.yaml*.
|
||||
|
||||
Raises:
|
||||
ImportError: If the required package for the configured backend is not installed.
|
||||
@@ -125,11 +125,29 @@ def get_checkpointer() -> Checkpointer | None:
|
||||
if _checkpointer is not None:
|
||||
return _checkpointer
|
||||
|
||||
# Ensure app config is loaded before checking checkpointer config
|
||||
# This prevents returning InMemorySaver when config.yaml actually has a checkpointer section
|
||||
# but hasn't been loaded yet
|
||||
from src.config.app_config import _app_config
|
||||
from src.config.checkpointer_config import get_checkpointer_config
|
||||
|
||||
if _app_config is None:
|
||||
# Only load config if it hasn't been initialized yet
|
||||
# In tests, config may be set directly via set_checkpointer_config()
|
||||
try:
|
||||
get_app_config()
|
||||
except FileNotFoundError:
|
||||
# In test environments without config.yaml, this is expected
|
||||
# Tests will set config directly via set_checkpointer_config()
|
||||
pass
|
||||
|
||||
config = get_checkpointer_config()
|
||||
if config is None:
|
||||
return None
|
||||
from langgraph.checkpoint.memory import InMemorySaver
|
||||
|
||||
logger.info("Checkpointer: using InMemorySaver (in-process, not persistent)")
|
||||
_checkpointer = InMemorySaver()
|
||||
return _checkpointer
|
||||
|
||||
_checkpointer_ctx = _sync_checkpointer_cm(config)
|
||||
_checkpointer = _checkpointer_ctx.__enter__()
|
||||
@@ -159,7 +177,7 @@ def reset_checkpointer() -> None:
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def checkpointer_context() -> Iterator[Checkpointer | None]:
|
||||
def checkpointer_context() -> Iterator[Checkpointer]:
|
||||
"""Sync context manager that yields a checkpointer and cleans up on exit.
|
||||
|
||||
Unlike :func:`get_checkpointer`, this does **not** cache the instance —
|
||||
@@ -168,11 +186,15 @@ def checkpointer_context() -> Iterator[Checkpointer | None]:
|
||||
|
||||
with checkpointer_context() as cp:
|
||||
graph.invoke(input, config={"configurable": {"thread_id": "1"}})
|
||||
|
||||
Yields an ``InMemorySaver`` when no checkpointer is configured in *config.yaml*.
|
||||
"""
|
||||
|
||||
config = get_app_config()
|
||||
if config.checkpointer is None:
|
||||
yield None
|
||||
from langgraph.checkpoint.memory import InMemorySaver
|
||||
|
||||
yield InMemorySaver()
|
||||
return
|
||||
|
||||
with _sync_checkpointer_cm(config.checkpointer) as saver:
|
||||
|
||||
Reference in New Issue
Block a user