#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script d'extraction des questions et réponses d'un ticket. Génère un tableau Markdown avec les questions et réponses identifiées. """ import os import sys import json import argparse import logging import re from typing import Dict, List, Any, Optional # Configuration du logger logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("extract_qr.log"), logging.StreamHandler() ] ) logger = logging.getLogger("extract_qr") try: from llm import Mistral except ImportError: logger.error("Module LLM non trouvé. Veuillez vous assurer que le répertoire parent est dans PYTHONPATH.") sys.exit(1) class QuestionReponseExtractor: """ Agent d'extraction des questions et réponses d'un ticket. """ def __init__(self, api_key: Optional[str] = None): """ Initialise l'agent d'extraction de questions-réponses. Args: api_key: Clé API pour le LLM """ self.llm = Mistral(api_key=api_key) # Configurer le LLM try: self.llm.model = "mistral-medium" self.llm.temperature = 0.3 self.llm.max_tokens = 2000 except Exception as e: logger.warning(f"Impossible de configurer le modèle: {e}") # Définir le prompt système par défaut self.system_prompt = """ Tu es un expert en analyse de conversations de support technique. Votre mission est d'identifier avec précision: 1. Le rôle de chaque intervenant (client ou support technique) 2. La nature de chaque message (question, réponse, information additionnelle) 3. Le contenu essentiel de chaque message en éliminant les formules de politesse, signatures, mentions légales et autres éléments non pertinents Pour l'identification client/support: - Support: Signatures avec noms d'entreprise fournissant le logiciel, domaines email comme @cbao.fr, @odoo.com, mentions "support technique", etc. - Client: Utilisateurs finaux qui signalent des problèmes ou posent des questions Pour la classification en question/réponse: - Questions: Demandes explicites (avec "?"), demandes implicites de résolution de problèmes, descriptions de bugs ou dysfonctionnements - Réponses: Explications techniques, solutions proposées, instructions fournies par le support Concentre-toi uniquement sur le contenu technique utile en ignorant tous les éléments superflus qui n'apportent pas d'information sur le problème ou sa solution. """ self.historique = [] def ajouter_historique(self, action: str, entree: str, resultat: str) -> None: """ Ajoute une entrée à l'historique des actions. Args: action: Type d'action effectuée entree: Entrée de l'action resultat: Résultat de l'action """ self.historique.append({ "action": action, "entree": entree, "resultat": resultat }) def nettoyer_contenu(self, texte: str) -> str: """ Nettoie le contenu en supprimant signatures, mentions légales, etc. Args: texte: Texte brut à nettoyer Returns: Texte nettoyé des éléments non pertinents """ # Si l'entrée n'est pas une chaîne, convertir en chaîne ou retourner vide if not isinstance(texte, str): if texte is None: return "" try: texte = str(texte) except: return "" # Détecter et supprimer les balises HTML avec regex robuste try: # Première passe - balises standard texte_nettoye = re.sub(r']*>', ' ', texte, flags=re.IGNORECASE) # Deuxième passe - balises restantes, y compris les mal formées texte_nettoye = re.sub(r'<[^>]*>', ' ', texte_nettoye) # Troisième passe pour les balises qui pourraient avoir échappé texte_nettoye = re.sub(r'<[^>]*$', ' ', texte_nettoye) # Balises incomplètes à la fin except Exception as e: logger.warning(f"Erreur lors du nettoyage HTML: {e}") texte_nettoye = texte # Remplacer les références aux images texte_nettoye = re.sub(r'\[Image:[^\]]+\]', '[Image]', texte_nettoye) texte_nettoye = re.sub(r']+>', '[Image]', texte_nettoye, flags=re.IGNORECASE) # Supprimer les éléments courants non pertinents patterns_a_supprimer = [ r'Cordialement,[\s\S]*?$', r'Bien cordialement,[\s\S]*?$', r'Bonne réception[\s\S]*?$', r'À votre disposition[\s\S]*?$', r'Support technique[\s\S]*?$', r'L\'objectif du Support Technique[\s\S]*?$', r'Notre service est ouvert[\s\S]*?$', r'Dès réception[\s\S]*?$', r'Confidentialité[\s\S]*?$', r'Ce message électronique[\s\S]*?$', r'Droit à la déconnexion[\s\S]*?$', r'Afin d\'assurer une meilleure traçabilité[\s\S]*?$', r'tél\s*:\s*[\d\s\+]+', r'mobile\s*:\s*[\d\s\+]+', r'www\.[^\s]+\.[a-z]{2,3}', r'\*{10,}.*?\*{10,}', # Lignes de séparation avec astérisques r'----.*?----', # Lignes de séparation avec tirets ] for pattern in patterns_a_supprimer: texte_nettoye = re.sub(pattern, '', texte_nettoye, flags=re.IGNORECASE) # Supprimer les lignes multiples vides et espaces multiples texte_nettoye = re.sub(r'\n\s*\n', '\n', texte_nettoye) texte_nettoye = re.sub(r'\s+', ' ', texte_nettoye) # Convertir les entités HTML html_entities = { ' ': ' ', '<': '<', '>': '>', '&': '&', '"': '"', ''': "'", '€': '€', '©': '©', '®': '®', 'é': 'é', 'è': 'è', 'à': 'à', 'ç': 'ç', 'ê': 'ê', 'â': 'â', 'î': 'î', 'ô': 'ô', 'û': 'û' } for entity, char in html_entities.items(): texte_nettoye = texte_nettoye.replace(entity, char) return texte_nettoye.strip() def detecter_role(self, message: Dict[str, Any]) -> str: """ Détecte si un message provient du client ou du support. Args: message: Dictionnaire contenant les informations du message Returns: "Client" ou "Support" """ # Vérifier le champ 'role' s'il existe déjà if "role" in message and message["role"] in ["Client", "Support"]: return message["role"] # Indices de support dans l'email domaines_support = ["@cbao.fr", "@odoo.com", "support@", "ticket.support"] indices_nom_support = ["support", "cbao", "technique", "odoo"] email = message.get("email_from", "").lower() # Nettoyer le format "Nom " if "<" in email and ">" in email: match = re.search(r'<([^>]+)>', email) if match: email = match.group(1).lower() # Vérifier le domaine email if any(domaine in email for domaine in domaines_support): return "Support" # Vérifier le nom d'auteur auteur = "" if "author_id" in message and isinstance(message["author_id"], list) and len(message["author_id"]) > 1: auteur = str(message["author_id"][1]).lower() elif "auteur" in message: auteur = str(message["auteur"]).lower() if any(indice in auteur for indice in indices_nom_support): return "Support" # Par défaut, considérer comme client return "Client" def extraire_questions_reponses(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]: """ Extrait les questions et réponses d'une liste de messages. Args: messages: Liste des messages du ticket Returns: Dictionnaire avec les questions et réponses extraites """ if not messages: logger.warning("Aucun message à analyser") return { "success": False, "erreur": "Aucun message à analyser", "paires_qr": [] } logger.info(f"Extraction des questions et réponses de {len(messages)} messages") # Préparation des messages messages_prepares = [] for msg in messages: # Nettoyer le contenu contenu = msg.get("body", "") or msg.get("contenu", "") contenu_nettoye = self.nettoyer_contenu(contenu) # Détecter le rôle role = self.detecter_role(msg) # Ajouter le message préparé si non vide après nettoyage if contenu_nettoye.strip(): messages_prepares.append({ "id": msg.get("id", "") or msg.get("ID", ""), "date": msg.get("date", ""), "role": role, "body": contenu_nettoye }) # S'il n'y a pas assez de messages pour une conversation if len(messages_prepares) < 2: logger.warning("Pas assez de messages pour une conversation") return { "success": True, "paires_qr": [], "nb_questions": 0, "nb_reponses": 0 } # Trier par date si disponible messages_prepares.sort(key=lambda x: x.get("date", "")) # Préparer l'entrée pour le LLM messages_for_llm = [] for i, msg in enumerate(messages_prepares): messages_for_llm.append({ "numero": i + 1, "role": msg.get("role", "Inconnu"), "date": msg.get("date", ""), "contenu": msg.get("body", "") }) # Préparer le prompt pour extraire les paires Q/R prompt = """ Analyse la conversation suivante et identifie toutes les paires de questions et réponses. Pour chaque message: 1. Identifie s'il s'agit d'une question, d'une réponse ou d'une information. 2. Extrais le contenu essentiel en ignorant les formules de politesse et les signatures. Ensuite, forme des paires de questions-réponses en associant chaque question avec sa réponse correspondante. Réponds en utilisant la structure suivante: ``` MESSAGE 1: - Rôle: [Client/Support] - Type: [Question/Réponse/Information] - Contenu essentiel: [Contenu essentiel du message] MESSAGE 2: ... PAIRE 1: - Question (Client): [Question posée] - Réponse (Support): [Réponse donnée] PAIRE 2: ... ``` Si une question n'a pas de réponse, indique-le. """ try: # Appeler le LLM pour l'analyse from json import dumps resultat = self.llm.chat_completion([ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": prompt + "\n\nConversation:\n" + dumps(messages_for_llm, indent=2)} ]) contenu = resultat.get("choices", [{}])[0].get("message", {}).get("content", "") self.ajouter_historique("analyze_messages", f"{len(messages)} messages", "Analyse effectuée") # Traiter la réponse pour extraire les messages analysés messages_analyses = [] pattern_messages = r"MESSAGE (\d+):\s*- Rôle: (Client|Support)\s*- Type: (Question|Réponse|Information)\s*- Contenu essentiel: (.*?)(?=MESSAGE \d+:|PAIRE \d+:|$)" for match in re.finditer(pattern_messages, contenu, re.DOTALL): num = int(match.group(1)) role = match.group(2) type_msg = match.group(3) contenu_essentiel = match.group(4).strip() # Trouver le message correspondant msg_idx = num - 1 msg_id = "" msg_date = "" if 0 <= msg_idx < len(messages_for_llm): original_idx = messages_for_llm[msg_idx]["numero"] - 1 if 0 <= original_idx < len(messages_prepares): msg_id = messages_prepares[original_idx].get("id", "") msg_date = messages_prepares[original_idx].get("date", "") messages_analyses.append({ "id": msg_id, "date": msg_date, "role": role, "type": type_msg, "contenu": contenu_essentiel }) # Extraire les paires QR paires_qr = [] pattern_paires = r"PAIRE (\d+):\s*- Question \((Client|Support)\): (.*?)(?:\s*- Réponse \((Client|Support)\): (.*?))?(?=PAIRE \d+:|$)" for match in re.finditer(pattern_paires, contenu, re.DOTALL): num = match.group(1) q_role = match.group(2) question = match.group(3).strip() r_role = match.group(4) if match.group(4) else "" reponse = match.group(5).strip() if match.group(5) else "" paires_qr.append({ "numero": num, "question": { "role": q_role, "contenu": question }, "reponse": { "role": r_role, "contenu": reponse } if reponse else None }) return { "success": True, "messages_analyses": messages_analyses, "paires_qr": paires_qr, "nb_questions": len(paires_qr), "nb_reponses": sum(1 for p in paires_qr if p.get("reponse")) } except Exception as e: erreur = f"Erreur lors de l'extraction des questions et réponses: {str(e)}" logger.error(erreur) return { "success": False, "erreur": erreur, "paires_qr": [] } def generer_tableau_markdown(self, paires_qr: List[Dict[str, Any]]) -> str: """ Génère un tableau Markdown avec les questions et réponses. Args: paires_qr: Liste de paires question/réponse Returns: Tableau Markdown formaté """ # Créer le tableau markdown = ["# Analyse des Questions et Réponses\n"] markdown.append("| Question | Réponse |") markdown.append("|---------|---------|") if not paires_qr: # Si aucune paire n'a été trouvée, laisser le tableau vide pass else: for paire in paires_qr: question = paire.get("question", {}) reponse = paire.get("reponse", {}) q_role = question.get("role", "Client") q_contenu = question.get("contenu", "") if reponse: r_role = reponse.get("role", "Support") r_contenu = reponse.get("contenu", "") markdown.append(f"| **{q_role}**: {q_contenu} | **{r_role}**: {r_contenu} |") else: markdown.append(f"| **{q_role}**: {q_contenu} | *Pas de réponse* |") # Ajouter les informations sur les paramètres LLM utilisés markdown.append("\n## Paramètres LLM utilisés\n") markdown.append(f"- **Type de LLM**: Mistral") markdown.append(f"- **Modèle**: {getattr(self.llm, 'model', 'mistral-medium')}") markdown.append(f"- **Température**: {getattr(self.llm, 'temperature', 0.3)}") markdown.append(f"- **Tokens max**: {getattr(self.llm, 'max_tokens', 2000)}") return "\n".join(markdown) def charger_config(): """ Charge la configuration depuis config.json. Returns: Configuration chargée """ config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.json") if not os.path.exists(config_path): logger.warning(f"Fichier de configuration non trouvé: {config_path}") return {"llm": {"api_key": None}} try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config except Exception as e: logger.error(f"Erreur lors du chargement de la configuration: {str(e)}") return {"llm": {"api_key": None}} def main(): """ Point d'entrée du script. """ parser = argparse.ArgumentParser(description="Extrait les questions et réponses d'un ticket de support.") parser.add_argument("--messages", "-m", required=True, help="Chemin vers le fichier messages.json") parser.add_argument("--output", "-o", help="Chemin du fichier de sortie pour le tableau Markdown (par défaut: /questions_reponses.md)") parser.add_argument("--verbose", "-v", action="store_true", help="Afficher plus d'informations") args = parser.parse_args() # Configurer le niveau de log if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Vérifier que le fichier messages existe if not os.path.exists(args.messages): logger.error(f"Fichier de messages non trouvé: {args.messages}") sys.exit(1) # Charger les messages try: with open(args.messages, 'r', encoding='utf-8') as f: messages = json.load(f) if not isinstance(messages, list): logger.error(f"Format de fichier messages.json invalide. Une liste est attendue.") sys.exit(1) except Exception as e: logger.error(f"Erreur lors du chargement des messages: {str(e)}") sys.exit(1) # Déterminer le chemin de sortie output_path = args.output if not output_path: # Par défaut, utiliser le même répertoire que le fichier messages output_dir = os.path.dirname(args.messages) if not output_dir: output_dir = "." output_path = os.path.join(output_dir, "questions_reponses.md") # Charger la configuration config = charger_config() api_key = config.get("llm", {}).get("api_key") # Initialiser l'extracteur de questions-réponses extractor = QuestionReponseExtractor(api_key=api_key) try: # Extraire les questions et réponses resultats = extractor.extraire_questions_reponses(messages) if not resultats.get("success", False): logger.error(f"Échec de l'extraction: {resultats.get('erreur', 'Erreur inconnue')}") sys.exit(1) # Générer le tableau Markdown tableau_md = extractor.generer_tableau_markdown(resultats.get("paires_qr", [])) # Sauvegarder le tableau with open(output_path, 'w', encoding='utf-8') as f: f.write(tableau_md) logger.info(f"Tableau Markdown sauvegardé: {output_path}") # Afficher un résumé print("\nRésumé de l'extraction:") print(f"Messages analysés: {len(messages)}") print(f"Questions extraites: {resultats.get('nb_questions', 0)}") print(f"Réponses extraites: {resultats.get('nb_reponses', 0)}") print(f"Tableau Markdown sauvegardé: {output_path}") except Exception as e: logger.error(f"Erreur lors de l'extraction: {str(e)}") import traceback logger.debug(f"Détails: {traceback.format_exc()}") sys.exit(1) if __name__ == "__main__": main()