llm_ticket3/test_corrections/test_tri_images.py
2025-04-23 11:27:45 +02:00

199 lines
6.6 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de test pour vérifier le tri d'images avec llama-vision après les corrections.
"""
import os
import sys
import json
import logging
from typing import List, Dict, Any
from pprint import pprint
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('TestTriImages')
def verifier_rapport_deduplication(ticket_id: str) -> List[str]:
"""
Vérifie le rapport de déduplication pour extraire les chemins des images uniques.
Args:
ticket_id: ID du ticket à analyser
Returns:
Liste des chemins d'images uniques
"""
# Déterminer le répertoire du ticket
base_dir = "output"
ticket_dir = f"ticket_{ticket_id}"
ticket_path = os.path.join(base_dir, ticket_dir)
# Trouver la dernière extraction
extractions = []
for extract in os.listdir(ticket_path):
extraction_path = os.path.join(ticket_path, extract)
if os.path.isdir(extraction_path) and extract.startswith(ticket_id):
extractions.append(extraction_path)
# Trier par date (plus récente en premier)
extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True)
latest_extraction = extractions[0]
# Chemin du rapport de déduplication
pipeline_dir = os.path.join(latest_extraction, f"{ticket_id}_rapports", "pipeline")
dedup_path = os.path.join(pipeline_dir, "rapport_de_deduplication.json")
if not os.path.exists(dedup_path):
logger.error(f"Rapport de déduplication introuvable: {dedup_path}")
return []
# Charger le rapport
with open(dedup_path, 'r', encoding='utf-8') as f:
dedup_data = json.load(f)
# Extraire les images uniques
images_uniques = [item['image_path'] for item in dedup_data if item.get('status') == 'unique']
logger.info(f"Trouvé {len(images_uniques)} images uniques sur {len(dedup_data)} total")
return images_uniques
def verifier_resultats_tri(ticket_id: str) -> Dict[str, Any]:
"""
Vérifie les résultats du tri d'images.
Args:
ticket_id: ID du ticket à analyser
Returns:
Dictionnaire avec les statistiques des résultats
"""
# Déterminer le répertoire du ticket
base_dir = "output"
ticket_dir = f"ticket_{ticket_id}"
ticket_path = os.path.join(base_dir, ticket_dir)
# Trouver la dernière extraction
extractions = []
for extract in os.listdir(ticket_path):
extraction_path = os.path.join(ticket_path, extract)
if os.path.isdir(extraction_path) and extract.startswith(ticket_id):
extractions.append(extraction_path)
# Trier par date (plus récente en premier)
extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True)
latest_extraction = extractions[0]
# Chemin des résultats de tri
pipeline_dir = os.path.join(latest_extraction, f"{ticket_id}_rapports", "pipeline")
# Trouver le fichier de résultats de tri
tri_files = [f for f in os.listdir(pipeline_dir) if f.startswith("tri_image_") and f.endswith("_results.json")]
if not tri_files:
logger.error(f"Aucun fichier de résultats de tri trouvé dans {pipeline_dir}")
return {}
# Charger le fichier le plus récent
tri_files.sort(key=lambda x: os.path.getmtime(os.path.join(pipeline_dir, x)), reverse=True)
tri_path = os.path.join(pipeline_dir, tri_files[0])
with open(tri_path, 'r', encoding='utf-8') as f:
tri_data = json.load(f)
# Analyser les résultats
images_analysees = []
pertinentes = 0
non_pertinentes = 0
if isinstance(tri_data, list):
for item in tri_data:
if "metadata" in item and "image_path" in item["metadata"]:
images_analysees.append(item["metadata"]["image_path"])
if item.get("is_relevant", False):
pertinentes += 1
else:
non_pertinentes += 1
# Préparer les statistiques
stats = {
"images_analysees": len(images_analysees),
"images_pertinentes": pertinentes,
"images_non_pertinentes": non_pertinentes,
"chemins_analyses": images_analysees,
"fichier_resultats": tri_path
}
return stats
def comparer_images(images_uniques: List[str], stats: Dict[str, Any]) -> None:
"""
Compare les images uniques avec celles analysées.
Args:
images_uniques: Liste des chemins d'images uniques
stats: Statistiques des résultats de tri
"""
images_analysees = stats.get("chemins_analyses", [])
# Vérifier si toutes les images uniques ont été analysées
manquantes = set(images_uniques) - set(images_analysees)
en_trop = set(images_analysees) - set(images_uniques)
if manquantes:
logger.warning(f"Images uniques non analysées ({len(manquantes)}):")
for img in manquantes:
logger.warning(f" - {os.path.basename(img)}")
if en_trop:
logger.warning(f"Images analysées qui ne sont pas uniques ({len(en_trop)}):")
for img in en_trop:
logger.warning(f" - {os.path.basename(img)}")
if not manquantes and not en_trop:
logger.info("Toutes les images uniques ont été correctement analysées")
def main():
"""
Fonction principale du script de test.
"""
if len(sys.argv) < 2:
logger.error("Usage: python test_tri_images.py <ticket_id>")
sys.exit(1)
ticket_id = sys.argv[1]
logger.info(f"Vérification du tri d'images pour le ticket {ticket_id}")
# Vérifier le rapport de déduplication
images_uniques = verifier_rapport_deduplication(ticket_id)
if not images_uniques:
logger.error("Aucune image unique trouvée, impossible de continuer")
sys.exit(1)
# Vérifier les résultats du tri
stats = verifier_resultats_tri(ticket_id)
if not stats:
logger.error("Aucun résultat de tri trouvé, impossible de continuer")
sys.exit(1)
# Afficher les résultats
logger.info("Résumé du tri d'images:")
logger.info(f" Images uniques: {len(images_uniques)}")
logger.info(f" Images analysées: {stats['images_analysees']}")
logger.info(f" Images pertinentes: {stats['images_pertinentes']}")
logger.info(f" Images non pertinentes: {stats['images_non_pertinentes']}")
# Comparer les images
comparer_images(images_uniques, stats)
return 0
if __name__ == "__main__":
sys.exit(main())