llm_ticket3/agents/utils/pipeline_logger.py
2025-04-17 14:04:38 +02:00

166 lines
6.7 KiB
Python

import os
import json
from datetime import datetime
from typing import Dict, Any, Optional, Union
def determiner_repertoire_ticket(ticket_id: str) -> Optional[str]:
"""
Détermine dynamiquement le répertoire du ticket.
Args:
ticket_id: str, le code du ticket
Returns:
str, le chemin du répertoire pour ce ticket ou None si non trouvé
"""
# Base de recherche des tickets
output_dir = "output"
# Format attendu du répertoire de ticket
ticket_dir = f"ticket_{ticket_id}"
ticket_path = os.path.join(output_dir, ticket_dir)
if not os.path.exists(ticket_path):
print(f"Répertoire de ticket non trouvé: {ticket_path}")
return None
# Trouver la dernière extraction (par date)
extractions = []
for extraction in os.listdir(ticket_path):
extraction_path = os.path.join(ticket_path, extraction)
if os.path.isdir(extraction_path) and extraction.startswith(ticket_id):
extractions.append(extraction_path)
if not extractions:
print(f"Aucune extraction trouvée pour le ticket {ticket_id}")
return None
# Trier par date de modification (plus récente en premier)
extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True)
# Retourner le chemin de la dernière extraction
return extractions[0]
def extraire_ticket_id(data: Dict[str, Any]) -> Optional[str]:
"""
Extrait l'ID du ticket à partir des métadonnées ou du chemin de l'image.
Args:
data: dict, données contenant potentiellement des métadonnées avec l'ID du ticket
Returns:
str, l'ID du ticket extrait ou None si non trouvé
"""
if not data:
return None
# Essayer d'extraire le ticket_id des métadonnées
metadata = data.get("metadata", {})
image_path = metadata.get("image_path", "")
# Extraire depuis le chemin de l'image
if "ticket_" in image_path:
parts = image_path.split("ticket_")
if len(parts) > 1:
ticket_parts = parts[1].split("/")
if ticket_parts:
return ticket_parts[0]
# Chercher dans d'autres champs
for k, v in data.items():
if isinstance(v, str) and "ticket_" in v.lower():
parts = v.lower().split("ticket_")
if len(parts) > 1:
ticket_parts = parts[1].split("/")
if ticket_parts and ticket_parts[0].startswith("t"):
return ticket_parts[0].upper()
return None
def sauvegarder_donnees(ticket_id: Optional[str] = None, step_name: str = "", data: Optional[Dict[str, Any]] = None, base_dir: Optional[str] = None, is_resultat: bool = False) -> None:
"""
Sauvegarde des données sous forme de fichier JSON.
Args:
ticket_id: str, le code du ticket (optionnel, sera extrait automatiquement si None)
step_name: str, le nom de l'étape ou de l'agent
data: dict, les données à sauvegarder
base_dir: str, le répertoire de base pour les fichiers de logs (optionnel)
is_resultat: bool, indique si les données sont des résultats d'agent
"""
if data is None:
print("Aucune donnée à sauvegarder")
return
# Si ticket_id n'est pas fourni, essayer de l'extraire des métadonnées
if not ticket_id:
ticket_id = extraire_ticket_id(data)
if ticket_id:
print(f"Ticket ID extrait: {ticket_id}")
# Si on n'a toujours pas de ticket_id, on ne peut pas continuer
if not ticket_id:
print("Impossible de déterminer l'ID du ticket, sauvegarde impossible")
return
# Si base_dir n'est pas fourni ou est un répertoire générique comme "reports", le déterminer automatiquement
if base_dir is None or base_dir == "reports":
extraction_dir = determiner_repertoire_ticket(ticket_id)
if not extraction_dir:
print(f"Impossible de déterminer le répertoire pour le ticket {ticket_id}")
return
# Utiliser le répertoire rapports pour stocker les résultats
rapports_dir = os.path.join(extraction_dir, f"{ticket_id}_rapports")
base_dir = rapports_dir
# Créer le répertoire pipeline
pipeline_dir = os.path.join(base_dir, "pipeline")
os.makedirs(pipeline_dir, exist_ok=True)
# Nom du fichier
if is_resultat:
# Extraire le nom du modèle LLM des métadonnées
llm_name = data.get("metadata", {}).get("model_info", {}).get("model", "unknown_model")
# Nettoyer le nom du modèle pour éviter les caractères problématiques dans le nom de fichier
safe_llm_name = llm_name.lower().replace("/", "_").replace(" ", "_")
file_name = f"{step_name}_{safe_llm_name}_results.json"
else:
file_name = f"{step_name}.json"
file_path = os.path.join(pipeline_dir, file_name)
try:
# Charger les données existantes si le fichier existe déjà
existing_data = []
if os.path.exists(file_path):
try:
with open(file_path, "r", encoding="utf-8") as f:
file_content = f.read().strip()
if file_content: # Vérifier que le fichier n'est pas vide
existing_data = json.loads(file_content)
# Si les données existantes ne sont pas une liste, les convertir en liste
if not isinstance(existing_data, list):
existing_data = [existing_data]
except json.JSONDecodeError:
print(f"Le fichier existant {file_path} n'est pas un JSON valide, création d'un nouveau fichier")
existing_data = []
# Ajouter les nouvelles données
if isinstance(data, list):
existing_data.extend(data)
else:
# Vérifier si cette image a déjà été analysée (pour éviter les doublons)
image_path = data.get("metadata", {}).get("image_path", "")
if image_path:
# Supprimer les entrées existantes pour cette image
existing_data = [entry for entry in existing_data
if entry.get("metadata", {}).get("image_path", "") != image_path]
existing_data.append(data)
# Sauvegarder les données combinées
with open(file_path, "w", encoding="utf-8") as f:
json.dump(existing_data, f, indent=2, ensure_ascii=False)
print(f"Données sauvegardées dans {file_path} ({len(existing_data)} entrées)")
except Exception as e:
print(f"Erreur lors de la sauvegarde des données: {e}")