llm_ticket3/scripts/analyze_ticket.py
2025-04-02 09:01:55 +02:00

444 lines
17 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script d'analyse de ticket pour extraire les informations essentielles
et générer un rapport d'analyse complet.
"""
import os
import sys
import json
import argparse
import logging
from typing import Dict, List, Any, Optional
# Configuration du logger
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("analyze_ticket.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("analyze_ticket")
try:
from llm import Mistral
except ImportError:
logger.error("Module LLM non trouvé. Veuillez vous assurer que le répertoire parent est dans PYTHONPATH.")
sys.exit(1)
class TicketAnalyzer:
"""
Agent d'analyse de ticket qui extrait les informations pertinentes.
"""
def __init__(self, api_key: Optional[str] = None):
"""
Initialise l'agent d'analyse de ticket.
Args:
api_key: Clé API pour le LLM
"""
self.llm = Mistral(api_key=api_key)
self.llm.set_model("mistral-medium")
self.llm.set_temperature(0.3)
self.llm.set_max_tokens(1000)
# Définir le prompt système par défaut
self.system_prompt = """
Tu es un expert en analyse de tickets de support technique.
Ton objectif est d'analyser un ticket de support pour:
1. Identifier le problème principal
2. Résumer la solution (si présente)
3. Extraire les informations clés
4. Catégoriser le problème et sa gravité
5. Évaluer la qualité de la résolution
Utilise un ton professionnel et factuel.
Concentre-toi uniquement sur les informations pertinentes.
Ne spécule pas au-delà de ce qui est présent dans les données.
Si une information n'est pas disponible, indique-le clairement.
"""
self.historique = []
def ajouter_historique(self, action: str, entree: str, resultat: str) -> None:
"""
Ajoute une entrée à l'historique des actions.
Args:
action: Type d'action effectuée
entree: Entrée de l'action
resultat: Résultat de l'action
"""
self.historique.append({
"action": action,
"entree": entree,
"resultat": resultat
})
def analyser_ticket(self, messages: List[Dict[str, Any]], infos_images: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Analyse un ticket à partir de ses messages et informations d'images.
Args:
messages: Liste des messages du ticket
infos_images: Informations sur les images analysées (optionnel)
Returns:
Résultats de l'analyse du ticket
"""
if not messages:
logger.warning("Aucun message à analyser")
return {
"success": False,
"erreur": "Aucun message à analyser"
}
logger.info(f"Analyse de ticket avec {len(messages)} messages")
# Extraire les informations du ticket depuis le premier message (contexte)
ticket_info = {}
if messages and messages[0].get("role") == "system" and messages[0].get("type") == "contexte":
ticket_info = {
"id": messages[0].get("id", ""),
"code": messages[0].get("code", ""),
"name": messages[0].get("name", ""),
"description": messages[0].get("description", ""),
"date_create": messages[0].get("date_create", "")
}
# Retirer le message de contexte pour l'analyse
actual_messages = messages[1:]
else:
actual_messages = messages
# Préparer le prompt pour l'analyse
prompt = f"""
Analyse ce ticket de support:
TICKET: {ticket_info.get('code', 'N/A')} - {ticket_info.get('name', 'Sans titre')}
DATE: {ticket_info.get('date_create', 'Inconnue')}
DESCRIPTION:
{ticket_info.get('description', 'Aucune description')}
MESSAGES:
"""
# Ajouter les messages
for i, msg in enumerate(actual_messages):
role = msg.get("role", "Inconnu")
date = msg.get("date", "")
body = msg.get("body", "")
prompt += f"\n--- MESSAGE {i+1} ({role}, {date}) ---\n{body}\n"
# Ajouter les informations sur les images si disponibles
if infos_images:
prompt += "\n\nIMAGES ANALYSÉES:\n"
for image_path, analyse in infos_images.get("analyses", {}).items():
if analyse.get("pertinente", False):
prompt += f"- {image_path}: {analyse.get('description', 'Pas de description')} ({analyse.get('type_image', 'type inconnu')})\n"
# Demander une analyse structurée
prompt += """
Fais une analyse complète et structurée du ticket avec les sections suivantes:
1. PROBLÈME: Résume clairement le problème principal en 1-2 phrases
2. CATÉGORIE: Catégorise le problème (bug, demande de fonctionnalité, question, etc.)
3. GRAVITÉ: Évalue la gravité (Critique, Élevée, Moyenne, Faible)
4. SOLUTION: Résume la solution fournie ou indique qu'aucune solution n'a été fournie
5. EFFICACITÉ: Évalue si la solution a résolu le problème (Résolue, Partiellement résolue, Non résolue, Inconnue)
6. RÉSUMÉ: Fournis un résumé complet de l'incident en 3-5 phrases
7. POINTS CLÉS: Liste les 3-5 points les plus importants à retenir de ce ticket
Réponds en format Markdown bien structuré.
"""
try:
# Effectuer l'analyse avec le LLM
resultat = self.llm.generate_text(prompt, system_prompt=self.system_prompt)
self.ajouter_historique("analyze_ticket", f"{len(messages)} messages", "Analyse effectuée")
# Extraire le contenu
analyse_texte = resultat.get("content", "")
# Extraire les différentes sections
sections = {}
current_section = None
current_content = []
for line in analyse_texte.split("\n"):
# Détecter les en-têtes de section
if line.startswith("# "):
if current_section:
sections[current_section] = "\n".join(current_content).strip()
current_section = line[2:].strip().lower()
current_content = []
elif line.startswith("## "):
if current_section:
sections[current_section] = "\n".join(current_content).strip()
current_section = line[3:].strip().lower()
current_content = []
elif ":" in line and not "://" in line and not current_section:
# Cas des lignes "SECTION: contenu" sans formatage Markdown
parts = line.split(":", 1)
if len(parts) == 2 and parts[0].strip().upper() == parts[0].strip():
current_section = parts[0].strip().lower()
current_content = [parts[1].strip()]
else:
if current_section:
current_content.append(line)
else:
if current_section:
current_content.append(line)
# Ajouter la dernière section
if current_section:
sections[current_section] = "\n".join(current_content).strip()
# Si on n'a pas pu extraire les sections, utiliser tout le texte
if not sections:
sections = {
"analyse_complete": analyse_texte
}
# Créer le résultat final
resultat_analyse = {
"success": True,
"ticket_info": ticket_info,
"sections": sections,
"analyse_brute": analyse_texte,
"nb_messages": len(actual_messages)
}
logger.info("Analyse de ticket terminée avec succès")
return resultat_analyse
except Exception as e:
erreur = f"Erreur lors de l'analyse du ticket: {str(e)}"
logger.error(erreur)
return {
"success": False,
"erreur": erreur
}
def generer_rapport_markdown(self, analyse: Dict[str, Any]) -> str:
"""
Génère un rapport Markdown à partir de l'analyse du ticket.
Args:
analyse: Résultat de l'analyse du ticket
Returns:
Rapport au format Markdown
"""
if not analyse.get("success", False):
return f"# Échec de l'analyse\n\nErreur: {analyse.get('erreur', 'Inconnue')}"
ticket_info = analyse.get("ticket_info", {})
sections = analyse.get("sections", {})
# En-tête du rapport
rapport = f"# Rapport d'analyse de ticket\n\n"
rapport += f"**Ticket**: {ticket_info.get('code', 'N/A')} - {ticket_info.get('name', 'Sans titre')}\n"
rapport += f"**Date**: {ticket_info.get('date_create', 'Inconnue')}\n"
rapport += f"**Messages analysés**: {analyse.get('nb_messages', 0)}\n\n"
# Récupérer les sections principales
problem = sections.get("problème", sections.get("probleme", ""))
category = sections.get("catégorie", sections.get("categorie", ""))
severity = sections.get("gravité", sections.get("gravite", ""))
solution = sections.get("solution", "")
efficacy = sections.get("efficacité", sections.get("efficacite", ""))
summary = sections.get("résumé", sections.get("resume", ""))
key_points = sections.get("points clés", sections.get("points cles", ""))
# Ajouter les sections au rapport
if problem:
rapport += f"## Problème\n\n{problem}\n\n"
if category or severity:
rapport += "## Classification\n\n"
if category:
rapport += f"**Catégorie**: {category}\n\n"
if severity:
rapport += f"**Gravité**: {severity}\n\n"
if solution:
rapport += f"## Solution\n\n{solution}\n\n"
if efficacy:
rapport += f"**Efficacité**: {efficacy}\n\n"
if summary:
rapport += f"## Résumé\n\n{summary}\n\n"
if key_points:
rapport += f"## Points clés\n\n{key_points}\n\n"
# Ajouter les autres sections qui n'auraient pas été traitées
for name, content in sections.items():
if name not in ["problème", "probleme", "catégorie", "categorie",
"gravité", "gravite", "solution", "efficacité",
"efficacite", "résumé", "resume", "points clés",
"points cles", "analyse_complete"]:
rapport += f"## {name.title()}\n\n{content}\n\n"
# Ajouter le rapport complet si on n'a pas pu extraire les sections
if "analyse_complete" in sections and len(sections) == 1:
rapport += f"## Analyse complète\n\n{sections['analyse_complete']}\n\n"
# Ajouter les paramètres de l'analyse
rapport += "## Paramètres de l'analyse\n\n"
rapport += f"- **Modèle**: {self.llm.get_model()}\n"
rapport += f"- **Température**: {self.llm.get_temperature()}\n"
return rapport
def charger_config():
"""
Charge la configuration depuis config.json.
Returns:
Configuration chargée
"""
config_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "config.json")
if not os.path.exists(config_path):
logger.warning(f"Fichier de configuration non trouvé: {config_path}")
return {"llm": {"api_key": None}}
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config
except Exception as e:
logger.error(f"Erreur lors du chargement de la configuration: {str(e)}")
return {"llm": {"api_key": None}}
def main():
"""
Point d'entrée du script.
"""
parser = argparse.ArgumentParser(description="Analyse un ticket de support.")
parser.add_argument("--messages", "-m", required=True, help="Chemin vers le fichier messages.json")
parser.add_argument("--images-rapport", "-i", help="Chemin vers le rapport d'analyse d'images (filter_report.json)")
parser.add_argument("--output", "-o", help="Répertoire de sortie pour les rapports")
parser.add_argument("--format", "-f", choices=["json", "md", "both"], default="both",
help="Format de sortie (json, md, both)")
parser.add_argument("--verbose", "-v", action="store_true", help="Afficher plus d'informations")
args = parser.parse_args()
# Configurer le niveau de log
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Vérifier que le fichier messages existe
if not os.path.exists(args.messages):
logger.error(f"Fichier de messages non trouvé: {args.messages}")
sys.exit(1)
# Charger les messages
try:
with open(args.messages, 'r', encoding='utf-8') as f:
messages = json.load(f)
if not isinstance(messages, list):
logger.error(f"Format de fichier messages.json invalide. Une liste est attendue.")
sys.exit(1)
except Exception as e:
logger.error(f"Erreur lors du chargement des messages: {str(e)}")
sys.exit(1)
# Charger les informations sur les images si disponibles
images_info = None
if args.images_rapport and os.path.exists(args.images_rapport):
try:
with open(args.images_rapport, 'r', encoding='utf-8') as f:
images_info = json.load(f)
logger.info(f"Informations sur les images chargées: {args.images_rapport}")
except Exception as e:
logger.warning(f"Impossible de charger les informations sur les images: {str(e)}")
# Déterminer le répertoire de sortie
output_dir = args.output
if not output_dir:
# Par défaut, utiliser le même répertoire que le fichier messages
output_dir = os.path.dirname(args.messages)
if not output_dir:
output_dir = "."
# Créer le répertoire de sortie s'il n'existe pas
rapport_dir = os.path.join(output_dir, "rapport")
os.makedirs(rapport_dir, exist_ok=True)
# Charger la configuration
config = charger_config()
api_key = config.get("llm", {}).get("api_key")
# Initialiser l'analyseur de ticket
analyzer = TicketAnalyzer(api_key=api_key)
try:
# Analyser le ticket
resultat = analyzer.analyser_ticket(messages, images_info)
if not resultat.get("success", False):
logger.error(f"Échec de l'analyse: {resultat.get('erreur', 'Erreur inconnue')}")
sys.exit(1)
# Générer le rapport Markdown
rapport_md = analyzer.generer_rapport_markdown(resultat)
# Sauvegarder les résultats selon le format demandé
if args.format in ["json", "both"]:
json_path = os.path.join(rapport_dir, "ticket_analysis.json")
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(resultat, f, indent=2, ensure_ascii=False)
logger.info(f"Rapport JSON sauvegardé: {json_path}")
if args.format in ["md", "both"]:
md_path = os.path.join(rapport_dir, "ticket_analysis.md")
with open(md_path, 'w', encoding='utf-8') as f:
f.write(rapport_md)
logger.info(f"Rapport Markdown sauvegardé: {md_path}")
# Afficher un résumé
print("\nRésumé de l'analyse:")
print(f"Ticket: {resultat.get('ticket_info', {}).get('code', 'N/A')} - {resultat.get('ticket_info', {}).get('name', 'Sans titre')}")
print(f"Messages analysés: {resultat.get('nb_messages', 0)}")
print(f"Sections extraites: {len(resultat.get('sections', {}))}")
# Afficher un extrait du problème et de la solution
sections = resultat.get("sections", {})
probleme = sections.get("problème", sections.get("probleme", ""))
solution = sections.get("solution", "")
if probleme:
probleme_court = probleme[:150] + "..." if len(probleme) > 150 else probleme
print(f"\nProblème: {probleme_court}")
if solution:
solution_court = solution[:150] + "..." if len(solution) > 150 else solution
print(f"\nSolution: {solution_court}")
print(f"\nRappport complet sauvegardé dans: {rapport_dir}")
except Exception as e:
logger.error(f"Erreur lors de l'analyse: {str(e)}")
import traceback
logger.debug(f"Détails: {traceback.format_exc()}")
sys.exit(1)
if __name__ == "__main__":
main()