""" Module pour gérer l'extraction de tickets depuis Odoo. Cette version est simplifiée et indépendante de odoo_toolkit. """ import os import json import base64 from typing import Dict, List, Any, Optional import requests class TicketManager: """ Gestionnaire de tickets pour extraire des données depuis Odoo. """ def __init__(self, url: str, db: str, username: str, api_key: str): """ Initialise le gestionnaire de tickets avec les paramètres de connexion. Args: url: URL du serveur Odoo db: Nom de la base de données username: Nom d'utilisateur api_key: Clé API ou mot de passe """ self.url = url self.db = db self.username = username self.api_key = api_key self.uid = None self.session_id = None self.model_name = "project.task" # Modèle par défaut pour les tickets def login(self) -> bool: """ Établit la connexion au serveur Odoo. Returns: True si la connexion réussit, False sinon """ try: # Point d'entrée pour le login login_url = f"{self.url}/web/session/authenticate" # Données pour la requête de login login_data = { "jsonrpc": "2.0", "params": { "db": self.db, "login": self.username, "password": self.api_key } } # Effectuer la requête response = requests.post(login_url, json=login_data) response.raise_for_status() # Extraire les résultats result = response.json() if result.get("error"): print(f"Erreur de connexion: {result['error']['message']}") return False # Récupérer l'ID utilisateur et la session self.uid = result.get("result", {}).get("uid") self.session_id = response.cookies.get("session_id") if not self.uid: print("Erreur: Impossible de récupérer l'ID utilisateur") return False print(f"Connecté avec succès à {self.url} (User ID: {self.uid})") return True except Exception as e: print(f"Erreur de connexion: {str(e)}") return False def _ensure_connection(self) -> bool: """ Vérifie que la connexion est établie, tente de se reconnecter si nécessaire. Returns: True si la connexion est disponible, False sinon """ if not self.uid or not self.session_id: return self.login() return True def _rpc_call(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]: """ Effectue un appel RPC vers le serveur Odoo. Args: endpoint: Point d'entrée de l'API (/web/dataset/call_kw, etc.) params: Paramètres de la requête Returns: Résultat de la requête """ if not self._ensure_connection(): return {"error": "Non connecté"} try: # Préparer la requête full_url = f"{self.url}{endpoint}" headers = {"Content-Type": "application/json"} # Données de la requête data = { "jsonrpc": "2.0", "method": "call", "params": params } # Effectuer la requête response = requests.post( full_url, json=data, headers=headers, cookies={"session_id": self.session_id} if self.session_id else None ) response.raise_for_status() # Traiter la réponse result = response.json() if result.get("error"): return {"error": result["error"]["message"]} return result.get("result", {}) except Exception as e: return {"error": str(e)} def search_read(self, model: str, domain: List, fields: List[str], limit: int = 0) -> List[Dict[str, Any]]: """ Recherche et lit des enregistrements selon un domaine. Args: model: Nom du modèle domain: Domaine de recherche fields: Champs à récupérer limit: Nombre max de résultats (0 pour illimité) Returns: Liste des enregistrements trouvés """ params = { "model": model, "method": "search_read", "args": [domain, fields], "kwargs": {"limit": limit} } result = self._rpc_call("/web/dataset/call_kw", params) if isinstance(result, dict) and "error" in result: print(f"Erreur lors de la recherche: {result['error']}") return [] return result if isinstance(result, list) else [] def read(self, model: str, ids: List[int], fields: List[str]) -> List[Dict[str, Any]]: """ Lit des enregistrements par leurs IDs. Args: model: Nom du modèle ids: Liste des IDs à lire fields: Champs à récupérer Returns: Liste des enregistrements lus """ params = { "model": model, "method": "read", "args": [ids, fields], "kwargs": {} } result = self._rpc_call("/web/dataset/call_kw", params) if isinstance(result, dict) and "error" in result: print(f"Erreur lors de la lecture: {result['error']}") return [] return result if isinstance(result, list) else [] def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]: """ Récupère un ticket par son code. Args: ticket_code: Code du ticket à récupérer Returns: Données du ticket ou dictionnaire vide si non trouvé """ # Rechercher l'ID du ticket par son code tickets = self.search_read( model=self.model_name, domain=[("code", "=", ticket_code)], fields=["id"], limit=1 ) if not tickets: print(f"Aucun ticket trouvé avec le code {ticket_code}") return {} # Récupérer toutes les données du ticket ticket_id = tickets[0]["id"] return self.get_ticket_by_id(ticket_id) def get_ticket_by_id(self, ticket_id: int) -> Dict[str, Any]: """ Récupère un ticket par son ID. Args: ticket_id: ID du ticket à récupérer Returns: Données du ticket ou dictionnaire vide si non trouvé """ # Récupérer les champs disponibles pour le modèle fields_info = self._get_model_fields(self.model_name) # Lire les données du ticket tickets = self.read( model=self.model_name, ids=[ticket_id], fields=fields_info ) if not tickets: print(f"Aucun ticket trouvé avec l'ID {ticket_id}") return {} return tickets[0] def _get_model_fields(self, model_name: str) -> List[str]: """ Récupère la liste des champs disponibles pour un modèle. Args: model_name: Nom du modèle Returns: Liste des noms de champs """ params = { "model": model_name, "method": "fields_get", "args": [], "kwargs": {"attributes": ["name", "type"]} } result = self._rpc_call("/web/dataset/call_kw", params) if "error" in result: print(f"Erreur lors de la récupération des champs: {result['error']}") return [] # Filtrer les types de champs problématiques invalid_types = ["many2many", "binary"] valid_fields = [ field for field, info in result.items() if info.get("type") not in invalid_types ] return valid_fields def get_ticket_messages(self, ticket_id: int) -> List[Dict[str, Any]]: """ Récupère les messages d'un ticket. Args: ticket_id: ID du ticket Returns: Liste des messages du ticket """ # D'abord récupérer les IDs des messages ticket = self.read( model=self.model_name, ids=[ticket_id], fields=["message_ids"] ) if not ticket or "message_ids" not in ticket[0]: print(f"Impossible de récupérer les messages pour le ticket {ticket_id}") return [] message_ids = ticket[0]["message_ids"] # Récupérer les détails des messages messages = self.read( model="mail.message", ids=message_ids, fields=["id", "body", "date", "author_id", "email_from", "subject", "parent_id"] ) return messages def get_ticket_attachments(self, ticket_id: int, download_path: Optional[str] = None) -> List[Dict[str, Any]]: """ Récupère les pièces jointes d'un ticket, avec option de téléchargement. Args: ticket_id: ID du ticket download_path: Chemin où télécharger les pièces jointes (optionnel) Returns: Liste des informations sur les pièces jointes """ # Rechercher les pièces jointes liées au ticket attachments = self.search_read( model="ir.attachment", domain=[("res_model", "=", self.model_name), ("res_id", "=", ticket_id)], fields=["id", "name", "mimetype", "create_date", "datas"] ) if not attachments: print(f"Aucune pièce jointe trouvée pour le ticket {ticket_id}") return [] if download_path: # Créer le répertoire si nécessaire os.makedirs(download_path, exist_ok=True) # Télécharger chaque pièce jointe for attachment in attachments: if "datas" in attachment and attachment["datas"]: # Déchiffrer les données base64 binary_data = base64.b64decode(attachment["datas"]) # Nettoyer le nom de fichier safe_name = attachment["name"].replace("/", "_").replace("\\", "_") file_path = os.path.join(download_path, f"{attachment['id']}_{safe_name}") # Sauvegarder le fichier with open(file_path, "wb") as f: f.write(binary_data) # Remplacer les données binaires par le chemin du fichier attachment["file_path"] = file_path del attachment["datas"] return attachments def extract_ticket_data(self, ticket_id: int, output_dir: str) -> Dict[str, Any]: """ Extrait toutes les données d'un ticket, y compris messages et pièces jointes. Args: ticket_id: ID du ticket output_dir: Répertoire de sortie Returns: Dictionnaire avec toutes les données du ticket """ # Créer le répertoire de sortie os.makedirs(output_dir, exist_ok=True) # Récupérer les données du ticket ticket = self.get_ticket_by_id(ticket_id) if not ticket: return {"error": f"Ticket {ticket_id} non trouvé"} # Sauvegarder les données du ticket ticket_path = os.path.join(output_dir, "ticket_info.json") with open(ticket_path, "w", encoding="utf-8") as f: json.dump(ticket, f, indent=2, ensure_ascii=False) # Récupérer et sauvegarder les messages messages = self.get_ticket_messages(ticket_id) # Nettoyer le contenu HTML des messages cleaned_messages = self._clean_messages(messages) messages_path = os.path.join(output_dir, "messages.json") with open(messages_path, "w", encoding="utf-8") as f: json.dump(cleaned_messages, f, indent=2, ensure_ascii=False) # Récupérer et sauvegarder les pièces jointes attachments_dir = os.path.join(output_dir, "attachments") attachments = self.get_ticket_attachments(ticket_id, attachments_dir) attachments_path = os.path.join(output_dir, "attachments_info.json") with open(attachments_path, "w", encoding="utf-8") as f: json.dump(attachments, f, indent=2, ensure_ascii=False) # Compiler toutes les informations result = { "ticket": ticket, "messages": cleaned_messages, "attachments": [ {k: v for k, v in a.items() if k != "datas"} for a in attachments ], "files": { "ticket_info": ticket_path, "messages": messages_path, "attachments_info": attachments_path, "attachments_dir": attachments_dir } } return result def _clean_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Nettoie le contenu HTML des messages et filtre les messages indésirables. Args: messages: Liste des messages à nettoyer Returns: Liste des messages nettoyés """ import re from html import unescape # Restauration du contenu original du premier message original_content = None for message in messages: if message.get("body") and isinstance(message.get("body"), str) and not message.get("author_id", [0])[0] == 2: if original_content is None and "body_original" in message: # Récupérer le corps original du message initial (généralement la demande client) body_original = message["body_original"] # Extraire le contenu de la question initiale if body_original: # Suppression des balises d'image avec leurs attributs content = re.sub(r']*?>', '', body_original) # Supprimer les balises de style et script content = re.sub(r']*?>.*?', '', content, flags=re.DOTALL) content = re.sub(r']*?>.*?', '', content, flags=re.DOTALL) # Supprimer les attributs de style des balises content = re.sub(r' style="[^"]*"', '', content) content = re.sub(r' id="[^"]*"', '', content) content = re.sub(r' class="[^"]*"', '', content) content = re.sub(r' width="[^"]*"', '', content) content = re.sub(r' height="[^"]*"', '', content) content = re.sub(r' border="[^"]*"', '', content) # Remplacer les balises

,

,
par des sauts de ligne content = re.sub(r'<(?:p|div)[^>]*?>', '\n', content) content = re.sub(r'', '\n', content) content = re.sub(r']*?>', '\n', content) # Remplacer les listes content = re.sub(r']*?>', '\n- ', content) content = re.sub(r'', '', content) # Remplacer les liens par leur texte et URL def replace_link(match): link_text = re.sub(r'<[^>]*?>', '', match.group(2)) href = re.search(r'href="([^"]*)"', match.group(1)) if href and href.group(1) != link_text: return f"{link_text} ({href.group(1)})" return link_text content = re.sub(r']*?)>(.*?)', replace_link, content) # Supprimer toutes les autres balises HTML content = re.sub(r'<[^>]*?>', '', content) # Convertir les entités HTML en caractères correspondants content = unescape(content) # Supprimer les signatures et autres textes communs des emails signatures = [ r'Droit à la déconnexion.*', r'Ce message électronique et tous les fichiers attachés.*', r'Direction des Infrastructures.*', r'Service d\'Appui aux Politiques d\'Aménagement.*', r'tél :.*', r'mobile :.*', ] for sig_pattern in signatures: content = re.sub(sig_pattern, '', content, flags=re.DOTALL | re.IGNORECASE) # Supprimer les espaces et sauts de ligne multiples content = re.sub(r'\n\s*\n', '\n\n', content) content = re.sub(r' +', ' ', content) # Supprimer les espaces en début et fin de chaîne content = content.strip() original_content = content message["body"] = content break # On arrête après avoir traité le premier message client cleaned_messages = [] for message in messages: # Ignorer les messages d'OdooBot if message.get("author_id") and message["author_id"][0] == 2 and message["author_id"][1] == "OdooBot": continue # Ignorer les messages vides sans contenu if not message.get("body"): # Vérifier si c'est un message qui ne contient aucune information utile if not message.get("subject") and not message.get("email_from"): continue cleaned_message = message.copy() # Nettoyer le contenu du message si existe if "body" in cleaned_message and cleaned_message["body"]: # Vérifier que body est une chaîne de caractères if isinstance(cleaned_message["body"], str): # Conserver le corps original pour référence cleaned_message["body_original"] = message["body"] # Supprimer les balises HTML body = cleaned_message["body"] # Si ce n'est pas le premier message et qu'on n'a pas déjà nettoyé if body != original_content: # Supprimer les balises d'image avec leurs attributs body = re.sub(r']*?>', '', body) # Supprimer les balises de style et script body = re.sub(r']*?>.*?', '', body, flags=re.DOTALL) body = re.sub(r']*?>.*?', '', body, flags=re.DOTALL) # Supprimer les attributs de style des balises body = re.sub(r' style="[^"]*"', '', body) body = re.sub(r' id="[^"]*"', '', body) body = re.sub(r' class="[^"]*"', '', body) body = re.sub(r' width="[^"]*"', '', body) body = re.sub(r' height="[^"]*"', '', body) body = re.sub(r' border="[^"]*"', '', body) # Remplacer les balises

,

,
par des sauts de ligne body = re.sub(r'<(?:p|div)[^>]*?>', '\n', body) body = re.sub(r'', '\n', body) body = re.sub(r']*?>', '\n', body) # Remplacer les listes body = re.sub(r']*?>', '\n- ', body) body = re.sub(r'', '', body) # Remplacer les liens par leur texte et URL def replace_link(match): link_text = re.sub(r'<[^>]*?>', '', match.group(2)) href = re.search(r'href="([^"]*)"', match.group(1)) if href and href.group(1) != link_text: return f"{link_text} ({href.group(1)})" return link_text body = re.sub(r']*?)>(.*?)', replace_link, body) # Supprimer toutes les autres balises HTML body = re.sub(r'<[^>]*?>', '', body) # Convertir les entités HTML en caractères correspondants body = unescape(body) # Supprimer les parties de signature standard et de footer signatures = [ r'---\s*\n.*Support technique.*', r'Afin d\'assurer une meilleure traçabilité.*', r'Confidentialité :.*', r'Ce message électronique et tous les fichiers attachés.*', r'Droit à la déconnexion :.*', r'L\'objectif du Support Technique est de vous aider.*', r'.*www\.cbao\.fr.*', r'.*tél.*\+33.*', r'.*\@.*\.fr.*', r' 60: signature_section = False # Filtrer les lignes qui contiennent probablement une signature if not signature_section: filtered_lines.append(line) body = '\n'.join(filtered_lines) # Supprimer les espaces et sauts de ligne multiples body = re.sub(r'\n\s*\n', '\n\n', body) body = re.sub(r' +', ' ', body) # Supprimer les espaces en début et fin de chaîne body = body.strip() cleaned_message["body"] = body cleaned_messages.append(cleaned_message) return cleaned_messages if __name__ == "__main__": import sys import argparse parser = argparse.ArgumentParser(description="Extraction de tickets Odoo") parser.add_argument("ticket_code", help="Code du ticket à extraire (ex: T0167)") parser.add_argument("--config", default="config.json", help="Chemin vers le fichier de configuration") parser.add_argument("--output-dir", help="Répertoire de sortie (par défaut: output/ticket_CODE)") parser.add_argument("--verbose", "-v", action="store_true", help="Afficher plus d'informations") parser.add_argument("--keep-html", action="store_true", help="Conserver le contenu HTML dans les messages") parser.add_argument("--no-original", action="store_true", help="Ne pas conserver le corps de message original") args = parser.parse_args() # Charger la configuration try: with open(args.config, "r", encoding="utf-8") as f: config = json.load(f) if args.verbose: print(f"Configuration chargée depuis {args.config}") except Exception as e: print(f"Erreur lors du chargement de la configuration: {e}") sys.exit(1) # Extraire les informations de connexion odoo_config = config.get("odoo", {}) url = odoo_config.get("url") db = odoo_config.get("db") username = odoo_config.get("username") api_key = odoo_config.get("api_key") if not all([url, db, username, api_key]): print("Informations de connexion Odoo manquantes dans le fichier de configuration") sys.exit(1) # Définir le répertoire de sortie output_dir = args.output_dir or os.path.join(config.get("output_dir", "output"), f"ticket_{args.ticket_code}") # Créer et connecter le gestionnaire de tickets ticket_manager = TicketManager(url, db, username, api_key) # Personnaliser le nettoyage des messages HTML si demandé if args.keep_html: # Remplacer la méthode de nettoyage par une qui ne fait rien ticket_manager._clean_messages = lambda messages: [ {**msg, "body_original": msg["body"] if isinstance(msg.get("body"), str) else msg.get("body")} for msg in messages ] elif args.no_original: # Modifier la méthode pour ne pas conserver le corps original original_clean_method = ticket_manager._clean_messages ticket_manager._clean_messages = lambda messages: [ {k: v for k, v in msg.items() if k != "body_original"} for msg in original_clean_method(messages) ] if not ticket_manager.login(): print("Échec de la connexion à Odoo") sys.exit(1) # Récupérer le ticket if args.verbose: print(f"Recherche du ticket {args.ticket_code}...") ticket = ticket_manager.get_ticket_by_code(args.ticket_code) if not ticket: print(f"Ticket {args.ticket_code} non trouvé") sys.exit(1) if args.verbose: print(f"Ticket {args.ticket_code} trouvé (ID: {ticket.get('id')})") print(f"Extraction des données vers {output_dir}...") # Extraire et sauvegarder toutes les données result = ticket_manager.extract_ticket_data(ticket["id"], output_dir) if "error" in result: print(f"Erreur: {result['error']}") sys.exit(1) print(f"Extraction terminée avec succès") print(f"- Informations du ticket: {result['files']['ticket_info']}") print(f"- Messages: {result['files']['messages']}") print(f"- Pièces jointes: {result['files']['attachments_info']}") print(f"- Dossier des pièces jointes: {result['files']['attachments_dir']}") # Afficher un résumé print(f"\nRésumé du ticket {args.ticket_code}:") print(f"- Nom: {ticket.get('name', 'N/A')}") print(f"- Messages: {len(result['messages'])}") print(f"- Pièces jointes: {len(result['attachments'])}")