llm_ticket3/test_analyse_image_large.py
2025-04-18 12:05:46 +02:00

217 lines
8.3 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import time
import logging
import json
import traceback
from datetime import datetime
# Import des agents
from agents.mistral_large.agent_ticket_analyser import AgentTicketAnalyser
from agents.pixtral_large.agent_image_sorter import AgentImageSorter
from agents.pixtral_large.agent_image_analyser import AgentImageAnalyser
# Import des modèles LLM
from llm_classes.mistral_large import MistralLarge
from llm_classes.pixtral_large import PixtralLarge
# Import pour filtrer les images dupliquées
from utils.image_dedup import filtrer_images_uniques
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
filename='test_image_analyse.log', filemode='w')
logger = logging.getLogger("TestImageAnalyse")
def test_image_analyse(ticket_id):
"""
Exécute l'analyse d'un ticket, le tri des images et l'analyse d'images contextualisée
Args:
ticket_id: Identifiant du ticket à traiter (obligatoire)
"""
if not ticket_id:
logger.error("Erreur: Aucun ID de ticket fourni.")
print("ERREUR: Vous devez fournir un ID de ticket comme argument.")
return
# Vérifier que le dossier output existe
if not os.path.exists("output/"):
logger.error("Le dossier output/ n'existe pas")
print("ERREUR: Le dossier output/ n'existe pas")
return
# Vérifier que le ticket existe
ticket_path = os.path.join("output", f"ticket_{ticket_id}")
if not os.path.exists(ticket_path):
logger.error(f"Le ticket {ticket_id} n'existe pas")
print(f"ERREUR: Le ticket {ticket_id} n'existe pas")
return
print(f"Ticket à traiter: {ticket_id}")
# Initialisation des LLM
print("Initialisation des modèles LLM...")
start_time = time.time()
# Mistral Large pour l'analyse de ticket
ticket_llm = MistralLarge()
logger.info("LLM MistralLarge initialisé pour l'analyse du ticket")
# Pixtral Large pour le tri et l'analyse d'images
image_sorter_llm = PixtralLarge()
logger.info("LLM PixtralLarge initialisé pour le tri d'images")
image_analyser_llm = PixtralLarge()
logger.info("LLM PixtralLarge initialisé pour l'analyse d'images")
llm_init_time = time.time() - start_time
print(f"Tous les modèles LLM ont été initialisés en {llm_init_time:.2f} secondes")
# Création des agents
print("Création des agents...")
ticket_agent = AgentTicketAnalyser(ticket_llm)
image_sorter = AgentImageSorter(image_sorter_llm)
image_analyser = AgentImageAnalyser(image_analyser_llm)
print("Tous les agents ont été créés")
# ÉTAPE 1: Trouver l'extraction la plus récente du ticket
extractions = [d for d in os.listdir(ticket_path) if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(ticket_id)]
if not extractions:
logger.error(f"Aucune extraction trouvée pour le ticket {ticket_id}")
print(f"ERREUR: Aucune extraction trouvée pour le ticket {ticket_id}")
return
# Trier par ordre décroissant pour avoir la plus récente en premier
extractions.sort(reverse=True)
latest_extraction = extractions[0]
extraction_path = os.path.join(ticket_path, latest_extraction)
print(f"Utilisation de l'extraction: {latest_extraction}")
# ÉTAPE 2: Chercher le fichier JSON du ticket
rapport_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
# Chercher dans le répertoire des rapports
json_file = None
if os.path.exists(rapport_dir):
for file in os.listdir(rapport_dir):
if file.lower().endswith('.json'):
json_file = os.path.join(rapport_dir, file)
break
# Si pas trouvé, chercher directement dans l'extraction
if not json_file:
for file in os.listdir(extraction_path):
if file.lower().endswith('.json') and ticket_id in file:
json_file = os.path.join(extraction_path, file)
break
if not json_file:
logger.error(f"Aucun fichier JSON trouvé pour le ticket {ticket_id}")
print(f"ERREUR: Aucun fichier JSON trouvé pour le ticket {ticket_id}")
return
print(f"Fichier JSON du ticket trouvé: {os.path.basename(json_file)}")
# ÉTAPE 3: Charger les données du ticket
try:
with open(json_file, 'r', encoding='utf-8') as f:
ticket_data = json.load(f)
print("Données du ticket chargées avec succès")
except Exception as e:
logger.error(f"Erreur lors du chargement du fichier JSON: {e}")
print(f"ERREUR: Impossible de charger le fichier JSON: {e}")
return
# Ajouter le code du ticket si absent
if "code" not in ticket_data:
ticket_data["code"] = ticket_id
# ÉTAPE 4: Analyse du ticket
print("Analyse du ticket en cours...")
try:
ticket_analysis = ticket_agent.executer(ticket_data)
print(f"Analyse du ticket terminée: {len(ticket_analysis) if ticket_analysis else 0} caractères")
except Exception as e:
logger.error(f"Erreur lors de l'analyse du ticket: {e}")
print(f"ERREUR: Impossible d'analyser le ticket: {e}")
ticket_analysis = "Erreur d'analyse du ticket"
# ÉTAPE 5: Recherche et tri des images
attachments_dir = os.path.join(extraction_path, "attachments")
if not os.path.exists(attachments_dir):
logger.warning(f"Répertoire des pièces jointes non trouvé: {attachments_dir}")
print("AVERTISSEMENT: Aucune pièce jointe trouvée pour ce ticket")
return
# Récupérer et filtrer les images
images = [f for f in os.listdir(attachments_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
image_paths = [os.path.join(attachments_dir, img) for img in images]
image_paths_uniques = filtrer_images_uniques(image_paths)
images = [os.path.basename(p) for p in image_paths_uniques]
print(f"Images trouvées: {len(images)}")
# ÉTAPE 6: Tri des images
relevant_images = []
for img in images:
img_path = os.path.join(attachments_dir, img)
print(f"Évaluation de l'image: {img}")
try:
sorting_result = image_sorter.executer(img_path)
is_relevant = sorting_result.get("is_relevant", False)
reason = sorting_result.get("reason", "")
if is_relevant:
print(f" => Pertinente: {reason[:50]}...")
relevant_images.append(img_path)
else:
print(f" => Non pertinente: {reason[:50]}...")
except Exception as e:
logger.error(f"Erreur lors du tri de l'image {img}: {e}")
print(f"ERREUR: Impossible de trier l'image {img}: {e}")
print(f"Images pertinentes trouvées: {len(relevant_images)}/{len(images)}")
# ÉTAPE 7: Analyse des images pertinentes avec contexte
if relevant_images:
print("Analyse des images pertinentes avec contexte...")
for img_path in relevant_images:
img_name = os.path.basename(img_path)
print(f"Analyse approfondie de l'image: {img_name}")
try:
analysis_result = image_analyser.executer(img_path, contexte=ticket_analysis)
print(f" => Analyse terminée: {len(analysis_result.get('analyse', '')) if analysis_result else 0} caractères")
except Exception as e:
logger.error(f"Erreur lors de l'analyse de l'image {img_name}: {e}")
print(f"ERREUR: Impossible d'analyser l'image {img_name}: {e}")
else:
print("Aucune image pertinente à analyser")
print(f"\nProcessus d'analyse terminé avec succès.")
if __name__ == "__main__":
print("Démarrage du test d'analyse d'images contextualisée")
# Vérifier si un ID de ticket est passé en argument
ticket_id = None
if len(sys.argv) > 1:
ticket_id = sys.argv[1]
print(f"ID de ticket fourni en argument: {ticket_id}")
else:
print("ERREUR: Vous devez fournir un ID de ticket comme argument.")
sys.exit(1)
test_image_analyse(ticket_id)
print("Test terminé")