security: add log injection attack prevention with input sanitization (#667)

* security: add log injection attack prevention with input sanitization

- Created src/utils/log_sanitizer.py to sanitize user-controlled input before logging
- Prevents log injection attacks using newlines, tabs, carriage returns, etc.
- Escapes dangerous characters: \n, \r, \t, \0, \x1b
- Provides specialized functions for different input types:
  - sanitize_log_input: general purpose sanitization
  - sanitize_thread_id: for user-provided thread IDs
  - sanitize_user_content: for user messages (more aggressive truncation)
  - sanitize_agent_name: for agent identifiers
  - sanitize_tool_name: for tool names
  - sanitize_feedback: for user interrupt feedback
  - create_safe_log_message: template-based safe message creation

- Updated src/server/app.py to sanitize all user input in logging:
  - Thread IDs from request parameter
  - Message content from user
  - Agent names and node information
  - Tool names and feedback

- Updated src/agents/tool_interceptor.py to sanitize:
  - Tool names during execution
  - User feedback during interrupt handling
  - Tool input data

- Added 29 comprehensive unit tests covering:
  - Classic newline injection attacks
  - Carriage return injection
  - Tab and null character injection
  - HTML/ANSI escape sequence injection
  - Combined multi-character attacks
  - Truncation and length limits

Fixes potential log forgery vulnerability where malicious users could inject
fake log entries via unsanitized input containing control characters.
This commit is contained in:
Willem Jiang
2025-10-27 20:57:23 +08:00
committed by GitHub
parent ccd7535072
commit b4c09aa4b1
13 changed files with 585 additions and 80 deletions

View File

@@ -6,10 +6,10 @@ from typing import List, Optional
from langgraph.prebuilt import create_react_agent
from src.agents.tool_interceptor import wrap_tools_with_interceptor
from src.config.agents import AGENT_LLM_MAP
from src.llms.llm import get_llm_by_type
from src.prompts import apply_prompt_template
from src.agents.tool_interceptor import wrap_tools_with_interceptor
logger = logging.getLogger(__name__)

View File

@@ -8,6 +8,12 @@ from typing import Any, Callable, List, Optional
from langchain_core.tools import BaseTool
from langgraph.types import interrupt
from src.utils.log_sanitizer import (
sanitize_feedback,
sanitize_log_input,
sanitize_tool_name,
)
logger = logging.getLogger(__name__)
@@ -84,27 +90,30 @@ class ToolInterceptor:
BaseTool: The wrapped tool with interrupt capability
"""
original_func = tool.func
logger.debug(f"Wrapping tool '{tool.name}' with interrupt capability")
safe_tool_name = sanitize_tool_name(tool.name)
logger.debug(f"Wrapping tool '{safe_tool_name}' with interrupt capability")
def intercepted_func(*args: Any, **kwargs: Any) -> Any:
"""Execute the tool with interrupt check."""
tool_name = tool.name
logger.debug(f"[ToolInterceptor] Executing tool: {tool_name}")
safe_tool_name_local = sanitize_tool_name(tool_name)
logger.debug(f"[ToolInterceptor] Executing tool: {safe_tool_name_local}")
# Format tool input for display
tool_input = args[0] if args else kwargs
tool_input_repr = ToolInterceptor._format_tool_input(tool_input)
logger.debug(f"[ToolInterceptor] Tool input: {tool_input_repr[:200]}")
safe_tool_input = sanitize_log_input(tool_input_repr, max_length=100)
logger.debug(f"[ToolInterceptor] Tool input: {safe_tool_input}")
should_interrupt = interceptor.should_interrupt(tool_name)
logger.debug(f"[ToolInterceptor] should_interrupt={should_interrupt} for tool '{tool_name}'")
logger.debug(f"[ToolInterceptor] should_interrupt={should_interrupt} for tool '{safe_tool_name_local}'")
if should_interrupt:
logger.info(
f"[ToolInterceptor] Interrupting before tool '{tool_name}'"
f"[ToolInterceptor] Interrupting before tool '{safe_tool_name_local}'"
)
logger.debug(
f"[ToolInterceptor] Interrupt message: About to execute tool '{tool_name}' with input: {tool_input_repr[:100]}..."
f"[ToolInterceptor] Interrupt message: About to execute tool '{safe_tool_name_local}' with input: {safe_tool_input}..."
)
# Trigger interrupt and wait for user feedback
@@ -112,41 +121,43 @@ class ToolInterceptor:
feedback = interrupt(
f"About to execute tool: '{tool_name}'\n\nInput:\n{tool_input_repr}\n\nApprove execution?"
)
logger.debug(f"[ToolInterceptor] Interrupt returned with feedback: {f'{feedback[:100]}...' if feedback and len(feedback) > 100 else feedback if feedback else 'None'}")
safe_feedback = sanitize_feedback(feedback)
logger.debug(f"[ToolInterceptor] Interrupt returned with feedback: {f'{safe_feedback[:100]}...' if safe_feedback and len(safe_feedback) > 100 else safe_feedback if safe_feedback else 'None'}")
except Exception as e:
logger.error(f"[ToolInterceptor] Error during interrupt: {str(e)}")
raise
logger.debug(f"[ToolInterceptor] Processing feedback approval for '{tool_name}'")
logger.debug(f"[ToolInterceptor] Processing feedback approval for '{safe_tool_name_local}'")
# Check if user approved
is_approved = ToolInterceptor._parse_approval(feedback)
logger.info(f"[ToolInterceptor] Tool '{tool_name}' approval decision: {is_approved}")
logger.info(f"[ToolInterceptor] Tool '{safe_tool_name_local}' approval decision: {is_approved}")
if not is_approved:
logger.warning(f"[ToolInterceptor] User rejected execution of tool '{tool_name}'")
logger.warning(f"[ToolInterceptor] User rejected execution of tool '{safe_tool_name_local}'")
return {
"error": f"Tool execution rejected by user",
"tool": tool_name,
"status": "rejected",
}
logger.info(f"[ToolInterceptor] User approved execution of tool '{tool_name}', proceeding")
logger.info(f"[ToolInterceptor] User approved execution of tool '{safe_tool_name_local}', proceeding")
# Execute the original tool
try:
logger.debug(f"[ToolInterceptor] Calling original function for tool '{tool_name}'")
logger.debug(f"[ToolInterceptor] Calling original function for tool '{safe_tool_name_local}'")
result = original_func(*args, **kwargs)
logger.info(f"[ToolInterceptor] Tool '{tool_name}' execution completed successfully")
logger.debug(f"[ToolInterceptor] Tool result length: {len(str(result))}")
logger.info(f"[ToolInterceptor] Tool '{safe_tool_name_local}' execution completed successfully")
result_len = len(str(result))
logger.debug(f"[ToolInterceptor] Tool result length: {result_len}")
return result
except Exception as e:
logger.error(f"[ToolInterceptor] Error executing tool '{tool_name}': {str(e)}")
logger.error(f"[ToolInterceptor] Error executing tool '{safe_tool_name_local}': {str(e)}")
raise
# Replace the function and update the tool
# Use object.__setattr__ to bypass Pydantic validation
logger.debug(f"Attaching intercepted function to tool '{tool.name}'")
logger.debug(f"Attaching intercepted function to tool '{safe_tool_name}'")
object.__setattr__(tool, "func", intercepted_func)
return tool