import json import os from .base_agent import BaseAgent from datetime import datetime from typing import Dict, Any, Tuple, Optional, List import logging import traceback import re import sys from .utils.report_utils import extraire_et_traiter_json from .utils.report_formatter import extraire_sections_texte, generer_rapport_markdown, construire_rapport_json logger = logging.getLogger("AgentReportGenerator") class AgentReportGenerator(BaseAgent): """ Agent pour générer un rapport synthétique à partir des analyses de ticket et d'images. L'agent récupère: 1. L'analyse du ticket effectuée par AgentTicketAnalyser 2. Les analyses des images pertinentes effectuées par AgentImageAnalyser Il génère: - Un rapport JSON structuré (format principal) - Un rapport Markdown pour la présentation """ def __init__(self, llm): super().__init__("AgentReportGenerator", llm) # Configuration locale de l'agent self.temperature = 0.2 self.top_p = 0.9 self.max_tokens = 2500 # Prompt système pour la génération de rapport self.system_prompt = """Tu es un expert en génération de rapports techniques pour BRG-Lab pour la société CBAO. Ta mission est de synthétiser les analyses (ticket et images) en un rapport structuré. EXIGENCE ABSOLUE - Ton rapport DOIT inclure: 1. Un résumé du problème initial (nom de la demande + description) 2. Une reconstitution du fil de discussion client/support - tu peux synthétiser si trop long mais GARDE les éléments déterminants (références, normes, éléments techniques importants) 3. Un tableau des informations essentielles avec cette structure: ```json { "chronologie_echanges": [ {"date": "date exacte", "emetteur": "CLIENT ou SUPPORT", "type": "Question ou Réponse ou Information technique", "contenu": "contenu synthétisé fidèlement"} ] } ``` 4. Une analyse des images pertinentes en lien avec le problème (OBLIGATOIRE) 5. Un diagnostic technique des causes probables IMPORTANT POUR LE TABLEAU: - COMMENCE par inclure toute question identifiée dans le NOM DE LA DEMANDE ou la DESCRIPTION initiale - Il doit contenir d'un côté les questions et de l'autre les réponses - Si aucune réponse n'a été fournie, indique "Il ne ressort pas de réponse de l'analyse" - AJOUTE des éléments de l'analyse d'image si cela constitue une réponse plausible à une question - Identifie clairement chaque intervenant (CLIENT ou SUPPORT) - Pour les questions issues du NOM ou de la DESCRIPTION, utilise l'émetteur "CLIENT" et la date d'ouverture du ticket IMPORTANT POUR LA STRUCTURE: - Le rapport doit être clairement divisé en sections avec des titres (## Résumé, ## Fil de discussion, ## Tableau questions/réponses, ## Analyse des images, ## Diagnostic) - Pour l'analyse des images, décris précisément comment chaque image illustre le problème ou la solution - Si aucune image n'est fournie, tu DOIS l'indiquer explicitement dans la section "Analyse des images" - Reste factuel et précis dans ton analyse""" # Version du prompt pour la traçabilité self.prompt_version = "v2.2" # Appliquer la configuration au LLM self._appliquer_config_locale() logger.info("AgentReportGenerator initialisé") def _appliquer_config_locale(self) -> None: """ Applique la configuration locale au modèle LLM. """ # Appliquer le prompt système if hasattr(self.llm, "prompt_system"): self.llm.prompt_system = self.system_prompt # Appliquer les paramètres if hasattr(self.llm, "configurer"): params = { "temperature": self.temperature, "top_p": self.top_p, "max_tokens": self.max_tokens } self.llm.configurer(**params) logger.info(f"Configuration appliquée au modèle: {str(params)}") def _formater_prompt_pour_rapport(self, ticket_analyse: str, images_analyses: List[Dict]) -> str: """ Formate le prompt pour la génération du rapport Args: ticket_analyse: Analyse du ticket images_analyses: Liste des analyses d'images Returns: Prompt formaté pour le LLM """ num_images = len(images_analyses) logger.info(f"Formatage du prompt avec {num_images} analyses d'images") # Construire la section d'analyse du ticket prompt = f"""Génère un rapport technique complet, en te basant sur les analyses suivantes. ## ANALYSE DU TICKET {ticket_analyse} """ # Ajouter la section d'analyse des images si présente if num_images > 0: prompt += f"\n## ANALYSES DES IMAGES ({num_images} images)\n" for i, img_analyse in enumerate(images_analyses, 1): image_name = img_analyse.get("image_name", f"Image {i}") analyse = img_analyse.get("analyse", "Analyse non disponible") prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n" else: prompt += "\n## ANALYSES DES IMAGES\nAucune image n'a été fournie pour ce ticket.\n" # Instructions pour le rapport prompt += """ ## INSTRUCTIONS POUR LE RAPPORT 1. TON RAPPORT DOIT AVOIR LA STRUCTURE SUIVANTE: - Titre principal (# Rapport d'analyse: Nom du ticket) - Résumé du problème (## Résumé du problème) - Fil de discussion (## Fil de discussion) - Reconstitution chronologique des échanges - Tableau questions/réponses (## Tableau questions/réponses) - Analyse des images (## Analyse des images) - Diagnostic technique (## Diagnostic technique) 2. DANS LA SECTION "FIL DE DISCUSSION": - Reconstitue chronologiquement les échanges entre client et support - Identifie clairement l'émetteur de chaque message (CLIENT ou SUPPORT) - Tu peux synthétiser mais garde TOUS les éléments déterminants: * Références techniques * Normes citées * Paramètres importants * Informations techniques clés 3. DANS LA SECTION "TABLEAU QUESTIONS/RÉPONSES": - Commence par créer un objet JSON comme suit: ```json { "chronologie_echanges": [ {"date": "date exacte", "emetteur": "CLIENT", "type": "Question", "contenu": "contenu exact de la question"}, {"date": "date exacte", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "contenu exact de la réponse"} ] } ``` - COMMENCE par inclure toutes les questions identifiées dans le NOM DE LA DEMANDE et la DESCRIPTION - Pour ces questions initiales, utilise l'émetteur "CLIENT" et la date d'ouverture du ticket - Pour chaque question, identifie la réponse correspondante - Si une question n'a pas de réponse, indique "Il ne ressort pas de réponse de l'analyse" - Si une image contient une information qui répond à une question, inclus-la dans la réponse - Identifie clairement l'émetteur (CLIENT/SUPPORT) 4. DANS LA SECTION "ANALYSE DES IMAGES": - Si des images sont présentes, explique en détail ce qu'elles montrent et leur lien avec le problème - Décris spécifiquement les éléments mis en évidence (encadrés, entourés) - Établis le lien entre l'image et la discussion quand c'est pertinent - Si une image peut répondre à une question (même issue du nom ou de la description), indique-le explicitement - Si aucune image n'est fournie, indique-le clairement mais conserve cette section 5. DANS LA SECTION "DIAGNOSTIC TECHNIQUE": - Fournis une analyse claire des causes probables - Explique comment la solution proposée répond au problème - Si pertinent, mentionne les aspects techniques spécifiques IMPORTANT: Ce rapport sera utilisé par des techniciens et des développeurs pour comprendre rapidement le problème et sa résolution. Il doit être clair, précis et structuré. """ return prompt def executer(self, rapport_data: Dict, rapport_dir: str) -> Tuple[Optional[str], Optional[str]]: """ Génère un rapport à partir des analyses effectuées Args: rapport_data: Dictionnaire contenant toutes les données analysées rapport_dir: Répertoire où sauvegarder le rapport Returns: Tuple (chemin JSON, chemin Markdown) - Peut contenir None si une génération échoue """ try: # 1. PRÉPARATION ticket_id = self._extraire_ticket_id(rapport_data, rapport_dir) logger.info(f"Génération du rapport pour le ticket: {ticket_id}") print(f"AgentReportGenerator: Génération du rapport pour {ticket_id}") # Créer le répertoire de sortie si nécessaire os.makedirs(rapport_dir, exist_ok=True) # 2. EXTRACTION DES DONNÉES ticket_analyse = self._extraire_analyse_ticket(rapport_data) images_analyses = self._extraire_analyses_images(rapport_data) # 3. COLLECTE DES INFORMATIONS SUR LES AGENTS agents_info = self._collecter_info_agents(rapport_data) prompts_utilises = self._collecter_prompts_agents() # 4. GÉNÉRATION DU RAPPORT prompt = self._formater_prompt_pour_rapport(ticket_analyse, images_analyses) logger.info("Génération du rapport avec le LLM") print(f" Génération du rapport avec le LLM...") # Mesurer le temps d'exécution start_time = datetime.now() rapport_genere = self.llm.interroger(prompt) generation_time = (datetime.now() - start_time).total_seconds() logger.info(f"Rapport généré: {len(rapport_genere)} caractères") print(f" Rapport généré: {len(rapport_genere)} caractères") # 5. EXTRACTION DES DONNÉES DU RAPPORT # Utiliser l'utilitaire de report_utils.py pour extraire les données JSON rapport_traite, echanges_json, _ = extraire_et_traiter_json(rapport_genere) # Vérifier que echanges_json n'est pas None pour éviter l'erreur de type if echanges_json is None: echanges_json = {"chronologie_echanges": []} logger.warning("Aucun échange JSON extrait du rapport, création d'une structure vide") # Extraire les sections textuelles (résumé, diagnostic) resume, analyse_images, diagnostic = extraire_sections_texte(rapport_genere) # 6. CRÉATION DU RAPPORT JSON # Préparer les métadonnées de l'agent agent_metadata = { "model": getattr(self.llm, "modele", str(type(self.llm))), "model_version": getattr(self.llm, "version", "non spécifiée"), "temperature": self.temperature, "top_p": self.top_p, "max_tokens": self.max_tokens, "generation_time": generation_time, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "agents": agents_info } # Construire le rapport JSON rapport_json = construire_rapport_json( rapport_genere=rapport_genere, rapport_data=rapport_data, ticket_id=ticket_id, ticket_analyse=ticket_analyse, images_analyses=images_analyses, generation_time=generation_time, resume=resume, analyse_images=analyse_images, diagnostic=diagnostic, echanges_json=echanges_json, agent_metadata=agent_metadata, prompts_utilises=prompts_utilises ) # 7. SAUVEGARDE DU RAPPORT JSON json_path = os.path.join(rapport_dir, f"{ticket_id}_rapport_final.json") with open(json_path, "w", encoding="utf-8") as f: json.dump(rapport_json, f, ensure_ascii=False, indent=2) logger.info(f"Rapport JSON sauvegardé: {json_path}") print(f" Rapport JSON sauvegardé: {json_path}") # 8. GÉNÉRATION DU RAPPORT MARKDOWN md_path = generer_rapport_markdown(json_path) if md_path: logger.info(f"Rapport Markdown généré: {md_path}") print(f" Rapport Markdown généré: {md_path}") else: logger.error("Échec de la génération du rapport Markdown") print(f" ERREUR: Échec de la génération du rapport Markdown") return json_path, md_path except Exception as e: error_message = f"Erreur lors de la génération du rapport: {str(e)}" logger.error(error_message) logger.error(traceback.format_exc()) print(f" ERREUR: {error_message}") return None, None def _extraire_ticket_id(self, rapport_data: Dict, rapport_dir: str) -> str: """Extrait l'ID du ticket des données ou du chemin""" # Essayer d'extraire depuis les données du rapport ticket_id = rapport_data.get("ticket_id", "") # Si pas d'ID direct, essayer depuis les données du ticket if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict): ticket_id = rapport_data["ticket_data"].get("code", "") # En dernier recours, extraire depuis le chemin if not ticket_id: # Essayer d'extraire un ID de ticket (format Txxxx) du chemin match = re.search(r'T\d+', rapport_dir) if match: ticket_id = match.group(0) else: # Sinon, utiliser le dernier segment du chemin ticket_id = os.path.basename(rapport_dir) return ticket_id def _extraire_analyse_ticket(self, rapport_data: Dict) -> str: """Extrait l'analyse du ticket des données""" # Essayer les différentes clés possibles for key in ["ticket_analyse", "analyse_json", "analyse_ticket"]: if key in rapport_data and rapport_data[key]: logger.info(f"Utilisation de {key}") return rapport_data[key] # Créer une analyse par défaut si aucune n'est disponible logger.warning("Aucune analyse de ticket disponible, création d'un message par défaut") ticket_data = rapport_data.get("ticket_data", {}) ticket_name = ticket_data.get("name", "Sans titre") ticket_desc = ticket_data.get("description", "Pas de description disponible") return f"Analyse par défaut du ticket:\nNom: {ticket_name}\nDescription: {ticket_desc}\n(Aucune analyse détaillée n'a été fournie)" def _extraire_analyses_images(self, rapport_data: Dict) -> List[Dict]: """ Extrait et formate les analyses d'images pertinentes Args: rapport_data: Données du rapport contenant les analyses d'images Returns: Liste des analyses d'images pertinentes formatées """ images_analyses = [] analyse_images_data = rapport_data.get("analyse_images", {}) # Parcourir toutes les images for image_path, analyse_data in analyse_images_data.items(): # Vérifier si l'image est pertinente is_relevant = False if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict): is_relevant = analyse_data["sorting"].get("is_relevant", False) # Si l'image est pertinente, extraire son analyse if is_relevant: image_name = os.path.basename(image_path) analyse = self._extraire_analyse_image(analyse_data) if analyse: images_analyses.append({ "image_name": image_name, "image_path": image_path, "analyse": analyse, "sorting_info": analyse_data.get("sorting", {}), "metadata": analyse_data.get("analysis", {}).get("metadata", {}) }) logger.info(f"Analyse de l'image {image_name} ajoutée") return images_analyses def _extraire_analyse_image(self, analyse_data: Dict) -> Optional[str]: """ Extrait l'analyse d'une image depuis les données Args: analyse_data: Données d'analyse de l'image Returns: Texte d'analyse de l'image ou None si aucune analyse n'est disponible """ # Si pas de données d'analyse, retourner None if not "analysis" in analyse_data or not analyse_data["analysis"]: if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict): reason = analyse_data["sorting"].get("reason", "Non spécifiée") return f"Image marquée comme pertinente. Raison: {reason}" return None # Extraire l'analyse selon le format des données analysis = analyse_data["analysis"] # Structure type 1: {"analyse": "texte"} if isinstance(analysis, dict) and "analyse" in analysis: return analysis["analyse"] # Structure type 2: {"error": false, ...} - contient d'autres données utiles if isinstance(analysis, dict) and "error" in analysis and not analysis.get("error", True): return str(analysis) # Structure type 3: texte d'analyse direct if isinstance(analysis, str): return analysis # Structure type 4: autre format de dictionnaire - convertir en JSON if isinstance(analysis, dict): return json.dumps(analysis, ensure_ascii=False, indent=2) # Aucun format reconnu return None def _collecter_info_agents(self, rapport_data: Dict) -> Dict: """ Collecte des informations sur les agents utilisés dans l'analyse Args: rapport_data: Données du rapport Returns: Dictionnaire contenant les informations sur les agents """ agents_info = {} # Informations sur l'agent JSON Analyser (Ticket Analyser) ticket_analyses = {} for key in ["ticket_analyse", "analyse_json", "analyse_ticket"]: if key in rapport_data and isinstance(rapport_data[key], dict) and "metadata" in rapport_data[key]: ticket_analyses = rapport_data[key]["metadata"] break if ticket_analyses: agents_info["ticket_analyser"] = ticket_analyses # Informations sur les agents d'image if "analyse_images" in rapport_data and rapport_data["analyse_images"]: # Image Sorter sorter_info = {} analyser_info = {} for img_path, img_data in rapport_data["analyse_images"].items(): # Collecter info du sorter if "sorting" in img_data and isinstance(img_data["sorting"], dict) and "metadata" in img_data["sorting"]: sorter_info = img_data["sorting"]["metadata"] # Collecter info de l'analyser if "analysis" in img_data and isinstance(img_data["analysis"], dict) and "metadata" in img_data["analysis"]: analyser_info = img_data["analysis"]["metadata"] # Une fois qu'on a trouvé les deux, on peut sortir if sorter_info and analyser_info: break if sorter_info: agents_info["image_sorter"] = sorter_info if analyser_info: agents_info["image_analyser"] = analyser_info # Ajouter les informations de l'agent report generator agents_info["report_generator"] = { "model": getattr(self.llm, "modele", str(type(self.llm))), "temperature": self.temperature, "top_p": self.top_p, "max_tokens": self.max_tokens, "prompt_version": self.prompt_version } return agents_info def _collecter_prompts_agents(self) -> Dict[str, str]: """ Collecte les prompts système de tous les agents impliqués dans l'analyse. Returns: Dictionnaire contenant les prompts des agents """ prompts = { "rapport_generator": self.system_prompt } # Importer les classes d'agents pour accéder à leurs prompts try: # Importer les autres agents from .agent_ticket_analyser import AgentTicketAnalyser from .agent_image_analyser import AgentImageAnalyser from .agent_image_sorter import AgentImageSorter # Créer des instances temporaires pour récupérer les prompts # En passant None comme LLM pour éviter d'initialiser complètement les agents try: ticket_analyser = AgentTicketAnalyser(None) prompts["ticket_analyser"] = ticket_analyser.system_prompt logger.info("Prompt récupéré pour ticket_analyser") except Exception as e: logger.warning(f"Erreur lors de la récupération du prompt ticket_analyser: {str(e)}") try: image_analyser = AgentImageAnalyser(None) prompts["image_analyser"] = image_analyser.system_prompt logger.info("Prompt récupéré pour image_analyser") except Exception as e: logger.warning(f"Erreur lors de la récupération du prompt image_analyser: {str(e)}") try: image_sorter = AgentImageSorter(None) prompts["image_sorter"] = image_sorter.system_prompt logger.info("Prompt récupéré pour image_sorter") except Exception as e: logger.warning(f"Erreur lors de la récupération du prompt image_sorter: {str(e)}") except ImportError as e: logger.warning(f"Erreur lors de l'importation des classes d'agents: {str(e)}") return prompts