# ragflow/ragflow_local.py import requests from typing import Dict, Any, List from .base_ragflow import BaseRagflow class RagflowLocal(BaseRagflow): """ Implémentation de BaseRagflow pour un serveur Ragflow local. """ def indexer(self, contenu: str, metadata: Dict[str, Any]) -> Dict[str, Any]: url = f"{self.base_url}/api/documents" payload = { "collection": self.collection, "document": { "content": contenu, "metadata": metadata } } response = requests.post(url, json=payload) return response.json() def rechercher(self, question: str, top_k: int = 5) -> List[Dict[str, Any]]: url = f"{self.base_url}/api/query" payload = { "collection": self.collection, "query": question, "top_k": top_k } response = requests.post(url, json=payload) return response.json().get("results", []) def supprimer_collection(self) -> bool: url = f"{self.base_url}/api/collections/{self.collection}" response = requests.delete(url) return response.status_code == 200