mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-13 10:46:51 +01:00
152 lines
6.1 KiB
Python
152 lines
6.1 KiB
Python
import os
|
|
from PIL import Image
|
|
import imagehash
|
|
from typing import List, Dict, Optional
|
|
import json
|
|
|
|
def determiner_repertoire_ticket(ticket_id: str) -> Optional[str]:
|
|
"""
|
|
Détermine dynamiquement le répertoire du ticket.
|
|
|
|
Args:
|
|
ticket_id: str, le code du ticket
|
|
|
|
Returns:
|
|
str, le chemin du répertoire pour ce ticket ou None si non trouvé
|
|
"""
|
|
# Base de recherche des tickets
|
|
output_dir = "output"
|
|
|
|
# Format attendu du répertoire de ticket
|
|
ticket_dir = f"ticket_{ticket_id}"
|
|
ticket_path = os.path.join(output_dir, ticket_dir)
|
|
|
|
if not os.path.exists(ticket_path):
|
|
print(f"Répertoire de ticket non trouvé: {ticket_path}")
|
|
return None
|
|
|
|
# Trouver la dernière extraction (par date)
|
|
extractions = []
|
|
for extraction in os.listdir(ticket_path):
|
|
extraction_path = os.path.join(ticket_path, extraction)
|
|
if os.path.isdir(extraction_path) and extraction.startswith(ticket_id):
|
|
extractions.append(extraction_path)
|
|
|
|
if not extractions:
|
|
print(f"Aucune extraction trouvée pour le ticket {ticket_id}")
|
|
return None
|
|
|
|
# Trier par date de modification (plus récente en premier)
|
|
extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True)
|
|
|
|
# Retourner le chemin de la dernière extraction
|
|
return extractions[0]
|
|
|
|
def filtrer_images_uniques(image_paths: List[str], seuil_hamming: int = 5, ticket_id: str = "") -> List[str]:
|
|
"""
|
|
Supprime les doublons perceptuels parmi une liste d'images et sauvegarde les résultats.
|
|
|
|
Args:
|
|
image_paths: Liste des chemins vers les images.
|
|
seuil_hamming: Distance de Hamming maximale pour considérer deux images comme identiques.
|
|
ticket_id: ID du ticket pour le nom du fichier de rapport.
|
|
|
|
Returns:
|
|
Liste de chemins d'images considérées comme uniques.
|
|
(Les images avec noms/format les plus courts sont conservées)
|
|
"""
|
|
images_uniques = []
|
|
hashes: Dict[str, str] = {}
|
|
rapport = []
|
|
|
|
# Trier les chemins pour privilégier les noms courts et formats préférés
|
|
def cle_pertinence(p: str) -> tuple:
|
|
ext = os.path.splitext(p)[1].lower()
|
|
poids_format = {'.png': 0, '.jpeg': 1, '.jpg': 2, '.bmp': 3, '.gif': 4}
|
|
return (poids_format.get(ext, 99), len(os.path.basename(p)))
|
|
|
|
for path in sorted(image_paths, key=cle_pertinence):
|
|
try:
|
|
with Image.open(path) as img:
|
|
img_hash = str(imagehash.phash(img))
|
|
|
|
doublon = False
|
|
for existing_hash in hashes.values():
|
|
if imagehash.hex_to_hash(existing_hash) - imagehash.hex_to_hash(img_hash) <= seuil_hamming:
|
|
doublon = True
|
|
break
|
|
|
|
if not doublon:
|
|
hashes[path] = img_hash
|
|
images_uniques.append(path)
|
|
rapport.append({"image_path": path, "status": "unique"})
|
|
else:
|
|
rapport.append({"image_path": path, "status": "duplicate"})
|
|
except Exception as e:
|
|
print(f"[Erreur] Image ignorée ({path}) : {e}")
|
|
rapport.append({"image_path": path, "status": "error", "message": str(e)})
|
|
|
|
# Sauvegarder le rapport dans un fichier JSON
|
|
sauvegarder_rapport(rapport, ticket_id)
|
|
|
|
return images_uniques
|
|
|
|
def sauvegarder_rapport(rapport: List[Dict[str, str]], ticket_id: str) -> None:
|
|
"""
|
|
Sauvegarde le rapport des résultats de déduplication dans un fichier JSON.
|
|
|
|
Args:
|
|
rapport: Liste des résultats de déduplication.
|
|
ticket_id: ID du ticket pour le nom du fichier de rapport.
|
|
"""
|
|
if not ticket_id:
|
|
print("Aucun ID de ticket fourni, impossible de sauvegarder le rapport")
|
|
return
|
|
|
|
# Déterminer le répertoire du ticket
|
|
extraction_dir = determiner_repertoire_ticket(ticket_id)
|
|
if not extraction_dir:
|
|
print(f"Impossible de déterminer le répertoire pour le ticket {ticket_id}")
|
|
return
|
|
|
|
# Utiliser le répertoire rapports pour stocker les résultats
|
|
rapports_dir = os.path.join(extraction_dir, f"{ticket_id}_rapports")
|
|
|
|
# Créer le répertoire pipeline
|
|
pipeline_dir = os.path.join(rapports_dir, "pipeline")
|
|
os.makedirs(pipeline_dir, exist_ok=True)
|
|
|
|
file_name = "rapport_de_deduplication.json"
|
|
file_path = os.path.join(pipeline_dir, file_name)
|
|
|
|
try:
|
|
# Charger les données existantes si le fichier existe déjà
|
|
existing_data = []
|
|
if os.path.exists(file_path):
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
file_content = f.read().strip()
|
|
if file_content: # Vérifier que le fichier n'est pas vide
|
|
existing_data = json.loads(file_content)
|
|
# Si les données existantes ne sont pas une liste, les convertir en liste
|
|
if not isinstance(existing_data, list):
|
|
existing_data = [existing_data]
|
|
except json.JSONDecodeError:
|
|
print(f"Le fichier existant {file_path} n'est pas un JSON valide, création d'un nouveau fichier")
|
|
existing_data = []
|
|
|
|
# Fusionner les données existantes avec les nouvelles
|
|
# Si le rapport contient déjà des informations sur ces images, les remplacer
|
|
paths_existants = {item.get("image_path", "") for item in existing_data}
|
|
for item in rapport:
|
|
if item.get("image_path", "") in paths_existants:
|
|
# Supprimer l'ancien élément
|
|
existing_data = [entry for entry in existing_data
|
|
if entry.get("image_path", "") != item.get("image_path", "")]
|
|
existing_data.append(item)
|
|
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
|
json.dump(existing_data, f, indent=2, ensure_ascii=False)
|
|
print(f"Rapport de déduplication sauvegardé dans {file_path} ({len(existing_data)} entrées)")
|
|
except Exception as e:
|
|
print(f"Erreur lors de la sauvegarde du rapport: {e}") |