from odoo_connection import OdooConnection import os import json import base64 from utils import print_error from config import EXPORT_DIR class TicketManager: """Gestionnaire de tickets simplifié avec seulement les fonctionnalités essentielles""" def __init__(self): """Initialise le gestionnaire de tickets""" self.conn = OdooConnection() self.odoo = self.conn.get_odoo_instance() self.model_name = "project.task" def _ensure_connection(self): """Vérifie et établit la onnexion si nécessaire""" if not self.odoo: self.odoo = self.conn.get_odoo_instance() return self.odoo is not None def _safe_execute(self, model, method, *args): """ Exécute une méthode Odoo en toute sécurité via odoorpc. Vérifie la connexion avant d'exécuter. """ # Vérifier que la connexion est bien établie if not self._ensure_connection(): print_error("Connexion Odoo indisponible.") return None try: # Exécuter la méthode sur le modèle via OdooRPC return self.odoo.execute(model, method, *args) # type: ignore except odoorpc.error.RPCError as e: # type: ignore print_error(f" Erreur RPC lors de '{method}' sur '{model}': {e}") return None except Exception as e: print_error(f" Erreur inattendue lors de '{method}' sur '{model}': {e}") return None def get_model_fields(self, model_name): """Récupére tous les champs disponibles pour un modèle donné, en filtrant ceux qui ne sont pas exploitables""" fields_info = self._safe_execute(model_name, 'fields_get', [], ['name', 'type']) if not fields_info: print_error(f"Impossible de récupérer les champs pour {model_name}") return [] #On filtre les champs qui ne sont pas exploitables invalid_types = ['many2many'] valid_fields = [field for field, info in fields_info.items() if info.get("type") not in invalid_types] return valid_fields #Retourne la liste des champs exploitables def save_raw_ticket_data(self, ticket_data, filename="raw_ticket_data.json"): """Sauvegarde les données brutes du ticket dans un fichier JSON""" file_path = os.path.join(EXPORT_DIR, filename) with open(file_path, "w", encoding="utf-8") as f: json.dump(ticket_data, f, indent=4, ensure_ascii=False) print(f"Données brutes du ticket sauvegardées dans : {file_path}") def get_all_models(self): """Récupérer et sauvegarder tous les modèles disponibles""" models = self._safe_execute('ir.model', 'search_read', [], ['model', 'name'] ) if not models: print_error("Impossible de récupérer la liste des modèles.") return None #Convertir en dictionnaire {nom_du_modèle: description} models_dict = {model['model']: model['name'] for model in models} #Sauvegarder dans un fichier JSON self.save_raw_ticket_data(models_dict, "all_models.json") print(f"Liste des modèles sauvegardée dans 'available_models.json") return models_dict def get_model_fields_with_types(self, model_name): """Récupère et sauvegarde les champs d'un modèle avec tous leurs attributs""" # Récupérer tous les attributs disponibles pour chaque champ fields_info = self._safe_execute(model_name, 'fields_get', []) if not fields_info: print_error(f"Impossible de récupérer les champs pour {model_name}") return {} # Sauvegarde en JSON (tous les attributs sont conservés) self.save_raw_ticket_data(fields_info, f"fields_{model_name}_complete.json") print(f"Liste complète des champs du modèle '{model_name}' sauvegardée dans 'fields_{model_name}_complete.json'") # Pour la compatibilité, construire aussi le dictionnaire simplifié fields_dict = { field: { "type": info.get("type", "unknown"), "relation": info.get("relation", None), "string": info.get("string", field), "help": info.get("help", ""), "required": info.get("required", False), "readonly": info.get("readonly", False), "selection": info.get("selection", []) if info.get("type") == "selection" else None } for field, info in fields_info.items() } # Sauvegarde en JSON self.save_raw_ticket_data(fields_dict, f"fields_{model_name}.json") print(f"Liste des champs du modèle '{model_name}' sauvegardée dans 'fields_{model_name}.json'") return fields_dict def get_ticket_by_id(self, ticket_id): """Récupère les détails d'un ticket par son ID et exclut dynamiquement les champs invalides""" fields_to_read = self.get_model_fields(self.model_name) # Récupère tous les champs ticket_data = self._safe_execute(self.model_name, 'read', [ticket_id], fields_to_read) if ticket_data: ticket = self.resolve_relational_fields(ticket_data[0]) self.save_raw_ticket_data(ticket, f"ticket_{ticket_id}_raw.json") return ticket print_error(f"Aucun ticket trouvé avec l'ID {ticket_id}") return None def resolve_relational_fields(self, ticket): """Ajoute les valeurs des champs relationnels""" fields_info = self._safe_execute(self.model_name, 'fields_get', [], ['type']) for field, info in fields_info.items(): # type: ignore if info.get("type") == "many2one" and isinstance(ticket.get(field), list): ticket[f"{field}_value"] = ticket[field][1] if len(ticket[field]) > 1 else None #Ajoute la valeur lisible en plus du ID return ticket def get_ticket_by_code(self, ticket_code): """Récupérer un ticket via son code""" domain = [('code', '=', ticket_code)] ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 1) if not ticket_ids: return None # Sauvegarde des données brutes pour analyse self.save_raw_ticket_data(ticket_ids, f"ticket_{ticket_code}_raw.json") return self.get_ticket_by_id(ticket_ids[0]) def get_available_projects(self): """Retourne la liste des projets disponibles""" projects = self._safe_execute('project.project', 'search_read', [], ['id', 'name']) if not projects: print_error("Aucun projet trouvé.") return {} return {proj['id']: proj['name'] for proj in projects} def export_tickets_by_project_and_stage(self, project_id=None, stage_id=None): """ Exporte les tickets selon des critères de projet et d'étape Args: project_id: ID du projet à filtrer (optionnel) stage_id: ID de l'étape à filtrer (optionnel) """ # Vérifier la connexion Odoo if not self._ensure_connection(): print_error("Connexion Odoo indisponible.") return # Construire le domaine de recherche en fonction des paramètres domain = [] if project_id: domain.append(('project_id', '=', project_id)) if stage_id: domain.append(('stage_id', '=', stage_id)) # Récupérer les IDs des tickets correspondant aux critères ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 1000) if not ticket_ids: filters = [] if project_id: project_name = self._safe_execute('project.project', 'read', [project_id], ['name']) project_name = project_name[0]['name'] if project_name else str(project_id) filters.append(f"projet '{project_name}'") if stage_id: stage_name = self._safe_execute('project.task.type', 'read', [stage_id], ['name']) stage_name = stage_name[0]['name'] if stage_name else str(stage_id) filters.append(f"étape '{stage_name}'") filter_text = " et ".join(filters) if filters else "critères spécifiés" print_error(f"Aucun ticket trouvé pour les {filter_text}.") return # Récupérer les détails des tickets sans appeler get_ticket_by_id # pour éviter la création de fichiers individuels fields_to_read = self.get_model_fields(self.model_name) all_tickets_data = self._safe_execute(self.model_name, 'read', ticket_ids, fields_to_read) if not all_tickets_data: print_error("Impossible de récupérer les détails des tickets.") return # Organisation des tickets par projet et par étape tickets_by_project_stage = {} for ticket_data in all_tickets_data: # Résoudre les champs relationnels ticket = self.resolve_relational_fields(ticket_data) # Extraire les informations du projet project_data = ticket.get('project_id', [0, "Sans projet"]) project_id_value = project_data[0] if isinstance(project_data, list) and len(project_data) > 0 else 0 project_name = project_data[1] if isinstance(project_data, list) and len(project_data) > 1 else "Sans projet" # Extraire les informations de l'étape stage_data = ticket.get('stage_id', [0, "Non classé"]) stage_id_value = stage_data[0] if isinstance(stage_data, list) and len(stage_data) > 0 else 0 stage_name = stage_data[1] if isinstance(stage_data, list) and len(stage_data) > 1 else "Non classé" # Clés pour l'organisation des dossiers project_key = f"{project_id_value}_{project_name}" stage_key = f"{stage_id_value}_{stage_name}" # Organiser la structure de données if project_key not in tickets_by_project_stage: tickets_by_project_stage[project_key] = {} if stage_key not in tickets_by_project_stage[project_key]: tickets_by_project_stage[project_key][stage_key] = {} # Utiliser l'ID du ticket comme clé pour éviter les doublons ticket_id = ticket.get('id', 0) tickets_by_project_stage[project_key][stage_key][str(ticket_id)] = ticket # Création des répertoires d'exportation export_base_dir = os.path.join(EXPORT_DIR, "tickets_export") os.makedirs(export_base_dir, exist_ok=True) # Sauvegarder les tickets un par un dans des fichiers individuels # organisés par répertoires de projet et d'étape for project_key, stages in tickets_by_project_stage.items(): project_dir = os.path.join(export_base_dir, project_key) os.makedirs(project_dir, exist_ok=True) for stage_key, tickets in stages.items(): stage_dir = os.path.join(project_dir, stage_key) os.makedirs(stage_dir, exist_ok=True) # Créer un index de tous les tickets dans l'étape tickets_index = {} for ticket_id, ticket in tickets.items(): ticket_name = ticket.get('name', 'Sans nom') tickets_index[ticket_id] = { "name": ticket_name, "code": ticket.get('code', ''), "filename": f"ticket_{ticket_id}.json" } # Sauvegarder l'index des tickets index_path = os.path.join(stage_dir, "index.json") with open(index_path, "w", encoding="utf-8") as f: json.dump(tickets_index, f, indent=4, ensure_ascii=False) # Sauvegarder chaque ticket dans un fichier séparé for ticket_id, ticket in tickets.items(): ticket_path = os.path.join(stage_dir, f"ticket_{ticket_id}.json") with open(ticket_path, "w", encoding="utf-8") as f: json.dump(ticket, f, indent=4, ensure_ascii=False) print(f"Sauvegarde de {len(tickets)} tickets pour le projet '{project_key}', étape '{stage_key}'") print(f"Exportation terminée. Les fichiers sont organisés dans : {export_base_dir}/") def extract_ticket_attachments_and_messages(self, ticket_id): """ Extrait toutes les pièces jointes et messages d'un ticket et les sauvegarde dans un répertoire dédié. Args: ticket_id: ID du ticket à extraire """ # Récupérer les informations du ticket ticket = self.get_ticket_by_id(ticket_id) if not ticket: print_error(f"Impossible de trouver le ticket avec l'ID {ticket_id}") return # Créer un répertoire pour ce ticket ticket_name = ticket.get('name', 'Sans nom').replace('/', '_').replace('\\', '_') ticket_dir = os.path.join(EXPORT_DIR, f"ticket_{ticket_id}_{ticket_name}") os.makedirs(ticket_dir, exist_ok=True) # Sauvegarder les données du ticket self.save_raw_ticket_data(ticket, os.path.join(ticket_dir, "ticket_info.json")) # Récupérer les informations de contact supplémentaires si disponibles contact_info = {} if ticket.get('partner_id'): partner_id = ticket.get('partner_id')[0] if isinstance(ticket.get('partner_id'), list) else ticket.get('partner_id') partner_details = self._safe_execute('res.partner', 'read', [partner_id], ['name', 'email', 'phone', 'mobile', 'street', 'city', 'zip', 'country_id', 'comment']) if partner_details: contact_info = partner_details[0] self.save_raw_ticket_data(contact_info, os.path.join(ticket_dir, "contact_info.json")) print(f"Informations de contact extraites pour le partenaire {partner_id}") # Récupérer les activités liées au ticket activity_ids = ticket.get('activity_ids', []) if activity_ids: activities = self._safe_execute('mail.activity', 'read', activity_ids, ['id', 'res_id', 'activity_type_id', 'summary', 'note', 'date_deadline', 'user_id', 'create_date']) if activities: self.save_raw_ticket_data(activities, os.path.join(ticket_dir, "activities.json")) print(f"{len(activities)} activités extraites pour le ticket {ticket_id}") # Récupérer les messages (discussions) message_ids = ticket.get('message_ids', []) if message_ids: # Récupérer tous les messages avec leurs détails messages = self._safe_execute('mail.message', 'read', message_ids, ['id', 'body', 'date', 'author_id', 'email_from', 'subject', 'parent_id', 'message_type', 'subtype_id', 'attachment_ids']) if messages: # Sauvegarder tous les messages dans un fichier self.save_raw_ticket_data(messages, os.path.join(ticket_dir, "messages.json")) # Organiser les messages par fil de discussion messages_by_thread = {} for message in messages: parent_id = message.get('parent_id', False) parent_id = parent_id[0] if isinstance(parent_id, list) and len(parent_id) > 0 else False if not parent_id: # Message principal thread_id = message['id'] if thread_id not in messages_by_thread: messages_by_thread[thread_id] = { 'main_message': message, 'replies': [] } else: # Réponse à un message if parent_id not in messages_by_thread: messages_by_thread[parent_id] = { 'main_message': None, 'replies': [] } messages_by_thread[parent_id]['replies'].append(message) # Sauvegarder les fils de discussion self.save_raw_ticket_data(messages_by_thread, os.path.join(ticket_dir, "message_threads.json")) # Sauvegarder chaque message dans un fichier séparé pour une meilleure lisibilité messages_dir = os.path.join(ticket_dir, "messages") os.makedirs(messages_dir, exist_ok=True) for message in messages: message_id = message.get('id', 0) message_date = message.get('date', '').replace(':', '-').replace(' ', '_') message_path = os.path.join(messages_dir, f"message_{message_id}_{message_date}.json") # Récupérer les détails de l'auteur si disponible if message.get('author_id') and isinstance(message.get('author_id'), list) and len(message.get('author_id')) > 0: author_id = message.get('author_id')[0] author_details = self._safe_execute('res.partner', 'read', [author_id], ['name', 'email', 'phone', 'function', 'company_id']) if author_details: message['author_details'] = author_details[0] # Récupérer les détails du sous-type si disponible if message.get('subtype_id') and isinstance(message.get('subtype_id'), list) and len(message.get('subtype_id')) > 0: subtype_id = message.get('subtype_id')[0] subtype_details = self._safe_execute('mail.message.subtype', 'read', [subtype_id], ['name', 'description', 'default']) if subtype_details: message['subtype_details'] = subtype_details with open(message_path, "w", encoding="utf-8") as f: json.dump(message, f, indent=4, ensure_ascii=False) print(f"{len(messages)} messages extraits pour le ticket {ticket_id}") # Récupérer les suiveurs du ticket follower_ids = ticket.get('message_follower_ids', []) if follower_ids: followers = self._safe_execute('mail.followers', 'read', follower_ids, ['id', 'partner_id', 'name', 'email', 'subtype_ids']) if followers: # Enrichir les informations des suiveurs for follower in followers: if follower.get('partner_id') and isinstance(follower.get('partner_id'), list) and len(follower.get('partner_id')) > 0: partner_id = follower.get('partner_id')[0] partner_details = self._safe_execute('res.partner', 'read', [partner_id], ['name', 'email', 'phone', 'function', 'company_id']) if partner_details: follower['partner_details'] = partner_details[0] # Récupérer les détails des sous-types if follower.get('subtype_ids'): subtype_details = self._safe_execute('mail.message.subtype', 'read', follower.get('subtype_ids'), ['name', 'description', 'default']) if subtype_details: follower['subtype_details'] = subtype_details self.save_raw_ticket_data(followers, os.path.join(ticket_dir, "followers.json")) print(f"{len(followers)} suiveurs extraits pour le ticket {ticket_id}") # Récupérer les pièces jointes attachment_ids = self._safe_execute('ir.attachment', 'search', [('res_model', '=', self.model_name), ('res_id', '=', ticket_id)]) if attachment_ids: attachments = self._safe_execute('ir.attachment', 'read', attachment_ids, ['id', 'name', 'datas', 'mimetype', 'create_date', 'create_uid', 'description', 'res_name', 'public', 'type']) if attachments: # Sauvegarder les métadonnées des pièces jointes attachments_info = [{ 'id': att.get('id'), 'name': att.get('name'), 'mimetype': att.get('mimetype'), 'create_date': att.get('create_date'), 'description': att.get('description'), 'res_name': att.get('res_name'), 'type': att.get('type'), 'file_path': f"attachments/{att.get('id')}_{att.get('name', '').replace('/', '_')}" } for att in attachments] self.save_raw_ticket_data(attachments_info, os.path.join(ticket_dir, "attachments_info.json")) # Créer un répertoire pour les pièces jointes attachments_dir = os.path.join(ticket_dir, "attachments") os.makedirs(attachments_dir, exist_ok=True) # Sauvegarder chaque pièce jointe for attachment in attachments: attachment_id = attachment.get('id', 0) attachment_name = attachment.get('name', f"attachment_{attachment_id}").replace('/', '_') attachment_data = attachment.get('datas') if attachment_data: try: # Décoder les données base64 binary_data = base64.b64decode(attachment_data) file_path = os.path.join(attachments_dir, f"{attachment_id}_{attachment_name}") with open(file_path, "wb") as f: f.write(binary_data) print(f"Pièce jointe {attachment_name} sauvegardée dans {file_path}") except Exception as e: print_error(f"Erreur lors de la sauvegarde de la pièce jointe {attachment_name}: {e}") print(f"{len(attachments)} pièces jointes extraites pour le ticket {ticket_id}") # Extraire les historiques de timesheet si disponibles timesheet_ids = ticket.get('timesheet_ids', []) if timesheet_ids: timesheets = self._safe_execute('account.analytic.line', 'read', timesheet_ids, ['id', 'date', 'user_id', 'name', 'unit_amount', 'employee_id', 'department_id']) if timesheets: self.save_raw_ticket_data(timesheets, os.path.join(ticket_dir, "timesheets.json")) print(f"{len(timesheets)} feuilles de temps extraites pour le ticket {ticket_id}") print(f"Extraction terminée. Les fichiers sont disponibles dans: {ticket_dir}") return ticket_dir