llm_ticket3/test_image_analyser.py
2025-04-17 08:48:26 +02:00

244 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour tester directement l'agent d'analyse d'images (AgentImageAnalyser) avec Pixtral-Large.
Permet d'obtenir une analyse détaillée d'images pertinentes pour un ticket.
"""
import os
import sys
import json
import argparse
import glob
from typing import List, Optional
from llm_classes.pixtral_large import PixtralLarge
from agents.pixtral_large.agent_image_analyser import AgentImageAnalyser
from utils.image_dedup import filtrer_images_uniques
def get_images_in_extraction(extraction_path: str) -> List[str]:
"""
Récupère toutes les images dans un répertoire d'extraction
Args:
extraction_path: Chemin vers le répertoire d'extraction
Returns:
Liste des chemins d'accès aux images
"""
# Extensions d'images courantes
image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.gif', '*.webp']
# Liste pour stocker les chemins d'images
images = []
# Chercher dans le répertoire principal
for extension in image_extensions:
pattern = os.path.join(extraction_path, extension)
images.extend(glob.glob(pattern))
# Chercher dans le sous-répertoire "attachments"
attachments_path = os.path.join(extraction_path, "attachments")
if os.path.exists(attachments_path) and os.path.isdir(attachments_path):
for extension in image_extensions:
pattern = os.path.join(attachments_path, extension)
images.extend(glob.glob(pattern))
# Chercher dans les sous-répertoires de "attachments" (si des ID sont utilisés)
for subdir in os.listdir(attachments_path):
subdir_path = os.path.join(attachments_path, subdir)
if os.path.isdir(subdir_path):
for extension in image_extensions:
pattern = os.path.join(subdir_path, extension)
images.extend(glob.glob(pattern))
return images
def extraire_contexte_ticket(extraction_path: str) -> str:
"""
Extrait le contexte du ticket à partir des fichiers disponibles
Args:
extraction_path: Chemin vers le répertoire d'extraction
Returns:
Contexte du ticket sous forme de texte
"""
# Essayer d'abord de lire le all_messages.json
all_messages_path = os.path.join(extraction_path, "all_messages.json")
if os.path.exists(all_messages_path):
try:
with open(all_messages_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Extraire le résumé du ticket
ticket_summary = data.get("ticket_summary", {})
name = ticket_summary.get("name", "")
description = ticket_summary.get("description", "")
# Construire un contexte concis
contexte = f"Titre du ticket: {name}\n\nDescription: {description}\n\n"
# Ajouter les 3 premiers messages pour plus de contexte
messages = data.get("messages", [])
if messages:
contexte += "Extraits des premiers messages:\n"
for i, msg in enumerate(messages[:3]):
body = msg.get("body", "")
if body:
# Limiter la taille de chaque message
if len(body) > 500:
body = body[:500] + "..."
contexte += f"- Message {i+1}: {body}\n\n"
return contexte
except Exception as e:
print(f"Erreur lors de la lecture du fichier all_messages.json: {str(e)}")
# En cas d'échec, essayer avec le ticket_summary.json
summary_path = os.path.join(extraction_path, "ticket_summary.json")
if os.path.exists(summary_path):
try:
with open(summary_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Extraire les informations de base
name = data.get("name", "")
description = data.get("description", "")
return f"Titre du ticket: {name}\n\nDescription: {description}"
except Exception as e:
print(f"Erreur lors de la lecture du fichier ticket_summary.json: {str(e)}")
# En dernier recours
return "Contexte du ticket non disponible. Analysez uniquement le contenu de l'image."
def main():
# Configuration de l'analyseur d'arguments
parser = argparse.ArgumentParser(description="Tester l'agent d'analyse d'images directement.")
parser.add_argument("ticket_id", help="ID du ticket à analyser (ex: T1234)")
parser.add_argument("--image", help="Chemin spécifique vers une image à analyser")
parser.add_argument("--output_dir", default="output", help="Répertoire de sortie contenant les tickets")
parser.add_argument("--save", action="store_true", help="Sauvegarder les résultats d'analyse dans un fichier")
parser.add_argument("--no-dedup", action="store_true", help="Désactiver le préfiltrage des doublons")
parser.add_argument("--seuil", type=int, default=5, help="Seuil de similarité pour détecter les doublons (0-10, défaut=5)")
parser.add_argument("--verbose", action="store_true", help="Afficher des informations détaillées")
# Analyser les arguments
args = parser.parse_args()
# Construire le chemin vers le ticket
ticket_path = os.path.join(args.output_dir, f"ticket_{args.ticket_id}")
# Vérifier que le répertoire du ticket existe
if not os.path.exists(ticket_path):
print(f"ERREUR: Le ticket {args.ticket_id} n'existe pas dans {args.output_dir}")
return 1
# Rechercher la dernière extraction (la plus récente)
extractions = [d for d in os.listdir(ticket_path) if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(args.ticket_id)]
if not extractions:
print(f"ERREUR: Aucune extraction trouvée pour le ticket {args.ticket_id}")
return 1
# Trier par ordre décroissant pour avoir la plus récente en premier
extractions.sort(reverse=True)
latest_extraction = extractions[0]
extraction_path = os.path.join(ticket_path, latest_extraction)
print(f"Utilisation de l'extraction: {latest_extraction}")
# Extraire le contexte du ticket
contexte = extraire_contexte_ticket(extraction_path)
print(f"Contexte du ticket extrait ({len(contexte)} caractères)")
# Initialiser le modèle Pixtral-Large
try:
print("Initialisation du modèle Pixtral-Large...")
model = PixtralLarge()
agent = AgentImageAnalyser(model)
print("Agent d'analyse d'images initialisé avec succès")
except Exception as e:
print(f"ERREUR: Impossible d'initialiser le modèle: {str(e)}")
return 1
# Si une image spécifique est fournie
if args.image:
image_path = args.image
if not os.path.exists(image_path):
print(f"ERREUR: L'image spécifiée n'existe pas: {image_path}")
return 1
print(f"Analyse de l'image: {os.path.basename(image_path)}")
resultat = agent.executer(image_path, contexte)
# Afficher le résultat
print("\nRésultat de l'analyse:")
print(resultat.get('analyse', 'Analyse non disponible'))
# Sauvegarder le résultat si demandé
if args.save:
output_file = os.path.join(extraction_path, f"analyse_{os.path.basename(image_path)}.txt")
with open(output_file, 'w', encoding='utf-8') as f:
f.write(resultat.get('analyse', 'Analyse non disponible'))
print(f"\nAnalyse sauvegardée dans: {output_file}")
else:
# Récupérer toutes les images de l'extraction
images = get_images_in_extraction(extraction_path)
if not images:
print(f"Aucune image trouvée dans l'extraction {latest_extraction}")
return 1
print(f"Nombre d'images trouvées: {len(images)}")
# Appliquer le préfiltrage de doublons si activé
if not args.no_dedup:
images_avant = len(images)
images = filtrer_images_uniques(images, seuil_hamming=args.seuil)
images_apres = len(images)
if images_avant > images_apres:
print(f"Préfiltrage des doublons: {images_avant}{images_apres} images ({images_avant - images_apres} doublons supprimés)")
else:
print("Préfiltrage terminé: aucun doublon détecté")
# Analyser chaque image
results = []
for image_path in images:
image_name = os.path.basename(image_path)
print(f"\nAnalyse de l'image: {image_name}")
resultat = agent.executer(image_path, contexte)
results.append(resultat)
# Afficher le résultat
print("\nRésultat de l'analyse:")
if args.verbose:
print(resultat.get('analyse', 'Analyse non disponible'))
else:
# Afficher seulement les premières lignes pour un aperçu
analyse = resultat.get('analyse', 'Analyse non disponible')
lignes = analyse.split('\n')
apercu = '\n'.join(lignes[:min(5, len(lignes))]) + ('\n...' if len(lignes) > 5 else '')
print(apercu)
# Sauvegarder le résultat si demandé
if args.save:
output_file = os.path.join(extraction_path, f"analyse_{image_name}.txt")
with open(output_file, 'w', encoding='utf-8') as f:
f.write(resultat.get('analyse', 'Analyse non disponible'))
print(f"Analyse sauvegardée dans: {output_file}")
# Afficher un résumé à la fin
print(f"\nRésumé: {len(results)} images analysées")
if args.save:
print(f"Les résultats ont été sauvegardés dans le répertoire: {extraction_path}")
return 0
if __name__ == "__main__":
sys.exit(main())