import os import json from datetime import datetime from typing import Dict, Any, Optional, Union def determiner_repertoire_ticket(ticket_id: str) -> Optional[str]: """ Détermine dynamiquement le répertoire du ticket. Args: ticket_id: str, le code du ticket Returns: str, le chemin du répertoire pour ce ticket ou None si non trouvé """ # Base de recherche des tickets output_dir = "output" # Format attendu du répertoire de ticket ticket_dir = f"ticket_{ticket_id}" ticket_path = os.path.join(output_dir, ticket_dir) if not os.path.exists(ticket_path): print(f"Répertoire de ticket non trouvé: {ticket_path}") return None # Trouver la dernière extraction (par date) extractions = [] for extraction in os.listdir(ticket_path): extraction_path = os.path.join(ticket_path, extraction) if os.path.isdir(extraction_path) and extraction.startswith(ticket_id): extractions.append(extraction_path) if not extractions: print(f"Aucune extraction trouvée pour le ticket {ticket_id}") return None # Trier par date de modification (plus récente en premier) extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True) # Retourner le chemin de la dernière extraction return extractions[0] def extraire_ticket_id(data: Dict[str, Any]) -> Optional[str]: """ Extrait l'ID du ticket à partir des métadonnées ou du chemin de l'image. Args: data: dict, données contenant potentiellement des métadonnées avec l'ID du ticket Returns: str, l'ID du ticket extrait ou None si non trouvé """ if not data: return None # Essayer d'extraire le ticket_id des métadonnées metadata = data.get("metadata", {}) image_path = metadata.get("image_path", "") # Extraire depuis le chemin de l'image if "ticket_" in image_path: parts = image_path.split("ticket_") if len(parts) > 1: ticket_parts = parts[1].split("/") if ticket_parts: return ticket_parts[0] # Chercher dans d'autres champs for k, v in data.items(): if isinstance(v, str) and "ticket_" in v.lower(): parts = v.lower().split("ticket_") if len(parts) > 1: ticket_parts = parts[1].split("/") if ticket_parts and ticket_parts[0].startswith("t"): return ticket_parts[0].upper() return None def generer_version_texte(data: Union[Dict[str, Any], list], ticket_id: str, step_name: str, file_path: str) -> None: """ Génère une version texte lisible du fichier JSON pour faciliter la revue humaine. Args: data: Données à convertir en texte lisible ticket_id: ID du ticket step_name: Nom de l'étape ou de l'agent file_path: Chemin du fichier JSON """ try: # Créer le chemin du fichier texte txt_path = file_path.replace('.json', '.txt') with open(txt_path, 'w', encoding='utf-8') as f: f.write(f"RÉSULTATS DE L'ANALYSE {step_name.upper()} - TICKET {ticket_id}\n") f.write("="*80 + "\n\n") # Si c'est une liste, traiter chaque élément if isinstance(data, list): for i, item in enumerate(data, 1): f.write(f"--- ÉLÉMENT {i} ---\n\n") # Extraire et écrire la réponse si présente if isinstance(item, dict): # Priorité 1: Champ "response" (pour les rapports) if "response" in item: f.write(f"{item['response']}\n\n") # Priorité 2: Champ "analyse" (pour les analyses d'images) elif "analyse" in item and isinstance(item["analyse"], str): f.write(f"{item['analyse']}\n\n") # Priorité 3: Champ "raw_response" (pour les analyses brutes) elif "raw_response" in item and isinstance(item["raw_response"], str): f.write(f"{item['raw_response']}\n\n") # Priorité 4: Structure imbriquée d'analyse elif "analysis" in item: if isinstance(item["analysis"], dict): if "analyse" in item["analysis"]: f.write(f"{item['analysis']['analyse']}\n\n") elif "raw_response" in item["analysis"]: f.write(f"{item['analysis']['raw_response']}\n\n") # Priorité 5: Pour le tri d'images elif "is_relevant" in item: f.write(f"Image pertinente: {item['is_relevant']}\n") if "reason" in item: f.write(f"Raison: {item['reason']}\n\n") # Si aucun contenu n'a été trouvé, afficher des informations de base else: f.write("Aucun contenu d'analyse trouvé.\n\n") # Ajouter des métadonnées si disponibles if "metadata" in item: meta = item["metadata"] if "image_name" in meta: f.write(f"Image: {meta.get('image_name', 'N/A')}\n") if "timestamp" in meta: f.write(f"Horodatage: {meta.get('timestamp', 'N/A')}\n") if "source_agent" in meta: f.write(f"Agent source: {meta.get('source_agent', 'N/A')}\n") else: f.write(str(item) + "\n\n") f.write("-"*40 + "\n\n") # Si c'est un dictionnaire unique elif isinstance(data, dict): # Même logique que ci-dessus, mais pour un seul élément if "response" in data: f.write(f"{data['response']}\n\n") elif "analyse" in data and isinstance(data["analyse"], str): f.write(f"{data['analyse']}\n\n") elif "raw_response" in data: f.write(f"{data['raw_response']}\n\n") elif "analysis" in data and isinstance(data["analysis"], dict): if "analyse" in data["analysis"]: f.write(f"{data['analysis']['analyse']}\n\n") elif "raw_response" in data["analysis"]: f.write(f"{data['analysis']['raw_response']}\n\n") elif "is_relevant" in data: f.write(f"Image pertinente: {data['is_relevant']}\n") if "reason" in data: f.write(f"Raison: {data['reason']}\n\n") else: # Sinon on écrit un format plus général for key, value in data.items(): if key != "prompt" and not isinstance(value, dict): f.write(f"{key}: {value}\n") f.write("\n" + "="*80 + "\n") f.write(f"Fichier original: {os.path.basename(file_path)}") print(f"Version texte générée dans {txt_path}") except Exception as e: print(f"Erreur lors de la génération de la version texte: {e}") def sauvegarder_donnees(ticket_id: Optional[str] = None, step_name: str = "", data: Optional[Dict[str, Any]] = None, base_dir: Optional[str] = None, is_resultat: bool = False) -> None: """ Sauvegarde des données sous forme de fichier JSON. Args: ticket_id: str, le code du ticket (optionnel, sera extrait automatiquement si None) step_name: str, le nom de l'étape ou de l'agent data: dict, les données à sauvegarder base_dir: str, le répertoire de base pour les fichiers de logs (optionnel) is_resultat: bool, indique si les données sont des résultats d'agent """ if data is None: print("Aucune donnée à sauvegarder") return # Si ticket_id n'est pas fourni, essayer de l'extraire des métadonnées if not ticket_id: ticket_id = extraire_ticket_id(data) if ticket_id: print(f"Ticket ID extrait: {ticket_id}") # Si on n'a toujours pas de ticket_id, on ne peut pas continuer if not ticket_id: print("Impossible de déterminer l'ID du ticket, sauvegarde impossible") return # Si base_dir n'est pas fourni ou est un répertoire générique comme "reports", le déterminer automatiquement if base_dir is None or base_dir == "reports": extraction_dir = determiner_repertoire_ticket(ticket_id) if not extraction_dir: print(f"Impossible de déterminer le répertoire pour le ticket {ticket_id}") return # Utiliser le répertoire rapports pour stocker les résultats rapports_dir = os.path.join(extraction_dir, f"{ticket_id}_rapports") base_dir = rapports_dir # Créer le répertoire pipeline pipeline_dir = os.path.join(base_dir, "pipeline") os.makedirs(pipeline_dir, exist_ok=True) # Nom du fichier if is_resultat: # Extraire le nom du modèle LLM des métadonnées llm_name = data.get("metadata", {}).get("model_info", {}).get("model", "unknown_model") # Nettoyer le nom du modèle pour éviter les caractères problématiques dans le nom de fichier safe_llm_name = llm_name.lower().replace("/", "_").replace(" ", "_") file_name = f"{step_name}_{safe_llm_name}_results.json" else: file_name = f"{step_name}.json" file_path = os.path.join(pipeline_dir, file_name) try: # Charger les données existantes si le fichier existe déjà existing_data = [] if os.path.exists(file_path): try: with open(file_path, "r", encoding="utf-8") as f: file_content = f.read().strip() if file_content: # Vérifier que le fichier n'est pas vide existing_data = json.loads(file_content) # Si les données existantes ne sont pas une liste, les convertir en liste if not isinstance(existing_data, list): existing_data = [existing_data] except json.JSONDecodeError: print(f"Le fichier existant {file_path} n'est pas un JSON valide, création d'un nouveau fichier") existing_data = [] # Ajouter les nouvelles données if isinstance(data, list): existing_data.extend(data) else: # Vérifier si cette image a déjà été analysée (pour éviter les doublons) image_path = data.get("metadata", {}).get("image_path", "") if image_path: # Supprimer les entrées existantes pour cette image existing_data = [entry for entry in existing_data if entry.get("metadata", {}).get("image_path", "") != image_path] # Éviter la duplication pour le rapport final if step_name == "rapport_final" and existing_data: existing_data = [data] # Remplacer au lieu d'ajouter else: existing_data.append(data) # Sauvegarder les données combinées with open(file_path, "w", encoding="utf-8") as f: json.dump(existing_data, f, indent=2, ensure_ascii=False) print(f"Données sauvegardées dans {file_path} ({len(existing_data)} entrées)") # Générer une version texte pour tous les fichiers JSON generer_version_texte(existing_data, ticket_id, step_name, file_path) except Exception as e: print(f"Erreur lors de la sauvegarde des données: {e}")