import json import os from .base_agent import BaseAgent from datetime import datetime from typing import Dict, Any, Tuple, Optional import logging import traceback logger = logging.getLogger("AgentReportGenerator") class AgentReportGenerator(BaseAgent): """ Agent pour générer un rapport complet à partir des analyses de ticket et d'images """ def __init__(self, llm): super().__init__("AgentReportGenerator", llm) # Configuration locale de l'agent (remplace AgentConfig) self.temperature = 0.4 # Génération de rapport factuelle mais bien structurée self.top_p = 0.9 self.max_tokens = 2500 self.system_prompt = """Tu es un expert en génération de rapports techniques pour BRG-Lab pour la société CBAO. Ta mission est de synthétiser les analyses (ticket et images) en un rapport structuré et exploitable. EXIGENCE ABSOLUE - TABLEAU DES ÉCHANGES CLIENT/SUPPORT: - Tu DOIS IMPÉRATIVEMENT créer un TABLEAU MARKDOWN des échanges client/support - Le format du tableau DOIT être: | Date | Émetteur (CLIENT/SUPPORT) | Type (Question/Réponse) | Contenu | |------|---------------------------|-------------------------|---------| | date1 | CLIENT | Question | contenu... | | date2 | SUPPORT | Réponse | contenu... | - Chaque message du ticket doit apparaître dans une ligne du tableau - Indique clairement qui est CLIENT et qui est SUPPORT - Tu dois synthétiser au mieux les échanges(le plus court et clair possible) client/support(question/réponse) dans le tableau - TU dois spécifié si la question n'a pas de réponse - Le tableau DOIT être inclus dans la section "Chronologie des échanges" Structure ton rapport: 1. Résumé exécutif: Synthèse du problème initial (nom de la demande + description) 2. Chronologie des échanges: TABLEAU des interactions client/support (format imposé ci-dessus) 3. Analyse des images: Ce que montrent les captures d'écran et leur pertinence 4. Diagnostic technique: Interprétation des informations techniques pertinentes Reste factuel et précis dans ton analyse. Le tableau des échanges client/support est l'élément le plus important du rapport.""" # Appliquer la configuration au LLM self._appliquer_config_locale() logger.info("AgentReportGenerator initialisé") def _appliquer_config_locale(self) -> None: """ Applique la configuration locale au modèle LLM. """ # Appliquer le prompt système if hasattr(self.llm, "prompt_system"): self.llm.prompt_system = self.system_prompt # Appliquer les paramètres if hasattr(self.llm, "configurer"): params = { "temperature": self.temperature, "top_p": self.top_p, "max_tokens": self.max_tokens } # Ajustements selon le type de modèle if "mistral_medium" in self.llm.__class__.__name__.lower(): params["temperature"] += 0.05 params["max_tokens"] = 1000 elif "pixtral" in self.llm.__class__.__name__.lower(): params["temperature"] -= 0.05 elif "ollama" in self.llm.__class__.__name__.lower(): params["temperature"] += 0.1 params.update({ "num_ctx": 2048, "repeat_penalty": 1.1, }) self.llm.configurer(**params) def executer(self, rapport_data: Dict, rapport_dir: str) -> Tuple[Optional[str], Optional[str]]: """ Génère un rapport à partir des analyses effectuées Args: rapport_data: Dictionnaire contenant toutes les données analysées rapport_dir: Répertoire où sauvegarder le rapport Returns: Tuple (chemin vers le rapport JSON, chemin vers le rapport Markdown) """ # Récupérer l'ID du ticket depuis les données ticket_id = rapport_data.get("ticket_id", "") if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict): ticket_id = rapport_data["ticket_data"].get("code", "") if not ticket_id: ticket_id = os.path.basename(os.path.dirname(rapport_dir)) if not ticket_id.startswith("T"): # Dernier recours, utiliser le dernier segment du chemin ticket_id = os.path.basename(rapport_dir) logger.info(f"Génération du rapport pour le ticket: {ticket_id}") print(f"AgentReportGenerator: Génération du rapport pour {ticket_id}") # Validation des données d'entrée logger.info("Vérification de la complétude des données d'entrée:") if "ticket_data" in rapport_data: logger.info(f" - Données de ticket présentes: {len(str(rapport_data['ticket_data']))} caractères") else: logger.warning(" - Données de ticket manquantes") # Vérification des analyses ticket_analyse_exists = False if "ticket_analyse" in rapport_data and rapport_data["ticket_analyse"]: ticket_analyse_exists = True logger.info(f" - Analyse du ticket présente: {len(rapport_data['ticket_analyse'])} caractères") elif "analyse_json" in rapport_data and rapport_data["analyse_json"]: ticket_analyse_exists = True logger.info(f" - Analyse JSON présente: {len(rapport_data['analyse_json'])} caractères") else: logger.warning(" - Analyse du ticket manquante") # Vérification des analyses d'images if "analyse_images" in rapport_data and rapport_data["analyse_images"]: n_images = len(rapport_data["analyse_images"]) n_relevant = sum(1 for _, data in rapport_data["analyse_images"].items() if "sorting" in data and isinstance(data["sorting"], dict) and data["sorting"].get("is_relevant", False)) n_analyzed = sum(1 for _, data in rapport_data["analyse_images"].items() if "analysis" in data and data["analysis"]) logger.info(f" - Analyses d'images présentes: {n_images} images, {n_relevant} pertinentes, {n_analyzed} analysées") else: logger.warning(" - Analyses d'images manquantes") # S'assurer que le répertoire existe if not os.path.exists(rapport_dir): os.makedirs(rapport_dir) logger.info(f"Répertoire de rapport créé: {rapport_dir}") try: # Préparer les données formatées pour l'analyse ticket_analyse = None # Vérifier que l'analyse du ticket est disponible sous l'une des clés possibles if "ticket_analyse" in rapport_data and rapport_data["ticket_analyse"]: ticket_analyse = rapport_data["ticket_analyse"] logger.info("Utilisation de ticket_analyse") elif "analyse_json" in rapport_data and rapport_data["analyse_json"]: ticket_analyse = rapport_data["analyse_json"] logger.info("Utilisation de analyse_json en fallback") else: # Créer une analyse par défaut si aucune n'est disponible logger.warning("Aucune analyse de ticket disponible, création d'un message par défaut") ticket_data = rapport_data.get("ticket_data", {}) ticket_name = ticket_data.get("name", "Sans titre") ticket_desc = ticket_data.get("description", "Pas de description disponible") ticket_analyse = f"Analyse par défaut du ticket:\nNom: {ticket_name}\nDescription: {ticket_desc}\n(Aucune analyse détaillée n'a été fournie par l'agent d'analyse de ticket)" # Préparer les données d'analyse d'images images_analyses = [] analyse_images_data = rapport_data.get("analyse_images", {}) # Statistiques pour les métadonnées total_images = len(analyse_images_data) if analyse_images_data else 0 images_pertinentes = 0 # Collecter des informations sur les agents et LLM utilisés agents_info = self._collecter_info_agents(rapport_data) # Transformer les analyses d'images en liste structurée pour le prompt for image_path, analyse_data in analyse_images_data.items(): image_name = os.path.basename(image_path) # Vérifier si l'image est pertinente is_relevant = False if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict): is_relevant = analyse_data["sorting"].get("is_relevant", False) if is_relevant: images_pertinentes += 1 # Récupérer l'analyse détaillée si elle existe et que l'image est pertinente analyse_detail = None if is_relevant: if "analysis" in analyse_data and analyse_data["analysis"]: if isinstance(analyse_data["analysis"], dict) and "analyse" in analyse_data["analysis"]: analyse_detail = analyse_data["analysis"]["analyse"] elif isinstance(analyse_data["analysis"], dict): analyse_detail = str(analyse_data["analysis"]) # Si l'analyse n'a pas été trouvée mais que l'image est pertinente if not analyse_detail: analyse_detail = f"Image marquée comme pertinente. Raison: {analyse_data['sorting'].get('reason', 'Non spécifiée')}" # Ajouter l'analyse à la liste si elle existe if analyse_detail: images_analyses.append({ "image_name": image_name, "analyse": analyse_detail }) num_images = len(images_analyses) # Afficher un résumé des données collectées logger.info(f"Résumé des données préparées pour le rapport:") logger.info(f" - Ticket ID: {ticket_id}") logger.info(f" - Analyse du ticket: {len(ticket_analyse) if ticket_analyse else 0} caractères") logger.info(f" - Images analysées: {total_images}, Images pertinentes: {images_pertinentes}") logger.info(f" - Images avec analyse détaillée: {num_images}") # Mettre à jour les métadonnées avec les statistiques rapport_data.setdefault("metadata", {}).update({ "images_analysees": total_images, "images_pertinentes": images_pertinentes, "analyses_images_disponibles": num_images }) # Extraire les messages pour aider à la création du tableau messages_structure = [] try: ticket_data = rapport_data.get("ticket_data", {}) if "messages" in ticket_data and isinstance(ticket_data["messages"], list): for msg in ticket_data["messages"]: if isinstance(msg, dict): sender = msg.get("author_id", msg.get("from", "Inconnu")) date = msg.get("date", "Date inconnue") content = msg.get("content", "") # Déterminer le type (client/support) sender_type = "CLIENT" if "client" in sender.lower() else "SUPPORT" if "support" in sender.lower() else "AUTRE" messages_structure.append({ "date": date, "emetteur": sender_type, "contenu": content[:100] + "..." if len(content) > 100 else content }) logger.info(f" - {len(messages_structure)} messages extraits pour le tableau") except Exception as e: logger.warning(f"Erreur lors de l'extraction des messages: {e}") # Créer un prompt détaillé en s'assurant que toutes les analyses sont incluses prompt = f"""Génère un rapport technique complet pour le ticket #{ticket_id}, en te basant sur les analyses suivantes. ## ANALYSE DU TICKET {ticket_analyse} ## ANALYSES DES IMAGES ({num_images} images pertinentes sur {total_images} analysées) """ # Ajouter l'analyse de chaque image for i, img_analyse in enumerate(images_analyses, 1): image_name = img_analyse.get("image_name", f"Image {i}") analyse = img_analyse.get("analyse", "Analyse non disponible") prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n" # Ajouter des informations sur les messages pour aider à la création du tableau if messages_structure: prompt += "\n## STRUCTURE DES MESSAGES POUR LE TABLEAU\n" for i, msg in enumerate(messages_structure, 1): prompt += f"{i}. Date: {msg['date']} | Émetteur: {msg['emetteur']} | Contenu: {msg['contenu']}\n" prompt += f""" EXIGENCE ABSOLUE - TON RAPPORT DOIT IMPÉRATIVEMENT INCLURE: - Un TABLEAU MARKDOWN structuré avec les échanges client/support au format: | Date | Émetteur | Type | Contenu | |------|----------|------|---------| | date1 | CLIENT | Question | contenu... | | date2 | SUPPORT | Réponse | contenu... | Structure ton rapport: 1. Résumé exécutif: Synthèse concise du problème initial (reprend le nom de la demande + description) 2. Chronologie des échanges: TABLEAU DES INTERACTIONS CLIENT/SUPPORT (format imposé ci-dessus) 3. Analyse des images pertinentes: Ce que montrent les captures d'écran 4. Diagnostic technique: Points clés et interprétation technique Ton rapport doit être factuel et précis. Le tableau des échanges client/support est OBLIGATOIRE. """ # Appeler le LLM pour générer le rapport logger.info("Interrogation du LLM pour la génération du rapport") rapport_contenu = self.llm.interroger(prompt) # Vérifier que le rapport généré contient bien un tableau pour les échanges contains_table = "|" in rapport_contenu and ( "| Date |" in rapport_contenu or "| Émetteur |" in rapport_contenu or "| Type |" in rapport_contenu or "| CLIENT |" in rapport_contenu ) if not contains_table: logger.warning("ATTENTION: Le rapport généré ne semble pas contenir de tableau pour les échanges client/support") print(" ATTENTION: Le rapport ne contient pas de tableau pour les échanges") # Tenter une seconde génération avec un prompt plus direct logger.info("Tentative de régénération du rapport avec focus sur le tableau") # Prompt simplifié, focalisé sur le tableau second_prompt = f"""Pour le ticket #{ticket_id}, crée un rapport incluant IMPÉRATIVEMENT: UN TABLEAU MARKDOWN DES ÉCHANGES CLIENT/SUPPORT: | Date | Émetteur | Type | Contenu | |------|----------|------|---------| | date1 | CLIENT | Question | contenu... | Voici la structure des messages: """ # Ajouter les messages directement for i, msg in enumerate(messages_structure, 1): second_prompt += f"{i}. Date: {msg['date']} | Émetteur: {msg['emetteur']} | Contenu: {msg['contenu']}\n" second_prompt += """ Structure obligatoire: 1. Résumé exécutif (très court) 2. Chronologie des échanges: TABLEAU MARKDOWN (comme ci-dessus) 3. Bref diagnostic Le tableau est l'élément le plus important.""" # Tenter avec un autre prompt second_rapport = self.llm.interroger(second_prompt) # Vérifier à nouveau if "|" in second_rapport and ( "| Date |" in second_rapport or "| Émetteur |" in second_rapport or "| Type |" in second_rapport or "| CLIENT |" in second_rapport ): rapport_contenu = second_rapport logger.info("Succès: Le rapport régénéré contient un tableau") print(" Tableau des échanges généré avec succès dans la seconde tentative") else: logger.warning("Le tableau est toujours absent dans la seconde tentative") # Créer les noms de fichiers pour la sauvegarde timestamp = self._get_timestamp() base_filename = f"{ticket_id}_{timestamp}" json_path = os.path.join(rapport_dir, f"{base_filename}.json") md_path = os.path.join(rapport_dir, f"{base_filename}.md") # Collecter les métadonnées du rapport avec détails sur les agents et LLM utilisés metadata = rapport_data.get("metadata", {}) metadata.update({ "timestamp": timestamp, "model": getattr(self.llm, "modele", str(type(self.llm))), "temperature": self.temperature, "top_p": self.top_p, "max_tokens": self.max_tokens, "system_prompt": self.system_prompt, "agents_info": agents_info, "images_analysees": total_images, "images_pertinentes": images_pertinentes, "analyses_images_incluses": num_images }) # Sauvegarder le rapport au format JSON (données brutes + rapport généré) rapport_data_complet = rapport_data.copy() rapport_data_complet["rapport_genere"] = rapport_contenu rapport_data_complet["metadata"] = metadata # S'assurer que les clés nécessaires pour le markdown sont présentes if "ticket_analyse" not in rapport_data_complet: rapport_data_complet["ticket_analyse"] = ticket_analyse with open(json_path, "w", encoding="utf-8") as f: json.dump(rapport_data_complet, f, ensure_ascii=False, indent=2) # Générer et sauvegarder le rapport au format Markdown basé directement sur le JSON markdown_content = self._generer_markdown_depuis_json(rapport_data_complet) with open(md_path, "w", encoding="utf-8") as f: f.write(markdown_content) logger.info(f"Rapport sauvegardé: {json_path} et {md_path}") logger.info(f"Taille du rapport Markdown: {len(markdown_content)} caractères") except Exception as e: error_message = f"Erreur lors de la génération du rapport: {str(e)}" logger.error(error_message) print(f" ERREUR: {error_message}") return None, None # Enregistrer l'historique self.ajouter_historique("generation_rapport", { "rapport_dir": rapport_dir, "ticket_id": ticket_id, "total_images": total_images, "images_pertinentes": images_pertinentes, "temperature": self.temperature, "top_p": self.top_p, "max_tokens": self.max_tokens }, { "json_path": json_path, "md_path": md_path, "taille_rapport": len(markdown_content) if 'markdown_content' in locals() else 0, "rapport_contenu": rapport_contenu[:300] + ("..." if len(rapport_contenu) > 300 else "") }) message = f"Rapports générés dans: {rapport_dir}" print(f" {message}") print(f" - JSON: {os.path.basename(json_path)}") print(f" - Markdown: {os.path.basename(md_path)}") return json_path, md_path def _collecter_info_agents(self, rapport_data: Dict) -> Dict: """ Collecte des informations sur les agents utilisés dans l'analyse Args: rapport_data: Données du rapport Returns: Dictionnaire contenant les informations sur les agents """ agents_info = {} # Informations sur l'agent JSON Analyser if "analyse_json" in rapport_data: json_analysis = rapport_data["analyse_json"] # Vérifier si l'analyse JSON contient des métadonnées if isinstance(json_analysis, dict) and "metadata" in json_analysis: agents_info["json_analyser"] = json_analysis["metadata"] # Informations sur les agents d'image if "analyse_images" in rapport_data and rapport_data["analyse_images"]: # Image Sorter sorter_info = {} analyser_info = {} for img_path, img_data in rapport_data["analyse_images"].items(): # Collecter info du sorter if "sorting" in img_data and isinstance(img_data["sorting"], dict) and "metadata" in img_data["sorting"]: if "model_info" in img_data["sorting"]["metadata"]: sorter_info = img_data["sorting"]["metadata"]["model_info"] # Collecter info de l'analyser if "analysis" in img_data and img_data["analysis"] and isinstance(img_data["analysis"], dict) and "metadata" in img_data["analysis"]: if "model_info" in img_data["analysis"]["metadata"]: analyser_info = img_data["analysis"]["metadata"]["model_info"] # Une fois qu'on a trouvé les deux, on peut sortir if sorter_info and analyser_info: break if sorter_info: agents_info["image_sorter"] = sorter_info if analyser_info: agents_info["image_analyser"] = analyser_info # Ajouter les informations de l'agent report generator agents_info["report_generator"] = { "model": getattr(self.llm, "modele", str(type(self.llm))), "temperature": self.temperature, "top_p": self.top_p, "max_tokens": self.max_tokens } return agents_info def _generer_markdown_depuis_json(self, rapport_data: Dict) -> str: """ Génère un rapport Markdown directement à partir des données JSON Args: rapport_data: Données JSON complètes du rapport Returns: Contenu Markdown du rapport """ ticket_id = rapport_data.get("ticket_id", "") timestamp = rapport_data.get("metadata", {}).get("timestamp", self._get_timestamp()) # Contenu de base du rapport (partie générée par le LLM) rapport_contenu = rapport_data.get("rapport_genere", "") # Entête du document markdown = f"# Rapport d'analyse du ticket #{ticket_id}\n\n" markdown += f"*Généré le: {timestamp}*\n\n" # Ajouter le rapport principal généré par le LLM markdown += rapport_contenu + "\n\n" # Section séparatrice pour les détails d'analyse markdown += "---\n\n" markdown += "# Détails des analyses effectuées\n\n" # Ajouter un résumé du processus d'analyse complet markdown += "## Processus d'analyse\n\n" # 1. Analyse de ticket ticket_analyse = rapport_data.get("ticket_analyse", "") if not ticket_analyse and "analyse_json" in rapport_data: ticket_analyse = rapport_data.get("analyse_json", "") if ticket_analyse: markdown += "### Étape 1: Analyse du ticket\n\n" markdown += "L'agent d'analyse de ticket a extrait les informations suivantes du ticket d'origine:\n\n" markdown += "
\nCliquez pour voir l'analyse complète du ticket\n\n" markdown += "```\n" + ticket_analyse + "\n```\n\n" markdown += "
\n\n" # 2. Tri des images markdown += "### Étape 2: Tri des images\n\n" markdown += "L'agent de tri d'images a évalué chaque image pour déterminer sa pertinence par rapport au problème client:\n\n" analyse_images_data = rapport_data.get("analyse_images", {}) if analyse_images_data: # Créer un tableau pour le tri des images markdown += "| Image | Pertinence | Raison |\n" markdown += "|-------|------------|--------|\n" for image_path, analyse_data in analyse_images_data.items(): image_name = os.path.basename(image_path) # Information de tri is_relevant = "Non" reason = "Non spécifiée" if "sorting" in analyse_data: sorting_data = analyse_data["sorting"] if isinstance(sorting_data, dict): is_relevant = "Oui" if sorting_data.get("is_relevant", False) else "Non" reason = sorting_data.get("reason", "Non spécifiée") markdown += f"| {image_name} | {is_relevant} | {reason} |\n" markdown += "\n" else: markdown += "*Aucune image n'a été trouvée ou analysée.*\n\n" # 3. Analyse des images pertinentes markdown += "### Étape 3: Analyse détaillée des images pertinentes\n\n" images_pertinentes = 0 for image_path, analyse_data in analyse_images_data.items(): # Vérifier si l'image est pertinente is_relevant = False if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict): is_relevant = analyse_data["sorting"].get("is_relevant", False) if is_relevant: images_pertinentes += 1 image_name = os.path.basename(image_path) # Récupérer l'analyse détaillée analyse_detail = "Analyse non disponible" if "analysis" in analyse_data and analyse_data["analysis"]: if isinstance(analyse_data["analysis"], dict) and "analyse" in analyse_data["analysis"]: analyse_detail = analyse_data["analysis"]["analyse"] elif isinstance(analyse_data["analysis"], dict): analyse_detail = str(analyse_data["analysis"]) markdown += f"#### Image pertinente {images_pertinentes}: {image_name}\n\n" markdown += "
\nCliquez pour voir l'analyse complète de l'image\n\n" markdown += "```\n" + analyse_detail + "\n```\n\n" markdown += "
\n\n" if images_pertinentes == 0: markdown += "*Aucune image pertinente n'a été identifiée pour ce ticket.*\n\n" # 4. Synthèse (rapport final) markdown += "### Étape 4: Génération du rapport de synthèse\n\n" markdown += "L'agent de génération de rapport a synthétisé toutes les analyses précédentes pour produire le rapport ci-dessus.\n\n" # Informations techniques markdown += "## Informations techniques\n\n" # Ajouter les informations sur les agents utilisés agents_info = rapport_data.get("metadata", {}).get("agents_info", {}) if agents_info: markdown += "### Agents et modèles utilisés\n\n" # Agent JSON Analyser if "json_analyser" in agents_info: info = agents_info["json_analyser"] markdown += "#### Agent d'analyse de texte\n" markdown += f"- **Modèle**: {info.get('model', 'Non spécifié')}\n" markdown += f"- **Température**: {info.get('temperature', 'Non spécifiée')}\n" markdown += f"- **Top-p**: {info.get('top_p', 'Non spécifié')}\n" markdown += f"- **Max tokens**: {info.get('max_tokens', 'Non spécifié')}\n\n" # Agent Image Sorter if "image_sorter" in agents_info: info = agents_info["image_sorter"] markdown += "#### Agent de tri d'images\n" markdown += f"- **Modèle**: {info.get('model', 'Non spécifié')}\n" markdown += f"- **Température**: {info.get('temperature', 'Non spécifiée')}\n" markdown += f"- **Top-p**: {info.get('top_p', 'Non spécifié')}\n" markdown += f"- **Max tokens**: {info.get('max_tokens', 'Non spécifié')}\n\n" # Agent Image Analyser if "image_analyser" in agents_info: info = agents_info["image_analyser"] markdown += "#### Agent d'analyse d'images\n" markdown += f"- **Modèle**: {info.get('model', 'Non spécifié')}\n" markdown += f"- **Température**: {info.get('temperature', 'Non spécifiée')}\n" markdown += f"- **Top-p**: {info.get('top_p', 'Non spécifié')}\n" markdown += f"- **Max tokens**: {info.get('max_tokens', 'Non spécifié')}\n\n" # Agent Report Generator if "report_generator" in agents_info: info = agents_info["report_generator"] markdown += "#### Agent de génération de rapport\n" markdown += f"- **Modèle**: {info.get('model', 'Non spécifié')}\n" markdown += f"- **Température**: {info.get('temperature', 'Non spécifiée')}\n" markdown += f"- **Top-p**: {info.get('top_p', 'Non spécifié')}\n" markdown += f"- **Max tokens**: {info.get('max_tokens', 'Non spécifié')}\n\n" # Statistiques d'analyse markdown += "### Statistiques\n\n" total_images = len(analyse_images_data) if analyse_images_data else 0 markdown += f"- **Images analysées**: {total_images}\n" markdown += f"- **Images pertinentes**: {images_pertinentes}\n\n" return markdown def _get_timestamp(self) -> str: """Retourne un timestamp au format YYYYMMDD_HHMMSS""" return datetime.now().strftime("%Y%m%d_%H%M%S")