import os from PIL import Image import imagehash from typing import List, Dict, Optional import json def determiner_repertoire_ticket(ticket_id: str) -> Optional[str]: """ Détermine dynamiquement le répertoire du ticket. Args: ticket_id: str, le code du ticket Returns: str, le chemin du répertoire pour ce ticket ou None si non trouvé """ # Base de recherche des tickets output_dir = "output" # Format attendu du répertoire de ticket ticket_dir = f"ticket_{ticket_id}" ticket_path = os.path.join(output_dir, ticket_dir) if not os.path.exists(ticket_path): print(f"Répertoire de ticket non trouvé: {ticket_path}") return None # Trouver la dernière extraction (par date) extractions = [] for extraction in os.listdir(ticket_path): extraction_path = os.path.join(ticket_path, extraction) if os.path.isdir(extraction_path) and extraction.startswith(ticket_id): extractions.append(extraction_path) if not extractions: print(f"Aucune extraction trouvée pour le ticket {ticket_id}") return None # Trier par date de modification (plus récente en premier) extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True) # Retourner le chemin de la dernière extraction return extractions[0] def filtrer_images_uniques(image_paths: List[str], seuil_hamming: int = 5, ticket_id: str = "") -> List[str]: """ Supprime les doublons perceptuels parmi une liste d'images et sauvegarde les résultats. Args: image_paths: Liste des chemins vers les images. seuil_hamming: Distance de Hamming maximale pour considérer deux images comme identiques. ticket_id: ID du ticket pour le nom du fichier de rapport. Returns: Liste de chemins d'images considérées comme uniques. (Les images avec noms/format les plus courts sont conservées) """ images_uniques = [] hashes: Dict[str, str] = {} rapport = [] # Trier les chemins pour privilégier les noms courts et formats préférés def cle_pertinence(p: str) -> tuple: ext = os.path.splitext(p)[1].lower() poids_format = {'.png': 0, '.jpeg': 1, '.jpg': 2, '.bmp': 3, '.gif': 4} return (poids_format.get(ext, 99), len(os.path.basename(p))) for path in sorted(image_paths, key=cle_pertinence): try: with Image.open(path) as img: img_hash = str(imagehash.phash(img)) doublon = False for existing_hash in hashes.values(): if imagehash.hex_to_hash(existing_hash) - imagehash.hex_to_hash(img_hash) <= seuil_hamming: doublon = True break if not doublon: hashes[path] = img_hash images_uniques.append(path) rapport.append({"image_path": path, "status": "unique"}) else: rapport.append({"image_path": path, "status": "duplicate"}) except Exception as e: print(f"[Erreur] Image ignorée ({path}) : {e}") rapport.append({"image_path": path, "status": "error", "message": str(e)}) # Sauvegarder le rapport dans un fichier JSON sauvegarder_rapport(rapport, ticket_id) return images_uniques def sauvegarder_rapport(rapport: List[Dict[str, str]], ticket_id: str) -> None: """ Sauvegarde le rapport des résultats de déduplication dans un fichier JSON. Args: rapport: Liste des résultats de déduplication. ticket_id: ID du ticket pour le nom du fichier de rapport. """ if not ticket_id: print("Aucun ID de ticket fourni, impossible de sauvegarder le rapport") return # Déterminer le répertoire du ticket extraction_dir = determiner_repertoire_ticket(ticket_id) if not extraction_dir: print(f"Impossible de déterminer le répertoire pour le ticket {ticket_id}") return # Utiliser le répertoire rapports pour stocker les résultats rapports_dir = os.path.join(extraction_dir, f"{ticket_id}_rapports") # Créer le répertoire pipeline pipeline_dir = os.path.join(rapports_dir, "pipeline") os.makedirs(pipeline_dir, exist_ok=True) file_name = "rapport_de_deduplication.json" file_path = os.path.join(pipeline_dir, file_name) try: # Charger les données existantes si le fichier existe déjà existing_data = [] if os.path.exists(file_path): try: with open(file_path, "r", encoding="utf-8") as f: file_content = f.read().strip() if file_content: # Vérifier que le fichier n'est pas vide existing_data = json.loads(file_content) # Si les données existantes ne sont pas une liste, les convertir en liste if not isinstance(existing_data, list): existing_data = [existing_data] except json.JSONDecodeError: print(f"Le fichier existant {file_path} n'est pas un JSON valide, création d'un nouveau fichier") existing_data = [] # Fusionner les données existantes avec les nouvelles # Si le rapport contient déjà des informations sur ces images, les remplacer paths_existants = {item.get("image_path", "") for item in existing_data} for item in rapport: if item.get("image_path", "") in paths_existants: # Supprimer l'ancien élément existing_data = [entry for entry in existing_data if entry.get("image_path", "") != item.get("image_path", "")] existing_data.append(item) with open(file_path, "w", encoding="utf-8") as f: json.dump(existing_data, f, indent=2, ensure_ascii=False) print(f"Rapport de déduplication sauvegardé dans {file_path} ({len(existing_data)} entrées)") except Exception as e: print(f"Erreur lors de la sauvegarde du rapport: {e}")