#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script principal pour l'analyse de tickets de support. Ce script coordonne l'extraction de données depuis Odoo et l'analyse avec les agents LLM. """ import os import json import argparse import subprocess import shutil import re from typing import Dict, List, Any, Optional from utils import TicketAnalyzer, TicketManager from post_process import transformer_messages, corriger_json_accents, corriger_markdown_accents, reparer_ticket def charger_config(config_path: str) -> Dict[str, Any]: """ Charge la configuration depuis un fichier JSON. Args: config_path: Chemin vers le fichier de configuration Returns: Dictionnaire de configuration """ try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config except Exception as e: print(f"Erreur lors du chargement de la configuration: {e}") # Configuration par défaut minimale return { "odoo": { "url": "https://example.odoo.com", "db": "db_name", "username": "user@example.com", "api_key": "your_api_key" }, "llm": { "api_key": "your_mistral_api_key" }, "output_dir": "output" } def extraire_ticket(config: Dict[str, Any], ticket_code: str, output_dir: str) -> Dict[str, Any]: """ Extrait les données d'un ticket depuis Odoo. Args: config: Configuration avec les paramètres de connexion ticket_code: Code du ticket à extraire output_dir: Répertoire où sauvegarder les données Returns: Données du ticket extraites """ # Créer le gestionnaire de tickets manager = TicketManager( url=config["odoo"]["url"], db=config["odoo"]["db"], username=config["odoo"]["username"], api_key=config["odoo"]["api_key"] ) # Récupérer le ticket par son code ticket = manager.get_ticket_by_code(ticket_code) if not ticket: print(f"Ticket {ticket_code} non trouvé") return {} # Extraire toutes les données du ticket ticket_dir = os.path.join(output_dir, f"ticket_{ticket_code}") ticket_data = manager.extract_ticket_data(ticket["id"], ticket_dir) # Post-traiter immédiatement les messages post_traiter_messages(ticket_dir) return ticket_data def post_traiter_messages(ticket_dir: str) -> None: """ Post-traite les messages du ticket pour une meilleure analyse. Args: ticket_dir: Répertoire contenant les données du ticket """ messages_file = os.path.join(ticket_dir, "messages.json") # Vérifier que le fichier existe if not os.path.exists(messages_file): print(f"AVERTISSEMENT: Fichier messages.json introuvable dans {ticket_dir}") return print(f"Post-traitement des messages du ticket...") # Créer une sauvegarde avant transformation backup_file = os.path.join(ticket_dir, "messages.json.backup") if not os.path.exists(backup_file): shutil.copy2(messages_file, backup_file) print(f"Sauvegarde créée: {backup_file}") # Transformer les messages pour un format optimal transformer_messages(messages_file) # Vérifier que la transformation a réussi try: with open(messages_file, 'r', encoding='utf-8') as f: messages = json.load(f) print(f"Post-traitement terminé, {len(messages)} messages formatés.") except Exception as e: print(f"ERREUR: Échec du post-traitement: {e}") # Restaurer la sauvegarde si nécessaire if os.path.exists(backup_file): shutil.copy2(backup_file, messages_file) print("Restauration de la sauvegarde des messages.") def preparer_donnees_ticket(ticket_dir: str) -> Dict[str, Any]: """ Prépare les données du ticket pour l'analyse à partir des fichiers stockés. Args: ticket_dir: Répertoire contenant les données du ticket Returns: Dictionnaire des données du ticket prêtes pour l'analyse """ # Chemins des fichiers sources ticket_file = os.path.join(ticket_dir, "ticket_info.json") messages_file = os.path.join(ticket_dir, "messages.json") attachments_file = os.path.join(ticket_dir, "attachments_info.json") attachments_dir = os.path.join(ticket_dir, "attachments") # Vérifier que les fichiers nécessaires existent if not all(os.path.exists(f) for f in [ticket_file, messages_file, attachments_file]): missing = [f for f in [ticket_file, messages_file, attachments_file] if not os.path.exists(f)] raise FileNotFoundError(f"Fichiers manquants: {', '.join(missing)}") # Charger les données try: with open(ticket_file, 'r', encoding='utf-8') as f: ticket_info = json.load(f) with open(messages_file, 'r', encoding='utf-8') as f: messages = json.load(f) with open(attachments_file, 'r', encoding='utf-8') as f: attachments = json.load(f) # Vérifier et corriger les chemins des pièces jointes for attachment in attachments: if "file_path" in attachment: # S'assurer que le chemin est absolu if not os.path.isabs(attachment["file_path"]): attachment["file_path"] = os.path.join(attachments_dir, os.path.basename(attachment["file_path"])) # Vérifier que le fichier existe if not os.path.exists(attachment["file_path"]): print(f"AVERTISSEMENT: Pièce jointe introuvable: {attachment['file_path']}") return { "ticket": ticket_info, "messages": messages, "attachments": attachments, "files": { "ticket_info": ticket_file, "messages": messages_file, "attachments_info": attachments_file, "attachments_dir": attachments_dir } } except Exception as e: raise ValueError(f"Erreur lors du chargement des données du ticket: {e}") def analyser_ticket(ticket_data: Dict[str, Any], config: Dict[str, Any], output_dir: str, llm_params: Optional[Dict[str, Any]] = None) -> Dict[str, str]: """ Analyse un ticket avec les agents LLM. Args: ticket_data: Données du ticket extraites config: Configuration avec les clés API output_dir: Répertoire où sauvegarder les résultats llm_params: Paramètres LLM globaux à appliquer Returns: Chemins des fichiers générés """ # Créer l'analyseur de tickets analyzer = TicketAnalyzer(api_key=config["llm"]["api_key"], llm_params=llm_params) # Préparer le contexte pour l'analyse des images ticket_info = ticket_data.get("ticket", {}) contexte = f""" TICKET: {ticket_info.get('code', 'Inconnu')} - {ticket_info.get('name', 'Sans titre')} DESCRIPTION: {ticket_info.get('description', 'Aucune description')} """ # Récupérer les chemins des pièces jointes (images) attachments = ticket_data.get("attachments", []) image_paths = [] # Vérification des doublons par nom de fichier image_noms = set() for attachment in attachments: chemin = attachment.get("file_path") nom_fichier = os.path.basename(chemin) if chemin else "" if not chemin or not os.path.exists(chemin): continue mimetype = attachment.get("mimetype", "") # Vérifier que c'est une image et qu'on ne l'a pas déjà incluse if mimetype.startswith("image/") and nom_fichier not in image_noms: image_paths.append(chemin) image_noms.add(nom_fichier) print(f"Image ajoutée pour analyse: {nom_fichier}") # Filtrer les images pertinentes print(f"Filtrage de {len(image_paths)} images...") images_pertinentes = analyzer.filtrer_images(image_paths) print(f"Images pertinentes: {len(images_pertinentes)}/{len(image_paths)}") # Imprimer les détails pour le débogage for i, img in enumerate(image_paths): est_pertinente = img in images_pertinentes print(f" Image {i+1}: {os.path.basename(img)} - {'Pertinente' if est_pertinente else 'Non pertinente'}") # Analyser les images pertinentes print("Analyse des images pertinentes...") resultats_images = analyzer.analyser_images(images_pertinentes, contexte) print(f"Analyses d'images terminées: {len(resultats_images)}") # Extraire les questions et réponses print("Extraction des questions et réponses...") messages = ticket_data.get("messages", []) # Vérifier que les messages sont traités et ne contiennent pas de balises HTML for msg in messages: body = msg.get("body", "") if isinstance(body, str) and re.search(r'<[a-z]+[^>]*>', body, re.IGNORECASE): print(f"AVERTISSEMENT: Message {msg.get('id', 'inconnu')} contient du HTML non traité") qr_path = os.path.join(output_dir, "questions_reponses.md") resultats_qr = analyzer.extraire_questions_reponses(messages, qr_path) print(f"Questions extraites: {resultats_qr.get('nb_questions', 0)}") print(f"Réponses extraites: {resultats_qr.get('nb_reponses', 0)}") # Générer le rapport final print("Génération du rapport final...") rapport_dir = os.path.join(output_dir, "rapport") fichiers = analyzer.generer_rapport(rapport_dir) print(f"Rapport généré: {rapport_dir}") # Corriger les problèmes d'accents dans les fichiers JSON corriger_accents_fichiers(rapport_dir) return fichiers def corriger_accents_fichiers(dir_path: str) -> None: """ Corrige les problèmes d'accents dans tous les fichiers JSON et Markdown d'un répertoire. Args: dir_path: Chemin du répertoire contenant les fichiers à corriger """ print("Correction des problèmes d'accents dans les fichiers...") if not os.path.exists(dir_path): print(f"Répertoire non trouvé: {dir_path}") return for root, _, files in os.walk(dir_path): for file in files: # Corriger les fichiers JSON if file.endswith(".json"): json_file = os.path.join(root, file) corriger_json_accents(json_file) # Corriger les fichiers Markdown elif file.endswith(".md"): md_file = os.path.join(root, file) corriger_markdown_accents(md_file) # Corriger également les fichiers Markdown du répertoire de ticket ticket_dir = os.path.dirname(dir_path) for file in os.listdir(ticket_dir): if file.endswith(".md"): md_file = os.path.join(ticket_dir, file) corriger_markdown_accents(md_file) print("Correction des accents terminée.") def main(): """ Fonction principale du script. """ # Parser les arguments de la ligne de commande parser = argparse.ArgumentParser(description="Analyse de tickets de support") parser.add_argument("ticket_code", help="Code du ticket à analyser") parser.add_argument("--config", "-c", default="config.json", help="Chemin vers le fichier de configuration") parser.add_argument("--output", "-o", default="output", help="Répertoire de sortie") parser.add_argument("--skip-extraction", "-s", action="store_true", help="Ignorer l'extraction du ticket (utiliser les données existantes)") parser.add_argument("--fix-accents", "-f", action="store_true", help="Corriger les problèmes d'accents dans les fichiers existants") parser.add_argument("--llm-params", "-p", type=str, help="Paramètres LLM au format JSON (ex: '{\"temperature\": 0.5}')") parser.add_argument("--reprocess", "-r", action="store_true", help="Forcer le retraitement des messages même si l'extraction est ignorée") parser.add_argument("--repair", action="store_true", help="Réparer un ticket corrompu avant analyse") args = parser.parse_args() # Charger la configuration config = charger_config(args.config) # Charger les paramètres LLM supplémentaires si spécifiés llm_params = {} if args.llm_params: try: llm_params = json.loads(args.llm_params) print(f"Paramètres LLM personnalisés: {llm_params}") except json.JSONDecodeError as e: print(f"Erreur lors du parsing des paramètres LLM: {e}") # Créer le répertoire de sortie os.makedirs(args.output, exist_ok=True) # Construire le chemin du répertoire du ticket ticket_dir = os.path.join(args.output, f"ticket_{args.ticket_code}") # Réparer le ticket si demandé if args.repair: if os.path.exists(ticket_dir): print(f"Réparation du ticket {args.ticket_code}...") success = reparer_ticket(ticket_dir) if not success: print("ERREUR: La réparation du ticket a échoué. Impossible de continuer.") return print(f"Réparation terminée, poursuite de l'analyse...") else: print(f"Impossible de réparer: répertoire du ticket {args.ticket_code} introuvable.") if not args.skip_extraction: print("Le ticket sera extrait depuis Odoo.") else: print("ERREUR: Impossible de continuer sans extraction.") return # Si l'option de correction des accents est activée uniquement if args.fix_accents and not args.skip_extraction and not args.repair: rapport_dir = os.path.join(ticket_dir, "rapport") if os.path.exists(rapport_dir): corriger_accents_fichiers(rapport_dir) return # Extraction ou chargement des données du ticket try: if not args.skip_extraction: # Extraire les données du ticket print(f"Extraction du ticket {args.ticket_code}...") ticket_data = extraire_ticket(config, args.ticket_code, args.output) if not ticket_data: print("Impossible de continuer sans données de ticket.") return else: # Si on ignore l'extraction mais qu'on veut retraiter if args.reprocess: print(f"Retraitement forcé des messages du ticket {args.ticket_code}...") post_traiter_messages(ticket_dir) # Charger les données existantes print(f"Chargement des données du ticket {args.ticket_code}...") ticket_data = preparer_donnees_ticket(ticket_dir) print("Données chargées avec succès.") # Analyser le ticket print(f"Analyse du ticket {args.ticket_code}...") fichiers = analyser_ticket(ticket_data, config, ticket_dir, llm_params) print("\nAnalyse terminée!") print(f"Rapport JSON: {fichiers['json']}") print(f"Rapport Markdown: {fichiers['markdown']}") except FileNotFoundError as e: print(f"ERREUR: {e}") print("Utilisez l'extraction ou assurez-vous que tous les fichiers nécessaires existent.") print("Ou bien utilisez l'option --repair pour réparer le ticket.") except ValueError as e: print(f"ERREUR: {e}") print("Vous pouvez essayer l'option --repair pour réparer le ticket.") except Exception as e: print(f"ERREUR inattendue: {e}") print("Vous pouvez essayer l'option --repair pour réparer le ticket.") if __name__ == "__main__": main()