feat: support infoquest (#708)

* support infoquest

* support html checker

* support html checker

* change line break format

* change line break format

* change line break format

* change line break format

* change line break format

* change line break format

* change line break format

* change line break format

* Fix several critical issues in the codebase
- Resolve crawler panic by improving error handling
- Fix plan validation to prevent invalid configurations
- Correct InfoQuest crawler JSON conversion logic

* add test for infoquest

* add test for infoquest

* Add InfoQuest introduction to the README

* add test for infoquest

* fix readme for infoquest

* fix readme for infoquest

* resolve the conflict

* resolve the conflict

* resolve the conflict

* Fix formatting of INFOQUEST in SearchEngine enum

* Apply suggestions from code review

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Willem Jiang <143703838+willem-bd@users.noreply.github.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
infoquest-byteplus
2025-12-02 08:16:35 +08:00
committed by GitHub
parent e179fb1632
commit 7ec9e45702
22 changed files with 2103 additions and 94 deletions

View File

@@ -0,0 +1,4 @@
from .infoquest_search_api import InfoQuestAPIWrapper
from .infoquest_search_results import InfoQuestSearchResults
__all__ = ["InfoQuestAPIWrapper", "InfoQuestSearchResults"]

View File

@@ -0,0 +1,232 @@
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# SPDX-License-Identifier: MIT
"""Util that calls InfoQuest Search API.
In order to set this up, follow instructions at:
https://docs.byteplus.com/en/docs/InfoQuest/What_is_Info_Quest
"""
import json
from typing import Any, Dict, List
import aiohttp
import requests
from langchain_core.utils import get_from_dict_or_env
from pydantic import BaseModel, ConfigDict, SecretStr, model_validator
from src.config import load_yaml_config
import logging
logger = logging.getLogger(__name__)
INFOQUEST_API_URL = "https://search.infoquest.bytepluses.com"
def get_search_config():
config = load_yaml_config("conf.yaml")
search_config = config.get("SEARCH_ENGINE", {})
return search_config
class InfoQuestAPIWrapper(BaseModel):
"""Wrapper for InfoQuest Search API."""
infoquest_api_key: SecretStr
model_config = ConfigDict(
extra="forbid",
)
@model_validator(mode="before")
@classmethod
def validate_environment(cls, values: Dict) -> Any:
"""Validate that api key and endpoint exists in environment."""
logger.info("Initializing BytePlus InfoQuest Product - Search API client")
infoquest_api_key = get_from_dict_or_env(
values, "infoquest_api_key", "INFOQUEST_API_KEY"
)
values["infoquest_api_key"] = infoquest_api_key
logger.info("BytePlus InfoQuest Product - Environment validation successful")
return values
def raw_results(
self,
query: str,
time_range: int,
site: str,
output_format: str = "JSON",
) -> Dict:
"""Get results from the InfoQuest Search API synchronously."""
if logger.isEnabledFor(logging.DEBUG):
query_truncated = query[:50] + "..." if len(query) > 50 else query
logger.debug(
f"InfoQuest - Search API request initiated | "
f"operation=search | "
f"query_truncated={query_truncated} | "
f"has_time_filter={time_range > 0} | "
f"has_site_filter={bool(site)} | "
f"request_type=sync"
)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.infoquest_api_key.get_secret_value()}",
}
params = {
"format": output_format,
"query": query
}
if time_range > 0:
params["time_range"] = time_range
logger.debug(f"InfoQuest - Applying time range filter: time_range_days={time_range}")
if site != "":
params["site"] = site
logger.debug(f"InfoQuest - Applying site filter: site={site}")
response = requests.post(
f"{INFOQUEST_API_URL}",
headers=headers,
json=params
)
response.raise_for_status()
# Print partial response for debugging
response_json = response.json()
if logger.isEnabledFor(logging.DEBUG):
response_sample = json.dumps(response_json)[:200] + ("..." if len(json.dumps(response_json)) > 200 else "")
logger.debug(
f"Search API request completed successfully | "
f"service=InfoQuest | "
f"status=success | "
f"response_sample={response_sample}"
)
return response_json["search_result"]
async def raw_results_async(
self,
query: str,
time_range: int,
site: str,
output_format: str = "JSON",
) -> Dict:
"""Get results from the InfoQuest Search API asynchronously."""
if logger.isEnabledFor(logging.DEBUG):
query_truncated = query[:50] + "..." if len(query) > 50 else query
logger.debug(
f"BytePlus InfoQuest - Search API async request initiated | "
f"operation=search | "
f"query_truncated={query_truncated} | "
f"has_time_filter={time_range > 0} | "
f"has_site_filter={bool(site)} | "
f"request_type=async"
)
# Function to perform the API call
async def fetch() -> str:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.infoquest_api_key.get_secret_value()}",
}
params = {
"format": output_format,
"query": query,
}
if time_range > 0:
params["time_range"] = time_range
logger.debug(f"Applying time range filter in async request: {time_range} days")
if site != "":
params["site"] = site
logger.debug(f"Applying site filter in async request: {site}")
async with aiohttp.ClientSession(trust_env=True) as session:
async with session.post(f"{INFOQUEST_API_URL}", headers=headers, json=params) as res:
if res.status == 200:
data = await res.text()
return data
else:
raise Exception(f"Error {res.status}: {res.reason}")
results_json_str = await fetch()
# Print partial response for debugging
if logger.isEnabledFor(logging.DEBUG):
response_sample = results_json_str[:200] + ("..." if len(results_json_str) > 200 else "")
logger.debug(
f"Async search API request completed successfully | "
f"service=InfoQuest | "
f"status=success | "
f"response_sample={response_sample}"
)
return json.loads(results_json_str)["search_result"]
def clean_results_with_images(
self, raw_results: List[Dict[str, Dict[str, Dict[str, Any]]]]
) -> List[Dict]:
"""Clean results from InfoQuest Search API."""
logger.debug("Processing search results")
seen_urls = set()
clean_results = []
counts = {"pages": 0, "news": 0, "images": 0}
for content_list in raw_results:
content = content_list["content"]
results = content["results"]
if results.get("organic"):
organic_results = results["organic"]
for result in organic_results:
clean_result = {
"type": "page",
"title": result["title"],
"url": result["url"],
"desc": result["desc"],
}
url = clean_result["url"]
if isinstance(url, str) and url and url not in seen_urls:
seen_urls.add(url)
clean_results.append(clean_result)
counts["pages"] += 1
if results.get("top_stories"):
news = results["top_stories"]
for obj in news["items"]:
clean_result = {
"type": "news",
"time_frame": obj["time_frame"],
"title": obj["title"],
"url": obj["url"],
"source": obj["source"],
}
url = clean_result["url"]
if isinstance(url, str) and url and url not in seen_urls:
seen_urls.add(url)
clean_results.append(clean_result)
counts["news"] += 1
if results.get("images"):
images = results["images"]
for image in images["items"]:
clean_result = {
"type": "image_url",
"image_url": image["url"],
"image_description": image["alt"],
}
url = clean_result["image_url"]
if isinstance(url, str) and url and url not in seen_urls:
seen_urls.add(url)
clean_results.append(clean_result)
counts["images"] += 1
logger.debug(
f"Results processing completed | "
f"total_results={len(clean_results)} | "
f"pages={counts['pages']} | "
f"news_items={counts['news']} | "
f"images={counts['images']} | "
f"unique_urls={len(seen_urls)}"
)
return clean_results

View File

@@ -0,0 +1,236 @@
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# SPDX-License-Identifier: MIT
"""Tool for the InfoQuest search API."""
import json
import logging
from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union
from langchain_core.callbacks import (
AsyncCallbackManagerForToolRun,
CallbackManagerForToolRun,
)
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from src.tools.infoquest_search.infoquest_search_api import InfoQuestAPIWrapper
logger = logging.getLogger(__name__)
class InfoQuestInput(BaseModel):
"""Input for the InfoQuest tool."""
query: str = Field(description="search query to look up")
class InfoQuestSearchResults(BaseTool):
"""Tool that queries the InfoQuest Search API and returns processed results with images.
Setup:
Install required packages and set environment variable ``INFOQUEST_API_KEY``.
.. code-block:: bash
pip install -U langchain-community aiohttp
export INFOQUEST_API_KEY="your-api-key"
Instantiate:
.. code-block:: python
from your_module import InfoQuestSearch
tool = InfoQuestSearchResults(
output_format="json",
time_range=10,
site="nytimes.com"
)
Invoke directly with args:
.. code-block:: python
tool.invoke({
'query': 'who won the last french open'
})
.. code-block:: json
[
{
"type": "page",
"title": "Djokovic Claims French Open Title...",
"url": "https://www.nytimes.com/...",
"desc": "Novak Djokovic won the 2024 French Open by defeating Casper Ruud..."
},
{
"type": "news",
"time_frame": "2 days ago",
"title": "French Open Finals Recap",
"url": "https://www.nytimes.com/...",
"source": "New York Times"
},
{
"type": "image_url",
"image_url": {"url": "https://www.nytimes.com/.../djokovic.jpg"},
"image_description": "Novak Djokovic celebrating his French Open victory"
}
]
Invoke with tool call:
.. code-block:: python
tool.invoke({
"args": {
'query': 'who won the last french open',
},
"type": "tool_call",
"id": "foo",
"name": "infoquest"
})
.. code-block:: python
ToolMessage(
content='[
{"type": "page", "title": "Djokovic Claims...", "url": "https://www.nytimes.com/...", "desc": "Novak Djokovic won..."},
{"type": "news", "time_frame": "2 days ago", "title": "French Open Finals...", "url": "https://www.nytimes.com/...", "source": "New York Times"},
{"type": "image_url", "image_url": {"url": "https://www.nytimes.com/.../djokovic.jpg"}, "image_description": "Novak Djokovic celebrating..."}
]',
tool_call_id='1',
name='infoquest_search_results_json',
)
""" # noqa: E501
name: str = "infoquest_search_results_json"
description: str = (
"A search engine optimized for comprehensive, accurate, and trusted results. "
"Useful for when you need to answer questions about current events. "
"Input should be a search query."
)
args_schema: Type[BaseModel] = InfoQuestInput
"""The tool response format."""
time_range: int = -1
"""Time range for filtering search results, in days.
If set to a positive integer (e.g., 30), only results from the last N days will be included.
Default is -1, which means no time range filter is applied.
"""
site: str = ""
"""Specific domain to restrict search results to (e.g., "nytimes.com").
If provided, only results from the specified domain will be returned.
Default is an empty string, which means no domain restriction is applied.
"""
api_wrapper: InfoQuestAPIWrapper = Field(default_factory=InfoQuestAPIWrapper) # type: ignore[arg-type]
response_format: Literal["content_and_artifact"] = "content_and_artifact"
def __init__(self, **kwargs: Any) -> None:
# Create api_wrapper with infoquest_api_key if provided
if "infoquest_api_key" in kwargs:
kwargs["api_wrapper"] = InfoQuestAPIWrapper(
infoquest_api_key=kwargs["infoquest_api_key"]
)
logger.debug("API wrapper initialized with provided key")
super().__init__(**kwargs)
logger.info(
"\n============================================\n"
"🚀 BytePlus InfoQuest Search Initialization 🚀\n"
"============================================"
)
# Prepare initialization details
time_range_status = f"{self.time_range} days" if hasattr(self, 'time_range') and self.time_range > 0 else "Disabled"
site_filter = f"'{self.site}'" if hasattr(self, 'site') and self.site else "Disabled"
initialization_details = (
f"\n🔧 Tool Information:\n"
f"├── Tool Name: {self.name}\n"
f"├── Time Range Filter: {time_range_status}\n"
f"└── Site Filter: {site_filter}\n"
f"📊 Configuration Summary:\n"
f"├── Response Format: {self.response_format}\n"
)
logger.info(initialization_details)
logger.info("\n" + "*" * 70 + "\n")
def _run(
self,
query: str,
run_manager: Optional[CallbackManagerForToolRun] = None,
) -> Tuple[Union[List[Dict[str, str]], str], Dict]:
"""Use the tool."""
try:
logger.debug(f"Executing search with parameters: time_range={self.time_range}, site={self.site}")
raw_results = self.api_wrapper.raw_results(
query,
self.time_range,
self.site
)
logger.debug("Processing raw search results")
cleaned_results = self.api_wrapper.clean_results_with_images(raw_results["results"])
result_json = json.dumps(cleaned_results, ensure_ascii=False)
logger.info(
f"Search tool execution completed | "
f"mode=synchronous | "
f"results_count={len(cleaned_results)}"
)
return result_json, raw_results
except Exception as e:
logger.error(
f"Search tool execution failed | "
f"mode=synchronous | "
f"error={str(e)}"
)
error_result = json.dumps({"error": repr(e)}, ensure_ascii=False)
return error_result, {}
async def _arun(
self,
query: str,
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
) -> Tuple[Union[List[Dict[str, str]], str], Dict]:
"""Use the tool asynchronously."""
if logger.isEnabledFor(logging.DEBUG):
query_truncated = query[:50] + "..." if len(query) > 50 else query
logger.debug(
f"Search tool execution started | "
f"mode=asynchronous | "
f"query={query_truncated}"
)
try:
logger.debug(f"Executing async search with parameters: time_range={self.time_range}, site={self.site}")
raw_results = await self.api_wrapper.raw_results_async(
query,
self.time_range,
self.site
)
logger.debug("Processing raw async search results")
cleaned_results = self.api_wrapper.clean_results_with_images(raw_results["results"])
result_json = json.dumps(cleaned_results, ensure_ascii=False)
logger.debug(
f"Search tool execution completed | "
f"mode=asynchronous | "
f"results_count={len(cleaned_results)}"
)
return result_json, raw_results
except Exception as e:
logger.error(
f"Search tool execution failed | "
f"mode=asynchronous | "
f"error={str(e)}"
)
error_result = json.dumps({"error": repr(e)}, ensure_ascii=False)
return error_result, {}

View File

@@ -21,6 +21,7 @@ from langchain_community.utilities import (
from src.config import SELECTED_SEARCH_ENGINE, SearchEngine, load_yaml_config
from src.tools.decorators import create_logged_tool
from src.tools.infoquest_search.infoquest_search_results import InfoQuestSearchResults
from src.tools.tavily_search.tavily_search_results_with_images import (
TavilySearchWithImages,
)
@@ -29,6 +30,7 @@ logger = logging.getLogger(__name__)
# Create logged versions of the search tools
LoggedTavilySearch = create_logged_tool(TavilySearchWithImages)
LoggedInfoQuestSearch = create_logged_tool(InfoQuestSearchResults)
LoggedDuckDuckGoSearch = create_logged_tool(DuckDuckGoSearchResults)
LoggedBraveSearch = create_logged_tool(BraveSearch)
LoggedArxivSearch = create_logged_tool(ArxivQueryRun)
@@ -76,6 +78,17 @@ def get_web_search_tool(max_search_results: int):
include_domains=include_domains,
exclude_domains=exclude_domains,
)
elif SELECTED_SEARCH_ENGINE == SearchEngine.INFOQUEST.value:
time_range = search_config.get("time_range", -1)
site = search_config.get("site", "")
logger.info(
f"InfoQuest search configuration loaded: time_range={time_range}, site={site}"
)
return LoggedInfoQuestSearch(
name="web_search",
time_range=time_range,
site=site,
)
elif SELECTED_SEARCH_ENGINE == SearchEngine.DUCKDUCKGO.value:
return LoggedDuckDuckGoSearch(
name="web_search",