import os import json from datetime import datetime from typing import Dict, List, Any, Optional from .auth_manager import AuthManager from .message_manager import MessageManager from .attachment_manager import AttachmentManager from .utils import save_json class TicketManager: def __init__(self, auth_manager: AuthManager): self.auth_manager = auth_manager self.message_manager = MessageManager(auth_manager) self.attachment_manager = AttachmentManager(auth_manager) self.model_name = "project.task" def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]: """ Récupère un ticket par son code. Args: ticket_code: Code du ticket à rechercher Returns: Dictionnaire contenant les informations du ticket """ params = { "model": self.model_name, "method": "search_read", "args": [[["code", "=", ticket_code]], ["id", "name", "description", "stage_id", "project_id", "partner_id", "user_id", "date_start", "date_end", "date_deadline", "create_date", "write_date", "tag_ids", "priority", "email_from", "email_cc", "message_ids", "message_follower_ids", "attachment_ids", "timesheet_ids"]], "kwargs": {"limit": 1} } result = self.auth_manager._rpc_call("/web/dataset/call_kw", params) if isinstance(result, list) and len(result) > 0: # Résoudre les champs relationnels return self.resolve_relation_fields(result[0]) else: print(f"Aucun ticket trouvé avec le code {ticket_code}") return {} def resolve_relation_fields(self, ticket: Dict[str, Any]) -> Dict[str, Any]: """ Résout les champs relationnels d'un ticket pour obtenir les noms au lieu des IDs. Args: ticket: Dictionnaire contenant les données du ticket Returns: Ticket avec champs relationnels résolus """ relation_fields = { "stage_id": "res.stage", "project_id": "project.project", "partner_id": "res.partner", "user_id": "res.users", "tag_ids": "project.tags" } # Traiter les champs many2one for field, model in relation_fields.items(): if field in ticket and ticket[field] and field != "tag_ids": if isinstance(ticket[field], list) and len(ticket[field]) >= 2: # Le format est déjà [id, name] ticket[f"{field}_name"] = ticket[field][1] elif isinstance(ticket[field], int): # Récupérer le nom depuis l'API params = { "model": model, "method": "name_get", "args": [[ticket[field]]], "kwargs": {} } result = self.auth_manager._rpc_call("/web/dataset/call_kw", params) if result and isinstance(result, list) and result[0] and len(result[0]) >= 2: ticket[f"{field}_name"] = result[0][1] # Traiter les tags (many2many) if "tag_ids" in ticket and ticket["tag_ids"] and isinstance(ticket["tag_ids"], list): if all(isinstance(tag_id, int) for tag_id in ticket["tag_ids"]): params = { "model": "project.tags", "method": "name_get", "args": [ticket["tag_ids"]], "kwargs": {} } result = self.auth_manager._rpc_call("/web/dataset/call_kw", params) if result and isinstance(result, list): ticket["tag_names"] = [tag[1] for tag in result] return ticket def extract_ticket_data(self, ticket_code: str, output_dir: str): """ Extrait toutes les données d'un ticket et les sauvegarde dans une structure organisée. Args: ticket_code: Code du ticket à extraire output_dir: Répertoire de sortie Returns: Dictionnaire avec les chemins des fichiers créés ou None en cas d'erreur """ os.makedirs(output_dir, exist_ok=True) # Récupérer les données du ticket ticket_data = self.get_ticket_by_code(ticket_code) if not ticket_data or "id" not in ticket_data: print(f"Erreur: Ticket non trouvé.") return None ticket_id = ticket_data["id"] ticket_name = ticket_data.get("name", "Sans nom") # Sauvegarder ticket_info.json ticket_info_path = os.path.join(output_dir, "ticket_info.json") save_json(ticket_data, ticket_info_path) # Sauvegarder le résumé du ticket ticket_summary = { "id": ticket_id, "code": ticket_code, "name": ticket_name, "description": ticket_data.get("description", ""), "stage": ticket_data.get("stage_id_name", ""), "project": ticket_data.get("project_id_name", ""), "partner": ticket_data.get("partner_id_name", ""), "assigned_to": ticket_data.get("user_id_name", ""), "tags": ticket_data.get("tag_names", []), "create_date": ticket_data.get("create_date", ""), "write_date": ticket_data.get("write_date", ""), "deadline": ticket_data.get("date_deadline", "") } summary_path = os.path.join(output_dir, "ticket_summary.json") save_json(ticket_summary, summary_path) # Traiter et sauvegarder les messages messages_result = self.message_manager.process_messages( ticket_id, ticket_code, ticket_name, output_dir ) # Récupérer et sauvegarder les pièces jointes attachments_info = self.attachment_manager.save_attachments(ticket_id, output_dir) attachments_info_path = os.path.join(output_dir, "attachments_info.json") # Récupérer les followers si disponibles follower_ids = ticket_data.get("message_follower_ids", []) followers_path = None if follower_ids: params = { "model": "mail.followers", "method": "read", "args": [follower_ids, ["id", "partner_id", "name", "email"]], "kwargs": {} } followers = self.auth_manager._rpc_call("/web/dataset/call_kw", params) if followers: followers_path = os.path.join(output_dir, "followers.json") save_json(followers, followers_path) # Génération de structure.json avec toutes les informations structure = { "date_extraction": datetime.now().isoformat(), "ticket_id": ticket_id, "ticket_code": ticket_code, "ticket_name": ticket_name, "output_dir": output_dir, "files": { "ticket_info": "ticket_info.json", "ticket_summary": "ticket_summary.json", "messages": "all_messages.json", "messages_raw": "messages_raw.json", "messages_text": "all_messages.txt", "attachments": "attachments_info.json", "followers": "followers.json" if followers_path else None }, "stats": { "messages_count": messages_result.get("messages_count", 0), "attachments_count": len(attachments_info) } } structure_path = os.path.join(output_dir, "structure.json") save_json(structure, structure_path) return { "ticket_info": ticket_info_path, "ticket_summary": summary_path, "messages_file": messages_result.get("all_messages_path"), "messages_count": messages_result.get("messages_count", 0), "ticket_data_file": structure_path, "attachments": attachments_info, "attachments_count": len(attachments_info) }