import os import json import base64 import requests from typing import Dict, List, Any, Optional from datetime import datetime import re from html import unescape class TicketManager: def __init__(self, url: str, db: str, username: str, api_key: str): self.url = url self.db = db self.username = username self.api_key = api_key self.uid = None self.session_id = None self.model_name = "project.task" def login(self) -> bool: login_url = f"{self.url}/web/session/authenticate" login_data = { "jsonrpc": "2.0", "params": { "db": self.db, "login": self.username, "password": self.api_key } } response = requests.post(login_url, json=login_data) result = response.json() if result.get("error"): print(f"Erreur de connexion: {result['error']['message']}") return False self.uid = result.get("result", {}).get("uid") self.session_id = response.cookies.get("session_id") return True if self.uid else False def _rpc_call(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]: full_url = f"{self.url}{endpoint}" headers = {"Content-Type": "application/json"} data = {"jsonrpc": "2.0", "method": "call", "params": params} response = requests.post(full_url, json=data, headers=headers, cookies={"session_id": self.session_id}) return response.json().get("result", {}) def get_ticket(self, ticket_id: int) -> Dict[str, Any]: params = { "model": self.model_name, "method": "read", "args": [[ticket_id]], "kwargs": {"fields": ["id", "name", "description", "stage_id", "user_id", "partner_id", "create_date", "write_date", "date_deadline", "priority", "tag_ids", "code", "project_id"]} } result = self._rpc_call("/web/dataset/call_kw", params) # Afficher le résultat brut pour le débogage print(f"Réponse brute de l'appel RPC pour le ticket ID {ticket_id} : {result}") if isinstance(result, list) and len(result) > 0: return result[0] else: print(f"Aucun ticket trouvé avec l'ID {ticket_id} ou une erreur est survenue.") return {} def get_ticket_messages(self, ticket_id: int) -> List[Dict[str, Any]]: messages = self._rpc_call("/web/dataset/call_kw", { "model": "mail.message", "method": "search_read", "args": [[["res_id", "=", ticket_id], ["model", "=", "project.task"]]], "kwargs": {"fields": ["id", "body", "author_id", "date"]} }) return messages def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]: params = { "model": self.model_name, "method": "search_read", "args": [[["code", "=", ticket_code]]], "kwargs": {"fields": ["id", "name", "description", "stage_id", "user_id", "partner_id", "create_date", "write_date"], "limit": 1} } print(f"Recherche du ticket avec code: {ticket_code}") result = self._rpc_call("/web/dataset/call_kw", params) if result and len(result) > 0: print(f"Ticket trouvé avec le code {ticket_code}: ID={result[0]['id']}") return result[0] else: print(f"Aucun ticket trouvé avec le code {ticket_code}") return {} def save_json(self, data: Any, path: str): with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) def extract_ticket_data(self, ticket_id: int, output_dir: str): os.makedirs(output_dir, exist_ok=True) ticket_data = self.get_ticket(ticket_id) # Vérifier si le ticket a bien été trouvé if not ticket_data or "id" not in ticket_data: print(f"Impossible d'extraire les données pour le ticket ID {ticket_id}") return {"error": "Ticket non trouvé", "ticket_info": None, "messages_file": None, "ticket_data_file": None, "attachments": []} ticket_info = { "id": ticket_data["id"], "name": ticket_data["name"], "description": ticket_data.get("description", ""), "stage_id": ticket_data.get("stage_id", ["", ""])[1] if ticket_data.get("stage_id") else "Non défini" } self.save_json(ticket_info, os.path.join(output_dir, "ticket_info.json")) messages = self.get_ticket_messages(ticket_id) # Sauvegarde brute des messages self.save_json(messages, os.path.join(output_dir, "messages_raw.json")) # Nettoyage des messages cleaned_messages = self._clean_messages(messages) self.save_json(cleaned_messages, os.path.join(output_dir, "all_messages.json")) # Génération de structure.json structure = { "date_extraction": datetime.now().isoformat(), "ticket_dir": output_dir, "fichiers_json": [ "ticket_info.json", "messages_raw.json", "all_messages.json" ] } self.save_json(structure, os.path.join(output_dir, "structure.json")) return { "ticket_info": os.path.join(output_dir, "ticket_info.json"), "messages_file": os.path.join(output_dir, "all_messages.json"), "ticket_data_file": os.path.join(output_dir, "structure.json"), "attachments": [] # Si vous implémentez la gestion des pièces jointes } def _clean_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: signatures = [ r'Droit à la déconnexion.*', r'Ce message électronique et tous les fichiers attachés.*', r'Direction des Infrastructures.*', r'Service d\'Appui aux Politiques d\'Aménagement.*', r'tél :.*', r'mobile :.*', r'email :.*', r'Cordialement,.*', r'Bonne réception.*', r'---.*' ] cleaned_messages = [] for msg in messages: body = msg.get("body", "") body = re.sub(r'<.*?>', '', body) body = unescape(body) body = re.sub(r'\s+', ' ', body).strip() # Supprime les signatures courantes for signature in signatures: body = re.sub(signature, '', body, flags=re.IGNORECASE | re.DOTALL) msg["body"] = body cleaned_messages.append(msg) return cleaned_messages