import os import json import base64 from typing import Dict, List, Any, Optional import requests import re from html import unescape from datetime import datetime class TicketManager: """ Gestionnaire de tickets pour extraire des données depuis Odoo. """ def __init__(self, url: str, db: str, username: str, api_key: str): self.url = url self.db = db self.username = username self.api_key = api_key self.uid = None self.session_id = None self.model_name = "project.task" def login(self) -> bool: try: login_url = f"{self.url}/web/session/authenticate" login_data = { "jsonrpc": "2.0", "params": { "db": self.db, "login": self.username, "password": self.api_key } } response = requests.post(login_url, json=login_data) response.raise_for_status() result = response.json() if result.get("error"): print(f"Erreur de connexion: {result['error']['message']}") return False self.uid = result.get("result", {}).get("uid") self.session_id = response.cookies.get("session_id") if not self.uid: print("Erreur: Impossible de récupérer l'ID utilisateur") return False print(f"Connecté avec succès à {self.url} (User ID: {self.uid})") return True except Exception as e: print(f"Erreur de connexion: {str(e)}") return False def _rpc_call(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]: if not self.uid and not self.login(): return {"error": "Non connecté"} try: full_url = f"{self.url}{endpoint}" headers = {"Content-Type": "application/json"} data = {"jsonrpc": "2.0", "method": "call", "params": params} response = requests.post( full_url, json=data, headers=headers, cookies={"session_id": self.session_id} if self.session_id else None ) response.raise_for_status() result = response.json() if result.get("error"): return {"error": result["error"]["message"]} return result.get("result", {}) except Exception as e: return {"error": str(e)} def search_read(self, model: str, domain: List, fields: List[str], order: Optional[str] = None, limit: Optional[int] = None) -> List[Dict[str, Any]]: params = { "model": model, "method": "search_read", "args": [domain, fields], "kwargs": {} } if order is not None: params["kwargs"]["order"] = order if limit is not None: params["kwargs"]["limit"] = limit result = self._rpc_call("/web/dataset/call_kw", params) return result if isinstance(result, list) else [] def read(self, model: str, ids: List[int], fields: List[str]) -> List[Dict[str, Any]]: params = {"model": model, "method": "read", "args": [ids, fields], "kwargs": {}} result = self._rpc_call("/web/dataset/call_kw", params) return result if isinstance(result, list) else [] def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]: tickets = self.search_read(self.model_name, [("code", "=", ticket_code)], ["id"]) if not tickets: return {} return self.get_ticket_by_id(tickets[0]["id"]) def get_ticket_by_id(self, ticket_id: int) -> Dict[str, Any]: ticket_fields = [ "id", "name", "description", "stage_id", "user_id", "partner_id", "create_date", "write_date", "date_deadline", "priority", "tag_ids", "code", "project_id", "kanban_state", "color", "active", "company_id", "display_name" ] tickets = self.read(self.model_name, [ticket_id], ticket_fields) return tickets[0] if tickets else {} def get_ticket_messages(self, ticket_id: int) -> List[Dict[str, Any]]: messages = self.search_read( "mail.message", [ ("res_id", "=", ticket_id), ("model", "=", self.model_name), ("message_type", "in", ["comment", "notification", "email"]) ], ["id", "body", "date", "author_id", "email_from", "message_type", "parent_id", "subtype_id", "tracking_value_ids"], order="date asc" ) return self._clean_messages(messages) def _clean_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: cleaned_messages = [] for message in messages: if message.get("body"): body = message["body"] body = unescape(body) # Stocker la version HTML message["body_html"] = body # Créer une version texte nettoyée body_text = re.sub(r'.*?', '', body, flags=re.DOTALL) body_text = re.sub(r'.*?', '', body_text, flags=re.DOTALL) body_text = re.sub(r'<[^>]+>', ' ', body_text) body_text = re.sub(r'\s+', ' ', body_text).strip() message["body_text"] = body_text # Organiser les messages en fils de discussion if message.get("parent_id"): parent_id = message["parent_id"][0] if isinstance(message["parent_id"], (list, tuple)) else message["parent_id"] message["parent_id"] = parent_id cleaned_messages.append(message) return cleaned_messages def get_ticket_attachments(self, ticket_id: int) -> List[Dict[str, Any]]: """ Récupère les pièces jointes associées à un ticket. Args: ticket_id: ID du ticket Returns: Liste des pièces jointes avec leurs métadonnées. """ attachments = self.search_read( "ir.attachment", [ ("res_id", "=", ticket_id), ("res_model", "=", self.model_name) ], ["id", "name", "mimetype", "file_size", "create_date", "create_uid", "datas", "description"] ) return attachments def download_attachment(self, attachment: Dict[str, Any], output_dir: str) -> str: """ Télécharge et sauvegarde une pièce jointe dans le répertoire spécifié. Args: attachment: Dictionnaire contenant les métadonnées de la pièce jointe output_dir: Répertoire où sauvegarder la pièce jointe Returns: Chemin du fichier sauvegardé """ if not attachment.get("datas"): return "" # Créer le dossier attachments s'il n'existe pas attachments_dir = os.path.join(output_dir, "attachments") os.makedirs(attachments_dir, exist_ok=True) # Construire un nom de fichier sécurisé filename = re.sub(r'[^\w\.-]', '_', attachment["name"]) file_path = os.path.join(attachments_dir, filename) # Décoder et sauvegarder le contenu try: file_content = base64.b64decode(attachment["datas"]) with open(file_path, "wb") as f: f.write(file_content) return file_path except Exception as e: print(f"Erreur lors du téléchargement de la pièce jointe {attachment['name']}: {str(e)}") return "" def organize_messages_by_thread(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Organise les messages en fils de discussion. Args: messages: Liste des messages à organiser Returns: Liste des messages racines avec leurs réponses imbriquées. """ # Créer un dictionnaire pour stocker tous les messages par ID messages_by_id = {msg["id"]: {**msg, "replies": []} for msg in messages} # Identifier les messages racines et ajouter les réponses aux parents root_messages = [] for msg_id, msg in messages_by_id.items(): if not msg.get("parent_id") or msg["parent_id"] == 0: root_messages.append(msg) else: parent_id = msg["parent_id"] if parent_id in messages_by_id: messages_by_id[parent_id]["replies"].append(msg) # Trier les messages racines par date root_messages.sort(key=lambda m: m.get("date", "")) return root_messages def extract_ticket_data(self, ticket_id: int, output_dir: str) -> Dict[str, Any]: """ Extrait toutes les données d'un ticket, y compris messages et pièces jointes. Args: ticket_id: ID du ticket output_dir: Répertoire de sortie Returns: Dictionnaire contenant les chemins des fichiers créés. """ os.makedirs(output_dir, exist_ok=True) ticket = self.get_ticket_by_id(ticket_id) if not ticket: return {"error": f"Ticket {ticket_id} non trouvé"} # Récupération des messages associés au ticket messages = self.get_ticket_messages(ticket_id) # Organisation des messages en fils de discussion thread_messages = self.organize_messages_by_thread(messages) # Récupération des pièces jointes attachments = self.get_ticket_attachments(ticket_id) attachment_files = [] # Téléchargement des pièces jointes for attachment in attachments: file_path = self.download_attachment(attachment, output_dir) if file_path: # Supprimer les données binaires avant de sauvegarder dans le JSON attachment_info = {k: v for k, v in attachment.items() if k != "datas"} attachment_info["local_path"] = file_path attachment_files.append(attachment_info) # Constitution des données complètes du ticket ticket_data = { **ticket, "messages": messages, "threads": thread_messages, "attachments": attachment_files } # Sauvegarde des données du ticket dans un fichier JSON ticket_path = os.path.join(output_dir, "ticket_data.json") with open(ticket_path, "w", encoding="utf-8") as f: json.dump(ticket_data, f, indent=2, ensure_ascii=False) # Sauvegarder séparément les messages pour compatibilité messages_path = os.path.join(output_dir, "messages.json") with open(messages_path, "w", encoding="utf-8") as f: json.dump({"ticket": ticket, "messages": messages}, f, indent=2, ensure_ascii=False) # Journal d'extraction pour référence log_path = os.path.join(output_dir, "extraction_log.txt") with open(log_path, "w", encoding="utf-8") as f: f.write(f"Extraction du ticket {ticket_id} le {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"Nom du ticket: {ticket.get('name', 'N/A')}\n") f.write(f"Nombre de messages: {len(messages)}\n") f.write(f"Nombre de pièces jointes: {len(attachments)}\n") print(f"Données complètes sauvegardées dans {ticket_path}") print(f"Pièces jointes ({len(attachment_files)}) sauvegardées dans {os.path.join(output_dir, 'attachments')}") # Retourner un dictionnaire contenant les informations du ticket return { "ticket_info": ticket, "messages_file": messages_path, "ticket_data_file": ticket_path, "attachments": attachment_files, "log_file": log_path } if __name__ == "__main__": import sys if len(sys.argv) < 2: print("Usage: python retrieve_ticket.py ") sys.exit(1) ticket_code = sys.argv[1] output_dir = f"output/ticket_{ticket_code}" config = { "url": "https://odoo.example.com", "db": "your_db_name", "username": "your_username", "api_key": "your_api_key" } manager = TicketManager(config["url"], config["db"], config["username"], config["api_key"]) if manager.login(): ticket = manager.get_ticket_by_code(ticket_code) if ticket: result = manager.extract_ticket_data(ticket["id"], output_dir) print(f"Extraction terminée. Données disponibles dans {output_dir}") else: print(f"Ticket avec code {ticket_code} non trouvé.")