#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script d'extraction des questions et réponses d'un ticket.
Génère un tableau Markdown avec les questions et réponses identifiées.
"""
import os
import sys
import json
import argparse
import logging
import re
from typing import Dict, List, Any, Optional
# Configuration du logger
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("extract_qr.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("extract_qr")
try:
from llm import Mistral
except ImportError:
logger.error("Module LLM non trouvé. Veuillez vous assurer que le répertoire parent est dans PYTHONPATH.")
sys.exit(1)
class QuestionReponseExtractor:
"""
Agent d'extraction des questions et réponses d'un ticket.
"""
def __init__(self, api_key: Optional[str] = None):
"""
Initialise l'agent d'extraction de questions-réponses.
Args:
api_key: Clé API pour le LLM
"""
self.llm = Mistral(api_key=api_key)
# Configurer le LLM
try:
self.llm.model = "mistral-medium"
self.llm.temperature = 0.3
self.llm.max_tokens = 2000
except Exception as e:
logger.warning(f"Impossible de configurer le modèle: {e}")
# Définir le prompt système par défaut
self.system_prompt = """
Tu es un expert en analyse de conversations de support technique.
Votre mission est d'identifier avec précision:
1. Le rôle de chaque intervenant (client ou support technique)
2. La nature de chaque message (question, réponse, information additionnelle)
3. Le contenu essentiel de chaque message en éliminant les formules de politesse,
signatures, mentions légales et autres éléments non pertinents
Pour l'identification client/support:
- Support: Signatures avec noms d'entreprise fournissant le logiciel, domaines email
comme @cbao.fr, @odoo.com, mentions "support technique", etc.
- Client: Utilisateurs finaux qui signalent des problèmes ou posent des questions
Pour la classification en question/réponse:
- Questions: Demandes explicites (avec "?"), demandes implicites de résolution
de problèmes, descriptions de bugs ou dysfonctionnements
- Réponses: Explications techniques, solutions proposées, instructions fournies
par le support
Concentre-toi uniquement sur le contenu technique utile en ignorant tous les
éléments superflus qui n'apportent pas d'information sur le problème ou sa solution.
"""
self.historique = []
def ajouter_historique(self, action: str, entree: str, resultat: str) -> None:
"""
Ajoute une entrée à l'historique des actions.
Args:
action: Type d'action effectuée
entree: Entrée de l'action
resultat: Résultat de l'action
"""
self.historique.append({
"action": action,
"entree": entree,
"resultat": resultat
})
def nettoyer_contenu(self, texte: str) -> str:
"""
Nettoie le contenu en supprimant signatures, mentions légales, etc.
Args:
texte: Texte brut à nettoyer
Returns:
Texte nettoyé des éléments non pertinents
"""
# Si l'entrée n'est pas une chaîne, convertir en chaîne ou retourner vide
if not isinstance(texte, str):
if texte is None:
return ""
try:
texte = str(texte)
except:
return ""
# Détecter et supprimer les balises HTML avec regex robuste
try:
# Première passe - balises standard
texte_nettoye = re.sub(r'?[a-z]+[^>]*>', ' ', texte, flags=re.IGNORECASE)
# Deuxième passe - balises restantes, y compris les mal formées
texte_nettoye = re.sub(r'<[^>]*>', ' ', texte_nettoye)
# Troisième passe pour les balises qui pourraient avoir échappé
texte_nettoye = re.sub(r'<[^>]*$', ' ', texte_nettoye) # Balises incomplètes à la fin
except Exception as e:
logger.warning(f"Erreur lors du nettoyage HTML: {e}")
texte_nettoye = texte
# Remplacer les références aux images
texte_nettoye = re.sub(r'\[Image:[^\]]+\]', '[Image]', texte_nettoye)
texte_nettoye = re.sub(r'
]+>', '[Image]', texte_nettoye, flags=re.IGNORECASE)
# Supprimer les éléments courants non pertinents
patterns_a_supprimer = [
r'Cordialement,[\s\S]*?$',
r'Bien cordialement,[\s\S]*?$',
r'Bonne réception[\s\S]*?$',
r'À votre disposition[\s\S]*?$',
r'Support technique[\s\S]*?$',
r'L\'objectif du Support Technique[\s\S]*?$',
r'Notre service est ouvert[\s\S]*?$',
r'Dès réception[\s\S]*?$',
r'Confidentialité[\s\S]*?$',
r'Ce message électronique[\s\S]*?$',
r'Droit à la déconnexion[\s\S]*?$',
r'Afin d\'assurer une meilleure traçabilité[\s\S]*?$',
r'tél\s*:\s*[\d\s\+]+',
r'mobile\s*:\s*[\d\s\+]+',
r'www\.[^\s]+\.[a-z]{2,3}',
r'\*{10,}.*?\*{10,}', # Lignes de séparation avec astérisques
r'----.*?----', # Lignes de séparation avec tirets
]
for pattern in patterns_a_supprimer:
texte_nettoye = re.sub(pattern, '', texte_nettoye, flags=re.IGNORECASE)
# Supprimer les lignes multiples vides et espaces multiples
texte_nettoye = re.sub(r'\n\s*\n', '\n', texte_nettoye)
texte_nettoye = re.sub(r'\s+', ' ', texte_nettoye)
# Convertir les entités HTML
html_entities = {
' ': ' ', '<': '<', '>': '>', '&': '&',
'"': '"', ''': "'", '€': '€', '©': '©',
'®': '®', 'é': 'é', 'è': 'è', 'à': 'à',
'ç': 'ç', 'ê': 'ê', 'â': 'â', 'î': 'î',
'ô': 'ô', 'û': 'û'
}
for entity, char in html_entities.items():
texte_nettoye = texte_nettoye.replace(entity, char)
return texte_nettoye.strip()
def detecter_role(self, message: Dict[str, Any]) -> str:
"""
Détecte si un message provient du client ou du support.
Args:
message: Dictionnaire contenant les informations du message
Returns:
"Client" ou "Support"
"""
# Vérifier le champ 'role' s'il existe déjà
if "role" in message and message["role"] in ["Client", "Support"]:
return message["role"]
# Indices de support dans l'email
domaines_support = ["@cbao.fr", "@odoo.com", "support@", "ticket.support"]
indices_nom_support = ["support", "cbao", "technique", "odoo"]
email = message.get("email_from", "").lower()
# Nettoyer le format "Nom "
if "<" in email and ">" in email:
match = re.search(r'<([^>]+)>', email)
if match:
email = match.group(1).lower()
# Vérifier le domaine email
if any(domaine in email for domaine in domaines_support):
return "Support"
# Vérifier le nom d'auteur
auteur = ""
if "author_id" in message and isinstance(message["author_id"], list) and len(message["author_id"]) > 1:
auteur = str(message["author_id"][1]).lower()
elif "auteur" in message:
auteur = str(message["auteur"]).lower()
if any(indice in auteur for indice in indices_nom_support):
return "Support"
# Par défaut, considérer comme client
return "Client"
def extraire_questions_reponses(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Extrait les questions et réponses d'une liste de messages.
Args:
messages: Liste des messages du ticket
Returns:
Dictionnaire avec les questions et réponses extraites
"""
if not messages:
logger.warning("Aucun message à analyser")
return {
"success": False,
"erreur": "Aucun message à analyser",
"paires_qr": []
}
logger.info(f"Extraction des questions et réponses de {len(messages)} messages")
# Préparation des messages
messages_prepares = []
for msg in messages:
# Nettoyer le contenu
contenu = msg.get("body", "") or msg.get("contenu", "")
contenu_nettoye = self.nettoyer_contenu(contenu)
# Détecter le rôle
role = self.detecter_role(msg)
# Ajouter le message préparé si non vide après nettoyage
if contenu_nettoye.strip():
messages_prepares.append({
"id": msg.get("id", "") or msg.get("ID", ""),
"date": msg.get("date", ""),
"role": role,
"body": contenu_nettoye
})
# S'il n'y a pas assez de messages pour une conversation
if len(messages_prepares) < 2:
logger.warning("Pas assez de messages pour une conversation")
return {
"success": True,
"paires_qr": [],
"nb_questions": 0,
"nb_reponses": 0
}
# Trier par date si disponible
messages_prepares.sort(key=lambda x: x.get("date", ""))
# Préparer l'entrée pour le LLM
messages_for_llm = []
for i, msg in enumerate(messages_prepares):
messages_for_llm.append({
"numero": i + 1,
"role": msg.get("role", "Inconnu"),
"date": msg.get("date", ""),
"contenu": msg.get("body", "")
})
# Préparer le prompt pour extraire les paires Q/R
prompt = """
Analyse la conversation suivante et identifie toutes les paires de questions et réponses.
Pour chaque message:
1. Identifie s'il s'agit d'une question, d'une réponse ou d'une information.
2. Extrais le contenu essentiel en ignorant les formules de politesse et les signatures.
Ensuite, forme des paires de questions-réponses en associant chaque question avec sa réponse correspondante.
Réponds en utilisant la structure suivante:
```
MESSAGE 1:
- Rôle: [Client/Support]
- Type: [Question/Réponse/Information]
- Contenu essentiel: [Contenu essentiel du message]
MESSAGE 2:
...
PAIRE 1:
- Question (Client): [Question posée]
- Réponse (Support): [Réponse donnée]
PAIRE 2:
...
```
Si une question n'a pas de réponse, indique-le.
"""
try:
# Appeler le LLM pour l'analyse
from json import dumps
resultat = self.llm.chat_completion([
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": prompt + "\n\nConversation:\n" + dumps(messages_for_llm, indent=2)}
])
contenu = resultat.get("choices", [{}])[0].get("message", {}).get("content", "")
self.ajouter_historique("analyze_messages", f"{len(messages)} messages", "Analyse effectuée")
# Traiter la réponse pour extraire les messages analysés
messages_analyses = []
pattern_messages = r"MESSAGE (\d+):\s*- Rôle: (Client|Support)\s*- Type: (Question|Réponse|Information)\s*- Contenu essentiel: (.*?)(?=MESSAGE \d+:|PAIRE \d+:|$)"
for match in re.finditer(pattern_messages, contenu, re.DOTALL):
num = int(match.group(1))
role = match.group(2)
type_msg = match.group(3)
contenu_essentiel = match.group(4).strip()
# Trouver le message correspondant
msg_idx = num - 1
msg_id = ""
msg_date = ""
if 0 <= msg_idx < len(messages_for_llm):
original_idx = messages_for_llm[msg_idx]["numero"] - 1
if 0 <= original_idx < len(messages_prepares):
msg_id = messages_prepares[original_idx].get("id", "")
msg_date = messages_prepares[original_idx].get("date", "")
messages_analyses.append({
"id": msg_id,
"date": msg_date,
"role": role,
"type": type_msg,
"contenu": contenu_essentiel
})
# Extraire les paires QR
paires_qr = []
pattern_paires = r"PAIRE (\d+):\s*- Question \((Client|Support)\): (.*?)(?:\s*- Réponse \((Client|Support)\): (.*?))?(?=PAIRE \d+:|$)"
for match in re.finditer(pattern_paires, contenu, re.DOTALL):
num = match.group(1)
q_role = match.group(2)
question = match.group(3).strip()
r_role = match.group(4) if match.group(4) else ""
reponse = match.group(5).strip() if match.group(5) else ""
paires_qr.append({
"numero": num,
"question": {
"role": q_role,
"contenu": question
},
"reponse": {
"role": r_role,
"contenu": reponse
} if reponse else None
})
return {
"success": True,
"messages_analyses": messages_analyses,
"paires_qr": paires_qr,
"nb_questions": len(paires_qr),
"nb_reponses": sum(1 for p in paires_qr if p.get("reponse"))
}
except Exception as e:
erreur = f"Erreur lors de l'extraction des questions et réponses: {str(e)}"
logger.error(erreur)
return {
"success": False,
"erreur": erreur,
"paires_qr": []
}
def generer_tableau_markdown(self, paires_qr: List[Dict[str, Any]]) -> str:
"""
Génère un tableau Markdown avec les questions et réponses.
Args:
paires_qr: Liste de paires question/réponse
Returns:
Tableau Markdown formaté
"""
# Créer le tableau
markdown = ["# Analyse des Questions et Réponses\n"]
markdown.append("| Question | Réponse |")
markdown.append("|---------|---------|")
if not paires_qr:
# Si aucune paire n'a été trouvée, laisser le tableau vide
pass
else:
for paire in paires_qr:
question = paire.get("question", {})
reponse = paire.get("reponse", {})
q_role = question.get("role", "Client")
q_contenu = question.get("contenu", "")
if reponse:
r_role = reponse.get("role", "Support")
r_contenu = reponse.get("contenu", "")
markdown.append(f"| **{q_role}**: {q_contenu} | **{r_role}**: {r_contenu} |")
else:
markdown.append(f"| **{q_role}**: {q_contenu} | *Pas de réponse* |")
# Ajouter les informations sur les paramètres LLM utilisés
markdown.append("\n## Paramètres LLM utilisés\n")
markdown.append(f"- **Type de LLM**: Mistral")
markdown.append(f"- **Modèle**: {getattr(self.llm, 'model', 'mistral-medium')}")
markdown.append(f"- **Température**: {getattr(self.llm, 'temperature', 0.3)}")
markdown.append(f"- **Tokens max**: {getattr(self.llm, 'max_tokens', 2000)}")
return "\n".join(markdown)
def charger_config():
"""
Charge la configuration depuis config.json.
Returns:
Configuration chargée
"""
config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.json")
if not os.path.exists(config_path):
logger.warning(f"Fichier de configuration non trouvé: {config_path}")
return {"llm": {"api_key": None}}
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config
except Exception as e:
logger.error(f"Erreur lors du chargement de la configuration: {str(e)}")
return {"llm": {"api_key": None}}
def main():
"""
Point d'entrée du script.
"""
parser = argparse.ArgumentParser(description="Extrait les questions et réponses d'un ticket de support.")
parser.add_argument("--messages", "-m", required=True, help="Chemin vers le fichier messages.json")
parser.add_argument("--output", "-o", help="Chemin du fichier de sortie pour le tableau Markdown (par défaut: /questions_reponses.md)")
parser.add_argument("--verbose", "-v", action="store_true", help="Afficher plus d'informations")
args = parser.parse_args()
# Configurer le niveau de log
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Vérifier que le fichier messages existe
if not os.path.exists(args.messages):
logger.error(f"Fichier de messages non trouvé: {args.messages}")
sys.exit(1)
# Charger les messages
try:
with open(args.messages, 'r', encoding='utf-8') as f:
messages = json.load(f)
if not isinstance(messages, list):
logger.error(f"Format de fichier messages.json invalide. Une liste est attendue.")
sys.exit(1)
except Exception as e:
logger.error(f"Erreur lors du chargement des messages: {str(e)}")
sys.exit(1)
# Déterminer le chemin de sortie
output_path = args.output
if not output_path:
# Par défaut, utiliser le même répertoire que le fichier messages
output_dir = os.path.dirname(args.messages)
if not output_dir:
output_dir = "."
output_path = os.path.join(output_dir, "questions_reponses.md")
# Charger la configuration
config = charger_config()
api_key = config.get("llm", {}).get("api_key")
# Initialiser l'extracteur de questions-réponses
extractor = QuestionReponseExtractor(api_key=api_key)
try:
# Extraire les questions et réponses
resultats = extractor.extraire_questions_reponses(messages)
if not resultats.get("success", False):
logger.error(f"Échec de l'extraction: {resultats.get('erreur', 'Erreur inconnue')}")
sys.exit(1)
# Générer le tableau Markdown
tableau_md = extractor.generer_tableau_markdown(resultats.get("paires_qr", []))
# Sauvegarder le tableau
with open(output_path, 'w', encoding='utf-8') as f:
f.write(tableau_md)
logger.info(f"Tableau Markdown sauvegardé: {output_path}")
# Afficher un résumé
print("\nRésumé de l'extraction:")
print(f"Messages analysés: {len(messages)}")
print(f"Questions extraites: {resultats.get('nb_questions', 0)}")
print(f"Réponses extraites: {resultats.get('nb_reponses', 0)}")
print(f"Tableau Markdown sauvegardé: {output_path}")
except Exception as e:
logger.error(f"Erreur lors de l'extraction: {str(e)}")
import traceback
logger.debug(f"Détails: {traceback.format_exc()}")
sys.exit(1)
if __name__ == "__main__":
main()