mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-16 00:36:52 +01:00
400 lines
16 KiB
Python
400 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Script principal pour l'analyse de tickets de support.
|
|
Ce script coordonne l'extraction de données depuis Odoo et l'analyse avec les agents LLM.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import argparse
|
|
import subprocess
|
|
import shutil
|
|
import re
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
from utils import TicketAnalyzer, TicketManager
|
|
from post_process import transformer_messages, corriger_json_accents, corriger_markdown_accents, reparer_ticket
|
|
|
|
def charger_config(config_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Charge la configuration depuis un fichier JSON.
|
|
|
|
Args:
|
|
config_path: Chemin vers le fichier de configuration
|
|
|
|
Returns:
|
|
Dictionnaire de configuration
|
|
"""
|
|
try:
|
|
with open(config_path, 'r', encoding='utf-8') as f:
|
|
config = json.load(f)
|
|
return config
|
|
except Exception as e:
|
|
print(f"Erreur lors du chargement de la configuration: {e}")
|
|
# Configuration par défaut minimale
|
|
return {
|
|
"odoo": {
|
|
"url": "https://example.odoo.com",
|
|
"db": "db_name",
|
|
"username": "user@example.com",
|
|
"api_key": "your_api_key"
|
|
},
|
|
"llm": {
|
|
"api_key": "your_mistral_api_key"
|
|
},
|
|
"output_dir": "output"
|
|
}
|
|
|
|
def extraire_ticket(config: Dict[str, Any], ticket_code: str, output_dir: str) -> Dict[str, Any]:
|
|
"""
|
|
Extrait les données d'un ticket depuis Odoo.
|
|
|
|
Args:
|
|
config: Configuration avec les paramètres de connexion
|
|
ticket_code: Code du ticket à extraire
|
|
output_dir: Répertoire où sauvegarder les données
|
|
|
|
Returns:
|
|
Données du ticket extraites
|
|
"""
|
|
# Créer le gestionnaire de tickets
|
|
manager = TicketManager(
|
|
url=config["odoo"]["url"],
|
|
db=config["odoo"]["db"],
|
|
username=config["odoo"]["username"],
|
|
api_key=config["odoo"]["api_key"]
|
|
)
|
|
|
|
# Récupérer le ticket par son code
|
|
ticket = manager.get_ticket_by_code(ticket_code)
|
|
if not ticket:
|
|
print(f"Ticket {ticket_code} non trouvé")
|
|
return {}
|
|
|
|
# Extraire toutes les données du ticket
|
|
ticket_dir = os.path.join(output_dir, f"ticket_{ticket_code}")
|
|
ticket_data = manager.extract_ticket_data(ticket["id"], ticket_dir)
|
|
|
|
# Post-traiter immédiatement les messages
|
|
post_traiter_messages(ticket_dir)
|
|
|
|
return ticket_data
|
|
|
|
def post_traiter_messages(ticket_dir: str) -> None:
|
|
"""
|
|
Post-traite les messages du ticket pour une meilleure analyse.
|
|
|
|
Args:
|
|
ticket_dir: Répertoire contenant les données du ticket
|
|
"""
|
|
messages_file = os.path.join(ticket_dir, "messages.json")
|
|
|
|
# Vérifier que le fichier existe
|
|
if not os.path.exists(messages_file):
|
|
print(f"AVERTISSEMENT: Fichier messages.json introuvable dans {ticket_dir}")
|
|
return
|
|
|
|
print(f"Post-traitement des messages du ticket...")
|
|
|
|
# Créer une sauvegarde avant transformation
|
|
backup_file = os.path.join(ticket_dir, "messages.json.backup")
|
|
if not os.path.exists(backup_file):
|
|
shutil.copy2(messages_file, backup_file)
|
|
print(f"Sauvegarde créée: {backup_file}")
|
|
|
|
# Transformer les messages pour un format optimal
|
|
transformer_messages(messages_file)
|
|
|
|
# Vérifier que la transformation a réussi
|
|
try:
|
|
with open(messages_file, 'r', encoding='utf-8') as f:
|
|
messages = json.load(f)
|
|
print(f"Post-traitement terminé, {len(messages)} messages formatés.")
|
|
except Exception as e:
|
|
print(f"ERREUR: Échec du post-traitement: {e}")
|
|
# Restaurer la sauvegarde si nécessaire
|
|
if os.path.exists(backup_file):
|
|
shutil.copy2(backup_file, messages_file)
|
|
print("Restauration de la sauvegarde des messages.")
|
|
|
|
def preparer_donnees_ticket(ticket_dir: str) -> Dict[str, Any]:
|
|
"""
|
|
Prépare les données du ticket pour l'analyse à partir des fichiers stockés.
|
|
|
|
Args:
|
|
ticket_dir: Répertoire contenant les données du ticket
|
|
|
|
Returns:
|
|
Dictionnaire des données du ticket prêtes pour l'analyse
|
|
"""
|
|
# Chemins des fichiers sources
|
|
ticket_file = os.path.join(ticket_dir, "ticket_info.json")
|
|
messages_file = os.path.join(ticket_dir, "messages.json")
|
|
attachments_file = os.path.join(ticket_dir, "attachments_info.json")
|
|
attachments_dir = os.path.join(ticket_dir, "attachments")
|
|
|
|
# Vérifier que les fichiers nécessaires existent
|
|
if not all(os.path.exists(f) for f in [ticket_file, messages_file, attachments_file]):
|
|
missing = [f for f in [ticket_file, messages_file, attachments_file] if not os.path.exists(f)]
|
|
raise FileNotFoundError(f"Fichiers manquants: {', '.join(missing)}")
|
|
|
|
# Charger les données
|
|
try:
|
|
with open(ticket_file, 'r', encoding='utf-8') as f:
|
|
ticket_info = json.load(f)
|
|
|
|
with open(messages_file, 'r', encoding='utf-8') as f:
|
|
messages = json.load(f)
|
|
|
|
with open(attachments_file, 'r', encoding='utf-8') as f:
|
|
attachments = json.load(f)
|
|
|
|
# Vérifier et corriger les chemins des pièces jointes
|
|
for attachment in attachments:
|
|
if "file_path" in attachment:
|
|
# S'assurer que le chemin est absolu
|
|
if not os.path.isabs(attachment["file_path"]):
|
|
attachment["file_path"] = os.path.join(attachments_dir, os.path.basename(attachment["file_path"]))
|
|
|
|
# Vérifier que le fichier existe
|
|
if not os.path.exists(attachment["file_path"]):
|
|
print(f"AVERTISSEMENT: Pièce jointe introuvable: {attachment['file_path']}")
|
|
|
|
return {
|
|
"ticket": ticket_info,
|
|
"messages": messages,
|
|
"attachments": attachments,
|
|
"files": {
|
|
"ticket_info": ticket_file,
|
|
"messages": messages_file,
|
|
"attachments_info": attachments_file,
|
|
"attachments_dir": attachments_dir
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
raise ValueError(f"Erreur lors du chargement des données du ticket: {e}")
|
|
|
|
def analyser_ticket(ticket_data: Dict[str, Any], config: Dict[str, Any], output_dir: str, llm_params: Optional[Dict[str, Any]] = None) -> Dict[str, str]:
|
|
"""
|
|
Analyse un ticket avec les agents LLM.
|
|
|
|
Args:
|
|
ticket_data: Données du ticket extraites
|
|
config: Configuration avec les clés API
|
|
output_dir: Répertoire où sauvegarder les résultats
|
|
llm_params: Paramètres LLM globaux à appliquer
|
|
|
|
Returns:
|
|
Chemins des fichiers générés
|
|
"""
|
|
# Créer l'analyseur de tickets
|
|
analyzer = TicketAnalyzer(api_key=config["llm"]["api_key"], llm_params=llm_params)
|
|
|
|
# Préparer le contexte pour l'analyse des images
|
|
ticket_info = ticket_data.get("ticket", {})
|
|
contexte = f"""
|
|
TICKET: {ticket_info.get('code', 'Inconnu')} - {ticket_info.get('name', 'Sans titre')}
|
|
|
|
DESCRIPTION:
|
|
{ticket_info.get('description', 'Aucune description')}
|
|
"""
|
|
|
|
# Récupérer les chemins des pièces jointes (images)
|
|
attachments = ticket_data.get("attachments", [])
|
|
image_paths = []
|
|
|
|
# Vérification des doublons par nom de fichier
|
|
image_noms = set()
|
|
|
|
for attachment in attachments:
|
|
chemin = attachment.get("file_path")
|
|
nom_fichier = os.path.basename(chemin) if chemin else ""
|
|
|
|
if not chemin or not os.path.exists(chemin):
|
|
continue
|
|
|
|
mimetype = attachment.get("mimetype", "")
|
|
|
|
# Vérifier que c'est une image et qu'on ne l'a pas déjà incluse
|
|
if mimetype.startswith("image/") and nom_fichier not in image_noms:
|
|
image_paths.append(chemin)
|
|
image_noms.add(nom_fichier)
|
|
print(f"Image ajoutée pour analyse: {nom_fichier}")
|
|
|
|
# Filtrer les images pertinentes
|
|
print(f"Filtrage de {len(image_paths)} images...")
|
|
images_pertinentes = analyzer.filtrer_images(image_paths)
|
|
print(f"Images pertinentes: {len(images_pertinentes)}/{len(image_paths)}")
|
|
|
|
# Imprimer les détails pour le débogage
|
|
for i, img in enumerate(image_paths):
|
|
est_pertinente = img in images_pertinentes
|
|
print(f" Image {i+1}: {os.path.basename(img)} - {'Pertinente' if est_pertinente else 'Non pertinente'}")
|
|
|
|
# Analyser les images pertinentes
|
|
print("Analyse des images pertinentes...")
|
|
resultats_images = analyzer.analyser_images(images_pertinentes, contexte)
|
|
print(f"Analyses d'images terminées: {len(resultats_images)}")
|
|
|
|
# Extraire les questions et réponses
|
|
print("Extraction des questions et réponses...")
|
|
messages = ticket_data.get("messages", [])
|
|
|
|
# Vérifier que les messages sont traités et ne contiennent pas de balises HTML
|
|
for msg in messages:
|
|
body = msg.get("body", "")
|
|
if isinstance(body, str) and re.search(r'<[a-z]+[^>]*>', body, re.IGNORECASE):
|
|
print(f"AVERTISSEMENT: Message {msg.get('id', 'inconnu')} contient du HTML non traité")
|
|
|
|
qr_path = os.path.join(output_dir, "questions_reponses.md")
|
|
resultats_qr = analyzer.extraire_questions_reponses(messages, qr_path)
|
|
print(f"Questions extraites: {resultats_qr.get('nb_questions', 0)}")
|
|
print(f"Réponses extraites: {resultats_qr.get('nb_reponses', 0)}")
|
|
|
|
# Générer le rapport final
|
|
print("Génération du rapport final...")
|
|
rapport_dir = os.path.join(output_dir, "rapport")
|
|
fichiers = analyzer.generer_rapport(rapport_dir)
|
|
print(f"Rapport généré: {rapport_dir}")
|
|
|
|
# Corriger les problèmes d'accents dans les fichiers JSON
|
|
corriger_accents_fichiers(rapport_dir)
|
|
|
|
return fichiers
|
|
|
|
def corriger_accents_fichiers(dir_path: str) -> None:
|
|
"""
|
|
Corrige les problèmes d'accents dans tous les fichiers JSON et Markdown d'un répertoire.
|
|
|
|
Args:
|
|
dir_path: Chemin du répertoire contenant les fichiers à corriger
|
|
"""
|
|
print("Correction des problèmes d'accents dans les fichiers...")
|
|
|
|
if not os.path.exists(dir_path):
|
|
print(f"Répertoire non trouvé: {dir_path}")
|
|
return
|
|
|
|
for root, _, files in os.walk(dir_path):
|
|
for file in files:
|
|
# Corriger les fichiers JSON
|
|
if file.endswith(".json"):
|
|
json_file = os.path.join(root, file)
|
|
corriger_json_accents(json_file)
|
|
|
|
# Corriger les fichiers Markdown
|
|
elif file.endswith(".md"):
|
|
md_file = os.path.join(root, file)
|
|
corriger_markdown_accents(md_file)
|
|
|
|
# Corriger également les fichiers Markdown du répertoire de ticket
|
|
ticket_dir = os.path.dirname(dir_path)
|
|
for file in os.listdir(ticket_dir):
|
|
if file.endswith(".md"):
|
|
md_file = os.path.join(ticket_dir, file)
|
|
corriger_markdown_accents(md_file)
|
|
|
|
print("Correction des accents terminée.")
|
|
|
|
def main():
|
|
"""
|
|
Fonction principale du script.
|
|
"""
|
|
# Parser les arguments de la ligne de commande
|
|
parser = argparse.ArgumentParser(description="Analyse de tickets de support")
|
|
parser.add_argument("ticket_code", help="Code du ticket à analyser")
|
|
parser.add_argument("--config", "-c", default="config.json", help="Chemin vers le fichier de configuration")
|
|
parser.add_argument("--output", "-o", default="output", help="Répertoire de sortie")
|
|
parser.add_argument("--skip-extraction", "-s", action="store_true", help="Ignorer l'extraction du ticket (utiliser les données existantes)")
|
|
parser.add_argument("--fix-accents", "-f", action="store_true", help="Corriger les problèmes d'accents dans les fichiers existants")
|
|
parser.add_argument("--llm-params", "-p", type=str, help="Paramètres LLM au format JSON (ex: '{\"temperature\": 0.5}')")
|
|
parser.add_argument("--reprocess", "-r", action="store_true", help="Forcer le retraitement des messages même si l'extraction est ignorée")
|
|
parser.add_argument("--repair", action="store_true", help="Réparer un ticket corrompu avant analyse")
|
|
args = parser.parse_args()
|
|
|
|
# Charger la configuration
|
|
config = charger_config(args.config)
|
|
|
|
# Charger les paramètres LLM supplémentaires si spécifiés
|
|
llm_params = {}
|
|
if args.llm_params:
|
|
try:
|
|
llm_params = json.loads(args.llm_params)
|
|
print(f"Paramètres LLM personnalisés: {llm_params}")
|
|
except json.JSONDecodeError as e:
|
|
print(f"Erreur lors du parsing des paramètres LLM: {e}")
|
|
|
|
# Créer le répertoire de sortie
|
|
os.makedirs(args.output, exist_ok=True)
|
|
|
|
# Construire le chemin du répertoire du ticket
|
|
ticket_dir = os.path.join(args.output, f"ticket_{args.ticket_code}")
|
|
|
|
# Réparer le ticket si demandé
|
|
if args.repair:
|
|
if os.path.exists(ticket_dir):
|
|
print(f"Réparation du ticket {args.ticket_code}...")
|
|
success = reparer_ticket(ticket_dir)
|
|
if not success:
|
|
print("ERREUR: La réparation du ticket a échoué. Impossible de continuer.")
|
|
return
|
|
print(f"Réparation terminée, poursuite de l'analyse...")
|
|
else:
|
|
print(f"Impossible de réparer: répertoire du ticket {args.ticket_code} introuvable.")
|
|
if not args.skip_extraction:
|
|
print("Le ticket sera extrait depuis Odoo.")
|
|
else:
|
|
print("ERREUR: Impossible de continuer sans extraction.")
|
|
return
|
|
|
|
# Si l'option de correction des accents est activée uniquement
|
|
if args.fix_accents and not args.skip_extraction and not args.repair:
|
|
rapport_dir = os.path.join(ticket_dir, "rapport")
|
|
if os.path.exists(rapport_dir):
|
|
corriger_accents_fichiers(rapport_dir)
|
|
return
|
|
|
|
# Extraction ou chargement des données du ticket
|
|
try:
|
|
if not args.skip_extraction:
|
|
# Extraire les données du ticket
|
|
print(f"Extraction du ticket {args.ticket_code}...")
|
|
ticket_data = extraire_ticket(config, args.ticket_code, args.output)
|
|
if not ticket_data:
|
|
print("Impossible de continuer sans données de ticket.")
|
|
return
|
|
else:
|
|
# Si on ignore l'extraction mais qu'on veut retraiter
|
|
if args.reprocess:
|
|
print(f"Retraitement forcé des messages du ticket {args.ticket_code}...")
|
|
post_traiter_messages(ticket_dir)
|
|
|
|
# Charger les données existantes
|
|
print(f"Chargement des données du ticket {args.ticket_code}...")
|
|
ticket_data = preparer_donnees_ticket(ticket_dir)
|
|
print("Données chargées avec succès.")
|
|
|
|
# Analyser le ticket
|
|
print(f"Analyse du ticket {args.ticket_code}...")
|
|
fichiers = analyser_ticket(ticket_data, config, ticket_dir, llm_params)
|
|
|
|
print("\nAnalyse terminée!")
|
|
print(f"Rapport JSON: {fichiers['json']}")
|
|
print(f"Rapport Markdown: {fichiers['markdown']}")
|
|
|
|
except FileNotFoundError as e:
|
|
print(f"ERREUR: {e}")
|
|
print("Utilisez l'extraction ou assurez-vous que tous les fichiers nécessaires existent.")
|
|
print("Ou bien utilisez l'option --repair pour réparer le ticket.")
|
|
except ValueError as e:
|
|
print(f"ERREUR: {e}")
|
|
print("Vous pouvez essayer l'option --repair pour réparer le ticket.")
|
|
except Exception as e:
|
|
print(f"ERREUR inattendue: {e}")
|
|
print("Vous pouvez essayer l'option --repair pour réparer le ticket.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |