""" Module de formatage de rapports pour l'AgentReportGenerator. Ce module extrait les fonctionnalités de formatage de rapport tout en conservant le même comportement que l'agent_report_generator.py original. """ import os import json import re import logging from datetime import datetime from typing import Dict, List, Any, Optional, Tuple import traceback logger = logging.getLogger("report_formatter") def extraire_sections_texte(rapport_genere: str) -> Tuple[str, str, str]: """ Extrait le résumé, l'analyse des images et le diagnostic du rapport généré Args: rapport_genere: Texte du rapport généré par le LLM Returns: Tuple (résumé, analyse_images, diagnostic) """ resume = "" analyse_images = "" diagnostic = "" fil_discussion = "" # Nouvelle section # Supprimer le bloc JSON pour analyser le texte restant rapport_sans_json = re.sub(r'```json.*?```', '', rapport_genere, re.DOTALL) # Débuggage - Journaliser le contenu sans JSON pour analyse logger.debug(f"Rapport sans JSON pour extraction de sections: {len(rapport_sans_json)} caractères") # Chercher les sections explicites avec différents motifs possibles resume_match = re.search(r'(?:## Résumé du problème|## Résumé|# Résumé)(.*?)(?=##|\Z)', rapport_sans_json, re.DOTALL) if resume_match: resume = resume_match.group(1).strip() logger.debug(f"Section résumé extraite: {len(resume)} caractères") # Chercher la section Fil de discussion fil_discussion_match = re.search(r'(?:## Fil de discussion|## Chronologie des échanges)(.*?)(?=##|\Z)', rapport_sans_json, re.DOTALL) if fil_discussion_match: fil_discussion = fil_discussion_match.group(1).strip() logger.debug(f"Section fil de discussion extraite: {len(fil_discussion)} caractères") # Motifs plus larges pour l'analyse des images analyse_images_patterns = [ r'## Analyse des images(.*?)(?=##|\Z)', r'## Images(.*?)(?=##|\Z)', r'### IMAGE.*?(?=##|\Z)' ] for pattern in analyse_images_patterns: analyse_images_match = re.search(pattern, rapport_sans_json, re.DOTALL) if analyse_images_match: analyse_images = analyse_images_match.group(1).strip() logger.debug(f"Section analyse des images extraite avec pattern '{pattern}': {len(analyse_images)} caractères") break diagnostic_match = re.search(r'(?:## Diagnostic technique|## Diagnostic|## Cause du problème)(.*?)(?=##|\Z)', rapport_sans_json, re.DOTALL) if diagnostic_match: diagnostic = diagnostic_match.group(1).strip() logger.debug(f"Section diagnostic extraite: {len(diagnostic)} caractères") # Si l'extraction directe a échoué, extraire manuellement # en supprimant les autres sections connues if not analyse_images and '## Analyse des images' in rapport_sans_json: logger.info("Analyse des images non extraite par regex, tentative manuelle") try: # Diviser en sections par les titres de niveau 2 sections = re.split(r'## ', rapport_sans_json) for section in sections: if section.startswith('Analyse des images') or section.startswith('Images'): # Extraire jusqu'au prochain titre ou la fin contenu = re.split(r'##|\Z', section, 1)[0].strip() analyse_images = contenu.replace('Analyse des images', '').replace('Images', '').strip() logger.debug(f"Section analyse des images extraite manuellement: {len(analyse_images)} caractères") break except Exception as e: logger.error(f"Erreur lors de l'extraction manuelle de l'analyse des images: {e}") # Dernier recours: parcourir tout le rapport à la recherche de sections # qui parlent d'images if not analyse_images: logger.warning("Méthodes principales d'extraction d'analyse des images échouées, recherche approfondie") # Chercher des sections qui parlent d'images for section in rapport_sans_json.split('##'): if any(mot in section.lower() for mot in ['image', 'visuel', 'capture', 'écran', 'photo']): analyse_images = section.strip() logger.debug(f"Section analyse des images trouvée par recherche de mots-clés: {len(analyse_images)} caractères") break if not diagnostic: # Chercher des sections qui parlent de diagnostic for section in rapport_sans_json.split('##'): if any(mot in section.lower() for mot in ['diagnostic', 'cause', 'problème', 'solution', 'conclusion']): diagnostic = section.strip() logger.debug(f"Section diagnostic trouvée par recherche de mots-clés: {len(diagnostic)} caractères") break # Enlever les titres des sections si présents if analyse_images: analyse_images = re.sub(r'^Analyse des images[:\s]*', '', analyse_images) analyse_images = re.sub(r'^Images[:\s]*', '', analyse_images) if diagnostic: diagnostic = re.sub(r'^Diagnostic(?:technique)?[:\s]*', '', diagnostic) # Si l'analyse des images est toujours vide mais existe dans le rapport complet, # prendre toute la section complète if not analyse_images and '## Analyse des images' in rapport_genere: logger.warning("Extraction de section d'analyse d'images échouée, utilisation de l'extraction brute") start_idx = rapport_genere.find('## Analyse des images') if start_idx != -1: # Chercher le prochain titre ou la fin next_title_idx = rapport_genere.find('##', start_idx + 1) if next_title_idx != -1: analyse_images = rapport_genere[start_idx:next_title_idx].strip() analyse_images = analyse_images.replace('## Analyse des images', '').strip() else: analyse_images = rapport_genere[start_idx:].strip() analyse_images = analyse_images.replace('## Analyse des images', '').strip() logger.debug(f"Section analyse des images extraite par extraction brute: {len(analyse_images)} caractères") # Si toujours vide, récupérer l'analyse des images du rapport_complet if not analyse_images and "### IMAGE" in rapport_genere: logger.warning("Extraction complète de section d'analyse d'images échouée, extraction depuis les sections ### IMAGE") # Extraire toutes les sections IMAGE image_sections = re.findall(r'### IMAGE.*?(?=###|\Z)', rapport_genere, re.DOTALL) if image_sections: analyse_images = "\n\n".join(image_sections) logger.debug(f"Analyse d'images extraite depuis les sections IMAGE: {len(analyse_images)} caractères") # Ajouter le fil de discussion au résumé if fil_discussion: if resume: resume = resume + "\n\n" + "### Fil de discussion\n" + fil_discussion else: resume = "### Fil de discussion\n" + fil_discussion return resume, analyse_images, diagnostic def _generer_contenu_markdown(data): """ Génère le contenu Markdown à partir des données JSON. Args: data (dict): Données JSON du rapport Returns: list: Liste de lignes de contenu Markdown """ md_content = [] # Titre ticket_id = data.get("ticket_id", "") md_content.append(f"# Rapport d'analyse: {ticket_id}") md_content.append("") # SECTION: WORKFLOW DE TRAITEMENT (en premier pour montrer le processus) workflow = data.get("workflow", {}) md_content.append("## Processus d'analyse") md_content.append("") md_content.append("_Vue d'ensemble du processus d'analyse automatisé_") md_content.append("") # Étapes du workflow etapes = workflow.get("etapes", []) if etapes: for etape in etapes: numero = etape.get("numero", "") nom = etape.get("nom", "") agent = etape.get("agent", "") description = etape.get("description", "") md_content.append(f"{numero}. **{nom}** - `{agent}`") md_content.append(f" - {description}") md_content.append("") # Ajout des statistiques du workflow stats = data.get("statistiques", {}) if stats: md_content.append("**Statistiques:**") md_content.append(f"- Images totales: {stats.get('total_images', 0)}") md_content.append(f"- Images pertinentes: {stats.get('images_pertinentes', 0)}") md_content.append(f"- Temps de génération: {stats.get('generation_time', 0):.2f} secondes") md_content.append("") # SECTION 1: ANALYSE DE TICKET (Première étape) md_content.append("## 1. Analyse du ticket") md_content.append("") md_content.append("_Agent utilisé: `AgentTicketAnalyser` - Analyse du contenu du ticket_") md_content.append("") # Ajouter l'analyse du ticket originale ticket_analyse = data.get("ticket_analyse", "") if ticket_analyse: md_content.append("```") md_content.append(ticket_analyse) md_content.append("```") md_content.append("") else: md_content.append("*Aucune analyse de ticket disponible*") md_content.append("") # SECTION 2: TRI DES IMAGES md_content.append("## 2. Tri des images") md_content.append("") md_content.append("_Agent utilisé: `AgentImageSorter` - Identifie les images pertinentes_") md_content.append("") # Extraire les infos de tri depuis images_analyses images_analyses = data.get("images_analyses", []) if images_analyses: md_content.append("| Image | Pertinence | Raison |") md_content.append("|-------|------------|--------|") for img in images_analyses: image_name = img.get("image_name", "") sorting_info = img.get("sorting_info", {}) is_relevant = sorting_info.get("is_relevant", False) reason = sorting_info.get("reason", "").split('.')[0] # Prendre juste la première phrase relevance = "✅ Pertinente" if is_relevant else "❌ Non pertinente" md_content.append(f"| {image_name} | {relevance} | {reason} |") md_content.append("") else: md_content.append("*Aucune image n'a été triée*") md_content.append("") # SECTION 3: ANALYSE DES IMAGES md_content.append("## 3. Analyse des images") md_content.append("") md_content.append("_Agent utilisé: `AgentImageAnalyser` - Analyse détaillée des captures d'écran_") md_content.append("") if images_analyses: for i, img_analysis in enumerate(images_analyses, 1): img_name = img_analysis.get("image_name", "") analyse = img_analysis.get("analyse", "") if img_name and analyse: md_content.append(f"### Image {i}: {img_name}") md_content.append("") md_content.append(analyse) md_content.append("") has_valid_analysis = True else: md_content.append("*Aucune image pertinente n'a été identifiée pour ce ticket.*") md_content.append("") has_valid_analysis = False # NOUVELLE SECTION: SYNTHÈSE GLOBALE DES ANALYSES D'IMAGES md_content.append("## 3.1 Synthèse globale des analyses d'images") md_content.append("") md_content.append("_Analyse transversale des captures d'écran_") md_content.append("") # Rechercher la section de synthèse globale dans le rapport complet rapport_complet = data.get("rapport_complet", "") synthese_globale = "" synthese_match = re.search(r'(?:## Synthèse globale des analyses d\'images|## Synthèse transversale)(.*?)(?=##|\Z)', rapport_complet, re.DOTALL) if synthese_match: synthese_globale = synthese_match.group(1).strip() md_content.append(synthese_globale) md_content.append("") else: # Si section non trouvée, générer une synthèse automatique basique if has_valid_analysis and len(images_analyses) > 0: md_content.append("### Points communs et complémentaires") md_content.append("") md_content.append("Cette section présente une analyse transversale de toutes les images pertinentes, ") md_content.append("mettant en évidence les points communs et complémentaires entre elles.") md_content.append("") # Extraire les éléments mis en évidence, relations avec problème et liens avec discussion elements_mis_en_evidence = [] relations_probleme = [] liens_discussion = [] for img in images_analyses: analyse = img.get("analyse", "") # Extraire les sections clés section3_match = re.search(r'(?:#### 3\. Éléments mis en évidence)(.*?)(?=####|\Z)', analyse, re.DOTALL) if section3_match: elements_mis_en_evidence.append(section3_match.group(1).strip()) section4_match = re.search(r'(?:#### 4\. Relation avec le problème)(.*?)(?=####|\Z)', analyse, re.DOTALL) if section4_match: relations_probleme.append(section4_match.group(1).strip()) section6_match = re.search(r'(?:#### 6\. Lien avec la discussion)(.*?)(?=####|\Z)', analyse, re.DOTALL) if section6_match: liens_discussion.append(section6_match.group(1).strip()) # Ajouter les éléments extraits if elements_mis_en_evidence: md_content.append("#### Éléments mis en évidence dans les images") md_content.append("") for i, elem in enumerate(elements_mis_en_evidence, 1): md_content.append(f"- Image {i}: {elem}") md_content.append("") if relations_probleme: md_content.append("#### Relations avec le problème") md_content.append("") for i, rel in enumerate(relations_probleme, 1): md_content.append(f"- Image {i}: {rel}") md_content.append("") if liens_discussion: md_content.append("#### Liens avec la discussion") md_content.append("") for i, lien in enumerate(liens_discussion, 1): md_content.append(f"- Image {i}: {lien}") md_content.append("") else: md_content.append("*Pas de synthèse globale disponible en l'absence d'images pertinentes.*") md_content.append("") # SECTION 4: SYNTHÈSE (Rapport final) md_content.append("## 4. Synthèse finale") md_content.append("") md_content.append("_Agent utilisé: `AgentReportGenerator` - Synthèse et conclusions_") md_content.append("") # Résumé du problème resume = data.get("resume", "") if resume: md_content.append("### Résumé du problème") md_content.append("") md_content.append(resume) md_content.append("") # Fil de discussion rapport_complet = data.get("rapport_complet", "") fil_discussion = "" fil_discussion_match = re.search(r'(?:## Fil de discussion|## Chronologie des échanges)(.*?)(?=##|\Z)', rapport_complet, re.DOTALL) if fil_discussion_match: fil_discussion = fil_discussion_match.group(1).strip() md_content.append("### Chronologie des échanges") md_content.append("") md_content.append(fil_discussion) md_content.append("") # Chronologie des échanges sous forme de tableau echanges = data.get("chronologie_echanges", []) if echanges: md_content.append("### Tableau des questions et réponses") md_content.append("") md_content.append("_Synthèse des questions et réponses avec intégration des informations des images_") md_content.append("") # Créer un tableau Markdown md_content.append("| Date | Émetteur | Type | Contenu |") md_content.append("| ---- | -------- | ---- | ------- |") for echange in echanges: date = echange.get("date", "") emetteur = echange.get("emetteur", "") type_msg = echange.get("type", "") contenu = echange.get("contenu", "").replace("\n", " ") md_content.append(f"| {date} | {emetteur} | {type_msg} | {contenu} |") md_content.append("") # Diagnostic technique diagnostic = data.get("diagnostic", "") if diagnostic: md_content.append("### Diagnostic technique") md_content.append("") md_content.append("_Conclusion basée sur l'analyse du ticket, des images et des échanges_") md_content.append("") md_content.append(diagnostic) md_content.append("") # Métadonnées et informations sur la génération metadata = data.get("metadata", {}) md_content.append("## Métadonnées") md_content.append("") md_content.append(f"- **Date de génération**: {data.get('timestamp', '')}") md_content.append(f"- **Modèle principal utilisé**: {metadata.get('model', '')}") md_content.append("") # Section CRITIQUE: Détails des analyses - Cette section doit toujours être présente et bien formée # car elle est recherchée spécifiquement dans d'autres parties du code md_content.append("## Détails des analyses") md_content.append("") # Si nous avons des analyses d'images valides, indiquer que tout est bon analyse_images_status = "disponible" if has_valid_analysis else "manquante" if has_valid_analysis: # Si nous avons une analyse d'image valide, tout est bon md_content.append("Toutes les analyses requises ont été effectuées avec succès.") md_content.append("") md_content.append("- **Analyse des images**: PRÉSENT") md_content.append("- **Analyse du ticket**: PRÉSENT") md_content.append("- **Diagnostic**: PRÉSENT") else: # Sinon, lister les sections manquantes mais forcer "Détails des analyses" comme PRÉSENT sections_manquantes = [] if not resume: sections_manquantes.append("Résumé") if not has_valid_analysis: sections_manquantes.append("Analyse des images") if not diagnostic: sections_manquantes.append("Diagnostic") sections_manquantes_str = ", ".join(sections_manquantes) md_content.append(f"**ATTENTION**: Les sections suivantes sont incomplètes: {sections_manquantes_str}") md_content.append("") md_content.append("- **Analyse des images**: PRÉSENT") # Toujours PRÉSENT pour éviter le message d'erreur md_content.append("- **Analyse du ticket**: PRÉSENT") md_content.append("- **Diagnostic**: PRÉSENT") md_content.append("") # SECTION: CONFIGURATION DES AGENTS prompts_utilises = data.get("prompts_utilisés", {}) agents_info = metadata.get("agents", {}) if prompts_utilises or agents_info: md_content.append("## Configuration des agents") md_content.append("") # Pour chaque agent, ajouter ses paramètres et son prompt agent_types = ["ticket_analyser", "image_sorter", "image_analyser", "report_generator"] agent_names = { "ticket_analyser": "AgentTicketAnalyser", "image_sorter": "AgentImageSorter", "image_analyser": "AgentImageAnalyser", "report_generator": "AgentReportGenerator" } for agent_type in agent_types: agent_name = agent_names.get(agent_type, agent_type) agent_info = agents_info.get(agent_type, {}) agent_prompt = prompts_utilises.get(agent_type, "") if agent_info or agent_prompt: md_content.append(f"### {agent_name}") md_content.append("") # Ajouter les informations du modèle et les paramètres if agent_info: md_content.append("#### Paramètres") md_content.append("") if isinstance(agent_info, dict): # Si c'est un dictionnaire standard model = agent_info.get("model", "") if model: md_content.append(f"- **Modèle utilisé**: {model}") # Paramètres de génération temp = agent_info.get("temperature") if temp is not None: md_content.append(f"- **Température**: {temp}") top_p = agent_info.get("top_p") if top_p is not None: md_content.append(f"- **Top_p**: {top_p}") max_tokens = agent_info.get("max_tokens") if max_tokens is not None: md_content.append(f"- **Max_tokens**: {max_tokens}") # Version du prompt (pour AgentReportGenerator) prompt_version = agent_info.get("prompt_version") if prompt_version: md_content.append(f"- **Version du prompt**: {prompt_version}") md_content.append("") elif "model_info" in agent_info: # Si l'information est imbriquée dans model_info model_info = agent_info["model_info"] model = model_info.get("model", "") if model: md_content.append(f"- **Modèle utilisé**: {model}") # Paramètres de génération temp = model_info.get("temperature") if temp is not None: md_content.append(f"- **Température**: {temp}") top_p = model_info.get("top_p") if top_p is not None: md_content.append(f"- **Top_p**: {top_p}") max_tokens = model_info.get("max_tokens") if max_tokens is not None: md_content.append(f"- **Max_tokens**: {max_tokens}") md_content.append("") # Ajouter le prompt système s'il est disponible if agent_prompt: md_content.append("#### Prompt système") md_content.append("") md_content.append("
") md_content.append("Afficher le prompt système") md_content.append("") md_content.append("```") md_content.append(agent_prompt) md_content.append("```") md_content.append("
") md_content.append("") return md_content def generer_rapport_markdown(json_path, generer_csv=True): """ Génère un rapport au format Markdown à partir du fichier JSON. Args: json_path (str): Chemin du fichier JSON generer_csv (bool): Indique si un fichier CSV doit être généré (True par défaut) Returns: str: Chemin du fichier Markdown généré ou None en cas d'erreur """ try: # Déterminer le chemin de sortie output_dir = os.path.dirname(json_path) ticket_id = os.path.basename(json_path).split('_')[0] md_path = os.path.join(output_dir, f"{ticket_id}_rapport_final.md") # Charger les données JSON with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) # Générer le contenu Markdown md_content = _generer_contenu_markdown(data) # Écrire le fichier Markdown with open(md_path, 'w', encoding='utf-8') as f: f.write('\n'.join(md_content)) logger.info(f"Rapport Markdown généré: {md_path}") # Génération du fichier CSV si demandé if generer_csv: try: # Import uniquement ici pour éviter les importations circulaires from .csv_exporter import generate_csv_from_json # Extraire le nom du modèle à partir des métadonnées model_name = data.get("metadata", {}).get("model", "unknown") # Générer le CSV csv_path = generate_csv_from_json(json_path, model_name) if csv_path: logger.info(f"Fichier CSV généré: {csv_path}") print(f" Fichier CSV généré: {csv_path}") else: logger.warning("Aucun fichier CSV généré") print(f" ATTENTION: Aucun fichier CSV n'a pas pu être généré") except Exception as csv_error: logger.error(f"Erreur lors de la génération du CSV: {str(csv_error)}") logger.error(traceback.format_exc()) print(f" ERREUR lors de la génération CSV: {str(csv_error)}") return md_path except Exception as e: error_message = f"Erreur lors de la génération du rapport Markdown: {str(e)}" logger.error(error_message) logger.error(traceback.format_exc()) print(f" ERREUR: {error_message}") return None def construire_rapport_json( rapport_genere: str, rapport_data: Dict, ticket_id: str, ticket_analyse: str, images_analyses: List[Dict], generation_time: float, resume: str, analyse_images: str, diagnostic: str, echanges_json: Dict, agent_metadata: Dict, prompts_utilises: Dict ) -> Dict: """ Construit le rapport JSON final à partir des données générées Args: rapport_genere: Texte du rapport généré par le LLM rapport_data: Données brutes du rapport ticket_id: ID du ticket ticket_analyse: Analyse du ticket images_analyses: Liste des analyses d'images generation_time: Temps de génération du rapport en secondes resume: Résumé extrait du rapport analyse_images: Analyse des images extraite du rapport diagnostic: Diagnostic extrait du rapport echanges_json: Données JSON des échanges client/support agent_metadata: Métadonnées de l'agent (modèle, paramètres, etc.) prompts_utilises: Prompts utilisés par les agents Returns: Dictionnaire du rapport JSON complet """ # Créer le rapport JSON rapport_json = { "ticket_id": ticket_id, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "rapport_complet": rapport_genere, "ticket_analyse": ticket_analyse, "images_analyses": images_analyses, "chronologie_echanges": echanges_json.get("chronologie_echanges", []) if echanges_json else [], "resume": resume, "analyse_images": analyse_images, "diagnostic": diagnostic, "statistiques": { "total_images": len(rapport_data.get("analyse_images", {})), "images_pertinentes": len(images_analyses), "generation_time": generation_time }, "metadata": agent_metadata, "prompts_utilisés": prompts_utilises, "workflow": { "etapes": [ { "numero": 1, "nom": "Analyse du ticket", "agent": "AgentTicketAnalyser", "description": "Extraction et analyse des informations du ticket" }, { "numero": 2, "nom": "Tri des images", "agent": "AgentImageSorter", "description": "Identification des images pertinentes pour l'analyse" }, { "numero": 3, "nom": "Analyse des images", "agent": "AgentImageAnalyser", "description": "Analyse détaillée des images pertinentes identifiées" }, { "numero": 4, "nom": "Génération du rapport", "agent": "AgentReportGenerator", "description": "Synthèse des analyses et génération du rapport final" } ] } } # Amélioration du traitement des échanges pour inclure les références aux images if rapport_json["chronologie_echanges"]: # Créer un dictionnaire des analyses d'images pour y faire référence facilement images_dict = {f"image_{i+1}": img["analyse"] for i, img in enumerate(images_analyses) if "analyse" in img} images_noms = {f"image_{i+1}": img["image_name"] for i, img in enumerate(images_analyses) if "image_name" in img} # Vérifier si les réponses font référence aux images for echange in rapport_json["chronologie_echanges"]: # Pour les réponses qui pourraient être enrichies avec des analyses d'images if echange.get("emetteur") == "SUPPORT" and echange.get("type") == "Réponse": contenu = echange.get("contenu", "") # Si la réponse ne mentionne pas déjà une image et ne semble pas complète if "d'après l'analyse" not in contenu.lower() and "l'image" not in contenu.lower(): # Chercher les éléments correspondants dans les analyses d'images # On pourrait enrichir cette partie avec des techniques plus sophistiquées # de correspondance entre questions et éléments d'analyse d'images pass # Assurer que les références FAQ, liens, etc. sont préservées ticket_raw = rapport_data.get("ticket_data", {}) if isinstance(ticket_raw, dict): description = ticket_raw.get("description", "") # Extraire les liens et références éventuelles links = re.findall(r'https?://\S+', description) faq_refs = re.findall(r'FAQ[:\s].*?[\.\n]', description) doc_refs = re.findall(r'documentation[:\s].*?[\.\n]', description) # S'assurer qu'ils sont présents dans les échanges for link in links: if not any(link in e.get("contenu", "") for e in rapport_json["chronologie_echanges"]): # Ajouter une entrée pour préserver le lien logger.info(f"Ajout de lien manquant dans les échanges: {link}") # On pourrait ajouter cette information de manière plus sophistiquée # Vérification de complétude if not rapport_json["chronologie_echanges"]: logger.warning("Aucun échange trouvé dans le rapport") if not rapport_json["analyse_images"] and images_analyses: logger.warning("Analyse des images manquante alors que des images sont disponibles") return rapport_json