From 5085bf8ee9f98e658dbf8a06db9af15d2625088f Mon Sep 17 00:00:00 2001 From: lele3436 Date: Tue, 16 Sep 2025 20:25:59 +0800 Subject: [PATCH] feat: support for moi in RAG module (#571) * feat: add support for moi * small adjust * small adjust * according 2 comments * add more intro * add more intro --- .env.example | 6 ++ README_zh.md | 10 +++ src/config/tools.py | 1 + src/rag/__init__.py | 2 + src/rag/builder.py | 3 + src/rag/moi.py | 154 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 176 insertions(+) create mode 100644 src/rag/moi.py diff --git a/.env.example b/.env.example index 7bb51a6..8fb6ee6 100644 --- a/.env.example +++ b/.env.example @@ -41,6 +41,12 @@ TAVILY_API_KEY=tvly-xxx # RAGFLOW_RETRIEVAL_SIZE=10 # RAGFLOW_CROSS_LANGUAGES=English,Chinese,Spanish,French,German,Japanese,Korean # Optional. To use RAGFlow's cross-language search, please separate each language with a single comma +# MOI is a hybrid database that mainly serves enterprise users (https://www.matrixorigin.io/matrixone-intelligence) +# RAG_PROVIDER=moi +# MOI_API_URL="https://freetier-01.cn-hangzhou.cluster.matrixonecloud.cn" +# MOI_API_KEY="xxx-xxx-xxx-xxx" +# MOI_RETRIEVAL_SIZE=10 +# MOI_LIST_LIMIT=10 # RAG_PROVIDER: milvus (using free milvus instance on zilliz cloud: https://docs.zilliz.com/docs/quick-start ) # RAG_PROVIDER=milvus diff --git a/README_zh.md b/README_zh.md index 00535fa..e45443c 100644 --- a/README_zh.md +++ b/README_zh.md @@ -183,6 +183,16 @@ DeerFlow 支持基于私有域知识的检索,您可以将文档上传到多 RAGFLOW_RETRIEVAL_SIZE=10 ``` +- **[MOI]**:AI 原生多模态数据智能平台 + ``` + # 参照示例进行配置 .env.example + RAG_PROVIDER=moi + MOI_API_URL="https://freetier-01.cn-hangzhou.cluster.matrixonecloud.cn" + MOI_API_KEY="xxx-xxx-xxx-xxx" + MOI_RETRIEVAL_SIZE=10 + MOI_LIST_LIMIT=10 + ``` + - **[VikingDB 知识库](https://www.volcengine.com/docs/84313/1254457)**:火山引擎提供的公有云知识库引擎 > 注意先从 [火山引擎](https://www.volcengine.com/docs/84313/1254485) 获取账号 AK/SK ``` diff --git a/src/config/tools.py b/src/config/tools.py index ff9ab58..be5c9f5 100644 --- a/src/config/tools.py +++ b/src/config/tools.py @@ -24,6 +24,7 @@ SELECTED_SEARCH_ENGINE = os.getenv("SEARCH_API", SearchEngine.TAVILY.value) class RAGProvider(enum.Enum): RAGFLOW = "ragflow" VIKINGDB_KNOWLEDGE_BASE = "vikingdb_knowledge_base" + MOI = "moi" MILVUS = "milvus" diff --git a/src/rag/__init__.py b/src/rag/__init__.py index 33519dc..4451543 100644 --- a/src/rag/__init__.py +++ b/src/rag/__init__.py @@ -3,6 +3,7 @@ from .builder import build_retriever from .ragflow import RAGFlowProvider +from .moi import MOIProvider from .retriever import Chunk, Document, Resource, Retriever from .vikingdb_knowledge_base import VikingDBKnowledgeBaseProvider @@ -11,6 +12,7 @@ __all__ = [ Document, Resource, RAGFlowProvider, + MOIProvider, VikingDBKnowledgeBaseProvider, Chunk, build_retriever, diff --git a/src/rag/builder.py b/src/rag/builder.py index 1032649..d3e2f15 100644 --- a/src/rag/builder.py +++ b/src/rag/builder.py @@ -3,6 +3,7 @@ from src.config.tools import SELECTED_RAG_PROVIDER, RAGProvider from src.rag.ragflow import RAGFlowProvider +from src.rag.moi import MOIProvider from src.rag.retriever import Retriever from src.rag.vikingdb_knowledge_base import VikingDBKnowledgeBaseProvider from src.rag.milvus import MilvusProvider @@ -11,6 +12,8 @@ from src.rag.milvus import MilvusProvider def build_retriever() -> Retriever | None: if SELECTED_RAG_PROVIDER == RAGProvider.RAGFLOW.value: return RAGFlowProvider() + elif SELECTED_RAG_PROVIDER == RAGProvider.MOI.value: + return MOIProvider() elif SELECTED_RAG_PROVIDER == RAGProvider.VIKINGDB_KNOWLEDGE_BASE.value: return VikingDBKnowledgeBaseProvider() elif SELECTED_RAG_PROVIDER == RAGProvider.MILVUS.value: diff --git a/src/rag/moi.py b/src/rag/moi.py new file mode 100644 index 0000000..0d5c9c1 --- /dev/null +++ b/src/rag/moi.py @@ -0,0 +1,154 @@ +# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates +# SPDX-License-Identifier: MIT + +import os +from urllib.parse import urlparse + +import requests + +from src.rag.retriever import Chunk, Document, Resource, Retriever + + +class MOIProvider(Retriever): + """ + MatrixOne Intelligence (MOI) is a multimodal data AI processing platform. + It supports connecting, processing, managing, and using both structured and unstructured data. + Through steps such as parsing, extraction, segmentation, cleaning, and enhancement, + it transforms raw data like documents, images, and audio/video into AI-ready application data. + With its self-developed data service layer (the MatrixOne database), + it can directly provide retrieval services for the processed data. + + The open-source repository is available at: https://github.com/matrixorigin/matrixone + For more information, please visit the website: https://www.matrixorigin.io/matrixone-intelligence + Documentation: https://docs.matrixorigin.cn/zh/m1intelligence/MatrixOne-Intelligence/Workspace-Mgmt/overview/ + Online Demo: https://www.matrixorigin.io/demo + """ + + def __init__(self): + # Initialize MOI API configuration from environment variables + self.api_url = os.getenv("MOI_API_URL") + if not self.api_url: + raise ValueError("MOI_API_URL is not set") + + # Add /byoa suffix to the API URL for MOI compatibility + if not self.api_url.endswith("/byoa"): + self.api_url = self.api_url + "/byoa" + + self.api_key = os.getenv("MOI_API_KEY") + if not self.api_key: + raise ValueError("MOI_API_KEY is not set") + + # Set page size for document retrieval + self.page_size = 10 + moi_size = os.getenv("MOI_RETRIEVAL_SIZE") + if moi_size: + self.page_size = int(moi_size) + + # Set MOI-specific list limit parameter + self.moi_list_limit = None + moi_list_limit = os.getenv("MOI_LIST_LIMIT") + if moi_list_limit: + self.moi_list_limit = int(moi_list_limit) + + def query_relevant_documents( + self, query: str, resources: list[Resource] = [] + ) -> list[Document]: + """ + Query relevant documents from MOI API using the provided resources. + """ + headers = { + "moi-key": f"{self.api_key}", + "Content-Type": "application/json", + } + + dataset_ids: list[str] = [] + document_ids: list[str] = [] + + for resource in resources: + dataset_id, document_id = self._parse_uri(resource.uri) + dataset_ids.append(dataset_id) + if document_id: + document_ids.append(document_id) + + payload = { + "question": query, + "dataset_ids": dataset_ids, + "document_ids": document_ids, + "page_size": self.page_size, + } + + response = requests.post( + f"{self.api_url}/api/v1/retrieval", headers=headers, json=payload + ) + + if response.status_code != 200: + raise Exception(f"Failed to query documents: {response.text}") + + result = response.json() + data = result.get("data", {}) + doc_aggs = data.get("doc_aggs", []) + docs: dict[str, Document] = { + doc.get("doc_id"): Document( + id=doc.get("doc_id"), + title=doc.get("doc_name"), + chunks=[], + ) + for doc in doc_aggs + } + + for chunk in data.get("chunks", []): + doc = docs.get(chunk.get("document_id")) + if doc: + doc.chunks.append( + Chunk( + content=chunk.get("content"), + similarity=chunk.get("similarity"), + ) + ) + + return list(docs.values()) + + def list_resources(self, query: str | None = None) -> list[Resource]: + """ + List resources from MOI API with optional query filtering and limit support. + """ + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + + params = {} + if query: + params["name"] = query + + if self.moi_list_limit: + params["limit"] = self.moi_list_limit + + response = requests.get( + f"{self.api_url}/api/v1/datasets", headers=headers, params=params + ) + + if response.status_code != 200: + raise Exception(f"Failed to list resources: {response.text}") + + result = response.json() + resources = [] + + for item in result.get("data", []): + resource = Resource( + uri=f"rag://dataset/{item.get('id')}", + title=item.get("name", ""), + description=item.get("description", ""), + ) + resources.append(resource) + + return resources + + def _parse_uri(self, uri: str) -> tuple[str, str]: + """ + Parse URI to extract dataset ID and document ID. + """ + parsed = urlparse(uri) + if parsed.scheme != "rag": + raise ValueError(f"Invalid URI: {uri}") + return parsed.path.split("/")[1], parsed.fragment