import os from datetime import datetime from typing import Dict, Any from .auth_manager import AuthManager from .message_manager import MessageManager from .attachment_manager import AttachmentManager from .utils import save_json class DataExtractor: def __init__(self, auth_manager: AuthManager): self.auth_manager = auth_manager self.message_manager = MessageManager(auth_manager) self.attachment_manager = AttachmentManager(auth_manager) self.model_name = "project.task" def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]: params = { "model": self.model_name, "method": "search_read", "args": [[["code", "=", ticket_code]], ["id", "name", "description", "stage_id", "project_id", "partner_id", "user_id", "date_start", "date_end", "create_date", "write_date", "message_ids", "message_follower_ids", "attachment_ids", "timesheet_ids"]], "kwargs": {"limit": 1} } result = self.auth_manager.rpc_call("/web/dataset/call_kw", params) if isinstance(result, list) and len(result) > 0: return result[0] else: print(f"Aucun ticket trouvé avec le code {ticket_code}") return {} def extract_ticket_data(self, ticket_code: str, output_dir: str): os.makedirs(output_dir, exist_ok=True) ticket_data = self.get_ticket_by_code(ticket_code) if not ticket_data or "id" not in ticket_data: print(f"Erreur: Ticket non trouvé.") return None # Sauvegarder ticket_info.json ticket_info_path = os.path.join(output_dir, "ticket_info.json") save_json(ticket_data, ticket_info_path) # Sauvegarde des messages messages_data = self.message_manager.get_ticket_messages(ticket_data["id"]) all_messages_path = os.path.join(output_dir, "all_messages.json") save_json(messages_data, all_messages_path) # Sauvegarde des pièces jointes attachments_data = self.attachment_manager.get_ticket_attachments(ticket_data["id"], output_dir) attachments_path = os.path.join(output_dir, "attachments_info.json") save_json(attachments_data, attachments_path) # Génération de structure.json structure = { "date_extraction": datetime.now().isoformat(), "ticket_dir": output_dir, "fichiers_json": [ "ticket_info.json", "all_messages.json", "attachments_info.json" ] } structure_path = os.path.join(output_dir, "structure.json") save_json(structure, structure_path) return { "ticket_info": ticket_info_path, "messages_file": all_messages_path, "ticket_data_file": structure_path, "attachments": attachments_path }