from odoo_connection import OdooConnection import os import json import re from utils import save_json, ensure_export_directory, print_error from config import EXPORT_DIR from data_filter import filter_ticket_data import odoorpc class TicketManager: """Gestionnaire de tickets simplifié avec seulement les fonctionnalités essentielles""" def __init__(self): """Initialise le gestionnaire de tickets""" self.conn = OdooConnection() self.odoo = self.conn.get_odoo_instance() self.model_name = "project.task" def _ensure_connection(self): """Vérifie et établit la onnexion si nécessaire""" if not self.odoo: self.odoo = self.conn.get_odoo_instance() return self.odoo is not None def _safe_execute(self, model, method, *args): """ Exécute une méthode Odoo en toute sécurité via odoorpc. Vérifie la connexion avant d'exécuter. """ # Vérifier que la connexion est bien établie if not self._ensure_connection(): print_error("Connexion Odoo indisponible.") return None try: # Exécuter la méthode sur le modèle via OdooRPC return self.odoo.execute(model, method, *args) # type: ignore except odoorpc.error.RPCError as e: # type: ignore print_error(f" Erreur RPC lors de '{method}' sur '{model}': {e}") return None except Exception as e: print_error(f" Erreur inattendue lors de '{method}' sur '{model}': {e}") return None def extract_invalid_field(self, error_message): """Extrait le nom du champ invalide depuis un message d'erreur RPC""" match = re.search(r"la colonne ([\w.]+) n'existe pas", error_message) if match: return match.group(1) #Retourne le champ détecté return None def get_model_fields(self, model_name): """Récupére tous les champs disponibles pour un modèle donné, en filtrant ceux qui ne sont pas exploitables""" fields_info = self._safe_execute(model_name, 'fields_get', [], ['name', 'type']) if not fields_info: print_error(f"Impossible de récupérer les champs pour {model_name}") return [] #On filtre les champs qui ne sont pas exploitables invalid_types = [] valid_fields = [field for field, info in fields_info.items() if info.get("type") not in invalid_types] return valid_fields #Retourne la liste des champs exploitables def save_raw_ticket_data(self, ticket_data, filename="raw_ticket_data.json"): """Sauvegarde les données brutes du ticket dans un fichier JSON""" file_path = os.path.join(EXPORT_DIR, filename) with open(file_path, "w", encoding="utf-8") as f: json.dump(ticket_data, f, indent=4, ensure_ascii=False) print(f"Données brutes du ticket sauvegardées dans : {file_path}") def get_ticket_by_id(self, ticket_id): """Récupère les détails d'un ticket par son ID et exclut dynamiquement les champs invalides""" fields_to_read = self.get_model_fields(self.model_name) # Récupère tous les champs excluded_fields = set() # Liste des champs à exclure while True: try: # Mise à jour de fields_to_read en supprimant les champs invalides valid_fields = [field for field in fields_to_read if field not in excluded_fields] if not valid_fields: print_error("Aucun champ valide disponible pour la requête.") return None # Récupérer les données du ticket avec les champs filtrés ticket_data = self._safe_execute(self.model_name, 'read', [ticket_id], valid_fields) if not ticket_data: print_error(f"Aucun ticket trouvé avec l'ID {ticket_id}") return None # Sauvegarde des données brutes pour analyse self.save_raw_ticket_data(ticket_data, f"ticket_{ticket_id}_raw.json") return ticket_data[0] except odoorpc.error.RPCError as e: error_message = str(e) print_error(f"Erreur RPC détectée : {error_message}") # Identifier le champ problématique invalid_field = self.extract_invalid_field(error_message) if invalid_field: print(f"Exclusion du champ invalide : {invalid_field}") excluded_fields.add(invalid_field) # Relancer la boucle immédiatement avec le champ exclu continue else: print_error("Impossible d'identifier le champ problématique.") return None def get_ticket_by_code(self, ticket_code): """Récupérer un ticket via son code""" domain = [('code', '=', ticket_code)] ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 1) if not ticket_ids: return None # Sauvegarde des données brutes pour analyse self.save_raw_ticket_data(ticket_ids, f"ticket_{ticket_code}_raw.json") return self.get_ticket_by_id(ticket_ids[0]) def get_available_projects(self): """Retourne la liste des projets disponibles""" projects = self._safe_execute('project.project', 'search_read', [], ['id', 'name']) if not projects: print_error("Aucun projet trouvé.") return {} return {proj['id']: proj['name'] for proj in projects} def export_tickets_by_project_and_stage(self, project_id): """ Exporte les tickets d'un projet classés par étape """ # Vérifier la connexion Odoo if not self._ensure_connection(): print_error("Connexion Odoo indisponible.") return # Récupérer les tickets du projet domain = [('project_id', '=', project_id)] ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 1000) if not ticket_ids: print_error(f"Aucun ticket trouvé pour le projet {project_id}.") return # Lire les détails des tickets tickets = [] for ticket_id in ticket_ids: ticket = self.get_ticket_by_id(ticket_id) if ticket: tickets.append(ticket) #trier les tickets par étape tickets_by_stage = {} for ticket in tickets: champs_relationnels = ticket.get("Champs Relationnels", {}) stage_data = champs_relationnels.get("stage_id", [0, "Non classé"]) stage_id = stage_data[0] if isinstance(stage_data, list) and len(stage_data) > 0 else 0 stage_name = stage_data[1] if isinstance(stage_data, list) and len(stage_data) > 1 else "Non classé" key = f"{stage_id}_{stage_name}" tickets_by_stage.setdefault(key, []).append(ticket) # Sauvegarde des fichiers project_dir = ensure_export_directory(f"project_{project_id}") for stage_key, stage_tickets in tickets_by_stage.items(): stage_dir = os.path.join(project_dir, stage_key) os.makedirs(stage_dir, exist_ok=True) save_json(os.path.join(stage_dir, "all_tickets.json"), stage_tickets) print(f"Exportation terminée. Les fichiers sont enregistrés dans : {project_dir}/")