""" Utilitaires pour la génération de rapports d'analyse, extraits de AgentReportGenerator. """ import re import json import logging from typing import Dict, List, Any, Tuple, Optional from datetime import datetime logger = logging.getLogger("report_utils") def extraire_question_initiale(echanges: List[Dict]) -> Optional[Dict]: """ Extrait la question initiale à partir du premier message de support qui cite souvent la demande du client. Args: echanges: Liste des échanges client/support Returns: Dict contenant la question initiale ou None si aucune question n'est trouvée """ if not echanges: return None # Chercher le premier message (qui est souvent une réponse du support) premier_message = echanges[0] # Si c'est déjà une question du client, pas besoin d'extraire if premier_message.get("emetteur", "").upper() == "CLIENT" and premier_message.get("type", "").upper() == "QUESTION": return None # Si c'est une réponse du support, chercher une citation de la demande cliente if premier_message.get("emetteur", "").upper() == "SUPPORT": contenu = premier_message.get("contenu", "") # Patterns communs de citation d'une demande initiale patterns = [ r"(?:concernant|au sujet de)(?:\s*:|\s*\n)(.+?)(?:\n\n|Je viens)", r"(?:demande|ticket)(?:\s*:|\s*\n)(.+?)(?:\n\n|Je viens)", r"suite à ta demande\s*:(.+?)(?:\n\n|Je viens)", r"(?:Bonjour|Salut).*?\n(.+?)(?:\n\n|Je viens)", ] for pattern in patterns: match = re.search(pattern, contenu, re.DOTALL) if match: question_text = match.group(1).strip() # Créer un nouvel échange pour la question initiale # Utiliser la date du premier message ou une estimation antérieure date_support = premier_message.get("date", "") if date_support: # Si on a une date, estimer une date légèrement antérieure if ":" in date_support: # Format avec heure parts = date_support.split(" ") if len(parts) >= 2 and ":" in parts[1]: time_parts = parts[1].split(":") if len(time_parts) >= 2: # Réduire l'heure de quelques minutes hour = int(time_parts[0]) minute = int(time_parts[1]) minute = max(0, minute - 30) # 30 minutes avant if minute < 0: hour = max(0, hour - 1) minute += 60 date_question = f"{parts[0]} {hour:02d}:{minute:02d}" else: date_question = date_support else: date_question = date_support else: date_question = date_support else: date_question = "Date inconnue" return { "date": date_question, "emetteur": "CLIENT", "type": "Question", "contenu": question_text } return None def get_timestamp() -> str: """ Retourne un timestamp au format YYYYMMDD_HHMMSS pour identifier les fichiers et données. Returns: Chaîne formatée avec le timestamp actuel """ return datetime.now().strftime("%Y%m%d_%H%M%S") def generer_tableau_questions_reponses(echanges: List[Dict]) -> str: """ Génère un tableau question/réponse simplifié à partir des échanges Args: echanges: Liste des échanges client/support Returns: Tableau au format markdown """ if not echanges: return "Aucun échange trouvé dans ce ticket." # Initialiser le tableau tableau = "\n## Tableau récapitulatif des échanges\n\n" tableau += "| Question (Client) | Réponse (Support) |\n" tableau += "|------------------|-------------------|\n" # Variables pour suivre les questions et réponses question_courante = None questions_sans_reponse = [] # Parcourir tous les échanges pour identifier les questions et réponses for echange in echanges: emetteur = echange.get("emetteur", "").lower() type_msg = echange.get("type", "").lower() contenu = echange.get("contenu", "") date = echange.get("date", "") # Formater le contenu (synthétiser si trop long) contenu_formate = synthétiser_contenu(contenu, 150) # Si c'est une question du client if emetteur == "client" and (type_msg == "question" or "?" in contenu): # Si une question précédente n'a pas de réponse, l'ajouter à la liste if question_courante: questions_sans_reponse.append(question_courante) # Enregistrer la nouvelle question courante question_courante = f"{contenu_formate} _(date: {date})_" # Si c'est une réponse du support et qu'il y a une question en attente elif emetteur == "support" and question_courante: # Ajouter la paire question/réponse au tableau tableau += f"| {question_courante} | {contenu_formate} _(date: {date})_ |\n" question_courante = None # Réinitialiser la question courante # Traiter toute question restante sans réponse if question_courante: questions_sans_reponse.append(question_courante) # Ajouter les questions sans réponse au tableau for q in questions_sans_reponse: tableau += f"| {q} | **Aucune réponse du support** |\n" # Ajouter une note si aucun échange support n'a été trouvé if not any(echange.get("emetteur", "").lower() == "support" for echange in echanges): tableau += "\n**Note: Aucune réponse du support n'a été trouvée dans ce ticket.**\n" return tableau def synthétiser_contenu(contenu: str, longueur_max: int) -> str: """ Synthétise le contenu s'il est trop long Args: contenu: Contenu à synthétiser longueur_max: Longueur maximale souhaitée Returns: Contenu synthétisé """ if len(contenu) <= longueur_max: return contenu # Extraire les premiers caractères debut = contenu[:longueur_max//2].strip() # Extraire les derniers caractères fin = contenu[-(longueur_max//2):].strip() return f"{debut}... {fin}" def extraire_et_traiter_json(texte_rapport: str) -> Tuple[str, Optional[Dict], Optional[str]]: """ Extrait l'objet JSON des échanges du texte du rapport et le convertit en Markdown Args: texte_rapport: Texte complet du rapport généré par le LLM Returns: Tuple (rapport_traité, echanges_json, echanges_markdown) """ # Remplacer CBAD par CBAO dans tout le rapport texte_rapport = texte_rapport.replace("CBAD", "CBAO") # Patterns de recherche plus variés pour s'adapter aux différents modèles patterns = [ r'```json\s*({.*?})\s*```', # Pattern standard avec backticks triples r'```\s*({.*?"chronologie_echanges".*?})\s*```', # Pattern sans spécifier json mais avec le contenu attendu r'{[\s\n]*"chronologie_echanges"[\s\n]*:[\s\n]*\[.*?\][\s\n]*}', # Pattern sans backticks r'(.*?)' # Pattern alternatif avec balises xml ] # Essayer chaque pattern json_text = None json_match = None for pattern in patterns: json_match = re.search(pattern, texte_rapport, re.DOTALL) if json_match: json_text = json_match.group(1).strip() logger.info(f"JSON trouvé avec le pattern: {pattern[:20]}...") break # Si aucun pattern n'a fonctionné, tenter une approche plus agressive pour extraire le JSON if not json_text: # Chercher des indices de début de JSON dans le texte potential_starts = [ texte_rapport.find('{"chronologie_echanges"'), texte_rapport.find('{\n "chronologie_echanges"'), texte_rapport.find('{ "chronologie_echanges"') ] # Filtrer les indices valides (non -1) valid_starts = [idx for idx in potential_starts if idx != -1] if valid_starts: # Prendre l'indice le plus petit (premier dans le texte) start_idx = min(valid_starts) # Chercher la fin du JSON (accolade fermante suivie d'une nouvelle ligne ou de la fin du texte) json_extract = texte_rapport[start_idx:] # Compter les accolades pour trouver la fermeture du JSON open_braces = 0 close_idx = -1 for i, char in enumerate(json_extract): if char == '{': open_braces += 1 elif char == '}': open_braces -= 1 if open_braces == 0: close_idx = i break if close_idx != -1: json_text = json_extract[:close_idx + 1] logger.info(f"JSON extrait par analyse d'accolades: {len(json_text)} caractères") if not json_text: logger.warning("Aucun JSON trouvé dans le rapport") return texte_rapport, None, None # Nettoyage supplémentaire du JSON # Enlever caractères non imprimables ou indésirables qui pourraient être ajoutés par certains modèles json_text = re.sub(r'[\x00-\x1F\x7F]', '', json_text) try: # Vérifier que le texte commence par { et se termine par } if not (json_text.startswith('{') and json_text.endswith('}')): logger.warning(f"Format JSON incorrect, tentative de correction. Texte: {json_text[:50]}...") # Chercher les délimiteurs du JSON start = json_text.find('{') end = json_text.rfind('}') if start != -1 and end != -1 and start < end: json_text = json_text[start:end+1] echanges_json = json.loads(json_text) logger.info(f"JSON extrait avec succès: {len(json_text)} caractères") # Vérifier si le JSON a la structure attendue if not isinstance(echanges_json, dict) or "chronologie_echanges" not in echanges_json: # Tenter de corriger la structure si possible if len(echanges_json) > 0 and isinstance(list(echanges_json.values())[0], list): # Prendre la première liste comme chronologie key = list(echanges_json.keys())[0] echanges_json = {"chronologie_echanges": echanges_json[key]} logger.info(f"Structure JSON corrigée en utilisant la clé: {key}") else: logger.warning("Structure JSON incorrecte et non réparable") return texte_rapport, None, None # Vérifier s'il faut ajouter une question initiale if "chronologie_echanges" in echanges_json and len(echanges_json["chronologie_echanges"]) > 0: question_initiale = extraire_question_initiale(echanges_json["chronologie_echanges"]) if question_initiale: # Insérer la question initiale au début de la chronologie echanges_json["chronologie_echanges"].insert(0, question_initiale) logger.info("Question initiale extraite et ajoutée à la chronologie") # Convertir en tableau Markdown echanges_markdown = "| Date | Émetteur | Type | Contenu | Statut |\n" echanges_markdown += "|------|---------|------|---------|--------|\n" if "chronologie_echanges" in echanges_json and isinstance(echanges_json["chronologie_echanges"], list): # Pré-traitement pour vérifier les questions sans réponse questions_sans_reponse = {} for i, echange in enumerate(echanges_json["chronologie_echanges"]): if echange.get("type", "").lower() == "question" and echange.get("emetteur", "").lower() == "client": has_response = False # Vérifier si la question a une réponse for j in range(i+1, len(echanges_json["chronologie_echanges"])): next_echange = echanges_json["chronologie_echanges"][j] if next_echange.get("type", "").lower() == "réponse" and next_echange.get("emetteur", "").lower() == "support": has_response = True break questions_sans_reponse[i] = not has_response # Générer le tableau for i, echange in enumerate(echanges_json["chronologie_echanges"]): date = echange.get("date", "-") emetteur = echange.get("emetteur", "-") type_msg = echange.get("type", "-") contenu = echange.get("contenu", "-") # Ajouter un statut pour les questions sans réponse statut = "" if emetteur.lower() == "client" and type_msg.lower() == "question" and questions_sans_reponse.get(i, False): statut = "**Sans réponse**" echanges_markdown += f"| {date} | {emetteur} | {type_msg} | {contenu} | {statut} |\n" # Ajouter une note si aucune réponse du support n'a été trouvée if not any(echange.get("emetteur", "").lower() == "support" for echange in echanges_json["chronologie_echanges"]): echanges_markdown += "\n**Note: Aucune réponse du support n'a été trouvée dans ce ticket.**\n\n" # Ajouter un tableau questions/réponses simplifié tableau_qr = generer_tableau_questions_reponses(echanges_json["chronologie_echanges"]) echanges_markdown += f"\n{tableau_qr}\n" # Remplacer le JSON dans le texte par le tableau Markdown # Si le JSON était entouré de backticks, remplacer tout le bloc if json_match: rapport_traite = texte_rapport.replace(json_match.group(0), echanges_markdown) else: # Sinon, remplacer juste le texte JSON rapport_traite = texte_rapport.replace(json_text, echanges_markdown) return rapport_traite, echanges_json, echanges_markdown except json.JSONDecodeError as e: logger.error(f"Erreur lors du décodage JSON: {e}") logger.debug(f"Contenu JSON problématique: {json_text[:100]}...") return texte_rapport, None, None