#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import sys import time import logging import json import traceback from datetime import datetime # Import des agents from agents.mistral_large.agent_ticket_analyser import AgentTicketAnalyser from agents.pixtral_large.agent_image_sorter import AgentImageSorter from agents.pixtral_large.agent_image_analyser import AgentImageAnalyser # Import des modèles LLM from llm_classes.mistral_large import MistralLarge from llm_classes.pixtral_large import PixtralLarge # Import pour filtrer les images dupliquées from utils.image_dedup import filtrer_images_uniques # Configuration du logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename='test_image_analyse.log', filemode='w') logger = logging.getLogger("TestImageAnalyse") def test_image_analyse(ticket_id): """ Exécute l'analyse d'un ticket, le tri des images et l'analyse d'images contextualisée Args: ticket_id: Identifiant du ticket à traiter (obligatoire) """ if not ticket_id: logger.error("Erreur: Aucun ID de ticket fourni.") print("ERREUR: Vous devez fournir un ID de ticket comme argument.") return # Vérifier que le dossier output existe if not os.path.exists("output/"): logger.error("Le dossier output/ n'existe pas") print("ERREUR: Le dossier output/ n'existe pas") return # Vérifier que le ticket existe ticket_path = os.path.join("output", f"ticket_{ticket_id}") if not os.path.exists(ticket_path): logger.error(f"Le ticket {ticket_id} n'existe pas") print(f"ERREUR: Le ticket {ticket_id} n'existe pas") return print(f"Ticket à traiter: {ticket_id}") # Initialisation des LLM print("Initialisation des modèles LLM...") start_time = time.time() # Mistral Large pour l'analyse de ticket ticket_llm = MistralLarge() logger.info("LLM MistralLarge initialisé pour l'analyse du ticket") # Pixtral Large pour le tri et l'analyse d'images image_sorter_llm = PixtralLarge() logger.info("LLM PixtralLarge initialisé pour le tri d'images") image_analyser_llm = PixtralLarge() logger.info("LLM PixtralLarge initialisé pour l'analyse d'images") llm_init_time = time.time() - start_time print(f"Tous les modèles LLM ont été initialisés en {llm_init_time:.2f} secondes") # Création des agents print("Création des agents...") ticket_agent = AgentTicketAnalyser(ticket_llm) image_sorter = AgentImageSorter(image_sorter_llm) image_analyser = AgentImageAnalyser(image_analyser_llm) print("Tous les agents ont été créés") # ÉTAPE 1: Trouver l'extraction la plus récente du ticket extractions = [d for d in os.listdir(ticket_path) if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(ticket_id)] if not extractions: logger.error(f"Aucune extraction trouvée pour le ticket {ticket_id}") print(f"ERREUR: Aucune extraction trouvée pour le ticket {ticket_id}") return # Trier par ordre décroissant pour avoir la plus récente en premier extractions.sort(reverse=True) latest_extraction = extractions[0] extraction_path = os.path.join(ticket_path, latest_extraction) print(f"Utilisation de l'extraction: {latest_extraction}") # ÉTAPE 2: Chercher le fichier JSON du ticket rapport_dir = os.path.join(extraction_path, f"{ticket_id}_rapports") # Chercher dans le répertoire des rapports json_file = None if os.path.exists(rapport_dir): for file in os.listdir(rapport_dir): if file.lower().endswith('.json'): json_file = os.path.join(rapport_dir, file) break # Si pas trouvé, chercher directement dans l'extraction if not json_file: for file in os.listdir(extraction_path): if file.lower().endswith('.json') and ticket_id in file: json_file = os.path.join(extraction_path, file) break if not json_file: logger.error(f"Aucun fichier JSON trouvé pour le ticket {ticket_id}") print(f"ERREUR: Aucun fichier JSON trouvé pour le ticket {ticket_id}") return print(f"Fichier JSON du ticket trouvé: {os.path.basename(json_file)}") # ÉTAPE 3: Charger les données du ticket try: with open(json_file, 'r', encoding='utf-8') as f: ticket_data = json.load(f) print("Données du ticket chargées avec succès") except Exception as e: logger.error(f"Erreur lors du chargement du fichier JSON: {e}") print(f"ERREUR: Impossible de charger le fichier JSON: {e}") return # Ajouter le code du ticket si absent if "code" not in ticket_data: ticket_data["code"] = ticket_id # ÉTAPE 4: Analyse du ticket print("Analyse du ticket en cours...") try: ticket_analysis = ticket_agent.executer(ticket_data) print(f"Analyse du ticket terminée: {len(ticket_analysis) if ticket_analysis else 0} caractères") except Exception as e: logger.error(f"Erreur lors de l'analyse du ticket: {e}") print(f"ERREUR: Impossible d'analyser le ticket: {e}") ticket_analysis = "Erreur d'analyse du ticket" # ÉTAPE 5: Recherche et tri des images attachments_dir = os.path.join(extraction_path, "attachments") if not os.path.exists(attachments_dir): logger.warning(f"Répertoire des pièces jointes non trouvé: {attachments_dir}") print("AVERTISSEMENT: Aucune pièce jointe trouvée pour ce ticket") return # Récupérer et filtrer les images images = [f for f in os.listdir(attachments_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))] image_paths = [os.path.join(attachments_dir, img) for img in images] image_paths_uniques = filtrer_images_uniques(image_paths) images = [os.path.basename(p) for p in image_paths_uniques] print(f"Images trouvées: {len(images)}") # ÉTAPE 6: Tri des images relevant_images = [] for img in images: img_path = os.path.join(attachments_dir, img) print(f"Évaluation de l'image: {img}") try: sorting_result = image_sorter.executer(img_path) is_relevant = sorting_result.get("is_relevant", False) reason = sorting_result.get("reason", "") if is_relevant: print(f" => Pertinente: {reason[:50]}...") relevant_images.append(img_path) else: print(f" => Non pertinente: {reason[:50]}...") except Exception as e: logger.error(f"Erreur lors du tri de l'image {img}: {e}") print(f"ERREUR: Impossible de trier l'image {img}: {e}") print(f"Images pertinentes trouvées: {len(relevant_images)}/{len(images)}") # ÉTAPE 7: Analyse des images pertinentes avec contexte if relevant_images: print("Analyse des images pertinentes avec contexte...") for img_path in relevant_images: img_name = os.path.basename(img_path) print(f"Analyse approfondie de l'image: {img_name}") try: analysis_result = image_analyser.executer(img_path, contexte=ticket_analysis) print(f" => Analyse terminée: {len(analysis_result.get('analyse', '')) if analysis_result else 0} caractères") except Exception as e: logger.error(f"Erreur lors de l'analyse de l'image {img_name}: {e}") print(f"ERREUR: Impossible d'analyser l'image {img_name}: {e}") else: print("Aucune image pertinente à analyser") print(f"\nProcessus d'analyse terminé avec succès.") if __name__ == "__main__": print("Démarrage du test d'analyse d'images contextualisée") # Vérifier si un ID de ticket est passé en argument ticket_id = None if len(sys.argv) > 1: ticket_id = sys.argv[1] print(f"ID de ticket fourni en argument: {ticket_id}") else: print("ERREUR: Vous devez fournir un ID de ticket comme argument.") sys.exit(1) test_image_analyse(ticket_id) print("Test terminé")