#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script de test pour vérifier le tri d'images avec llama-vision après les corrections. """ import os import sys import json import logging from typing import List, Dict, Any from pprint import pprint # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('TestTriImages') def verifier_rapport_deduplication(ticket_id: str) -> List[str]: """ Vérifie le rapport de déduplication pour extraire les chemins des images uniques. Args: ticket_id: ID du ticket à analyser Returns: Liste des chemins d'images uniques """ # Déterminer le répertoire du ticket base_dir = "output" ticket_dir = f"ticket_{ticket_id}" ticket_path = os.path.join(base_dir, ticket_dir) # Trouver la dernière extraction extractions = [] for extract in os.listdir(ticket_path): extraction_path = os.path.join(ticket_path, extract) if os.path.isdir(extraction_path) and extract.startswith(ticket_id): extractions.append(extraction_path) # Trier par date (plus récente en premier) extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True) latest_extraction = extractions[0] # Chemin du rapport de déduplication pipeline_dir = os.path.join(latest_extraction, f"{ticket_id}_rapports", "pipeline") dedup_path = os.path.join(pipeline_dir, "rapport_de_deduplication.json") if not os.path.exists(dedup_path): logger.error(f"Rapport de déduplication introuvable: {dedup_path}") return [] # Charger le rapport with open(dedup_path, 'r', encoding='utf-8') as f: dedup_data = json.load(f) # Extraire les images uniques images_uniques = [item['image_path'] for item in dedup_data if item.get('status') == 'unique'] logger.info(f"Trouvé {len(images_uniques)} images uniques sur {len(dedup_data)} total") return images_uniques def verifier_resultats_tri(ticket_id: str) -> Dict[str, Any]: """ Vérifie les résultats du tri d'images. Args: ticket_id: ID du ticket à analyser Returns: Dictionnaire avec les statistiques des résultats """ # Déterminer le répertoire du ticket base_dir = "output" ticket_dir = f"ticket_{ticket_id}" ticket_path = os.path.join(base_dir, ticket_dir) # Trouver la dernière extraction extractions = [] for extract in os.listdir(ticket_path): extraction_path = os.path.join(ticket_path, extract) if os.path.isdir(extraction_path) and extract.startswith(ticket_id): extractions.append(extraction_path) # Trier par date (plus récente en premier) extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True) latest_extraction = extractions[0] # Chemin des résultats de tri pipeline_dir = os.path.join(latest_extraction, f"{ticket_id}_rapports", "pipeline") # Trouver le fichier de résultats de tri tri_files = [f for f in os.listdir(pipeline_dir) if f.startswith("tri_image_") and f.endswith("_results.json")] if not tri_files: logger.error(f"Aucun fichier de résultats de tri trouvé dans {pipeline_dir}") return {} # Charger le fichier le plus récent tri_files.sort(key=lambda x: os.path.getmtime(os.path.join(pipeline_dir, x)), reverse=True) tri_path = os.path.join(pipeline_dir, tri_files[0]) with open(tri_path, 'r', encoding='utf-8') as f: tri_data = json.load(f) # Analyser les résultats images_analysees = [] pertinentes = 0 non_pertinentes = 0 if isinstance(tri_data, list): for item in tri_data: if "metadata" in item and "image_path" in item["metadata"]: images_analysees.append(item["metadata"]["image_path"]) if item.get("is_relevant", False): pertinentes += 1 else: non_pertinentes += 1 # Préparer les statistiques stats = { "images_analysees": len(images_analysees), "images_pertinentes": pertinentes, "images_non_pertinentes": non_pertinentes, "chemins_analyses": images_analysees, "fichier_resultats": tri_path } return stats def comparer_images(images_uniques: List[str], stats: Dict[str, Any]) -> None: """ Compare les images uniques avec celles analysées. Args: images_uniques: Liste des chemins d'images uniques stats: Statistiques des résultats de tri """ images_analysees = stats.get("chemins_analyses", []) # Vérifier si toutes les images uniques ont été analysées manquantes = set(images_uniques) - set(images_analysees) en_trop = set(images_analysees) - set(images_uniques) if manquantes: logger.warning(f"Images uniques non analysées ({len(manquantes)}):") for img in manquantes: logger.warning(f" - {os.path.basename(img)}") if en_trop: logger.warning(f"Images analysées qui ne sont pas uniques ({len(en_trop)}):") for img in en_trop: logger.warning(f" - {os.path.basename(img)}") if not manquantes and not en_trop: logger.info("Toutes les images uniques ont été correctement analysées") def main(): """ Fonction principale du script de test. """ if len(sys.argv) < 2: logger.error("Usage: python test_tri_images.py ") sys.exit(1) ticket_id = sys.argv[1] logger.info(f"Vérification du tri d'images pour le ticket {ticket_id}") # Vérifier le rapport de déduplication images_uniques = verifier_rapport_deduplication(ticket_id) if not images_uniques: logger.error("Aucune image unique trouvée, impossible de continuer") sys.exit(1) # Vérifier les résultats du tri stats = verifier_resultats_tri(ticket_id) if not stats: logger.error("Aucun résultat de tri trouvé, impossible de continuer") sys.exit(1) # Afficher les résultats logger.info("Résumé du tri d'images:") logger.info(f" Images uniques: {len(images_uniques)}") logger.info(f" Images analysées: {stats['images_analysees']}") logger.info(f" Images pertinentes: {stats['images_pertinentes']}") logger.info(f" Images non pertinentes: {stats['images_non_pertinentes']}") # Comparer les images comparer_images(images_uniques, stats) return 0 if __name__ == "__main__": sys.exit(main())