feat: support for moi in RAG module (#571)

* feat: add support for moi

* small adjust

* small adjust

* according 2 comments

* add more intro

* add more intro
This commit is contained in:
lele3436
2025-09-16 20:25:59 +08:00
committed by GitHub
parent ea0fe62971
commit 5085bf8ee9
6 changed files with 176 additions and 0 deletions

View File

@@ -41,6 +41,12 @@ TAVILY_API_KEY=tvly-xxx
# RAGFLOW_RETRIEVAL_SIZE=10
# RAGFLOW_CROSS_LANGUAGES=English,Chinese,Spanish,French,German,Japanese,Korean # Optional. To use RAGFlow's cross-language search, please separate each language with a single comma
# MOI is a hybrid database that mainly serves enterprise users (https://www.matrixorigin.io/matrixone-intelligence)
# RAG_PROVIDER=moi
# MOI_API_URL="https://freetier-01.cn-hangzhou.cluster.matrixonecloud.cn"
# MOI_API_KEY="xxx-xxx-xxx-xxx"
# MOI_RETRIEVAL_SIZE=10
# MOI_LIST_LIMIT=10
# RAG_PROVIDER: milvus (using free milvus instance on zilliz cloud: https://docs.zilliz.com/docs/quick-start )
# RAG_PROVIDER=milvus

View File

@@ -183,6 +183,16 @@ DeerFlow 支持基于私有域知识的检索,您可以将文档上传到多
RAGFLOW_RETRIEVAL_SIZE=10
```
- **[MOI]**AI 原生多模态数据智能平台
```
# 参照示例进行配置 .env.example
RAG_PROVIDER=moi
MOI_API_URL="https://freetier-01.cn-hangzhou.cluster.matrixonecloud.cn"
MOI_API_KEY="xxx-xxx-xxx-xxx"
MOI_RETRIEVAL_SIZE=10
MOI_LIST_LIMIT=10
```
- **[VikingDB 知识库](https://www.volcengine.com/docs/84313/1254457)**:火山引擎提供的公有云知识库引擎
> 注意先从 [火山引擎](https://www.volcengine.com/docs/84313/1254485) 获取账号 AK/SK
```

View File

@@ -24,6 +24,7 @@ SELECTED_SEARCH_ENGINE = os.getenv("SEARCH_API", SearchEngine.TAVILY.value)
class RAGProvider(enum.Enum):
RAGFLOW = "ragflow"
VIKINGDB_KNOWLEDGE_BASE = "vikingdb_knowledge_base"
MOI = "moi"
MILVUS = "milvus"

View File

@@ -3,6 +3,7 @@
from .builder import build_retriever
from .ragflow import RAGFlowProvider
from .moi import MOIProvider
from .retriever import Chunk, Document, Resource, Retriever
from .vikingdb_knowledge_base import VikingDBKnowledgeBaseProvider
@@ -11,6 +12,7 @@ __all__ = [
Document,
Resource,
RAGFlowProvider,
MOIProvider,
VikingDBKnowledgeBaseProvider,
Chunk,
build_retriever,

View File

@@ -3,6 +3,7 @@
from src.config.tools import SELECTED_RAG_PROVIDER, RAGProvider
from src.rag.ragflow import RAGFlowProvider
from src.rag.moi import MOIProvider
from src.rag.retriever import Retriever
from src.rag.vikingdb_knowledge_base import VikingDBKnowledgeBaseProvider
from src.rag.milvus import MilvusProvider
@@ -11,6 +12,8 @@ from src.rag.milvus import MilvusProvider
def build_retriever() -> Retriever | None:
if SELECTED_RAG_PROVIDER == RAGProvider.RAGFLOW.value:
return RAGFlowProvider()
elif SELECTED_RAG_PROVIDER == RAGProvider.MOI.value:
return MOIProvider()
elif SELECTED_RAG_PROVIDER == RAGProvider.VIKINGDB_KNOWLEDGE_BASE.value:
return VikingDBKnowledgeBaseProvider()
elif SELECTED_RAG_PROVIDER == RAGProvider.MILVUS.value:

154
src/rag/moi.py Normal file
View File

@@ -0,0 +1,154 @@
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# SPDX-License-Identifier: MIT
import os
from urllib.parse import urlparse
import requests
from src.rag.retriever import Chunk, Document, Resource, Retriever
class MOIProvider(Retriever):
"""
MatrixOne Intelligence (MOI) is a multimodal data AI processing platform.
It supports connecting, processing, managing, and using both structured and unstructured data.
Through steps such as parsing, extraction, segmentation, cleaning, and enhancement,
it transforms raw data like documents, images, and audio/video into AI-ready application data.
With its self-developed data service layer (the MatrixOne database),
it can directly provide retrieval services for the processed data.
The open-source repository is available at: https://github.com/matrixorigin/matrixone
For more information, please visit the website: https://www.matrixorigin.io/matrixone-intelligence
Documentation: https://docs.matrixorigin.cn/zh/m1intelligence/MatrixOne-Intelligence/Workspace-Mgmt/overview/
Online Demo: https://www.matrixorigin.io/demo
"""
def __init__(self):
# Initialize MOI API configuration from environment variables
self.api_url = os.getenv("MOI_API_URL")
if not self.api_url:
raise ValueError("MOI_API_URL is not set")
# Add /byoa suffix to the API URL for MOI compatibility
if not self.api_url.endswith("/byoa"):
self.api_url = self.api_url + "/byoa"
self.api_key = os.getenv("MOI_API_KEY")
if not self.api_key:
raise ValueError("MOI_API_KEY is not set")
# Set page size for document retrieval
self.page_size = 10
moi_size = os.getenv("MOI_RETRIEVAL_SIZE")
if moi_size:
self.page_size = int(moi_size)
# Set MOI-specific list limit parameter
self.moi_list_limit = None
moi_list_limit = os.getenv("MOI_LIST_LIMIT")
if moi_list_limit:
self.moi_list_limit = int(moi_list_limit)
def query_relevant_documents(
self, query: str, resources: list[Resource] = []
) -> list[Document]:
"""
Query relevant documents from MOI API using the provided resources.
"""
headers = {
"moi-key": f"{self.api_key}",
"Content-Type": "application/json",
}
dataset_ids: list[str] = []
document_ids: list[str] = []
for resource in resources:
dataset_id, document_id = self._parse_uri(resource.uri)
dataset_ids.append(dataset_id)
if document_id:
document_ids.append(document_id)
payload = {
"question": query,
"dataset_ids": dataset_ids,
"document_ids": document_ids,
"page_size": self.page_size,
}
response = requests.post(
f"{self.api_url}/api/v1/retrieval", headers=headers, json=payload
)
if response.status_code != 200:
raise Exception(f"Failed to query documents: {response.text}")
result = response.json()
data = result.get("data", {})
doc_aggs = data.get("doc_aggs", [])
docs: dict[str, Document] = {
doc.get("doc_id"): Document(
id=doc.get("doc_id"),
title=doc.get("doc_name"),
chunks=[],
)
for doc in doc_aggs
}
for chunk in data.get("chunks", []):
doc = docs.get(chunk.get("document_id"))
if doc:
doc.chunks.append(
Chunk(
content=chunk.get("content"),
similarity=chunk.get("similarity"),
)
)
return list(docs.values())
def list_resources(self, query: str | None = None) -> list[Resource]:
"""
List resources from MOI API with optional query filtering and limit support.
"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
params = {}
if query:
params["name"] = query
if self.moi_list_limit:
params["limit"] = self.moi_list_limit
response = requests.get(
f"{self.api_url}/api/v1/datasets", headers=headers, params=params
)
if response.status_code != 200:
raise Exception(f"Failed to list resources: {response.text}")
result = response.json()
resources = []
for item in result.get("data", []):
resource = Resource(
uri=f"rag://dataset/{item.get('id')}",
title=item.get("name", ""),
description=item.get("description", ""),
)
resources.append(resource)
return resources
def _parse_uri(self, uri: str) -> tuple[str, str]:
"""
Parse URI to extract dataset ID and document ID.
"""
parsed = urlparse(uri)
if parsed.scheme != "rag":
raise ValueError(f"Invalid URI: {uri}")
return parsed.path.split("/")[1], parsed.fragment