#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Module pour gérer l'extraction de tickets depuis Odoo. Cette version est simplifiée et indépendante de odoo_toolkit. """ import os import json import base64 from typing import Dict, List, Any, Optional import requests from agents import AgentFiltreImages, AgentAnalyseImage, AgentQuestionReponse class TicketManager: """ Gestionnaire de tickets pour extraire des données depuis Odoo. """ def __init__(self, url: str, db: str, username: str, api_key: str): """ Initialise le gestionnaire de tickets avec les paramètres de connexion. Args: url: URL du serveur Odoo db: Nom de la base de données username: Nom d'utilisateur api_key: Clé API ou mot de passe """ self.url = url self.db = db self.username = username self.api_key = api_key self.uid = None self.session_id = None self.model_name = "project.task" # Modèle par défaut pour les tickets def login(self) -> bool: """ Établit la connexion au serveur Odoo. Returns: True si la connexion réussit, False sinon """ try: # Point d'entrée pour le login login_url = f"{self.url}/web/session/authenticate" # Données pour la requête de login login_data = { "jsonrpc": "2.0", "params": { "db": self.db, "login": self.username, "password": self.api_key } } # Effectuer la requête response = requests.post(login_url, json=login_data) response.raise_for_status() # Extraire les résultats result = response.json() if result.get("error"): print(f"Erreur de connexion: {result['error']['message']}") return False # Récupérer l'ID utilisateur et la session self.uid = result.get("result", {}).get("uid") self.session_id = response.cookies.get("session_id") if not self.uid: print("Erreur: Impossible de récupérer l'ID utilisateur") return False print(f"Connecté avec succès à {self.url} (User ID: {self.uid})") return True except Exception as e: print(f"Erreur de connexion: {str(e)}") return False def _ensure_connection(self) -> bool: """ Vérifie que la connexion est établie, tente de se reconnecter si nécessaire. Returns: True si la connexion est disponible, False sinon """ if not self.uid or not self.session_id: return self.login() return True def _rpc_call(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]: """ Effectue un appel RPC vers le serveur Odoo. Args: endpoint: Point d'entrée de l'API (/web/dataset/call_kw, etc.) params: Paramètres de la requête Returns: Résultat de la requête """ if not self._ensure_connection(): return {"error": "Non connecté"} try: # Préparer la requête full_url = f"{self.url}{endpoint}" headers = {"Content-Type": "application/json"} # Données de la requête data = { "jsonrpc": "2.0", "method": "call", "params": params } # Effectuer la requête response = requests.post( full_url, json=data, headers=headers, cookies={"session_id": self.session_id} if self.session_id else None ) response.raise_for_status() # Traiter la réponse result = response.json() if result.get("error"): return {"error": result["error"]["message"]} return result.get("result", {}) except Exception as e: return {"error": str(e)} def search_read(self, model: str, domain: List, fields: List[str], limit: int = 0) -> List[Dict[str, Any]]: """ Recherche et lit des enregistrements selon un domaine. Args: model: Nom du modèle domain: Domaine de recherche fields: Champs à récupérer limit: Nombre max de résultats (0 pour illimité) Returns: Liste des enregistrements trouvés """ params = { "model": model, "method": "search_read", "args": [domain, fields], "kwargs": {"limit": limit} } result = self._rpc_call("/web/dataset/call_kw", params) if isinstance(result, dict) and "error" in result: print(f"Erreur lors de la recherche: {result['error']}") return [] return result if isinstance(result, list) else [] def read(self, model: str, ids: List[int], fields: List[str]) -> List[Dict[str, Any]]: """ Lit des enregistrements par leurs IDs. Args: model: Nom du modèle ids: Liste des IDs à lire fields: Champs à récupérer Returns: Liste des enregistrements lus """ params = { "model": model, "method": "read", "args": [ids, fields], "kwargs": {} } result = self._rpc_call("/web/dataset/call_kw", params) if isinstance(result, dict) and "error" in result: print(f"Erreur lors de la lecture: {result['error']}") return [] return result if isinstance(result, list) else [] def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]: """ Récupère un ticket par son code. Args: ticket_code: Code du ticket à récupérer Returns: Données du ticket ou dictionnaire vide si non trouvé """ # Rechercher l'ID du ticket par son code tickets = self.search_read( model=self.model_name, domain=[("code", "=", ticket_code)], fields=["id"], limit=1 ) if not tickets: print(f"Aucun ticket trouvé avec le code {ticket_code}") return {} # Récupérer toutes les données du ticket ticket_id = tickets[0]["id"] return self.get_ticket_by_id(ticket_id) def get_ticket_by_id(self, ticket_id: int) -> Dict[str, Any]: """ Récupère un ticket par son ID. Args: ticket_id: ID du ticket à récupérer Returns: Données du ticket ou dictionnaire vide si non trouvé """ # Récupérer les champs disponibles pour le modèle fields_info = self._get_model_fields(self.model_name) # Lire les données du ticket tickets = self.read( model=self.model_name, ids=[ticket_id], fields=fields_info ) if not tickets: print(f"Aucun ticket trouvé avec l'ID {ticket_id}") return {} return tickets[0] def _get_model_fields(self, model_name: str) -> List[str]: """ Récupère la liste des champs disponibles pour un modèle. Args: model_name: Nom du modèle Returns: Liste des noms de champs """ params = { "model": model_name, "method": "fields_get", "args": [], "kwargs": {"attributes": ["name", "type"]} } result = self._rpc_call("/web/dataset/call_kw", params) if "error" in result: print(f"Erreur lors de la récupération des champs: {result['error']}") return [] # Filtrer les types de champs problématiques invalid_types = ["many2many", "binary"] valid_fields = [ field for field, info in result.items() if info.get("type") not in invalid_types ] return valid_fields def get_ticket_messages(self, ticket_id: int) -> List[Dict[str, Any]]: """ Récupère les messages d'un ticket. Args: ticket_id: ID du ticket Returns: Liste des messages du ticket """ # D'abord récupérer les IDs des messages ticket = self.read( model=self.model_name, ids=[ticket_id], fields=["message_ids"] ) if not ticket or "message_ids" not in ticket[0]: print(f"Impossible de récupérer les messages pour le ticket {ticket_id}") return [] message_ids = ticket[0]["message_ids"] # Récupérer les détails des messages messages = self.read( model="mail.message", ids=message_ids, fields=["id", "body", "date", "author_id", "email_from", "subject", "parent_id"] ) return messages def get_ticket_attachments(self, ticket_id: int, download_path: Optional[str] = None) -> List[Dict[str, Any]]: """ Récupère les pièces jointes d'un ticket, avec option de téléchargement. Args: ticket_id: ID du ticket download_path: Chemin où télécharger les pièces jointes (optionnel) Returns: Liste des informations sur les pièces jointes """ # Rechercher les pièces jointes liées au ticket attachments = self.search_read( model="ir.attachment", domain=[("res_model", "=", self.model_name), ("res_id", "=", ticket_id)], fields=["id", "name", "mimetype", "create_date", "datas"] ) if not attachments: print(f"Aucune pièce jointe trouvée pour le ticket {ticket_id}") return [] if download_path: # Créer le répertoire si nécessaire os.makedirs(download_path, exist_ok=True) # Télécharger chaque pièce jointe for attachment in attachments: if "datas" in attachment and attachment["datas"]: # Déchiffrer les données base64 binary_data = base64.b64decode(attachment["datas"]) # Nettoyer le nom de fichier safe_name = attachment["name"].replace("/", "_").replace("\\", "_") file_path = os.path.join(download_path, f"{attachment['id']}_{safe_name}") # Sauvegarder le fichier with open(file_path, "wb") as f: f.write(binary_data) # Remplacer les données binaires par le chemin du fichier attachment["file_path"] = file_path del attachment["datas"] return attachments def extract_ticket_data(self, ticket_id: int, output_dir: str) -> Dict[str, Any]: """ Extrait toutes les données d'un ticket, y compris messages et pièces jointes. Args: ticket_id: ID du ticket output_dir: Répertoire de sortie Returns: Dictionnaire avec toutes les données du ticket """ # Créer le répertoire de sortie os.makedirs(output_dir, exist_ok=True) # Récupérer les données du ticket ticket = self.get_ticket_by_id(ticket_id) if not ticket: return {"error": f"Ticket {ticket_id} non trouvé"} # Sauvegarder les données du ticket ticket_path = os.path.join(output_dir, "ticket_info.json") with open(ticket_path, "w", encoding="utf-8") as f: json.dump(ticket, f, indent=2, ensure_ascii=False) # Récupérer et sauvegarder les messages messages = self.get_ticket_messages(ticket_id) messages_path = os.path.join(output_dir, "messages.json") with open(messages_path, "w", encoding="utf-8") as f: json.dump(messages, f, indent=2, ensure_ascii=False) # Récupérer et sauvegarder les pièces jointes attachments_dir = os.path.join(output_dir, "attachments") attachments = self.get_ticket_attachments(ticket_id, attachments_dir) attachments_path = os.path.join(output_dir, "attachments_info.json") with open(attachments_path, "w", encoding="utf-8") as f: json.dump(attachments, f, indent=2, ensure_ascii=False) # Compiler toutes les informations result = { "ticket": ticket, "messages": messages, "attachments": [ {k: v for k, v in a.items() if k != "datas"} for a in attachments ], "files": { "ticket_info": ticket_path, "messages": messages_path, "attachments_info": attachments_path, "attachments_dir": attachments_dir } } return result