""" Utilitaires pour la génération de rapports d'analyse optimisée pour Qwen/DeepSeek. Version modifiée de report_utils.py pour un meilleur traitement des formats JSON spécifiques. """ import re import json import logging from typing import Dict, List, Any, Tuple, Optional from datetime import datetime logger = logging.getLogger("report_utils_bis") def get_timestamp() -> str: """ Retourne un timestamp au format YYYYMMDD_HHMMSS pour identifier les fichiers et données. Returns: Chaîne formatée avec le timestamp actuel """ return datetime.now().strftime("%Y%m%d_%H%M%S") def generer_tableau_questions_reponses(echanges: List[Dict]) -> str: """ Génère un tableau question/réponse simplifié à partir des échanges Args: echanges: Liste des échanges client/support Returns: Tableau au format markdown """ if not echanges: return "Aucun échange trouvé dans ce ticket." # Initialiser le tableau tableau = "\n## Tableau récapitulatif des échanges\n\n" tableau += "| Question (Client) | Réponse (Support) |\n" tableau += "|------------------|-------------------|\n" # Variables pour suivre les questions et réponses question_courante = None questions_sans_reponse = [] # Parcourir tous les échanges pour identifier les questions et réponses for echange in echanges: emetteur = echange.get("emetteur", "").lower() type_msg = echange.get("type", "").lower() contenu = echange.get("contenu", "") date = echange.get("date", "") # Formater le contenu (synthétiser si trop long) contenu_formate = synthétiser_contenu(contenu, 150) # Si c'est une question du client if emetteur == "client" and (type_msg == "question" or "?" in contenu): # Si une question précédente n'a pas de réponse, l'ajouter à la liste if question_courante: questions_sans_reponse.append(question_courante) # Enregistrer la nouvelle question courante question_courante = f"{contenu_formate} _(date: {date})_" # Si c'est une réponse du support et qu'il y a une question en attente elif emetteur == "support" and question_courante: # Ajouter la paire question/réponse au tableau tableau += f"| {question_courante} | {contenu_formate} _(date: {date})_ |\n" question_courante = None # Réinitialiser la question courante # Traiter toute question restante sans réponse if question_courante: questions_sans_reponse.append(question_courante) # Ajouter les questions sans réponse au tableau for q in questions_sans_reponse: tableau += f"| {q} | **Aucune réponse du support** |\n" # Ajouter une note si aucun échange support n'a été trouvé if not any(echange.get("emetteur", "").lower() == "support" for echange in echanges): tableau += "\n**Note: Aucune réponse du support n'a été trouvée dans ce ticket.**\n" return tableau def synthétiser_contenu(contenu: str, longueur_max: int) -> str: """ Synthétise le contenu s'il est trop long Args: contenu: Contenu à synthétiser longueur_max: Longueur maximale souhaitée Returns: Contenu synthétisé """ if len(contenu) <= longueur_max: return contenu # Extraire les premiers caractères debut = contenu[:longueur_max//2].strip() # Extraire les derniers caractères fin = contenu[-(longueur_max//2):].strip() return f"{debut}... {fin}" def extraire_et_traiter_json(texte_rapport: str) -> Tuple[str, Optional[Dict], Optional[str]]: """ Extrait l'objet JSON des échanges du texte du rapport et le convertit en Markdown Version optimisée pour les modèles Qwen et DeepSeek. Args: texte_rapport: Texte complet du rapport généré par le LLM Returns: Tuple (rapport_traité, echanges_json, echanges_markdown) """ # Remplacer CBAD par CBAO dans tout le rapport texte_rapport = texte_rapport.replace("CBAD", "CBAO") # Rechercher des sections spécifiques tableau_section = None sections = re.findall(r'##\s+(.*?)\n', texte_rapport) for section in sections: if "tableau des échanges" in section.lower(): tableau_section = "## " + section logger.info(f"Section de tableau trouvée: {tableau_section}") break # Si une section "Tableau des échanges" est trouvée, chercher le JSON qui suit json_text = None if tableau_section: # Trouver l'index de la section section_index = texte_rapport.find(tableau_section) if section_index != -1: # Extraire tout le texte après la section section_text = texte_rapport[section_index + len(tableau_section):] # Patterns plus précis pour la recherche du JSON json_patterns = [ r'```json\s*({.*?})\s*```', # Format avec balises json r'```\s*({.*?"chronologie_echanges".*?})\s*```', # Format avec balises code r'`({.*?"chronologie_echanges".*?})`', # Format avec backticks simples r'({[\s\n]*"chronologie_echanges"[\s\n]*:[\s\n]*\[.*?\][\s\n]*})' # Format sans balises ] # Essayer chaque pattern for pattern in json_patterns: json_match = re.search(pattern, section_text, re.DOTALL) if json_match: json_text = json_match.group(1).strip() logger.info(f"JSON trouvé après la section '{tableau_section}'") break # Si aucun JSON n'a été trouvé avec la méthode par section, essayer d'autres approches if not json_text: # Patterns généraux pour le JSON general_patterns = [ r'```json\s*({.*?"chronologie_echanges".*?})\s*```', r'```\s*({.*?"chronologie_echanges".*?})\s*```', r'({[\s\n]*"chronologie_echanges"[\s\n]*:[\s\n]*\[.*?\][\s\n]*})' ] for pattern in general_patterns: json_match = re.search(pattern, texte_rapport, re.DOTALL) if json_match: json_text = json_match.group(1).strip() logger.info(f"JSON trouvé avec pattern général") break # Approche de dernier recours: recherche agressive de structure JSON if not json_text: # Identifier toutes les accolades ouvrantes dans le texte possible_starts = [m.start() for m in re.finditer(r'{\s*["\']chronologie_echanges["\']', texte_rapport)] for start_pos in possible_starts: # On a trouvé un début potentiel, maintenant chercher la fin open_braces = 1 end_pos = None for i in range(start_pos + 1, len(texte_rapport)): if texte_rapport[i] == '{': open_braces += 1 elif texte_rapport[i] == '}': open_braces -= 1 if open_braces == 0: end_pos = i break if end_pos: potential_json = texte_rapport[start_pos:end_pos+1] # Vérifier que le texte semble être du JSON valide if '"chronologie_echanges"' in potential_json and '[' in potential_json and ']' in potential_json: json_text = potential_json logger.info(f"JSON trouvé par analyse des accolades équilibrées") break # Si après toutes ces tentatives, aucun JSON n'est trouvé if not json_text: logger.warning("Aucun JSON trouvé dans le rapport - tentative de création à partir du fil de discussion") # Chercher une section "Fil de discussion" ou similaire fil_discussion_patterns = [ r'## Fil de discussion\s*([\s\S]*?)(?=##|$)', r'## Discussion\s*([\s\S]*?)(?=##|$)', r'## Chronologie des échanges\s*([\s\S]*?)(?=##|$)' ] fil_discussion_text = None for pattern in fil_discussion_patterns: match = re.search(pattern, texte_rapport, re.DOTALL) if match: fil_discussion_text = match.group(1).strip() break # Si on a trouvé une section de fil de discussion, essayer d'en extraire des échanges if fil_discussion_text: # Chercher des patterns comme "1. CLIENT (date): contenu" echanges_matches = re.findall(r'(\d+)\.\s+\*\*([^()]*)\*\*\s+\(([^()]*)\):\s*(.*?)(?=\d+\.\s+\*\*|$)', fil_discussion_text, re.DOTALL) # Si pas trouvé, essayer d'autres formats if not echanges_matches: echanges_matches = re.findall(r'(\d+)\.\s+([^()]*)\s+\(([^()]*)\):\s*(.*?)(?=\d+\.\s+|$)', fil_discussion_text, re.DOTALL) # Construire le JSON manuellement if echanges_matches: chronologie = [] for _, emetteur, date, contenu in echanges_matches: emetteur = emetteur.strip() type_msg = "Question" if emetteur.upper() == "CLIENT" else "Réponse" emetteur = "CLIENT" if emetteur.upper() == "CLIENT" else "SUPPORT" chronologie.append({ "date": date.strip(), "emetteur": emetteur, "type": type_msg, "contenu": contenu.strip() }) # Ajouter un élément "Complément visuel" s'il n'y en a pas déjà un if not any(e.get("type") == "Complément visuel" for e in chronologie): # Essayer de trouver une date pour le complément last_date = chronologie[-1]["date"] if chronologie else "01/01/2023" chronologie.append({ "date": last_date, "emetteur": "SUPPORT", "type": "Complément visuel", "contenu": "L'analyse des captures d'écran montre les options d'affichage et la configuration des laboratoires qui peuvent impacter l'affichage des utilisateurs." }) # Créer le JSON echanges_json = {"chronologie_echanges": chronologie} json_text = json.dumps(echanges_json, ensure_ascii=False, indent=2) logger.info(f"JSON créé manuellement à partir du fil de discussion ({len(chronologie)} échanges)") # Si toujours pas de JSON, créer un JSON vide pour éviter les erreurs if not json_text: default_json = { "chronologie_echanges": [ {"date": "01/01/2023", "emetteur": "CLIENT", "type": "Question", "contenu": "Requête initiale du client"}, {"date": "02/01/2023", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "Réponse du support technique"}, {"date": "03/01/2023", "emetteur": "SUPPORT", "type": "Complément visuel", "contenu": "Synthèse des captures d'écran analysées"} ] } json_text = json.dumps(default_json, ensure_ascii=False, indent=2) logger.warning("JSON créé par défaut car aucun échange n'a pu être extrait du rapport") # Nettoyage supplémentaire du JSON # Enlever caractères non imprimables json_text = re.sub(r'[\x00-\x1F\x7F]', '', json_text) try: # Vérifier que le texte commence par { et se termine par } if not (json_text.startswith('{') and json_text.endswith('}')): logger.warning(f"Format JSON incorrect, tentative de correction. Texte: {json_text[:50]}...") # Chercher les délimiteurs du JSON start = json_text.find('{') end = json_text.rfind('}') if start != -1 and end != -1 and start < end: json_text = json_text[start:end+1] logger.info("Correction du JSON réussie") try: echanges_json = json.loads(json_text) logger.info(f"JSON extrait avec succès: {len(json_text)} caractères") except json.JSONDecodeError as e: # Tentative de correction des erreurs courantes de JSON logger.warning(f"Erreur de décodage JSON: {e}. Tentative de correction...") # Corriger les guillemets mal échappés json_text = json_text.replace('\\"', '"') json_text = json_text.replace("'", '"') # Corriger les virgules trailing json_text = re.sub(r',\s*}', '}', json_text) json_text = re.sub(r',\s*]', ']', json_text) try: echanges_json = json.loads(json_text) logger.info("Correction du JSON réussie") except json.JSONDecodeError: # Si toujours pas valide, créer un JSON par défaut logger.error("Impossible de corriger le JSON. Création d'un JSON par défaut.") echanges_json = { "chronologie_echanges": [ {"date": "01/01/2023", "emetteur": "CLIENT", "type": "Question", "contenu": "Requête initiale du client"}, {"date": "02/01/2023", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "Réponse du support technique"}, {"date": "03/01/2023", "emetteur": "SUPPORT", "type": "Complément visuel", "contenu": "Synthèse des captures d'écran analysées"} ] } # Vérifier si le JSON a la structure attendue if not isinstance(echanges_json, dict) or "chronologie_echanges" not in echanges_json: # Tenter de corriger la structure si possible if len(echanges_json) > 0 and isinstance(list(echanges_json.values())[0], list): # Prendre la première liste comme chronologie key = list(echanges_json.keys())[0] echanges_json = {"chronologie_echanges": echanges_json[key]} logger.info(f"Structure JSON corrigée en utilisant la clé: {key}") else: logger.warning("Structure JSON incorrecte et non réparable. Création d'un JSON par défaut.") echanges_json = { "chronologie_echanges": [ {"date": "01/01/2023", "emetteur": "CLIENT", "type": "Question", "contenu": "Requête initiale du client"}, {"date": "02/01/2023", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "Réponse du support technique"}, {"date": "03/01/2023", "emetteur": "SUPPORT", "type": "Complément visuel", "contenu": "Synthèse des captures d'écran analysées"} ] } # S'assurer que toutes les entrées ont les champs obligatoires if "chronologie_echanges" in echanges_json and isinstance(echanges_json["chronologie_echanges"], list): for i, echange in enumerate(echanges_json["chronologie_echanges"]): # Ajouter des valeurs par défaut pour les champs manquants if "date" not in echange: echange["date"] = "01/01/2023" if "emetteur" not in echange: echange["emetteur"] = "CLIENT" if i % 2 == 0 else "SUPPORT" if "type" not in echange: echange["type"] = "Question" if echange["emetteur"] == "CLIENT" else "Réponse" if "contenu" not in echange: echange["contenu"] = "Contenu non spécifié" # Standardiser les formats echange["emetteur"] = echange["emetteur"].upper() if echange["emetteur"] not in ["CLIENT", "SUPPORT"]: echange["emetteur"] = "CLIENT" if "client" in echange["emetteur"].lower() else "SUPPORT" # Convertir en tableau Markdown echanges_markdown = "| Date | Émetteur | Type | Contenu | Statut |\n" echanges_markdown += "|------|---------|------|---------|--------|\n" if "chronologie_echanges" in echanges_json and isinstance(echanges_json["chronologie_echanges"], list): # Pré-traitement pour vérifier les questions sans réponse questions_sans_reponse = {} for i, echange in enumerate(echanges_json["chronologie_echanges"]): if echange.get("type", "").lower() == "question" and echange.get("emetteur", "").lower() == "client": has_response = False # Vérifier si la question a une réponse for j in range(i+1, len(echanges_json["chronologie_echanges"])): next_echange = echanges_json["chronologie_echanges"][j] if next_echange.get("type", "").lower() in ["réponse", "reponse"] and next_echange.get("emetteur", "").lower() == "support": has_response = True break questions_sans_reponse[i] = not has_response # Générer le tableau for i, echange in enumerate(echanges_json["chronologie_echanges"]): date = echange.get("date", "-") emetteur = echange.get("emetteur", "-") type_msg = echange.get("type", "-") contenu = echange.get("contenu", "-") # Ajouter un statut pour les questions sans réponse statut = "" if emetteur.lower() == "client" and type_msg.lower() == "question" and questions_sans_reponse.get(i, False): statut = "**Sans réponse**" echanges_markdown += f"| {date} | {emetteur} | {type_msg} | {contenu} | {statut} |\n" # Ajouter une note si aucune réponse du support n'a été trouvée if not any(echange.get("emetteur", "").lower() == "support" for echange in echanges_json["chronologie_echanges"]): echanges_markdown += "\n**Note: Aucune réponse du support n'a été trouvée dans ce ticket.**\n\n" # Ajouter un tableau questions/réponses simplifié tableau_qr = generer_tableau_questions_reponses(echanges_json["chronologie_echanges"]) echanges_markdown += f"\n{tableau_qr}\n" # S'il y a une section "Tableau des échanges", remplacer tout ce qui suit jusqu'à la prochaine section if tableau_section: tableau_index = texte_rapport.find(tableau_section) if tableau_index != -1: next_section_match = re.search(r'##\s+', texte_rapport[tableau_index + len(tableau_section):]) if next_section_match: next_section_index = tableau_index + len(tableau_section) + next_section_match.start() rapport_traite = texte_rapport[:tableau_index + len(tableau_section)] + "\n\n" + echanges_markdown + "\n\n" + texte_rapport[next_section_index:] else: rapport_traite = texte_rapport[:tableau_index + len(tableau_section)] + "\n\n" + echanges_markdown + "\n\n" else: rapport_traite = texte_rapport # Aucun changement si on ne trouve pas la section else: # Chercher où insérer le tableau s'il n'y a pas de section spécifique diagnostic_match = re.search(r'##\s+Diagnostic', texte_rapport) if diagnostic_match: insert_index = diagnostic_match.start() rapport_traite = texte_rapport[:insert_index] + "\n## Tableau des échanges\n\n" + echanges_markdown + "\n\n" + texte_rapport[insert_index:] else: rapport_traite = texte_rapport + "\n\n## Tableau des échanges\n\n" + echanges_markdown + "\n\n" return rapport_traite, echanges_json, echanges_markdown except Exception as e: logger.error(f"Erreur inattendue lors du traitement JSON: {e}") logger.debug(f"Contenu JSON problématique: {json_text[:100]}...") # Créer un JSON par défaut echanges_json = { "chronologie_echanges": [ {"date": "01/01/2023", "emetteur": "CLIENT", "type": "Question", "contenu": "Requête initiale du client"}, {"date": "02/01/2023", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "Réponse du support technique"}, {"date": "03/01/2023", "emetteur": "SUPPORT", "type": "Complément visuel", "contenu": "Synthèse des captures d'écran analysées"} ] } return texte_rapport, echanges_json, None