from odoo_connection import OdooConnection import os from utils import save_json from config import EXPORT_DIR from data_filter import filter_ticket_data import json from ticket_search import TicketSearch from ticket_display import TicketDisplay from ticket_classification import TicketClassification class TicketManager: """Gère la récupération et le traitement des tickets""" def __init__(self): """Initialise le gestionnaire de tickets""" self.conn = OdooConnection() self.odoo = self.conn.get_odoo_instance() self.model_name = "project.task" self.search = TicketSearch() self.display = TicketDisplay() self.classification = TicketClassification() def _check_connection(self): """Vérifie que la connexion Odoo est active""" if self.odoo is None: print("Erreur: La connexion à Odoo n'est pas établie.") print("Tentative de reconnexion...") try: self.conn = OdooConnection() self.odoo = self.conn.get_odoo_instance() except Exception as e: print(f"Erreur lors de la tentative de reconnexion: {e}") self.odoo = None return self.odoo is not None def _safe_execute(self, model, method, *args): """Exécute une méthode Odoo de manière sécurisée""" if not self._check_connection(): return None try: if self.odoo is None: print("Erreur: Impossible d'exécuter la commande, la connexion à Odoo est inactive.") return None return self.odoo.execute(model, method, *args) except Exception as e: print(f"Erreur lors de l'exécution de {method} sur {model}: {e}") return None def get_ticket_fields(self): """Récupère tous les champs d'un modèle en excluant les Many2Many""" fields_info = self._safe_execute(self.model_name, 'fields_get') if fields_info is None: return [], [] simple_fields = [] related_fields = [] for field_name, field_data in fields_info.items(): field_type = field_data["type"] if field_type == "many2many": continue # Ignorer les Many2Many pour éviter l'erreur if field_type in ["many2one", "one2many"]: related_fields.append((field_name, field_type, field_data.get("relation", "Inconnu"))) else: simple_fields.append((field_name, field_type)) return simple_fields, related_fields def list_available_tickets(self, limit=10): """Liste les tickets disponibles avec leur code""" ticket_ids = self._safe_execute(self.model_name, 'search', [], 0, limit) if ticket_ids is None: return [] tickets_info = self._safe_execute(self.model_name, 'read', ticket_ids, ['id', 'name', 'code']) if tickets_info is None: return [] print("\nListe des tickets disponibles:") for ticket in tickets_info: code = ticket.get('code', 'N/A') print(f"Code: {code} - ID: {ticket['id']} - Nom: {ticket['name']}") return tickets_info def get_field_values(self, field_name, limit=50): """Récupère les valeurs possibles pour un champ relationnel""" simple_fields, related_fields = self.get_ticket_fields() relation_model = None # Trouver le modèle de relation for field, field_type, relation in related_fields: if field == field_name: relation_model = relation break if not relation_model: print(f"Le champ {field_name} n'est pas un champ relationnel connu.") return [] # Récupérer toutes les valeurs possibles value_ids = self._safe_execute(relation_model, 'search', [], 0, limit) if value_ids is None: return [] values = self._safe_execute(relation_model, 'read', value_ids, ['id', 'name']) if values is None: return [] print(f"\nValeurs disponibles pour le champ {field_name}:") for value in values: print(f" - ID: {value['id']} - Nom: {value['name']}") return values def get_ticket_by_id(self, ticket_id): """Récupère un ticket par son ID""" simple_fields, related_fields = self.get_ticket_fields() all_fields = [f[0] for f in simple_fields + related_fields] ticket_data = self._safe_execute(self.model_name, 'read', [ticket_id], all_fields) if not ticket_data: return None ticket = ticket_data[0] formatted_ticket = { "ID du Ticket": ticket["id"], "Nom": ticket["name"], "Code": ticket.get("code", "N/A"), "Date Limite": ticket.get("date_deadline", "Non défini"), "Champs Simples": {}, "Champs Relationnels": {}, "Discussions": [] } for field, field_type in simple_fields: formatted_ticket["Champs Simples"][field] = ticket.get(field, "Non défini") for field, field_type, relation in related_fields: formatted_ticket["Champs Relationnels"][field] = ticket.get(field, "Aucun") if "message_ids" in ticket and ticket["message_ids"]: formatted_ticket["Discussions"] = self.get_ticket_discussions(ticket["message_ids"]) print(f"\nTicket sélectionné : {formatted_ticket['Code']} - {formatted_ticket['Nom']}") return formatted_ticket def get_ticket_by_code(self, code): """Récupère un ticket par son code""" # Recherche du ticket par code domain = [('code', '=', code)] ticket_ids = self._safe_execute(self.model_name, 'search', domain) if not ticket_ids: print(f"Aucun ticket trouvé avec le code : {code}") return None # Utiliser la fonction existante pour récupérer les détails return self.get_ticket_by_id(ticket_ids[0]) def get_tickets_by_codes(self, codes): """Récupère plusieurs tickets par leurs codes""" tickets = [] for code in codes: ticket = self.get_ticket_by_code(code) if ticket: tickets.append(ticket) return tickets def search_tickets_by_fields(self, field_criteria, limit=50): """Recherche des tickets selon des critères de champs""" # Construire le domaine de recherche domain = [] for field, value in field_criteria.items(): domain.append((field, '=', value)) print(f"\nRecherche de tickets avec les critères: {field_criteria}") # Rechercher les tickets correspondants ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, limit) if not ticket_ids: print("Aucun ticket ne correspond aux critères de recherche.") return [] print(f"Nombre de tickets trouvés: {len(ticket_ids)}") # Récupérer tous les tickets trouvés tickets = [] for ticket_id in ticket_ids: ticket = self.get_ticket_by_id(ticket_id) if ticket: tickets.append(ticket) return tickets def get_ticket_discussions(self, message_ids): """Récupère et nettoie les discussions associées à un ticket""" from data_formatter import clean_ticket_data model_name = "mail.message" if not message_ids: return [] messages = self._safe_execute(model_name, 'read', message_ids, ["id", "subject", "body", "author_id", "date"]) if messages is None: return [] formatted_messages = [] for msg in messages: cleaned_content = clean_ticket_data({"description": msg["body"]})["description"] if cleaned_content: formatted_messages.append({ "ID Message": msg["id"], "Sujet": msg["subject"], "Contenu": cleaned_content, "Auteur": msg["author_id"][1] if msg["author_id"] else "Inconnu", "Date": msg["date"] }) return formatted_messages def display_field(self, ticket, field_path): """Affiche un champ spécifique du ticket""" if not ticket: print("Aucun ticket à afficher.") return parts = field_path.split('.') if len(parts) == 1: # Champ principal if parts[0] in ticket: print(f"{parts[0]}: {ticket[parts[0]]}") else: print(f"Champ {parts[0]} introuvable.") elif len(parts) == 2: # Champ dans une section section, field = parts if section in ticket and field in ticket[section]: print(f"{section}.{field}: {ticket[section][field]}") else: print(f"Champ {field} introuvable dans {section}.") def classify_tickets_by_field(self, codes, field_name): """Classe les tickets par un champ donné et crée des répertoires.""" tickets = self.get_tickets_by_codes(codes) if not tickets: print("Aucun ticket à classer.") return # Dictionnaire pour stocker les tickets par valeur de champ classified_tickets = {} # Vérifier si le champ existe dans les tickets field_found = False for ticket in tickets: # Appliquer le filtrage pour nettoyer les discussions ticket = filter_ticket_data(ticket) # Nettoyer le ticket # Vérifier si le champ est dans Champs Simples if field_name in ticket["Champs Simples"]: field_found = True field_value = ticket["Champs Simples"][field_name] if field_value is not None: # Convertir la valeur en string pour l'utiliser comme nom de dossier str_value = str(field_value).replace("/", "_").replace("\\", "_") if str_value not in classified_tickets: classified_tickets[str_value] = [] classified_tickets[str_value].append(ticket) # Vérifier si le champ est dans Champs Relationnels elif field_name in ticket["Champs Relationnels"]: field_found = True field_value = ticket["Champs Relationnels"][field_name] if field_value is not None and field_value != "Aucun": # Pour les champs relationnels, utiliser le nom si disponible if isinstance(field_value, list) and len(field_value) > 1: str_value = str(field_value[1]).replace("/", "_").replace("\\", "_") else: str_value = str(field_value).replace("/", "_").replace("\\", "_") if str_value not in classified_tickets: classified_tickets[str_value] = [] classified_tickets[str_value].append(ticket) # Vérifier si le champ est dans les champs principaux elif field_name in ticket and ticket[field_name] is not None: field_found = True field_value = ticket[field_name] str_value = str(field_value).replace("/", "_").replace("\\", "_") if str_value not in classified_tickets: classified_tickets[str_value] = [] classified_tickets[str_value].append(ticket) if not field_found: print(f"Le champ '{field_name}' n'a pas été trouvé dans les tickets.") return if not classified_tickets: print(f"Aucun ticket avec des valeurs pour le champ '{field_name}'.") return # Créer un répertoire principal pour le champ field_dir = os.path.join(EXPORT_DIR, f"classified_by_{field_name}") os.makedirs(field_dir, exist_ok=True) print(f"\nClassification des tickets par le champ '{field_name}':") # Créer des répertoires et sauvegarder les tickets for value, value_tickets in classified_tickets.items(): # Créer un répertoire pour la valeur du champ value_dir = os.path.join(field_dir, value) os.makedirs(value_dir, exist_ok=True) print(f"- Valeur '{value}': {len(value_tickets)} ticket(s)") for ticket in value_tickets: # Sauvegarder chaque ticket dans un fichier JSON filename = f"ticket_{ticket['Code']}_{ticket['ID du Ticket']}.json" save_json(os.path.join(value_dir, filename), ticket) print(f" • Ticket {ticket['Code']} sauvegardé") print(f"\nTickets classés et sauvegardés dans {field_dir}/") return classified_tickets def classify_tickets_by_project_and_stage(self, project_id): """Classifie les tickets par project_id et stage_id""" # Rechercher les tickets par project_id try: project_id = int(project_id) # Convertir en entier # Utiliser un dictionnaire pour les critères de recherche tickets = self.search_tickets_by_fields({'project_id': project_id}) if not tickets: print(f"Aucun ticket trouvé pour le project_id : {project_id}") return # Créer un répertoire principal pour les classifications base_dir = os.path.join(EXPORT_DIR, "classified_by_project_id") os.makedirs(base_dir, exist_ok=True) # Créer un répertoire pour le project_id project_dir = os.path.join(base_dir, str(project_id)) os.makedirs(project_dir, exist_ok=True) # Récupérer le nom du projet project_name = "Inconnu" if tickets and 'Champs Relationnels' in tickets[0] and 'project_id' in tickets[0]['Champs Relationnels']: if isinstance(tickets[0]['Champs Relationnels']['project_id'], list) and len(tickets[0]['Champs Relationnels']['project_id']) > 1: project_name = tickets[0]['Champs Relationnels']['project_id'][1] print(f"\nClassification des tickets pour le projet {project_id} ({project_name}):") print(f"Nombre de tickets trouvés: {len(tickets)}") # Classer les tickets par stage_id tickets_by_stage = {} for ticket in tickets: # Nettoyer le ticket en utilisant le filtre pour nettoyer les discussions ticket = filter_ticket_data(ticket) # Vérifier que stage_id existe dans les champs relationnels if 'Champs Relationnels' in ticket and 'stage_id' in ticket['Champs Relationnels']: if isinstance(ticket['Champs Relationnels']['stage_id'], list) and len(ticket['Champs Relationnels']['stage_id']) > 0: stage_id = ticket['Champs Relationnels']['stage_id'][0] # Récupérer l'ID du stage stage_name = ticket['Champs Relationnels']['stage_id'][1] if len(ticket['Champs Relationnels']['stage_id']) > 1 else "Inconnu" key = f"{stage_id}_{stage_name}" if key not in tickets_by_stage: tickets_by_stage[key] = [] tickets_by_stage[key].append(ticket) # Vérifier si des tickets ont été classés if not tickets_by_stage: print("Aucun ticket n'a pu être classé par stage_id.") return # Créer des répertoires pour chaque stage_id et sauvegarder les tickets for stage_key, stage_tickets in tickets_by_stage.items(): # Extraire l'ID et le nom du stage stage_parts = stage_key.split('_', 1) stage_id = stage_parts[0] stage_name = stage_parts[1] if len(stage_parts) > 1 else "Inconnu" # Créer un nom de dossier valide en remplaçant les caractères problématiques safe_stage_name = stage_name.replace('/', '_').replace('\\', '_') stage_dir = os.path.join(project_dir, f"{stage_id}_{safe_stage_name}") os.makedirs(stage_dir, exist_ok=True) print(f"- Stage '{stage_name}' (ID: {stage_id}): {len(stage_tickets)} tickets") for i, ticket in enumerate(stage_tickets): ticket_file = os.path.join(stage_dir, f"ticket_{ticket['Code']}.json") with open(ticket_file, 'w', encoding='utf-8') as f: json.dump(ticket, f, ensure_ascii=False, indent=4) # Afficher un message toutes les 5 tickets ou pour le dernier if i % 5 == 0 or i == len(stage_tickets) - 1: print(f" • {i+1}/{len(stage_tickets)} tickets sauvegardés") print(f"\nTickets classés et sauvegardés dans {project_dir}/") except ValueError: print(f"Erreur: Le project_id '{project_id}' n'est pas un nombre valide.") except Exception as e: print(f"Erreur lors de la classification des tickets: {e}") def analyze_common_values(self, tickets): """Analyse les valeurs communes entre plusieurs tickets""" if not tickets or len(tickets) < 2: print("L'analyse des valeurs communes nécessite au moins 2 tickets.") return {} common_fields = {} first_ticket = tickets[0] # Analyser les champs simples if 'Champs Simples' in first_ticket: # Si le format est structuré for field, value in first_ticket['Champs Simples'].items(): is_common = True for ticket in tickets[1:]: if ('Champs Simples' not in ticket or field not in ticket['Champs Simples'] or ticket['Champs Simples'][field] != value): is_common = False break if is_common and value: # Ne pas inclure les valeurs vides common_fields[f"Champs Simples.{field}"] = value else: # Format standard non structuré simple_fields = ['description', 'priority', 'sequence', 'date_deadline', 'create_date', 'write_date'] for field in simple_fields: if field in first_ticket and first_ticket[field]: value = first_ticket[field] is_common = True for ticket in tickets[1:]: if field not in ticket or ticket[field] != value: is_common = False break if is_common: common_fields[field] = value # Analyser les champs relationnels if 'Champs Relationnels' in first_ticket: # Si le format est structuré for field, value in first_ticket['Champs Relationnels'].items(): is_common = True for ticket in tickets[1:]: if ('Champs Relationnels' not in ticket or field not in ticket['Champs Relationnels'] or ticket['Champs Relationnels'][field] != value): is_common = False break if is_common and value: # Ne pas inclure les valeurs vides common_fields[f"Champs Relationnels.{field}"] = value else: # Format standard des champs relationnels relational_fields = ['stage_id', 'project_id', 'user_id'] for field in relational_fields: if field in first_ticket and first_ticket[field]: value = first_ticket[field] is_common = True for ticket in tickets[1:]: if field not in ticket or ticket[field] != value: is_common = False break if is_common: # Récupérer le nom plutôt que l'ID si disponible if isinstance(value, list) and len(value) > 1: common_fields[field] = f"{value[1]} (ID: {value[0]})" else: common_fields[field] = value return common_fields def list_models(self): """Affiche la liste des modèles disponibles dans Odoo""" models = self._safe_execute('ir.model', 'search_read', [], ['model', 'name']) if not models: print("Aucun modèle disponible.") return [] print("\nListe des modèles disponibles:") for model in models: print(f"Modèle: {model['name']} (ID: {model['model']})") return models def list_model_fields(self, model_name): """Affiche les champs d'un modèle donné""" fields_info = self._safe_execute(model_name, 'fields_get') if not fields_info: print(f"Aucun champ trouvé pour le modèle {model_name}.") return [] print(f"\nChamps du modèle {model_name}:") for field_name, field_data in fields_info.items(): print(f"Champ: {field_name} - Type: {field_data['type']}") return fields_info def export_model_fields_to_json(self, model_name, filename): """Exporte les informations des champs d'un modèle en JSON""" fields_info = self.list_model_fields(model_name) if not fields_info: return False file_path = os.path.join(EXPORT_DIR, f"{filename}.json") try: with open(file_path, 'w', encoding='utf-8') as f: json.dump(fields_info, f, indent=4, ensure_ascii=False) print(f"Informations des champs exportées dans {file_path}") return True except Exception as e: print(f"Erreur lors de l'exportation des champs: {str(e)}") return False