1804-17:34

This commit is contained in:
Ladebeze66 2025-04-18 17:34:21 +02:00
parent 7419e4d1e1
commit 407cf39ea0
21 changed files with 1660 additions and 1732 deletions

View File

@ -44,7 +44,7 @@ Reste strictement factuel. Ne fais aucune hypothèse. Ne suggère pas d'étapes
if hasattr(self.llm, "configurer"):
self.llm.configurer(**self.params)
def executer(self, rapport_data: Dict[str, Any], dossier_destination: str) -> str:
def executer(self, rapport_data: Dict[str, Any]) -> str:
ticket_id = rapport_data.get("ticket_id", "Inconnu")
print(f"AgentReportGenerator : génération du rapport pour le ticket {ticket_id}")
@ -52,8 +52,6 @@ Reste strictement factuel. Ne fais aucune hypothèse. Ne suggère pas d'étapes
prompt = self._generer_prompt(rapport_data)
response = self.llm.interroger(prompt)
logger.info(f"Rapport généré pour le ticket {ticket_id}, longueur: {len(response)} caractères")
result = {
"prompt": prompt,
"response": response,
@ -68,7 +66,7 @@ Reste strictement factuel. Ne fais aucune hypothèse. Ne suggère pas d'étapes
}
}
sauvegarder_donnees(ticket_id, "rapport_final", result, base_dir=dossier_destination, is_resultat=True)
sauvegarder_donnees(ticket_id, "rapport_final", result, base_dir="reports", is_resultat=True)
self.ajouter_historique("rapport_final", {
"ticket_id": ticket_id,
@ -82,6 +80,7 @@ Reste strictement factuel. Ne fais aucune hypothèse. Ne suggère pas d'étapes
logger.error(f"Erreur lors de la génération du rapport : {str(e)}")
return f"ERREUR: {str(e)}"
def _generer_prompt(self, rapport_data: Dict[str, Any]) -> str:
ticket_text = rapport_data.get("ticket_analyse", "")
image_blocs = []

View File

@ -15,8 +15,8 @@ class AgentTicketAnalyser(BaseAgent):
self.params = {
"temperature": 0.1,
"top_p": 0.2,
"max_tokens": 7000
"top_p": 0.5,
"max_tokens": 4000
}
self.system_prompt = """Tu es un expert en analyse de tickets pour le support informatique de BRG-Lab pour la société CBAO.

View File

@ -1,217 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import time
import logging
import json
import traceback
from datetime import datetime
# Import des agents
from agents.mistral_large.agent_ticket_analyser import AgentTicketAnalyser
from agents.pixtral_large.agent_image_sorter import AgentImageSorter
from agents.pixtral_large.agent_image_analyser import AgentImageAnalyser
# Import des modèles LLM
from llm_classes.mistral_large import MistralLarge
from llm_classes.pixtral_large import PixtralLarge
# Import pour filtrer les images dupliquées
from utils.image_dedup import filtrer_images_uniques
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
filename='test_image_analyse.log', filemode='w')
logger = logging.getLogger("TestImageAnalyse")
def test_image_analyse(ticket_id):
"""
Exécute l'analyse d'un ticket, le tri des images et l'analyse d'images contextualisée
Args:
ticket_id: Identifiant du ticket à traiter (obligatoire)
"""
if not ticket_id:
logger.error("Erreur: Aucun ID de ticket fourni.")
print("ERREUR: Vous devez fournir un ID de ticket comme argument.")
return
# Vérifier que le dossier output existe
if not os.path.exists("output/"):
logger.error("Le dossier output/ n'existe pas")
print("ERREUR: Le dossier output/ n'existe pas")
return
# Vérifier que le ticket existe
ticket_path = os.path.join("output", f"ticket_{ticket_id}")
if not os.path.exists(ticket_path):
logger.error(f"Le ticket {ticket_id} n'existe pas")
print(f"ERREUR: Le ticket {ticket_id} n'existe pas")
return
print(f"Ticket à traiter: {ticket_id}")
# Initialisation des LLM
print("Initialisation des modèles LLM...")
start_time = time.time()
# Mistral Large pour l'analyse de ticket
ticket_llm = MistralLarge()
logger.info("LLM MistralLarge initialisé pour l'analyse du ticket")
# Pixtral Large pour le tri et l'analyse d'images
image_sorter_llm = PixtralLarge()
logger.info("LLM PixtralLarge initialisé pour le tri d'images")
image_analyser_llm = PixtralLarge()
logger.info("LLM PixtralLarge initialisé pour l'analyse d'images")
llm_init_time = time.time() - start_time
print(f"Tous les modèles LLM ont été initialisés en {llm_init_time:.2f} secondes")
# Création des agents
print("Création des agents...")
ticket_agent = AgentTicketAnalyser(ticket_llm)
image_sorter = AgentImageSorter(image_sorter_llm)
image_analyser = AgentImageAnalyser(image_analyser_llm)
print("Tous les agents ont été créés")
# ÉTAPE 1: Trouver l'extraction la plus récente du ticket
extractions = [d for d in os.listdir(ticket_path) if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(ticket_id)]
if not extractions:
logger.error(f"Aucune extraction trouvée pour le ticket {ticket_id}")
print(f"ERREUR: Aucune extraction trouvée pour le ticket {ticket_id}")
return
# Trier par ordre décroissant pour avoir la plus récente en premier
extractions.sort(reverse=True)
latest_extraction = extractions[0]
extraction_path = os.path.join(ticket_path, latest_extraction)
print(f"Utilisation de l'extraction: {latest_extraction}")
# ÉTAPE 2: Chercher le fichier JSON du ticket
rapport_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
# Chercher dans le répertoire des rapports
json_file = None
if os.path.exists(rapport_dir):
for file in os.listdir(rapport_dir):
if file.lower().endswith('.json'):
json_file = os.path.join(rapport_dir, file)
break
# Si pas trouvé, chercher directement dans l'extraction
if not json_file:
for file in os.listdir(extraction_path):
if file.lower().endswith('.json') and ticket_id in file:
json_file = os.path.join(extraction_path, file)
break
if not json_file:
logger.error(f"Aucun fichier JSON trouvé pour le ticket {ticket_id}")
print(f"ERREUR: Aucun fichier JSON trouvé pour le ticket {ticket_id}")
return
print(f"Fichier JSON du ticket trouvé: {os.path.basename(json_file)}")
# ÉTAPE 3: Charger les données du ticket
try:
with open(json_file, 'r', encoding='utf-8') as f:
ticket_data = json.load(f)
print("Données du ticket chargées avec succès")
except Exception as e:
logger.error(f"Erreur lors du chargement du fichier JSON: {e}")
print(f"ERREUR: Impossible de charger le fichier JSON: {e}")
return
# Ajouter le code du ticket si absent
if "code" not in ticket_data:
ticket_data["code"] = ticket_id
# ÉTAPE 4: Analyse du ticket
print("Analyse du ticket en cours...")
try:
ticket_analysis = ticket_agent.executer(ticket_data)
print(f"Analyse du ticket terminée: {len(ticket_analysis) if ticket_analysis else 0} caractères")
except Exception as e:
logger.error(f"Erreur lors de l'analyse du ticket: {e}")
print(f"ERREUR: Impossible d'analyser le ticket: {e}")
ticket_analysis = "Erreur d'analyse du ticket"
# ÉTAPE 5: Recherche et tri des images
attachments_dir = os.path.join(extraction_path, "attachments")
if not os.path.exists(attachments_dir):
logger.warning(f"Répertoire des pièces jointes non trouvé: {attachments_dir}")
print("AVERTISSEMENT: Aucune pièce jointe trouvée pour ce ticket")
return
# Récupérer et filtrer les images
images = [f for f in os.listdir(attachments_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
image_paths = [os.path.join(attachments_dir, img) for img in images]
image_paths_uniques = filtrer_images_uniques(image_paths)
images = [os.path.basename(p) for p in image_paths_uniques]
print(f"Images trouvées: {len(images)}")
# ÉTAPE 6: Tri des images
relevant_images = []
for img in images:
img_path = os.path.join(attachments_dir, img)
print(f"Évaluation de l'image: {img}")
try:
sorting_result = image_sorter.executer(img_path)
is_relevant = sorting_result.get("is_relevant", False)
reason = sorting_result.get("reason", "")
if is_relevant:
print(f" => Pertinente: {reason[:50]}...")
relevant_images.append(img_path)
else:
print(f" => Non pertinente: {reason[:50]}...")
except Exception as e:
logger.error(f"Erreur lors du tri de l'image {img}: {e}")
print(f"ERREUR: Impossible de trier l'image {img}: {e}")
print(f"Images pertinentes trouvées: {len(relevant_images)}/{len(images)}")
# ÉTAPE 7: Analyse des images pertinentes avec contexte
if relevant_images:
print("Analyse des images pertinentes avec contexte...")
for img_path in relevant_images:
img_name = os.path.basename(img_path)
print(f"Analyse approfondie de l'image: {img_name}")
try:
analysis_result = image_analyser.executer(img_path, contexte=ticket_analysis)
print(f" => Analyse terminée: {len(analysis_result.get('analyse', '')) if analysis_result else 0} caractères")
except Exception as e:
logger.error(f"Erreur lors de l'analyse de l'image {img_name}: {e}")
print(f"ERREUR: Impossible d'analyser l'image {img_name}: {e}")
else:
print("Aucune image pertinente à analyser")
print(f"\nProcessus d'analyse terminé avec succès.")
if __name__ == "__main__":
print("Démarrage du test d'analyse d'images contextualisée")
# Vérifier si un ID de ticket est passé en argument
ticket_id = None
if len(sys.argv) > 1:
ticket_id = sys.argv[1]
print(f"ID de ticket fourni en argument: {ticket_id}")
else:
print("ERREUR: Vous devez fournir un ID de ticket comme argument.")
sys.exit(1)
test_image_analyse(ticket_id)
print("Test terminé")

View File

@ -1,82 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour tester le workflow d'analyse d'images avec contexte.
Utilise la structure modulaire créée dans le répertoire tests/.
"""
import sys
import os
import argparse
import logging
# Import du workflow
from tests.workflows.image_analyser_workflow import execute_workflow
def main():
# Configuration de l'analyseur d'arguments
parser = argparse.ArgumentParser(description="Tester le workflow d'analyse d'images avec contexte")
parser.add_argument("ticket_id", help="ID du ticket à analyser (ex: T1234)")
parser.add_argument("--output_dir", default="output", help="Répertoire de sortie contenant les tickets")
parser.add_argument("--text_model", default="mistral_large", help="Modèle LLM à utiliser pour l'analyse de texte")
parser.add_argument("--vision_model", default="pixtral_large", help="Modèle LLM à utiliser pour l'analyse d'image")
parser.add_argument("--verbose", "-v", action="store_true", help="Afficher plus d'informations")
# Analyser les arguments
args = parser.parse_args()
# Configurer le logging
log_level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
handlers=[
logging.FileHandler(f"image_analyse_workflow_{args.ticket_id}.log"),
logging.StreamHandler()
]
)
# Vérifier que le ticket existe
ticket_path = os.path.join(args.output_dir, f"ticket_{args.ticket_id}")
if not os.path.exists(ticket_path):
print(f"ERREUR: Le ticket {args.ticket_id} n'existe pas dans {args.output_dir}")
return 1
# Afficher les informations de début
print("="*80)
print(f"TEST DU WORKFLOW D'ANALYSE D'IMAGES AVEC CONTEXTE")
print(f"Ticket: {args.ticket_id}")
print(f"Modèle texte: {args.text_model}")
print(f"Modèle vision: {args.vision_model}")
print("="*80)
print()
# Exécuter le workflow
results = execute_workflow(
ticket_id=args.ticket_id,
output_dir=args.output_dir,
text_model=args.text_model,
vision_model=args.vision_model
)
# Afficher un résumé des résultats
if "error" in results:
print(f"\nErreur lors de l'exécution du workflow: {results['error']}")
print(f"Étape en échec: {results.get('stage', 'inconnue')}")
return 1
print("\n" + "="*80)
print("RÉSUMÉ DU WORKFLOW")
print("="*80)
print(f"Ticket: {results['ticket_id']}")
print(f"Analyse du ticket: {len(results['ticket_analysis']) if results['ticket_analysis'] else 0} caractères")
print(f"Images totales: {results['images_count']}")
print(f"Images pertinentes: {len(results['relevant_images'])}")
print(f"Images analysées: {len(results['analysis_results'])}")
print(f"Temps d'exécution: {results['execution_time']:.2f} secondes")
print("\nWorkflow exécuté avec succès!")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,244 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour tester directement l'agent d'analyse d'images (AgentImageAnalyser) avec Pixtral-Large.
Permet d'obtenir une analyse détaillée d'images pertinentes pour un ticket.
"""
import os
import sys
import json
import argparse
import glob
from typing import List, Optional
from llm_classes.pixtral_large import PixtralLarge
from agents.pixtral_large.agent_image_analyser import AgentImageAnalyser
from utils.image_dedup import filtrer_images_uniques
def get_images_in_extraction(extraction_path: str) -> List[str]:
"""
Récupère toutes les images dans un répertoire d'extraction
Args:
extraction_path: Chemin vers le répertoire d'extraction
Returns:
Liste des chemins d'accès aux images
"""
# Extensions d'images courantes
image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.gif', '*.webp']
# Liste pour stocker les chemins d'images
images = []
# Chercher dans le répertoire principal
for extension in image_extensions:
pattern = os.path.join(extraction_path, extension)
images.extend(glob.glob(pattern))
# Chercher dans le sous-répertoire "attachments"
attachments_path = os.path.join(extraction_path, "attachments")
if os.path.exists(attachments_path) and os.path.isdir(attachments_path):
for extension in image_extensions:
pattern = os.path.join(attachments_path, extension)
images.extend(glob.glob(pattern))
# Chercher dans les sous-répertoires de "attachments" (si des ID sont utilisés)
for subdir in os.listdir(attachments_path):
subdir_path = os.path.join(attachments_path, subdir)
if os.path.isdir(subdir_path):
for extension in image_extensions:
pattern = os.path.join(subdir_path, extension)
images.extend(glob.glob(pattern))
return images
def extraire_contexte_ticket(extraction_path: str) -> str:
"""
Extrait le contexte du ticket à partir des fichiers disponibles
Args:
extraction_path: Chemin vers le répertoire d'extraction
Returns:
Contexte du ticket sous forme de texte
"""
# Essayer d'abord de lire le all_messages.json
all_messages_path = os.path.join(extraction_path, "all_messages.json")
if os.path.exists(all_messages_path):
try:
with open(all_messages_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Extraire le résumé du ticket
ticket_summary = data.get("ticket_summary", {})
name = ticket_summary.get("name", "")
description = ticket_summary.get("description", "")
# Construire un contexte concis
contexte = f"Titre du ticket: {name}\n\nDescription: {description}\n\n"
# Ajouter les 3 premiers messages pour plus de contexte
messages = data.get("messages", [])
if messages:
contexte += "Extraits des premiers messages:\n"
for i, msg in enumerate(messages[:3]):
body = msg.get("body", "")
if body:
# Limiter la taille de chaque message
if len(body) > 500:
body = body[:500] + "..."
contexte += f"- Message {i+1}: {body}\n\n"
return contexte
except Exception as e:
print(f"Erreur lors de la lecture du fichier all_messages.json: {str(e)}")
# En cas d'échec, essayer avec le ticket_summary.json
summary_path = os.path.join(extraction_path, "ticket_summary.json")
if os.path.exists(summary_path):
try:
with open(summary_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Extraire les informations de base
name = data.get("name", "")
description = data.get("description", "")
return f"Titre du ticket: {name}\n\nDescription: {description}"
except Exception as e:
print(f"Erreur lors de la lecture du fichier ticket_summary.json: {str(e)}")
# En dernier recours
return "Contexte du ticket non disponible. Analysez uniquement le contenu de l'image."
def main():
# Configuration de l'analyseur d'arguments
parser = argparse.ArgumentParser(description="Tester l'agent d'analyse d'images directement.")
parser.add_argument("ticket_id", help="ID du ticket à analyser (ex: T1234)")
parser.add_argument("--image", help="Chemin spécifique vers une image à analyser")
parser.add_argument("--output_dir", default="output", help="Répertoire de sortie contenant les tickets")
parser.add_argument("--save", action="store_true", help="Sauvegarder les résultats d'analyse dans un fichier")
parser.add_argument("--no-dedup", action="store_true", help="Désactiver le préfiltrage des doublons")
parser.add_argument("--seuil", type=int, default=5, help="Seuil de similarité pour détecter les doublons (0-10, défaut=5)")
parser.add_argument("--verbose", action="store_true", help="Afficher des informations détaillées")
# Analyser les arguments
args = parser.parse_args()
# Construire le chemin vers le ticket
ticket_path = os.path.join(args.output_dir, f"ticket_{args.ticket_id}")
# Vérifier que le répertoire du ticket existe
if not os.path.exists(ticket_path):
print(f"ERREUR: Le ticket {args.ticket_id} n'existe pas dans {args.output_dir}")
return 1
# Rechercher la dernière extraction (la plus récente)
extractions = [d for d in os.listdir(ticket_path) if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(args.ticket_id)]
if not extractions:
print(f"ERREUR: Aucune extraction trouvée pour le ticket {args.ticket_id}")
return 1
# Trier par ordre décroissant pour avoir la plus récente en premier
extractions.sort(reverse=True)
latest_extraction = extractions[0]
extraction_path = os.path.join(ticket_path, latest_extraction)
print(f"Utilisation de l'extraction: {latest_extraction}")
# Extraire le contexte du ticket
contexte = extraire_contexte_ticket(extraction_path)
print(f"Contexte du ticket extrait ({len(contexte)} caractères)")
# Initialiser le modèle Pixtral-Large
try:
print("Initialisation du modèle Pixtral-Large...")
model = PixtralLarge()
agent = AgentImageAnalyser(model)
print("Agent d'analyse d'images initialisé avec succès")
except Exception as e:
print(f"ERREUR: Impossible d'initialiser le modèle: {str(e)}")
return 1
# Si une image spécifique est fournie
if args.image:
image_path = args.image
if not os.path.exists(image_path):
print(f"ERREUR: L'image spécifiée n'existe pas: {image_path}")
return 1
print(f"Analyse de l'image: {os.path.basename(image_path)}")
resultat = agent.executer(image_path, contexte)
# Afficher le résultat
print("\nRésultat de l'analyse:")
print(resultat.get('analyse', 'Analyse non disponible'))
# Sauvegarder le résultat si demandé
if args.save:
output_file = os.path.join(extraction_path, f"analyse_{os.path.basename(image_path)}.txt")
with open(output_file, 'w', encoding='utf-8') as f:
f.write(resultat.get('analyse', 'Analyse non disponible'))
print(f"\nAnalyse sauvegardée dans: {output_file}")
else:
# Récupérer toutes les images de l'extraction
images = get_images_in_extraction(extraction_path)
if not images:
print(f"Aucune image trouvée dans l'extraction {latest_extraction}")
return 1
print(f"Nombre d'images trouvées: {len(images)}")
# Appliquer le préfiltrage de doublons si activé
if not args.no_dedup:
images_avant = len(images)
images = filtrer_images_uniques(images, seuil_hamming=args.seuil)
images_apres = len(images)
if images_avant > images_apres:
print(f"Préfiltrage des doublons: {images_avant}{images_apres} images ({images_avant - images_apres} doublons supprimés)")
else:
print("Préfiltrage terminé: aucun doublon détecté")
# Analyser chaque image
results = []
for image_path in images:
image_name = os.path.basename(image_path)
print(f"\nAnalyse de l'image: {image_name}")
resultat = agent.executer(image_path, contexte)
results.append(resultat)
# Afficher le résultat
print("\nRésultat de l'analyse:")
if args.verbose:
print(resultat.get('analyse', 'Analyse non disponible'))
else:
# Afficher seulement les premières lignes pour un aperçu
analyse = resultat.get('analyse', 'Analyse non disponible')
lignes = analyse.split('\n')
apercu = '\n'.join(lignes[:min(5, len(lignes))]) + ('\n...' if len(lignes) > 5 else '')
print(apercu)
# Sauvegarder le résultat si demandé
if args.save:
output_file = os.path.join(extraction_path, f"analyse_{image_name}.txt")
with open(output_file, 'w', encoding='utf-8') as f:
f.write(resultat.get('analyse', 'Analyse non disponible'))
print(f"Analyse sauvegardée dans: {output_file}")
# Afficher un résumé à la fin
print(f"\nRésumé: {len(results)} images analysées")
if args.save:
print(f"Les résultats ont été sauvegardés dans le répertoire: {extraction_path}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,179 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de test pour exécuter l'orchestrateur sur un ticket spécifique.
Utilisation: python test_orchestrator.py [code_ticket]
Exemple: python test_orchestrator.py T0101
"""
import os
import sys
import time
import logging
import traceback
from datetime import datetime
# Import des agents
from agents.deepseek.agent_ticket_analyser import AgentTicketAnalyser
from agents.pixtral_large.agent_image_sorter import AgentImageSorter
from agents.pixtral_large.agent_image_analyser import AgentImageAnalyser
from agents.deepseek.agent_report_generator import AgentReportGenerator
# Import des modèles LLM
from llm_classes.deepseek import DeepSeek
from llm_classes.pixtral_large import PixtralLarge
# Import de l'orchestrateur
from orchestrator import Orchestrator
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
filename='test_orchestrator.log', filemode='w')
logger = logging.getLogger("TestOrchestrator")
def test_orchestrator(ticket_id=None):
"""
Exécute l'orchestrateur avec les agents définis
Args:
ticket_id: Identifiant du ticket à traiter (optionnel)
"""
# Vérifier que le dossier output existe
if not os.path.exists("output/"):
os.makedirs("output/")
logger.warning("Le dossier output/ n'existait pas et a été créé")
print("ATTENTION: Le dossier output/ n'existait pas et a été créé")
# Vérifier le contenu du dossier output
tickets = [d for d in os.listdir("output/") if d.startswith("ticket_") and os.path.isdir(os.path.join("output/", d))]
logger.info(f"Tickets trouvés dans output/: {len(tickets)}")
print(f"Tickets existants dans output/: {len(tickets)}")
if len(tickets) == 0:
logger.error("Aucun ticket trouvé dans le dossier output/")
print("ERREUR: Aucun ticket trouvé dans le dossier output/")
return
# Initialisation des LLM
print("Initialisation des modèles LLM...")
start_time = time.time()
json_llm = DeepSeek()
logger.info("LLM DeepSeek initialisé pour l'analyse JSON")
# Utilisation de Pixtral12b pour le tri et l'analyse d'images
image_sorter_llm = PixtralLarge()
logger.info("LLM Pixtral12b initialisé pour le tri d'images")
image_analyser_llm = PixtralLarge()
logger.info("LLM Pixtral12b initialisé pour l'analyse d'images")
report_generator_llm = DeepSeek()
logger.info("LLM DeepSeeek initialisé pour la génération de rapports")
llm_init_time = time.time() - start_time
print(f"Tous les modèles LLM ont été initialisés en {llm_init_time:.2f} secondes")
# Création des agents
print("Création des agents...")
ticket_agent = AgentTicketAnalyser(json_llm)
image_sorter = AgentImageSorter(image_sorter_llm)
image_analyser = AgentImageAnalyser(image_analyser_llm)
report_generator = AgentReportGenerator(report_generator_llm)
print("Tous les agents ont été créés")
# Initialisation de l'orchestrateur avec les agents
logger.info("Initialisation de l'orchestrateur")
print("Initialisation de l'orchestrateur")
orchestrator = Orchestrator(
output_dir="output/",
ticket_agent=ticket_agent,
image_sorter=image_sorter,
image_analyser=image_analyser,
report_generator=report_generator
)
# Vérification du ticket spécifique si fourni
specific_ticket_path = None
if ticket_id:
target_ticket = f"ticket_{ticket_id}"
specific_ticket_path = os.path.join("output", target_ticket)
if not os.path.exists(specific_ticket_path):
logger.error(f"Le ticket {target_ticket} n'existe pas")
print(f"ERREUR: Le ticket {target_ticket} n'existe pas")
return
logger.info(f"Ticket spécifique à traiter: {specific_ticket_path}")
print(f"Ticket spécifique à traiter: {target_ticket}")
# Exécution de l'orchestrateur
total_start_time = time.time()
logger.info("Début de l'exécution de l'orchestrateur")
print("Début de l'exécution de l'orchestrateur")
try:
orchestrator.executer(ticket_id)
# Vérifier le rapport généré et afficher un résumé
if ticket_id:
# Chercher le rapport Markdown le plus récent
ticket_dir = os.path.join("output", f"ticket_{ticket_id}")
latest_md = None
for extraction in os.listdir(ticket_dir):
extraction_path = os.path.join(ticket_dir, extraction)
if os.path.isdir(extraction_path):
rapports_dir = os.path.join(extraction_path, f"{ticket_id}_rapports", f"{ticket_id}")
if os.path.exists(rapports_dir):
md_files = [f for f in os.listdir(rapports_dir) if f.endswith('.md')]
if md_files:
md_files.sort(reverse=True) # Le plus récent en premier
latest_md = os.path.join(rapports_dir, md_files[0])
break
if latest_md:
print(f"\nVérification du rapport: {latest_md}")
try:
with open(latest_md, 'r', encoding='utf-8') as f:
content = f.read()
# Vérifier si le tableau des échanges est présent
has_table = "| Date | " in content
has_details = "## Détails des analyses" in content
print(f"- Tableau des échanges: {'Présent' if has_table else 'MANQUANT'}")
print(f"- Détails des analyses: {'Présent' if has_details else 'MANQUANT'}")
if not has_table:
print("\nATTENTION: Le tableau des échanges client/support est manquant!")
print("Vérifiez le system prompt de l'agent de rapport et la transmission des données.")
except Exception as e:
print(f"Erreur lors de la vérification du rapport: {e}")
except Exception as e:
logger.error(f"Erreur lors de l'exécution de l'orchestrateur: {str(e)}")
print(f"ERREUR: {str(e)}")
traceback.print_exc()
total_time = time.time() - total_start_time
logger.info(f"Fin de l'exécution de l'orchestrateur (durée: {total_time:.2f} secondes)")
print(f"Fin de l'exécution de l'orchestrateur (durée: {total_time:.2f} secondes)")
if __name__ == "__main__":
print("Démarrage du test de l'orchestrateur")
# Vérifier si un ID de ticket est passé en argument
ticket_id = None
if len(sys.argv) > 1:
ticket_id = sys.argv[1]
print(f"ID de ticket fourni en argument: {ticket_id}")
test_orchestrator(ticket_id)
print("Test terminé")

View File

@ -1,173 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import time
import logging
import traceback
from datetime import datetime
# Import des agents
from agents.mistral_large.agent_ticket_analyser import AgentTicketAnalyser
from agents.pixtral_large.agent_image_sorter import AgentImageSorter
from agents.pixtral_large.agent_image_analyser import AgentImageAnalyser
from agents.mistral_large.agent_report_generator import AgentReportGenerator
# Import des modèles LLM
from llm_classes.mistral_large import MistralLarge
from llm_classes.pixtral_large import PixtralLarge
# Import de l'orchestrateur
from orchestrator import Orchestrator
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
filename='test_orchestrator.log', filemode='w')
logger = logging.getLogger("TestOrchestrator")
def test_orchestrator(ticket_id=None):
"""
Exécute l'orchestrateur avec les agents définis
Args:
ticket_id: Identifiant du ticket à traiter (optionnel)
"""
# Vérifier que le dossier output existe
if not os.path.exists("output/"):
os.makedirs("output/")
logger.warning("Le dossier output/ n'existait pas et a été créé")
print("ATTENTION: Le dossier output/ n'existait pas et a été créé")
# Vérifier le contenu du dossier output
tickets = [d for d in os.listdir("output/") if d.startswith("ticket_") and os.path.isdir(os.path.join("output/", d))]
logger.info(f"Tickets trouvés dans output/: {len(tickets)}")
print(f"Tickets existants dans output/: {len(tickets)}")
if len(tickets) == 0:
logger.error("Aucun ticket trouvé dans le dossier output/")
print("ERREUR: Aucun ticket trouvé dans le dossier output/")
return
# Initialisation des LLM
print("Initialisation des modèles LLM...")
start_time = time.time()
# Utilisation de Mistral Medium pour l'analyse JSON et la génération de rapports
json_llm = MistralLarge()
logger.info("LLM MistralMedium initialisé pour l'analyse JSON")
# Utilisation de Pixtral12b pour le tri et l'analyse d'images
image_sorter_llm = PixtralLarge()
logger.info("LLM Pixtral12b initialisé pour le tri d'images")
image_analyser_llm = PixtralLarge()
logger.info("LLM Pixtral12b initialisé pour l'analyse d'images")
report_generator_llm = MistralLarge()
logger.info("LLM MistralMedium initialisé pour la génération de rapports")
llm_init_time = time.time() - start_time
print(f"Tous les modèles LLM ont été initialisés en {llm_init_time:.2f} secondes")
# Création des agents
print("Création des agents...")
ticket_agent = AgentTicketAnalyser(json_llm)
image_sorter = AgentImageSorter(image_sorter_llm)
image_analyser = AgentImageAnalyser(image_analyser_llm)
report_generator = AgentReportGenerator(report_generator_llm)
print("Tous les agents ont été créés")
# Initialisation de l'orchestrateur avec les agents
logger.info("Initialisation de l'orchestrateur")
print("Initialisation de l'orchestrateur")
orchestrator = Orchestrator(
output_dir="output/",
ticket_agent=ticket_agent,
image_sorter=image_sorter,
image_analyser=image_analyser,
report_generator=report_generator
)
# Vérification du ticket spécifique si fourni
specific_ticket_path = None
if ticket_id:
target_ticket = f"ticket_{ticket_id}"
specific_ticket_path = os.path.join("output", target_ticket)
if not os.path.exists(specific_ticket_path):
logger.error(f"Le ticket {target_ticket} n'existe pas")
print(f"ERREUR: Le ticket {target_ticket} n'existe pas")
return
logger.info(f"Ticket spécifique à traiter: {specific_ticket_path}")
print(f"Ticket spécifique à traiter: {target_ticket}")
# Exécution de l'orchestrateur
total_start_time = time.time()
logger.info("Début de l'exécution de l'orchestrateur")
print("Début de l'exécution de l'orchestrateur")
try:
orchestrator.executer(ticket_id)
# Vérifier le rapport généré et afficher un résumé
if ticket_id:
# Chercher le rapport Markdown le plus récent
ticket_dir = os.path.join("output", f"ticket_{ticket_id}")
latest_md = None
for extraction in os.listdir(ticket_dir):
extraction_path = os.path.join(ticket_dir, extraction)
if os.path.isdir(extraction_path):
rapports_dir = os.path.join(extraction_path, f"{ticket_id}_rapports", f"{ticket_id}")
if os.path.exists(rapports_dir):
md_files = [f for f in os.listdir(rapports_dir) if f.endswith('.md')]
if md_files:
md_files.sort(reverse=True) # Le plus récent en premier
latest_md = os.path.join(rapports_dir, md_files[0])
break
if latest_md:
print(f"\nVérification du rapport: {latest_md}")
try:
with open(latest_md, 'r', encoding='utf-8') as f:
content = f.read()
# Vérifier si le tableau des échanges est présent
has_table = "| Date | " in content
has_details = "## Détails des analyses" in content
print(f"- Tableau des échanges: {'Présent' if has_table else 'MANQUANT'}")
print(f"- Détails des analyses: {'Présent' if has_details else 'MANQUANT'}")
if not has_table:
print("\nATTENTION: Le tableau des échanges client/support est manquant!")
print("Vérifiez le system prompt de l'agent de rapport et la transmission des données.")
except Exception as e:
print(f"Erreur lors de la vérification du rapport: {e}")
except Exception as e:
logger.error(f"Erreur lors de l'exécution de l'orchestrateur: {str(e)}")
print(f"ERREUR: {str(e)}")
traceback.print_exc()
total_time = time.time() - total_start_time
logger.info(f"Fin de l'exécution de l'orchestrateur (durée: {total_time:.2f} secondes)")
print(f"Fin de l'exécution de l'orchestrateur (durée: {total_time:.2f} secondes)")
if __name__ == "__main__":
print("Démarrage du test de l'orchestrateur")
# Vérifier si un ID de ticket est passé en argument
ticket_id = None
if len(sys.argv) > 1:
ticket_id = sys.argv[1]
print(f"ID de ticket fourni en argument: {ticket_id}")
test_orchestrator(ticket_id)
print("Test terminé")

View File

@ -1,211 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de test pour exécuter l'orchestrateur sur un ticket spécifique
avec le modèle LlamaVision pour tous les agents.
Utilisation: python test_orchestrator_llama_vision.py [code_ticket]
Exemple: python test_orchestrator_llama_vision.py T0101
"""
import os
import sys
import time
import logging
import traceback
from datetime import datetime
# Import des agents
from agents.llama_vision.agent_ticket_analyser import AgentTicketAnalyser
from agents.llama_vision.agent_image_sorter import AgentImageSorter
from agents.llama_vision.agent_image_analyser import AgentImageAnalyser
from agents.llama_vision.agent_report_generator import AgentReportGenerator
# Import du modèle LLM LlamaVision
from llm_classes.llama_vision import LlamaVision
# Import de l'orchestrateur
from orchestrator import Orchestrator
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
filename='test_orchestrator_llama_vision.log', filemode='w')
logger = logging.getLogger("TestOrchestratorLlamaVision")
def test_orchestrator_llama_vision(ticket_id=None):
"""
Exécute l'orchestrateur avec les agents définis utilisant tous LlamaVision
Args:
ticket_id: Identifiant du ticket à traiter (optionnel)
"""
# Vérifier que le dossier output existe
if not os.path.exists("output/"):
os.makedirs("output/")
logger.warning("Le dossier output/ n'existait pas et a été créé")
print("ATTENTION: Le dossier output/ n'existait pas et a été créé")
# Vérifier le contenu du dossier output
tickets = [d for d in os.listdir("output/") if d.startswith("ticket_") and os.path.isdir(os.path.join("output/", d))]
logger.info(f"Tickets trouvés dans output/: {len(tickets)}")
print(f"Tickets existants dans output/: {len(tickets)}")
if len(tickets) == 0:
logger.error("Aucun ticket trouvé dans le dossier output/")
print("ERREUR: Aucun ticket trouvé dans le dossier output/")
return
# Initialisation des LLM
print("Initialisation du modèle LlamaVision...")
start_time = time.time()
# Valeurs de timeout pour chaque étape
request_timeout = 400 # 6,5 minutes
# Utilisation de LlamaVision pour tous les agents avec paramètres adaptés
json_llm = LlamaVision()
json_llm.configurer(
temperature=0.1,
top_p=0.7,
num_ctx=2048,
num_predict=2048,
request_timeout=request_timeout
)
logger.info("LLM LlamaVision initialisé pour l'analyse JSON")
image_sorter_llm = LlamaVision()
image_sorter_llm.configurer(
temperature=0.1,
top_p=0.8,
top_k=30,
num_ctx=1024,
request_timeout=request_timeout
)
logger.info("LLM LlamaVision initialisé pour le tri d'images")
image_analyser_llm = LlamaVision()
image_analyser_llm.configurer(
temperature=0.2,
top_p=0.8,
num_ctx=2048,
request_timeout=request_timeout
)
logger.info("LLM LlamaVision initialisé pour l'analyse d'images")
report_generator_llm = LlamaVision()
report_generator_llm.configurer(
temperature=0.3,
top_p=0.8,
num_ctx=4096,
num_predict=3000,
request_timeout=request_timeout
)
logger.info("LLM LlamaVision initialisé pour la génération de rapports")
llm_init_time = time.time() - start_time
print(f"Tous les modèles LlamaVision ont été initialisés en {llm_init_time:.2f} secondes")
# Création des agents
print("Création des agents...")
ticket_agent = AgentTicketAnalyser(json_llm)
image_sorter = AgentImageSorter(image_sorter_llm)
image_analyser = AgentImageAnalyser(image_analyser_llm)
# Utiliser AgentReportGeneratorQwen pour le rapport (plus stable)
report_generator = AgentReportGenerator(report_generator_llm)
print("Tous les agents ont été créés")
# Initialisation de l'orchestrateur avec les agents
logger.info("Initialisation de l'orchestrateur")
print("Initialisation de l'orchestrateur")
orchestrator = Orchestrator(
output_dir="output/",
ticket_agent=ticket_agent,
image_sorter=image_sorter,
image_analyser=image_analyser,
report_generator=report_generator
)
# Vérification du ticket spécifique si fourni
specific_ticket_path = None
if ticket_id:
target_ticket = f"ticket_{ticket_id}"
specific_ticket_path = os.path.join("output", target_ticket)
if not os.path.exists(specific_ticket_path):
logger.error(f"Le ticket {target_ticket} n'existe pas")
print(f"ERREUR: Le ticket {target_ticket} n'existe pas")
return
logger.info(f"Ticket spécifique à traiter: {specific_ticket_path}")
print(f"Ticket spécifique à traiter: {target_ticket}")
# Exécution de l'orchestrateur
total_start_time = time.time()
logger.info("Début de l'exécution de l'orchestrateur avec LlamaVision")
print("Début de l'exécution de l'orchestrateur avec LlamaVision")
try:
orchestrator.executer(ticket_id)
# Vérifier le rapport généré et afficher un résumé
if ticket_id:
# Chercher le rapport Markdown le plus récent
ticket_dir = os.path.join("output", f"ticket_{ticket_id}")
latest_md = None
for extraction in os.listdir(ticket_dir):
extraction_path = os.path.join(ticket_dir, extraction)
if os.path.isdir(extraction_path):
rapports_dir = os.path.join(extraction_path, f"{ticket_id}_rapports", f"{ticket_id}")
if os.path.exists(rapports_dir):
md_files = [f for f in os.listdir(rapports_dir) if f.endswith('.md')]
if md_files:
md_files.sort(reverse=True) # Le plus récent en premier
latest_md = os.path.join(rapports_dir, md_files[0])
break
if latest_md:
print(f"\nVérification du rapport: {latest_md}")
try:
with open(latest_md, 'r', encoding='utf-8') as f:
content = f.read()
# Vérifier si le tableau des échanges est présent
has_table = "| Date | " in content
has_details = "## Détails des analyses" in content
print(f"- Tableau des échanges: {'Présent' if has_table else 'MANQUANT'}")
print(f"- Détails des analyses: {'Présent' if has_details else 'MANQUANT'}")
if not has_table:
print("\nATTENTION: Le tableau des échanges client/support est manquant!")
print("Vérifiez le system prompt de l'agent de rapport et la transmission des données.")
except Exception as e:
print(f"Erreur lors de la vérification du rapport: {e}")
except Exception as e:
logger.error(f"Erreur lors de l'exécution de l'orchestrateur: {str(e)}")
print(f"ERREUR: {str(e)}")
traceback.print_exc()
total_time = time.time() - total_start_time
logger.info(f"Fin de l'exécution de l'orchestrateur (durée: {total_time:.2f} secondes)")
print(f"Fin de l'exécution de l'orchestrateur (durée: {total_time:.2f} secondes)")
if __name__ == "__main__":
print("Démarrage du test de l'orchestrateur avec LlamaVision")
# Vérifier si un ID de ticket est passé en argument
ticket_id = None
if len(sys.argv) > 1:
ticket_id = sys.argv[1]
print(f"ID de ticket fourni en argument: {ticket_id}")
test_orchestrator_llama_vision(ticket_id)
print("Test terminé")

View File

@ -1,180 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de test pour exécuter l'orchestrateur sur un ticket spécifique.
Utilisation: python test_orchestrator.py [code_ticket]
Exemple: python test_orchestrator.py T0101
"""
import os
import sys
import time
import logging
import traceback
from datetime import datetime
# Import des agents
from agents.agent_ticket_analyser import AgentTicketAnalyser
from agents.agent_image_sorter import AgentImageSorter
from agents.agent_image_analyser import AgentImageAnalyser
from agents.agent_report_generator import AgentReportGenerator
# Import des modèles LLM
from llm_classes.mistral_medium import MistralMedium
from llm_classes.pixtral_12b import Pixtral12b
# Import de l'orchestrateur
from orchestrator import Orchestrator
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
filename='test_orchestrator.log', filemode='w')
logger = logging.getLogger("TestOrchestrator")
def test_orchestrator(ticket_id=None):
"""
Exécute l'orchestrateur avec les agents définis
Args:
ticket_id: Identifiant du ticket à traiter (optionnel)
"""
# Vérifier que le dossier output existe
if not os.path.exists("output/"):
os.makedirs("output/")
logger.warning("Le dossier output/ n'existait pas et a été créé")
print("ATTENTION: Le dossier output/ n'existait pas et a été créé")
# Vérifier le contenu du dossier output
tickets = [d for d in os.listdir("output/") if d.startswith("ticket_") and os.path.isdir(os.path.join("output/", d))]
logger.info(f"Tickets trouvés dans output/: {len(tickets)}")
print(f"Tickets existants dans output/: {len(tickets)}")
if len(tickets) == 0:
logger.error("Aucun ticket trouvé dans le dossier output/")
print("ERREUR: Aucun ticket trouvé dans le dossier output/")
return
# Initialisation des LLM
print("Initialisation des modèles LLM...")
start_time = time.time()
# Utilisation de Mistral Medium pour l'analyse JSON et la génération de rapports
json_llm = MistralMedium()
logger.info("LLM MistralMedium initialisé pour l'analyse JSON")
# Utilisation de Pixtral12b pour le tri et l'analyse d'images
image_sorter_llm = Pixtral12b()
logger.info("LLM Pixtral12b initialisé pour le tri d'images")
image_analyser_llm = Pixtral12b()
logger.info("LLM Pixtral12b initialisé pour l'analyse d'images")
report_generator_llm = MistralMedium()
logger.info("LLM MistralMedium initialisé pour la génération de rapports")
llm_init_time = time.time() - start_time
print(f"Tous les modèles LLM ont été initialisés en {llm_init_time:.2f} secondes")
# Création des agents
print("Création des agents...")
ticket_agent = AgentTicketAnalyser(json_llm)
image_sorter = AgentImageSorter(image_sorter_llm)
image_analyser = AgentImageAnalyser(image_analyser_llm)
report_generator = AgentReportGenerator(report_generator_llm)
print("Tous les agents ont été créés")
# Initialisation de l'orchestrateur avec les agents
logger.info("Initialisation de l'orchestrateur")
print("Initialisation de l'orchestrateur")
orchestrator = Orchestrator(
output_dir="output/",
ticket_agent=ticket_agent,
image_sorter=image_sorter,
image_analyser=image_analyser,
report_generator=report_generator
)
# Vérification du ticket spécifique si fourni
specific_ticket_path = None
if ticket_id:
target_ticket = f"ticket_{ticket_id}"
specific_ticket_path = os.path.join("output", target_ticket)
if not os.path.exists(specific_ticket_path):
logger.error(f"Le ticket {target_ticket} n'existe pas")
print(f"ERREUR: Le ticket {target_ticket} n'existe pas")
return
logger.info(f"Ticket spécifique à traiter: {specific_ticket_path}")
print(f"Ticket spécifique à traiter: {target_ticket}")
# Exécution de l'orchestrateur
total_start_time = time.time()
logger.info("Début de l'exécution de l'orchestrateur")
print("Début de l'exécution de l'orchestrateur")
try:
orchestrator.executer(ticket_id)
# Vérifier le rapport généré et afficher un résumé
if ticket_id:
# Chercher le rapport Markdown le plus récent
ticket_dir = os.path.join("output", f"ticket_{ticket_id}")
latest_md = None
for extraction in os.listdir(ticket_dir):
extraction_path = os.path.join(ticket_dir, extraction)
if os.path.isdir(extraction_path):
rapports_dir = os.path.join(extraction_path, f"{ticket_id}_rapports", f"{ticket_id}")
if os.path.exists(rapports_dir):
md_files = [f for f in os.listdir(rapports_dir) if f.endswith('.md')]
if md_files:
md_files.sort(reverse=True) # Le plus récent en premier
latest_md = os.path.join(rapports_dir, md_files[0])
break
if latest_md:
print(f"\nVérification du rapport: {latest_md}")
try:
with open(latest_md, 'r', encoding='utf-8') as f:
content = f.read()
# Vérifier si le tableau des échanges est présent
has_table = "| Date | " in content
has_details = "## Détails des analyses" in content
print(f"- Tableau des échanges: {'Présent' if has_table else 'MANQUANT'}")
print(f"- Détails des analyses: {'Présent' if has_details else 'MANQUANT'}")
if not has_table:
print("\nATTENTION: Le tableau des échanges client/support est manquant!")
print("Vérifiez le system prompt de l'agent de rapport et la transmission des données.")
except Exception as e:
print(f"Erreur lors de la vérification du rapport: {e}")
except Exception as e:
logger.error(f"Erreur lors de l'exécution de l'orchestrateur: {str(e)}")
print(f"ERREUR: {str(e)}")
traceback.print_exc()
total_time = time.time() - total_start_time
logger.info(f"Fin de l'exécution de l'orchestrateur (durée: {total_time:.2f} secondes)")
print(f"Fin de l'exécution de l'orchestrateur (durée: {total_time:.2f} secondes)")
if __name__ == "__main__":
print("Démarrage du test de l'orchestrateur")
# Vérifier si un ID de ticket est passé en argument
ticket_id = None
if len(sys.argv) > 1:
ticket_id = sys.argv[1]
print(f"ID de ticket fourni en argument: {ticket_id}")
test_orchestrator(ticket_id)
print("Test terminé")

View File

@ -1,180 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de test pour exécuter l'orchestrateur sur un ticket spécifique.
Utilisation: python test_orchestrator.py [code_ticket]
Exemple: python test_orchestrator.py T0101
"""
import os
import sys
import time
import logging
import traceback
from datetime import datetime
# Import des agents
from agents.mistral_medium.agent_ticket_analyser import AgentTicketAnalyser
from agents.pixtral12b.agent_image_sorter import AgentImageSorter
from agents.pixtral12b.agent_image_analyser import AgentImageAnalyser
from agents.mistral_medium.agent_report_generator import AgentReportGenerator
# Import des modèles LLM
from llm_classes.mistral_medium import MistralMedium
from llm_classes.pixtral_12b import Pixtral12b
# Import de l'orchestrateur
from orchestrator import Orchestrator
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
filename='test_orchestrator.log', filemode='w')
logger = logging.getLogger("TestOrchestrator")
def test_orchestrator(ticket_id=None):
"""
Exécute l'orchestrateur avec les agents définis
Args:
ticket_id: Identifiant du ticket à traiter (optionnel)
"""
# Vérifier que le dossier output existe
if not os.path.exists("output/"):
os.makedirs("output/")
logger.warning("Le dossier output/ n'existait pas et a été créé")
print("ATTENTION: Le dossier output/ n'existait pas et a été créé")
# Vérifier le contenu du dossier output
tickets = [d for d in os.listdir("output/") if d.startswith("ticket_") and os.path.isdir(os.path.join("output/", d))]
logger.info(f"Tickets trouvés dans output/: {len(tickets)}")
print(f"Tickets existants dans output/: {len(tickets)}")
if len(tickets) == 0:
logger.error("Aucun ticket trouvé dans le dossier output/")
print("ERREUR: Aucun ticket trouvé dans le dossier output/")
return
# Initialisation des LLM
print("Initialisation des modèles LLM...")
start_time = time.time()
# Utilisation de Mistral Medium pour l'analyse JSON et la génération de rapports
json_llm = MistralMedium()
logger.info("LLM MistralMedium initialisé pour l'analyse JSON")
# Utilisation de Pixtral12b pour le tri et l'analyse d'images
image_sorter_llm = Pixtral12b()
logger.info("LLM Pixtral12b initialisé pour le tri d'images")
image_analyser_llm = Pixtral12b()
logger.info("LLM Pixtral12b initialisé pour l'analyse d'images")
report_generator_llm = MistralMedium()
logger.info("LLM MistralMedium initialisé pour la génération de rapports")
llm_init_time = time.time() - start_time
print(f"Tous les modèles LLM ont été initialisés en {llm_init_time:.2f} secondes")
# Création des agents
print("Création des agents...")
ticket_agent = AgentTicketAnalyser(json_llm)
image_sorter = AgentImageSorter(image_sorter_llm)
image_analyser = AgentImageAnalyser(image_analyser_llm)
report_generator = AgentReportGenerator(report_generator_llm)
print("Tous les agents ont été créés")
# Initialisation de l'orchestrateur avec les agents
logger.info("Initialisation de l'orchestrateur")
print("Initialisation de l'orchestrateur")
orchestrator = Orchestrator(
output_dir="output/",
ticket_agent=ticket_agent,
image_sorter=image_sorter,
image_analyser=image_analyser,
report_generator=report_generator
)
# Vérification du ticket spécifique si fourni
specific_ticket_path = None
if ticket_id:
target_ticket = f"ticket_{ticket_id}"
specific_ticket_path = os.path.join("output", target_ticket)
if not os.path.exists(specific_ticket_path):
logger.error(f"Le ticket {target_ticket} n'existe pas")
print(f"ERREUR: Le ticket {target_ticket} n'existe pas")
return
logger.info(f"Ticket spécifique à traiter: {specific_ticket_path}")
print(f"Ticket spécifique à traiter: {target_ticket}")
# Exécution de l'orchestrateur
total_start_time = time.time()
logger.info("Début de l'exécution de l'orchestrateur")
print("Début de l'exécution de l'orchestrateur")
try:
orchestrator.executer(ticket_id)
# Vérifier le rapport généré et afficher un résumé
if ticket_id:
# Chercher le rapport Markdown le plus récent
ticket_dir = os.path.join("output", f"ticket_{ticket_id}")
latest_md = None
for extraction in os.listdir(ticket_dir):
extraction_path = os.path.join(ticket_dir, extraction)
if os.path.isdir(extraction_path):
rapports_dir = os.path.join(extraction_path, f"{ticket_id}_rapports", f"{ticket_id}")
if os.path.exists(rapports_dir):
md_files = [f for f in os.listdir(rapports_dir) if f.endswith('.md')]
if md_files:
md_files.sort(reverse=True) # Le plus récent en premier
latest_md = os.path.join(rapports_dir, md_files[0])
break
if latest_md:
print(f"\nVérification du rapport: {latest_md}")
try:
with open(latest_md, 'r', encoding='utf-8') as f:
content = f.read()
# Vérifier si le tableau des échanges est présent
has_table = "| Date | " in content
has_details = "## Détails des analyses" in content
print(f"- Tableau des échanges: {'Présent' if has_table else 'MANQUANT'}")
print(f"- Détails des analyses: {'Présent' if has_details else 'MANQUANT'}")
if not has_table:
print("\nATTENTION: Le tableau des échanges client/support est manquant!")
print("Vérifiez le system prompt de l'agent de rapport et la transmission des données.")
except Exception as e:
print(f"Erreur lors de la vérification du rapport: {e}")
except Exception as e:
logger.error(f"Erreur lors de l'exécution de l'orchestrateur: {str(e)}")
print(f"ERREUR: {str(e)}")
traceback.print_exc()
total_time = time.time() - total_start_time
logger.info(f"Fin de l'exécution de l'orchestrateur (durée: {total_time:.2f} secondes)")
print(f"Fin de l'exécution de l'orchestrateur (durée: {total_time:.2f} secondes)")
if __name__ == "__main__":
print("Démarrage du test de l'orchestrateur")
# Vérifier si un ID de ticket est passé en argument
ticket_id = None
if len(sys.argv) > 1:
ticket_id = sys.argv[1]
print(f"ID de ticket fourni en argument: {ticket_id}")
test_orchestrator(ticket_id)
print("Test terminé")

View File

@ -1,190 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de test pour exécuter l'orchestrateur sur un ticket spécifique
avec l'agent Qwen spécialisé.
Utilisation: python test_orchestrator_qwen_specialized.py [code_ticket]
Exemple: python test_orchestrator_qwen_specialized.py T9656
"""
import os
import sys
import time
import logging
import traceback
from datetime import datetime
# S'assurer que le répertoire racine est dans le sys.path
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.insert(0, current_dir)
# Import des agents - utilisation des agents depuis le dossier agents
from agents.qwen2_5.agent_ticket_analyser import AgentTicketAnalyser
from agents.pixtral_large.agent_image_sorter import AgentImageSorter
from agents.pixtral_large.agent_image_analyser import AgentImageAnalyser
from agents.qwen2_5.agent_report_generator import AgentReportGenerator
# Import des modèles LLM
from llm_classes.pixtral_large import PixtralLarge
from llm_classes.qwen2_5 import Qwen2_5 # Import de la nouvelle classe Qwen2_5
# Import de l'orchestrateur
from orchestrator import Orchestrator
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
filename='test_orchestrator_qwen_specialized.log', filemode='w')
logger = logging.getLogger("TestOrchestratorQwenSpecialized")
def test_orchestrator(ticket_id=None):
"""
Exécute l'orchestrateur avec le modèle Qwen 2.5
Args:
ticket_id: Identifiant du ticket à traiter (optionnel)
"""
# Vérifier que le dossier output existe
if not os.path.exists("output/"):
os.makedirs("output/")
logger.warning("Le dossier output/ n'existait pas et a été créé")
print("ATTENTION: Le dossier output/ n'existait pas et a été créé")
# Vérifier le contenu du dossier output
tickets = [d for d in os.listdir("output/") if d.startswith("ticket_") and os.path.isdir(os.path.join("output/", d))]
logger.info(f"Tickets trouvés dans output/: {len(tickets)}")
print(f"Tickets existants dans output/: {len(tickets)}")
if len(tickets) == 0:
logger.error("Aucun ticket trouvé dans le dossier output/")
print("ERREUR: Aucun ticket trouvé dans le dossier output/")
return
# Initialisation des LLM
print("Initialisation des modèles LLM...")
start_time = time.time()
# Utilisation de MistralMedium pour l'analyse des tickets
ticket_llm = Qwen2_5()
logger.info("LLM MistralMedium initialisé pour l'analyse des tickets")
# Utilisation de Pixtral12b pour le tri et l'analyse d'images
image_sorter_llm = PixtralLarge()
logger.info("LLM Pixtral12b initialisé pour le tri d'images")
image_analyser_llm = PixtralLarge()
logger.info("LLM PixtralLarge initialisé pour l'analyse d'images")
# Utilisation de Qwen2_5 pour la génération des rapports
qwen_llm = Qwen2_5()
qwen_llm.configurer(temperature=0.2, top_p=0.9, max_tokens=10000)
logger.info("LLM Qwen2_5 initialisé pour la génération des rapports")
llm_init_time = time.time() - start_time
print(f"Tous les modèles LLM ont été initialisés en {llm_init_time:.2f} secondes")
# Création des agents
print("Création des agents...")
ticket_agent = AgentTicketAnalyser(ticket_llm)
image_sorter = AgentImageSorter(image_sorter_llm)
image_analyser = AgentImageAnalyser(image_analyser_llm)
# Utilisation de l'agent de rapport avec Qwen
report_generator = AgentReportGenerator(qwen_llm)
print("Tous les agents ont été créés")
# Initialisation de l'orchestrateur avec les agents
logger.info("Initialisation de l'orchestrateur")
print("Initialisation de l'orchestrateur")
orchestrator = Orchestrator(
output_dir="output/",
ticket_agent=ticket_agent,
image_sorter=image_sorter,
image_analyser=image_analyser,
report_generator=report_generator
)
# Vérification du ticket spécifique si fourni
specific_ticket_path = None
if ticket_id:
target_ticket = f"ticket_{ticket_id}"
specific_ticket_path = os.path.join("output", target_ticket)
if not os.path.exists(specific_ticket_path):
logger.error(f"Le ticket {target_ticket} n'existe pas")
print(f"ERREUR: Le ticket {target_ticket} n'existe pas")
return
logger.info(f"Ticket spécifique à traiter: {specific_ticket_path}")
print(f"Ticket spécifique à traiter: {target_ticket}")
# Exécution de l'orchestrateur
total_start_time = time.time()
logger.info("Début de l'exécution de l'orchestrateur")
print("Début de l'exécution de l'orchestrateur")
try:
orchestrator.executer(ticket_id)
# Vérifier le rapport généré et afficher un résumé
if ticket_id:
# Chercher le rapport Markdown le plus récent
ticket_dir = os.path.join("output", f"ticket_{ticket_id}")
latest_md = None
for extraction in os.listdir(ticket_dir):
extraction_path = os.path.join(ticket_dir, extraction)
if os.path.isdir(extraction_path):
rapports_dir = os.path.join(extraction_path, f"{ticket_id}_rapports", f"{ticket_id}")
if os.path.exists(rapports_dir):
md_files = [f for f in os.listdir(rapports_dir) if f.endswith('.md')]
if md_files:
md_files.sort(reverse=True) # Le plus récent en premier
latest_md = os.path.join(rapports_dir, md_files[0])
break
if latest_md:
print(f"\nVérification du rapport: {latest_md}")
try:
with open(latest_md, 'r', encoding='utf-8') as f:
content = f.read()
# Vérifier si le tableau des échanges est présent
has_table = "| Date | " in content
has_details = "## Détails des analyses" in content
print(f"- Tableau des échanges: {'Présent' if has_table else 'MANQUANT'}")
print(f"- Détails des analyses: {'Présent' if has_details else 'MANQUANT'}")
if not has_table:
print("\nATTENTION: Le tableau des échanges client/support est manquant!")
print("Vérifiez le system prompt de l'agent de rapport et la transmission des données.")
except Exception as e:
print(f"Erreur lors de la vérification du rapport: {e}")
except Exception as e:
logger.error(f"Erreur lors de l'exécution de l'orchestrateur: {str(e)}")
print(f"ERREUR: {str(e)}")
traceback.print_exc()
total_time = time.time() - total_start_time
logger.info(f"Fin de l'exécution de l'orchestrateur (durée: {total_time:.2f} secondes)")
print(f"Fin de l'exécution de l'orchestrateur (durée: {total_time:.2f} secondes)")
if __name__ == "__main__":
print("Démarrage du test de l'orchestrateur avec Qwen 2.5")
# Vérifier si un ID de ticket est passé en argument
ticket_id = None
if len(sys.argv) > 1:
ticket_id = sys.argv[1]
print(f"ID de ticket fourni en argument: {ticket_id}")
test_orchestrator(ticket_id)
print("Test terminé")

View File

@ -0,0 +1,674 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour tester la chaîne d'analyse complète avec priorité au tri d'images:
1. Tri des images
2. Analyse du ticket
3. Analyse des images pertinentes
4. Génération d'un rapport final
"""
import os
import sys
import argparse
import json
import glob
from typing import Dict, Any, Optional, List
from llm_classes.mistral_large import MistralLarge
from llm_classes.pixtral_large import PixtralLarge
from agents.mistral_large.agent_ticket_analyser import AgentTicketAnalyser
from agents.pixtral_large.agent_image_sorter import AgentImageSorter
from agents.pixtral_large.agent_image_analyser import AgentImageAnalyser
from agents.mistral_large.agent_report_generator import AgentReportGenerator
from utils.image_dedup import filtrer_images_uniques
def get_ticket_report_file(ticket_id: str, output_dir: str) -> Optional[str]:
"""
Récupère le fichier de rapport du ticket dans le répertoire codeticket_rapports.
Args:
ticket_id: ID du ticket (ex: T1234)
output_dir: Répertoire de base contenant les tickets
Returns:
Chemin vers le fichier de rapport JSON, ou None si non trouvé
"""
# Construire le chemin vers le répertoire des rapports
ticket_path = os.path.join(output_dir, f"ticket_{ticket_id}")
# Chercher le répertoire d'extraction le plus récent
extractions = [d for d in os.listdir(ticket_path) if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(ticket_id)]
if not extractions:
return None
# Trier par ordre décroissant pour avoir la plus récente en premier
extractions.sort(reverse=True)
latest_extraction = extractions[0]
# Construire le chemin vers le répertoire des rapports
rapport_dir = os.path.join(ticket_path, latest_extraction, f"{ticket_id}_rapports")
rapport_file = os.path.join(rapport_dir, f"{ticket_id}_rapport.json")
if os.path.exists(rapport_file):
return rapport_file
return None
def get_images_in_extraction(extraction_path: str) -> List[str]:
"""
Récupère toutes les images dans un répertoire d'extraction
Args:
extraction_path: Chemin vers le répertoire d'extraction
Returns:
Liste des chemins d'accès aux images
"""
# Extensions d'images courantes
image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.gif', '*.webp']
# Liste pour stocker les chemins d'images
images = []
# Chercher dans le répertoire principal
for extension in image_extensions:
pattern = os.path.join(extraction_path, extension)
images.extend(glob.glob(pattern))
# Chercher dans le sous-répertoire "attachments"
attachments_path = os.path.join(extraction_path, "attachments")
if os.path.exists(attachments_path) and os.path.isdir(attachments_path):
for extension in image_extensions:
pattern = os.path.join(attachments_path, extension)
images.extend(glob.glob(pattern))
# Chercher dans les sous-répertoires de "attachments" (si des ID sont utilisés)
for subdir in os.listdir(attachments_path):
subdir_path = os.path.join(attachments_path, subdir)
if os.path.isdir(subdir_path):
for extension in image_extensions:
pattern = os.path.join(subdir_path, extension)
images.extend(glob.glob(pattern))
return images
def main():
# Configuration de l'analyseur d'arguments
parser = argparse.ArgumentParser(description="Tester la chaîne d'analyse avec priorité au tri d'images.")
parser.add_argument("ticket_id", help="ID du ticket à analyser (ex: T1234)")
parser.add_argument("--output_dir", default="output", help="Répertoire de sortie contenant les tickets")
parser.add_argument("--ticket_file", help="Chemin spécifique vers un fichier JSON du ticket à analyser")
parser.add_argument("--no-dedup", action="store_true", help="Désactiver le préfiltrage des doublons")
parser.add_argument("--seuil", type=int, default=5, help="Seuil de similarité pour détecter les doublons (0-10, défaut=5)")
parser.add_argument("--save_results", action="store_true", help="Sauvegarder les résultats dans un fichier JSON")
parser.add_argument("--image", help="Chemin spécifique vers une image à analyser")
parser.add_argument("--show-analysis", action="store_true", help="Afficher l'analyse du ticket et des images")
parser.add_argument("--debug", action="store_true", help="Afficher des informations de débogage supplémentaires")
parser.add_argument("--generate-report", action="store_true", help="Générer un rapport final à partir des analyses")
parser.add_argument("--interactive", action="store_true", help="Utiliser le menu interactif")
parser.add_argument("--mode", type=int, choices=[1, 2, 3, 4],
help="Mode d'analyse: 1=Tri uniquement, 2=Tri+Ticket, 3=Tri+Ticket+Images, 4=Analyse complète avec rapport")
# Analyser les arguments
args = parser.parse_args()
# Mode interactif si demandé
if args.interactive:
args = _interactive_menu(args)
# Si un mode est spécifié, configurer les options correspondantes
if args.mode:
if args.mode >= 1: # Tous les modes incluent le tri
pass # Tri toujours activé
if args.mode >= 2: # Modes 2, 3, 4 incluent l'analyse de ticket
pass # Analyse de ticket toujours activée
if args.mode >= 3: # Modes 3, 4 incluent l'analyse d'images
pass # Analyse d'images toujours activée
if args.mode >= 4: # Mode 4 inclut la génération de rapport
args.generate_report = True
# Construire le chemin vers le ticket
ticket_path = os.path.join(args.output_dir, f"ticket_{args.ticket_id}")
# Vérifier que le répertoire du ticket existe
if not os.path.exists(ticket_path):
print(f"ERREUR: Le ticket {args.ticket_id} n'existe pas dans {args.output_dir}")
return 1
# Rechercher la dernière extraction (la plus récente)
extractions = [d for d in os.listdir(ticket_path) if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(args.ticket_id)]
if not extractions:
print(f"ERREUR: Aucune extraction trouvée pour le ticket {args.ticket_id}")
return 1
# Trier par ordre décroissant pour avoir la plus récente en premier
extractions.sort(reverse=True)
latest_extraction = extractions[0]
extraction_path = os.path.join(ticket_path, latest_extraction)
print(f"Utilisation de l'extraction: {latest_extraction}")
# ============ TRAITEMENT D'UNE SEULE IMAGE ============
if args.image:
# 1. Initialiser uniquement l'agent de tri d'images
try:
print("Initialisation du modèle Pixtral-Large pour le tri...")
model_tri = PixtralLarge()
# Configuration explicite du modèle
model_tri.configurer(temperature=0.2, top_p=0.8, max_tokens=300)
image_sorter = AgentImageSorter(model_tri)
print("Agent de tri d'images initialisé avec succès")
except Exception as e:
print(f"ERREUR: Impossible d'initialiser le modèle de tri: {str(e)}")
return 1
# 2. Trier l'image
image_path = args.image
if not os.path.exists(image_path):
print(f"ERREUR: L'image spécifiée n'existe pas: {image_path}")
return 1
print(f"Analyse de l'image: {os.path.basename(image_path)}")
resultat = image_sorter.executer(image_path)
# Afficher le résultat du tri
print("\nRésultat de l'analyse:")
print(f"Pertinence: {'OUI' if resultat.get('is_relevant', False) else 'NON'}")
print(f"Raison: {resultat.get('reason', 'Non spécifiée')}")
# Si l'image est pertinente, on peut procéder à l'analyse du ticket puis à l'analyse de l'image
if resultat.get('is_relevant', False):
# 3. Charger et analyser le ticket
ticket_data = _charger_ticket_data(args)
if not ticket_data:
return 1
# 4. Initialiser l'agent d'analyse de tickets et exécuter l'analyse
try:
print("Initialisation du modèle Mistral Large...")
text_model = MistralLarge()
ticket_agent = AgentTicketAnalyser(text_model)
print("Agent d'analyse de tickets initialisé avec succès")
except Exception as e:
print(f"ERREUR: Impossible d'initialiser le modèle d'analyse de tickets: {str(e)}")
return 1
# Exécuter l'analyse du ticket
print(f"\nAnalyse du ticket {ticket_data.get('code', args.ticket_id)} en cours...")
ticket_analysis = ticket_agent.executer(ticket_data)
print(f"Analyse du ticket terminée: {len(ticket_analysis)} caractères")
if args.show_analysis:
print("\nRésultat de l'analyse du ticket:")
print(ticket_analysis)
# 5. Initialiser l'agent d'analyse d'images et exécuter l'analyse
try:
print("Initialisation du modèle Pixtral-Large pour l'analyse d'images...")
model_analyse = PixtralLarge()
# Configuration explicite du modèle
model_analyse.configurer(temperature=0.2, top_p=0.9, max_tokens=3000)
image_analyser = AgentImageAnalyser(model_analyse)
print("Agent d'analyse d'images initialisé avec succès")
except Exception as e:
print(f"ERREUR: Impossible d'initialiser le modèle d'analyse d'images: {str(e)}")
return 1
# Analyser l'image avec le contexte du ticket
print(f"\nAnalyse approfondie de l'image: {os.path.basename(image_path)}")
analysis_result = image_analyser.executer(image_path, contexte=ticket_analysis)
if "error" in analysis_result and analysis_result.get("error", False):
print(f" => Erreur: {analysis_result.get('analyse', '')}")
else:
analyse_text = analysis_result.get("analyse", "")
print(f" => Analyse terminée: {len(analyse_text)} caractères")
if args.show_analysis:
print("\nRésultat de l'analyse d'image:")
print(analyse_text)
else:
print(f" => Début de l'analyse: {analyse_text[:100]}...")
# 6. Générer un rapport final si demandé
if args.generate_report:
try:
print("\nInitialisation de l'agent de génération de rapport...")
report_model = MistralLarge()
report_generator = AgentReportGenerator(report_model)
print("Agent de génération de rapport initialisé avec succès")
# Préparer les données pour le rapport
rapport_data = {
"ticket_id": args.ticket_id,
"ticket_data": ticket_data,
"ticket_analyse": ticket_analysis,
"analyse_images": {
image_path: {
"sorting": resultat,
"analysis": analysis_result
}
}
}
# Générer le rapport
print("\nGénération du rapport final...")
rapport_final = report_generator.executer(rapport_data)
print(f"Rapport généré: {len(rapport_final)} caractères")
if args.show_analysis:
print("\nRapport final:")
print(rapport_final)
else:
print(f"Début du rapport: {rapport_final[:200]}...")
except Exception as e:
print(f"ERREUR: Impossible de générer le rapport: {str(e)}")
return 0
# ============ TRAITEMENT COMPLET DU TICKET ============
# ÉTAPE 1: Récupérer et trier les images avec un agent isolé
# Initialiser uniquement l'agent de tri d'images
try:
print("Initialisation du modèle Pixtral-Large pour le tri...")
model_tri = PixtralLarge()
# Configuration explicite du modèle
model_tri.configurer(temperature=0.2, top_p=0.8, max_tokens=300)
image_sorter = AgentImageSorter(model_tri)
print("Agent de tri d'images initialisé avec succès")
except Exception as e:
print(f"ERREUR: Impossible d'initialiser le modèle de tri: {str(e)}")
return 1
print("\nRécupération et tri des images...")
all_images = get_images_in_extraction(extraction_path)
if not all_images:
print(f"Aucune image trouvée dans l'extraction {latest_extraction}")
return 1
print(f"Nombre d'images trouvées: {len(all_images)}")
# Appliquer le préfiltrage de doublons si activé
if not args.no_dedup:
images_avant = len(all_images)
all_images = filtrer_images_uniques(all_images, seuil_hamming=args.seuil, ticket_id=args.ticket_id)
images_apres = len(all_images)
if images_avant > images_apres:
print(f"Préfiltrage des doublons: {images_avant}{images_apres} images ({images_avant - images_apres} doublons supprimés)")
else:
print("Préfiltrage terminé: aucun doublon détecté")
# Trier les images avec l'agent dédié
relevant_images = []
image_sorting_results = {}
# Analyser chaque image - exactement comme dans test_image_sorter
results = []
for img_path in all_images:
img_name = os.path.basename(img_path)
print(f"\nAnalyse de l'image: {img_name}")
if args.debug:
print(f"DEBUG: Chemin complet de l'image: {img_path}")
print(f"DEBUG: L'image existe: {os.path.exists(img_path)}")
print(f"DEBUG: L'image est accessible: {os.access(img_path, os.R_OK)}")
resultat = image_sorter.executer(img_path)
results.append(resultat)
is_relevant = resultat.get("is_relevant", False)
image_sorting_results[img_path] = resultat
# Afficher les résultats comme dans test_image_sorter
print(f"Pertinence: {'OUI' if is_relevant else 'NON'}")
print(f"Raison: {resultat.get('reason', 'Non spécifiée')}")
if is_relevant:
relevant_images.append(img_path)
# Afficher un résumé à la fin comme dans test_image_sorter
pertinentes = sum(1 for r in results if r.get('is_relevant', False))
print(f"\nRésumé: {pertinentes}/{len(results)} images pertinentes")
# Préparer les résultats pour les modes limités
combined_results = {
"ticket_id": args.ticket_id,
"images_total": len(all_images),
"images_relevant": len(relevant_images),
"analyse_images": {}
}
# Ajouter les résultats de tri
for img_path in all_images:
img_name = os.path.basename(img_path)
combined_results["analyse_images"][img_path] = {
"path": img_path,
"sorting": image_sorting_results.get(img_path, {})
}
# Si on est en mode 1 (tri uniquement), s'arrêter ici
if args.mode == 1:
print("\n=== ANALYSE DE TRI TERMINÉE ===")
print(f"Ticket: {args.ticket_id}")
print(f"Images totales: {len(all_images)}")
print(f"Images pertinentes: {len(relevant_images)}")
# Sauvegarder les résultats si demandé
if args.save_results:
save_dir = os.path.join("results", args.ticket_id)
os.makedirs(save_dir, exist_ok=True)
save_file = os.path.join(save_dir, f"tri_images_{args.ticket_id}.json")
with open(save_file, "w", encoding="utf-8") as f:
json.dump(combined_results, f, ensure_ascii=False, indent=2)
print(f"\nRésultats du tri sauvegardés dans: {save_file}")
# Libérer les ressources de l'agent de tri
del image_sorter
del model_tri
return 0
# Libérer les ressources de l'agent de tri
del image_sorter
del model_tri
# ÉTAPE 2: Charger et analyser le ticket avec un agent isolé
# Initialiser l'agent d'analyse de tickets
try:
print("Initialisation du modèle Mistral Large...")
text_model = MistralLarge()
ticket_agent = AgentTicketAnalyser(text_model)
print("Agent d'analyse de tickets initialisé avec succès")
except Exception as e:
print(f"ERREUR: Impossible d'initialiser le modèle d'analyse de tickets: {str(e)}")
return 1
ticket_data = _charger_ticket_data(args)
if not ticket_data:
return 1
# Exécuter l'analyse du ticket - Format comme dans test_agent_ticket_analyser_mistral_large.py
print(f"\nAnalyse du ticket {ticket_data.get('code', args.ticket_id)} en cours...")
ticket_analysis = ticket_agent.executer(ticket_data)
print(f"Analyse du ticket terminée: {len(ticket_analysis)} caractères")
if args.show_analysis:
print("\nRésultat de l'analyse:")
print(ticket_analysis)
# Ajouter l'analyse du ticket aux résultats
combined_results["ticket_data"] = ticket_data
combined_results["ticket_analyse"] = ticket_analysis
# Si on est en mode 2 (tri + ticket), s'arrêter ici
if args.mode == 2:
print("\n=== ANALYSE DE TICKET TERMINÉE ===")
print(f"Ticket: {args.ticket_id}")
print(f"Images totales: {len(all_images)}")
print(f"Images pertinentes: {len(relevant_images)}")
print(f"Analyse du ticket: {len(ticket_analysis)} caractères")
# Sauvegarder les résultats si demandé
if args.save_results:
save_dir = os.path.join("results", args.ticket_id)
os.makedirs(save_dir, exist_ok=True)
save_file = os.path.join(save_dir, f"tri_ticket_{args.ticket_id}.json")
with open(save_file, "w", encoding="utf-8") as f:
json.dump(combined_results, f, ensure_ascii=False, indent=2)
print(f"\nRésultats du tri et de l'analyse de ticket sauvegardés dans: {save_file}")
# Libérer les ressources de l'agent de tickets
del ticket_agent
del text_model
return 0
# Libérer les ressources de l'agent de tickets
del ticket_agent
del text_model
# ÉTAPE 3: Analyser les images pertinentes avec un agent isolé
image_analysis_results = {}
if relevant_images:
# Initialiser l'agent d'analyse d'images
try:
print("Initialisation du modèle Pixtral-Large pour l'analyse d'images...")
model_analyse = PixtralLarge()
# Configuration explicite du modèle
model_analyse.configurer(temperature=0.2, top_p=0.9, max_tokens=3000)
image_analyser = AgentImageAnalyser(model_analyse)
print("Agent d'analyse d'images initialisé avec succès")
except Exception as e:
print(f"ERREUR: Impossible d'initialiser le modèle d'analyse d'images: {str(e)}")
return 1
print("\nAnalyse des images pertinentes avec contexte du ticket...")
for img_path in relevant_images:
img_name = os.path.basename(img_path)
print(f"\nAnalyse de l'image: {img_name}")
try:
analysis_result = image_analyser.executer(img_path, contexte=ticket_analysis)
image_analysis_results[img_path] = analysis_result
if "error" in analysis_result and analysis_result.get("error", False):
print(f" => Erreur: {analysis_result.get('analyse', '')}")
else:
analyse_text = analysis_result.get("analyse", "")
print(f" => Analyse terminée: {len(analyse_text)} caractères")
if args.show_analysis:
print("\nContenu de l'analyse:")
print(analyse_text)
else:
print(f" => Début de l'analyse: {analyse_text[:100]}...")
except Exception as e:
print(f"ERREUR: Impossible d'analyser l'image {img_name}: {str(e)}")
image_analysis_results[img_path] = {"error": True, "analyse": f"ERREUR: {str(e)}"}
else:
print("Aucune image pertinente à analyser")
# Ajout des résultats d'analyse d'images
combined_results["images_analysed"] = len(image_analysis_results)
# Mise à jour des analyses d'images
for img_path in all_images:
if img_path in image_analysis_results:
combined_results["analyse_images"][img_path]["analysis"] = image_analysis_results[img_path]
# Libérer les ressources de l'agent d'analyse d'images
if relevant_images:
del image_analyser
del model_analyse
# Si on est en mode 3 (tri + ticket + images), s'arrêter ici
if args.mode == 3:
print("\n=== ANALYSE D'IMAGES TERMINÉE ===")
print(f"Ticket: {args.ticket_id}")
print(f"Images totales: {len(all_images)}")
print(f"Images pertinentes: {len(relevant_images)}")
print(f"Analyse du ticket: {len(ticket_analysis)} caractères")
print(f"Images analysées: {len(image_analysis_results)}")
# Sauvegarder les résultats si demandé
if args.save_results:
save_dir = os.path.join("results", args.ticket_id)
os.makedirs(save_dir, exist_ok=True)
save_file = os.path.join(save_dir, f"tri_ticket_images_{args.ticket_id}.json")
with open(save_file, "w", encoding="utf-8") as f:
json.dump(combined_results, f, ensure_ascii=False, indent=2)
print(f"\nRésultats complets des analyses sauvegardés dans: {save_file}")
return 0
# ÉTAPE 4: Générer un rapport final si en mode 4 ou avec l'option --generate-report
if args.mode == 4 or args.generate_report:
try:
print("\nInitialisation de l'agent de génération de rapport...")
report_model = MistralLarge()
report_generator = AgentReportGenerator(report_model)
print("Agent de génération de rapport initialisé avec succès")
# Préparer les données pour le rapport
rapport_data = {
"ticket_id": args.ticket_id,
"ticket_data": ticket_data,
"ticket_analyse": ticket_analysis,
"analyse_images": combined_results["analyse_images"]
}
# Générer le rapport
print("\nGénération du rapport final...")
rapport_final = report_generator.executer(rapport_data)
print(f"Rapport généré: {len(rapport_final)} caractères")
# Ajouter le rapport final aux résultats combinés
combined_results["rapport_final"] = rapport_final
if args.show_analysis:
print("\nRapport final:")
print(rapport_final)
else:
print(f"Début du rapport: {rapport_final[:200]}...")
except Exception as e:
print(f"ERREUR: Impossible de générer le rapport: {str(e)}")
# Sauvegarder les résultats si demandé
if args.save_results:
save_dir = os.path.join("results", args.ticket_id)
os.makedirs(save_dir, exist_ok=True)
save_file = os.path.join(save_dir, f"complete_analysis_{args.ticket_id}.json")
with open(save_file, "w", encoding="utf-8") as f:
json.dump(combined_results, f, ensure_ascii=False, indent=2)
print(f"\nRésultats complets sauvegardés dans: {save_file}")
# Afficher un résumé final comme dans le workflow d'analyse d'images
print("\n=== RÉSUMÉ DE L'ANALYSE COMPLÈTE ===")
print(f"Ticket: {args.ticket_id}")
print(f"Images totales: {len(all_images)}")
print(f"Images pertinentes: {len(relevant_images)}")
print(f"Analyse du ticket: {len(ticket_analysis)} caractères")
print(f"Images analysées: {len(image_analysis_results)}")
if args.generate_report or args.mode == 4:
print(f"Rapport final généré: {len(combined_results.get('rapport_final', ''))} caractères")
return 0
def _interactive_menu(args):
"""
Affiche un menu interactif pour choisir le mode d'analyse
"""
print("\n=== MENU D'ANALYSE ===")
print("Ticket à analyser: " + args.ticket_id)
print("1. Tri d'images uniquement")
print("2. Tri d'images + Analyse du ticket")
print("3. Tri d'images + Analyse du ticket + Analyse approfondie des images")
print("4. Analyse complète avec génération de rapport")
print("5. Analyse d'une seule image")
print("0. Quitter")
while True:
try:
choice = int(input("\nVotre choix (0-5): "))
if 0 <= choice <= 5:
break
else:
print("Veuillez entrer un chiffre entre 0 et 5.")
except ValueError:
print("Veuillez entrer un chiffre valide.")
if choice == 0:
print("Au revoir!")
sys.exit(0)
if choice == 5:
# Mode analyse d'une seule image
image_path = input("Chemin de l'image à analyser: ")
if not os.path.exists(image_path):
print(f"ERREUR: L'image spécifiée n'existe pas: {image_path}")
sys.exit(1)
args.image = image_path
else:
# Configurer le mode d'analyse
args.mode = choice
# Paramètres supplémentaires
if input("Afficher les analyses détaillées? (o/n): ").lower().startswith('o'):
args.show_analysis = True
if input("Sauvegarder les résultats? (o/n): ").lower().startswith('o'):
args.save_results = True
if input("Désactiver le préfiltrage des doublons? (o/n): ").lower().startswith('o'):
args.no_dedup = True
if args.no_dedup is False:
try:
seuil = int(input("Seuil de similarité pour détecter les doublons (0-10, défaut=5): "))
if 0 <= seuil <= 10:
args.seuil = seuil
except ValueError:
print("Valeur invalide, utilisation du seuil par défaut: 5")
return args
def _charger_ticket_data(args):
"""
Charge les données du ticket à partir d'un fichier spécifié ou du fichier de rapport par défaut.
"""
ticket_data = {}
if args.ticket_file:
if not os.path.exists(args.ticket_file):
print(f"ERREUR: Le fichier de ticket spécifié n'existe pas: {args.ticket_file}")
return None
try:
with open(args.ticket_file, "r", encoding="utf-8") as f:
ticket_data = json.load(f)
print(f"Fichier de ticket chargé: {args.ticket_file}")
except json.JSONDecodeError:
print(f"ERREUR: Le fichier {args.ticket_file} n'est pas un JSON valide")
return None
else:
# Chercher le fichier de rapport du ticket
rapport_file = get_ticket_report_file(args.ticket_id, args.output_dir)
if not rapport_file:
print(f"ERREUR: Aucun fichier de rapport trouvé pour le ticket {args.ticket_id}")
return None
print(f"Fichier de rapport trouvé: {rapport_file}")
try:
with open(rapport_file, "r", encoding="utf-8") as f:
ticket_data = json.load(f)
print(f"Fichier de rapport chargé: {rapport_file}")
except json.JSONDecodeError:
print(f"ERREUR: Le fichier {rapport_file} n'est pas un JSON valide")
return None
# Vérifier que les données du ticket contiennent un code
if "code" not in ticket_data:
ticket_data["code"] = args.ticket_id
print(f"Code de ticket ajouté: {args.ticket_id}")
return ticket_data
if __name__ == "__main__":
sys.exit(main())

98
tests/README.md Normal file
View File

@ -0,0 +1,98 @@
# Tests modulaires pour l'analyse de tickets
Ce répertoire contient une architecture de tests modulaire pour l'analyse de tickets avec différents modèles LLM.
## Structure
- `common/` : Utilitaires communs et factories
- `ticket_utils.py` : Fonctions pour manipuler les tickets et leurs ressources
- `llm_factory.py` : Factory pour initialiser les modèles LLM
- `agent_factory.py` : Factory pour initialiser les agents
- `workflows/` : Workflows de test complets
- `ticket_analyser_workflow.py` : Workflow pour l'analyse de tickets
- `image_analyser_workflow.py` : Workflow pour l'analyse d'images
- `complete_analysis_workflow.py` : Workflow complet avec génération de rapport
- `compare_models.py` : Script pour comparer plusieurs modèles sur le même ticket
## Utilisation
### Test d'analyse de ticket
Pour tester l'analyse d'un ticket avec un modèle spécifique :
```bash
python -m tests.workflows.ticket_analyser_workflow T1234 --model mistral_large --save
```
Options :
- `--model` : Modèle LLM à utiliser (défaut: mistral_large)
- `--output_dir` : Répertoire des tickets (défaut: output)
- `--verbose` : Mode verbeux
- `--save` : Sauvegarder les résultats dans un fichier JSON
### Test d'analyse d'images
Pour tester l'analyse d'images avec un modèle spécifique :
```bash
python -m tests.workflows.image_analyser_workflow T1234 output mistral_large pixtral_large
```
Arguments :
1. `ticket_id` : ID du ticket à analyser
2. `output_dir` : Répertoire des tickets (optionnel)
3. `text_model` : Modèle pour l'analyse de ticket (optionnel)
4. `vision_model` : Modèle pour l'analyse d'images (optionnel)
### Workflow complet avec génération de rapport
Pour exécuter le workflow complet (analyse de ticket, tri d'images, analyse d'images et génération de rapport) :
```bash
python -m tests.workflows.complete_analysis_workflow T1234 --text_model mistral_large --vision_model pixtral_large
```
Options :
- `--text_model` : Modèle LLM à utiliser pour l'analyse de ticket et la génération de rapport (défaut: mistral_large)
- `--vision_model` : Modèle LLM à utiliser pour l'analyse d'images (défaut: pixtral_large)
- `--output_dir` : Répertoire des tickets (défaut: output)
- `--no-report` : Ne pas générer de rapport final
- `--no-save` : Ne pas sauvegarder les résultats intermédiaires
- `--verbose` : Mode verbeux
### Comparaison de modèles
Pour comparer plusieurs modèles sur le même ticket :
```bash
python -m tests.compare_models T1234 --type both --save
```
Options :
- `--type` : Type de modèles à comparer (text, vision ou both)
- `--text-models` : Liste des modèles texte à comparer
- `--vision-models` : Liste des modèles vision à comparer
- `--text-model` : Modèle texte à utiliser pour les tests vision
- `--output-dir` : Répertoire des tickets
- `--save` : Sauvegarder les résultats
- `--verbose` : Mode verbeux
Exemple pour comparer des modèles spécifiques :
```bash
python -m tests.compare_models T1234 --type both --text-models mistral_large mistral_medium --vision-models pixtral_large llama_vision --save
```
## Ajout d'un nouveau modèle LLM
1. Créez une classe pour le nouveau modèle dans `llm_classes/`
2. Ajoutez le modèle à `LLM_CLASSES` dans `tests/common/llm_factory.py`
3. Ajoutez le modèle à `TEXT_MODELS` ou `VISION_MODELS` selon ses capacités
## Ajout d'un nouveau workflow de test
1. Créez un nouveau fichier dans `tests/workflows/`
2. Implémentez une fonction `execute_workflow()` qui prend en charge le workflow
3. Exportez la fonction dans `tests/workflows/__init__.py`

View File

@ -1,3 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Factory pour les agents LLM utilisés dans les tests.
Centralise l'initialisation des agents pour éviter la duplication de code.

View File

@ -1,3 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Factory pour les modèles LLM utilisés dans les tests.
Centralise l'initialisation des modèles pour éviter la duplication de code.

View File

@ -1,5 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Utilitaires pour la gestion des tickets dans les tests.
Utilitaires pour manipuler les tickets et leurs ressources.
Centralise les fonctions de recherche et de chargement des tickets, rapports et images.
"""
import os
@ -44,14 +48,9 @@ def get_ticket_info(ticket_id: str, output_dir: str = "output") -> Dict[str, Any
attachments_path = os.path.join(extraction_path, "attachments")
has_attachments = os.path.exists(attachments_path)
if has_attachments:
attachments = [f for f in os.listdir(attachments_path)
if os.path.isfile(os.path.join(attachments_path, f))]
images = [f for f in attachments
if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
else:
attachments = []
images = []
# Vérifier le répertoire des rapports
rapport_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
has_reports = os.path.exists(rapport_dir)
return {
"ticket_id": ticket_id,
@ -60,13 +59,48 @@ def get_ticket_info(ticket_id: str, output_dir: str = "output") -> Dict[str, Any
"extraction_path": extraction_path,
"has_attachments": has_attachments,
"attachments_path": attachments_path if has_attachments else None,
"attachments_count": len(attachments),
"images_count": len(images)
"has_reports": has_reports,
"reports_path": rapport_dir if has_reports else None
}
def get_ticket_report_file(ticket_id: str, output_dir: str = "output") -> Optional[str]:
"""
Récupère le fichier de rapport du ticket dans le répertoire ticket_rapports.
(Extrait de test_agent_ticket_analyser_mistral_large.py)
Args:
ticket_id: ID du ticket (ex: T1234)
output_dir: Répertoire de base contenant les tickets
Returns:
Chemin vers le fichier de rapport JSON, ou None si non trouvé
"""
try:
ticket_info = get_ticket_info(ticket_id, output_dir)
# Vérifier si le répertoire des rapports existe
rapport_dir = ticket_info.get("reports_path")
if not rapport_dir:
return None
# Chercher un fichier JSON de rapport
rapport_file = os.path.join(rapport_dir, f"{ticket_id}_rapport.json")
if os.path.exists(rapport_file):
return rapport_file
# Si le fichier spécifique n'existe pas, chercher n'importe quel JSON
for file in os.listdir(rapport_dir):
if file.lower().endswith('.json'):
return os.path.join(rapport_dir, file)
return None
except FileNotFoundError:
return None
def get_ticket_json(ticket_id: str, output_dir: str = "output") -> Tuple[Optional[str], Optional[Dict[str, Any]]]:
"""
Récupère le fichier JSON d'un ticket et son contenu.
(Adapté de test_analyse_image_large.py et test_agent_ticket_analyser_mistral_large.py)
Args:
ticket_id: ID du ticket (ex: T1234)
@ -77,49 +111,44 @@ def get_ticket_json(ticket_id: str, output_dir: str = "output") -> Tuple[Optiona
"""
try:
ticket_info = get_ticket_info(ticket_id, output_dir)
extraction_path = ticket_info["extraction_path"]
# Stratégie 1: Chercher dans le répertoire des rapports
json_file = get_ticket_report_file(ticket_id, output_dir)
# Stratégie 2: Si pas trouvé, chercher directement dans l'extraction
if not json_file:
for file in os.listdir(extraction_path):
if file.lower().endswith('.json') and ticket_id in file:
json_file = os.path.join(extraction_path, file)
break
if not json_file:
logger.warning(f"Aucun fichier JSON trouvé pour le ticket {ticket_id}")
return None, None
# Charger le contenu du JSON
try:
with open(json_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# Ajouter le code du ticket si absent
if "code" not in json_data:
json_data["code"] = ticket_id
return json_file, json_data
except Exception as e:
logger.error(f"Erreur lors du chargement du JSON: {e}")
return json_file, None
except FileNotFoundError:
logger.error(f"Ticket {ticket_id} non trouvé")
return None, None
extraction_path = ticket_info["extraction_path"]
rapport_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
# Chercher dans le répertoire des rapports
json_file = None
if os.path.exists(rapport_dir):
for file in os.listdir(rapport_dir):
if file.lower().endswith('.json'):
json_file = os.path.join(rapport_dir, file)
break
# Si pas trouvé, chercher directement dans l'extraction
if not json_file:
for file in os.listdir(extraction_path):
if file.lower().endswith('.json') and ticket_id in file:
json_file = os.path.join(extraction_path, file)
break
if not json_file:
logger.warning(f"Aucun fichier JSON trouvé pour le ticket {ticket_id}")
return None, None
# Charger le contenu du JSON
try:
with open(json_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# Ajouter le code du ticket si absent
if "code" not in json_data:
json_data["code"] = ticket_id
return json_file, json_data
except Exception as e:
logger.error(f"Erreur lors du chargement du JSON: {e}")
return json_file, None
def get_ticket_images(ticket_id: str, output_dir: str = "output", filter_duplicates: bool = True) -> List[str]:
"""
Récupère la liste des images d'un ticket.
(Adapté de test_analyse_image_large.py)
Args:
ticket_id: ID du ticket (ex: T1234)
@ -131,28 +160,32 @@ def get_ticket_images(ticket_id: str, output_dir: str = "output", filter_duplica
"""
try:
ticket_info = get_ticket_info(ticket_id, output_dir)
if not ticket_info["has_attachments"]:
return []
attachments_path = ticket_info["attachments_path"]
# Récupérer toutes les images
images = [f for f in os.listdir(attachments_path)
if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
if not images:
return []
image_paths = [os.path.join(attachments_path, img) for img in images]
# Filtrer les doublons si demandé
if filter_duplicates:
try:
from utils.image_dedup import filtrer_images_uniques
return filtrer_images_uniques(image_paths)
except ImportError:
logger.warning("Module utils.image_dedup non disponible, pas de filtrage des doublons")
return image_paths
else:
return image_paths
except FileNotFoundError:
logger.error(f"Ticket {ticket_id} non trouvé")
return []
if not ticket_info["has_attachments"]:
return []
attachments_path = ticket_info["attachments_path"]
# Récupérer les images
images = [f for f in os.listdir(attachments_path)
if os.path.isfile(os.path.join(attachments_path, f)) and
f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
image_paths = [os.path.join(attachments_path, img) for img in images]
# Filtrer les doublons si demandé
if filter_duplicates and image_paths:
try:
from utils.image_dedup import filtrer_images_uniques
image_paths = filtrer_images_uniques(image_paths)
except ImportError:
logger.warning("Module utils.image_dedup non disponible, pas de filtrage des doublons")
return image_paths
return []

253
tests/compare_models.py Normal file
View File

@ -0,0 +1,253 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour comparer plusieurs modèles LLM sur le même ticket.
Permet d'évaluer rapidement les performances de différents modèles.
"""
import os
import sys
import argparse
import json
import time
import logging
from typing import Dict, Any, List, Optional
from tests.common.llm_factory import TEXT_MODELS, VISION_MODELS
from tests.workflows import execute_ticket_analysis, execute_image_analysis
def setup_logging(verbose: bool = False, log_file: Optional[str] = None):
"""
Configure le système de logging.
Args:
verbose: Si True, active le mode verbeux (DEBUG)
log_file: Nom du fichier de log (si None, pas de log dans un fichier)
"""
log_level = logging.DEBUG if verbose else logging.INFO
handlers: List[logging.Handler] = [logging.StreamHandler()]
if log_file:
handlers.append(logging.FileHandler(log_file))
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=handlers
)
def compare_text_models(ticket_id: str, models: List[str], output_dir: str = "output") -> Dict[str, Any]:
"""
Compare plusieurs modèles de texte sur le même ticket.
Args:
ticket_id: ID du ticket à analyser
models: Liste des modèles à comparer
output_dir: Répertoire contenant les tickets
Returns:
Dictionnaire avec les résultats pour chaque modèle
"""
results = {
"ticket_id": ticket_id,
"type": "text",
"models": {},
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
print(f"\n=== Comparaison de modèles texte pour le ticket {ticket_id} ===\n")
for model in models:
print(f"Test du modèle: {model}")
start_time = time.time()
try:
workflow_results = execute_ticket_analysis(ticket_id, output_dir, model)
execution_time = time.time() - start_time
if "error" in workflow_results:
print(f" ❌ Échec pour {model}: {workflow_results['error']}")
results["models"][model] = {
"success": False,
"error": workflow_results["error"],
"stage": workflow_results.get("stage", "unknown"),
"execution_time": execution_time
}
else:
analysis = workflow_results["analysis"]
print(f" ✅ Succès pour {model} en {execution_time:.2f} sec: {len(analysis) if analysis else 0} caractères")
results["models"][model] = {
"success": True,
"execution_time": execution_time,
"analysis_length": len(analysis) if analysis else 0,
"analysis": analysis
}
except Exception as e:
execution_time = time.time() - start_time
print(f" ❌ Erreur pour {model}: {e}")
results["models"][model] = {
"success": False,
"error": str(e),
"execution_time": execution_time
}
return results
def compare_vision_models(ticket_id: str, models: List[str], text_model: str = "mistral_large", output_dir: str = "output") -> Dict[str, Any]:
"""
Compare plusieurs modèles de vision sur le même ticket.
Args:
ticket_id: ID du ticket à analyser
models: Liste des modèles de vision à comparer
text_model: Modèle de texte à utiliser pour l'analyse du contexte
output_dir: Répertoire contenant les tickets
Returns:
Dictionnaire avec les résultats pour chaque modèle
"""
results = {
"ticket_id": ticket_id,
"type": "vision",
"text_model": text_model,
"models": {},
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
print(f"\n=== Comparaison de modèles vision pour le ticket {ticket_id} (texte: {text_model}) ===\n")
for model in models:
print(f"Test du modèle: {model}")
start_time = time.time()
try:
workflow_results = execute_image_analysis(ticket_id, output_dir, text_model, model)
execution_time = time.time() - start_time
if "error" in workflow_results:
print(f" ❌ Échec pour {model}: {workflow_results['error']}")
results["models"][model] = {
"success": False,
"error": workflow_results["error"],
"stage": workflow_results.get("stage", "unknown"),
"execution_time": execution_time
}
else:
images_analysed = len(workflow_results["analysis_results"])
print(f" ✅ Succès pour {model} en {execution_time:.2f} sec: {images_analysed} images analysées")
results["models"][model] = {
"success": True,
"execution_time": execution_time,
"images_total": workflow_results["images_count"],
"images_relevant": len(workflow_results["relevant_images"]),
"images_analysed": images_analysed,
"analysis_results": workflow_results["analysis_results"]
}
except Exception as e:
execution_time = time.time() - start_time
print(f" ❌ Erreur pour {model}: {e}")
results["models"][model] = {
"success": False,
"error": str(e),
"execution_time": execution_time
}
return results
def print_comparison_table(results: Dict[str, Any]):
"""
Affiche un tableau de comparaison des modèles.
Args:
results: Résultats de la comparaison
"""
if results["type"] == "text":
print("\n=== Résultats de la comparaison des modèles texte ===")
print(f"Ticket: {results['ticket_id']}")
print(f"\n{'Modèle':<15} | {'Statut':<10} | {'Temps (s)':<10} | {'Taille analyse':<15}")
print("-" * 60)
for model, data in results["models"].items():
status = "✅ Succès" if data.get("success", False) else "❌ Échec"
time_str = f"{data.get('execution_time', 0):.2f}"
length = data.get("analysis_length", "N/A")
print(f"{model:<15} | {status:<10} | {time_str:<10} | {length:<15}")
elif results["type"] == "vision":
print("\n=== Résultats de la comparaison des modèles vision ===")
print(f"Ticket: {results['ticket_id']}")
print(f"Modèle texte: {results['text_model']}")
print(f"\n{'Modèle':<15} | {'Statut':<10} | {'Temps (s)':<10} | {'Images analysées':<20}")
print("-" * 65)
for model, data in results["models"].items():
status = "✅ Succès" if data.get("success", False) else "❌ Échec"
time_str = f"{data.get('execution_time', 0):.2f}"
if data.get("success", False):
images = f"{data.get('images_analysed', 0)}/{data.get('images_relevant', 0)}/{data.get('images_total', 0)}"
else:
images = "N/A"
print(f"{model:<15} | {status:<10} | {time_str:<10} | {images:<20}")
def main():
parser = argparse.ArgumentParser(description="Comparer plusieurs modèles LLM sur le même ticket")
parser.add_argument("ticket_id", help="ID du ticket à analyser")
parser.add_argument("--type", choices=["text", "vision", "both"], default="text",
help="Type de modèles à comparer")
parser.add_argument("--text-models", nargs="+", choices=TEXT_MODELS,
help="Modèles texte à comparer (par défaut: tous)")
parser.add_argument("--vision-models", nargs="+", choices=VISION_MODELS,
help="Modèles vision à comparer (par défaut: tous)")
parser.add_argument("--text-model", choices=TEXT_MODELS, default="mistral_large",
help="Modèle texte à utiliser pour les tests vision")
parser.add_argument("--output-dir", default="output", help="Répertoire des tickets")
parser.add_argument("--save", action="store_true", help="Sauvegarder les résultats")
parser.add_argument("--verbose", "-v", action="store_true", help="Mode verbeux")
args = parser.parse_args()
# Configuration du logging
setup_logging(args.verbose, f"compare_models_{args.ticket_id}.log")
# Sélection des modèles à comparer
text_models = args.text_models if args.text_models else TEXT_MODELS
vision_models = args.vision_models if args.vision_models else VISION_MODELS
# Comparaison des modèles
results = {}
if args.type in ["text", "both"]:
text_results = compare_text_models(args.ticket_id, text_models, args.output_dir)
results["text"] = text_results
print_comparison_table(text_results)
if args.save:
save_dir = os.path.join("results", "comparisons", args.ticket_id)
os.makedirs(save_dir, exist_ok=True)
save_file = os.path.join(save_dir, f"text_comparison_{int(time.time())}.json")
with open(save_file, "w", encoding="utf-8") as f:
json.dump(text_results, f, ensure_ascii=False, indent=2)
print(f"Résultats sauvegardés dans: {save_file}")
if args.type in ["vision", "both"]:
vision_results = compare_vision_models(args.ticket_id, vision_models, args.text_model, args.output_dir)
results["vision"] = vision_results
print_comparison_table(vision_results)
if args.save:
save_dir = os.path.join("results", "comparisons", args.ticket_id)
os.makedirs(save_dir, exist_ok=True)
save_file = os.path.join(save_dir, f"vision_comparison_{int(time.time())}.json")
with open(save_file, "w", encoding="utf-8") as f:
json.dump(vision_results, f, ensure_ascii=False, indent=2)
print(f"Résultats sauvegardés dans: {save_file}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,4 +1,13 @@
"""
Modules de test pour les workflows complets ou partiels.
Ce paquet contient les tests de différentes séquences d'agents.
"""
from .ticket_analyser_workflow import execute_workflow as execute_ticket_analysis
from .image_analyser_workflow import execute_workflow as execute_image_analysis
from .complete_analysis_workflow import execute_workflow as execute_complete_analysis
"""
Workflows de test pour l'application.
Ce package contient des workflows complets pour tester les différentes fonctionnalités.
"""

View File

@ -0,0 +1,347 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Workflow complet d'analyse de ticket avec génération de rapport.
Ce module implémente un workflow qui combine tous les agents :
1. Analyse du ticket
2. Tri des images
3. Analyse des images pertinentes
4. Génération d'un rapport final
"""
import os
import sys
import json
import logging
import time
import argparse
from typing import Dict, Any, Optional, List
# Import des factories
from tests.common.agent_factory import (
create_ticket_analyser,
create_image_sorter,
create_image_analyser,
create_report_generator
)
from tests.common.llm_factory import create_llm
from tests.common.ticket_utils import get_ticket_json, get_ticket_images
logger = logging.getLogger("CompleteAnalysisWorkflow")
def execute_workflow(
ticket_id: str,
output_dir: str = "output",
text_model: str = "mistral_large",
vision_model: str = "pixtral_large",
generate_report: bool = True,
save_results: bool = True
) -> Dict[str, Any]:
"""
Exécute le workflow complet d'analyse de ticket et génération de rapport.
Args:
ticket_id: Identifiant du ticket
output_dir: Répertoire contenant les tickets
text_model: Nom du modèle à utiliser pour l'analyse de ticket et la génération de rapport
vision_model: Nom du modèle à utiliser pour l'analyse d'image
generate_report: Si True, génère un rapport final
save_results: Si True, sauvegarde les résultats intermédiaires
Returns:
Dict avec les résultats du workflow
"""
workflow_start = time.time()
logger.info(f"Démarrage du workflow complet pour le ticket {ticket_id}")
print(f"Démarrage du workflow complet pour le ticket {ticket_id}")
# ÉTAPE 1: Initialiser les modèles LLM
try:
print("Initialisation des modèles LLM...")
text_llm = create_llm(text_model)
vision_llm = create_llm(vision_model)
print(f"Modèles initialisés: {text_model}, {vision_model}")
except Exception as e:
logger.error(f"Erreur lors de l'initialisation des modèles: {e}")
print(f"ERREUR: Impossible d'initialiser les modèles: {e}")
return {"error": str(e), "stage": "init_llm"}
# ÉTAPE 2: Initialiser les agents
try:
print("Création des agents...")
ticket_agent = create_ticket_analyser(text_llm)
image_sorter = create_image_sorter(vision_llm)
image_analyser = create_image_analyser(vision_llm)
report_generator = create_report_generator(text_llm) if generate_report else None
print("Agents créés avec succès")
except Exception as e:
logger.error(f"Erreur lors de la création des agents: {e}")
print(f"ERREUR: Impossible de créer les agents: {e}")
return {"error": str(e), "stage": "init_agents"}
# ÉTAPE 3: Charger les données du ticket
try:
print("Chargement des données du ticket...")
json_file, ticket_data = get_ticket_json(ticket_id, output_dir)
if not json_file or not ticket_data:
error_msg = f"Impossible de charger les données du ticket {ticket_id}"
logger.error(error_msg)
print(f"ERREUR: {error_msg}")
return {"error": error_msg, "stage": "load_ticket"}
print(f"Données du ticket chargées: {json_file}")
except Exception as e:
logger.error(f"Erreur lors du chargement du ticket: {e}")
print(f"ERREUR: Impossible de charger le ticket: {e}")
return {"error": str(e), "stage": "load_ticket"}
# ÉTAPE 4: Analyser le ticket
try:
print("Analyse du ticket en cours...")
ticket_analysis = ticket_agent.executer(ticket_data)
print(f"Analyse du ticket terminée: {len(ticket_analysis) if ticket_analysis else 0} caractères")
# Sauvegarder l'analyse si demandé
if save_results:
save_dir = os.path.join("results", ticket_id)
os.makedirs(save_dir, exist_ok=True)
save_file = os.path.join(save_dir, f"ticket_analysis_{text_model}.json")
with open(save_file, "w", encoding="utf-8") as f:
json.dump({
"ticket_id": ticket_id,
"model": text_model,
"analysis": ticket_analysis,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}, f, ensure_ascii=False, indent=2)
print(f"Analyse du ticket sauvegardée: {save_file}")
except Exception as e:
logger.error(f"Erreur lors de l'analyse du ticket: {e}")
print(f"ERREUR: Impossible d'analyser le ticket: {e}")
return {"error": str(e), "stage": "analyse_ticket", "ticket_data": ticket_data}
# ÉTAPE 5: Récupérer et trier les images
relevant_images = []
image_sorting_results = {}
all_images = []
try:
print("Récupération et tri des images...")
all_images = get_ticket_images(ticket_id, output_dir)
if not all_images:
print("Aucune image trouvée pour ce ticket")
else:
print(f"Images trouvées: {len(all_images)}")
# Trier les images
for img_path in all_images:
print(f"Évaluation de l'image: {os.path.basename(img_path)}")
try:
sorting_result = image_sorter.executer(img_path)
is_relevant = sorting_result.get("is_relevant", False)
reason = sorting_result.get("reason", "")
image_sorting_results[img_path] = sorting_result
if is_relevant:
print(f" => Pertinente: {reason[:50]}...")
relevant_images.append(img_path)
else:
print(f" => Non pertinente: {reason[:50]}...")
except Exception as e:
logger.error(f"Erreur lors du tri de l'image {img_path}: {e}")
print(f"ERREUR: Impossible de trier l'image {img_path}: {e}")
image_sorting_results[img_path] = {"error": str(e), "is_relevant": False}
print(f"Images pertinentes: {len(relevant_images)}/{len(all_images)}")
# Sauvegarder les résultats du tri si demandé
if save_results and image_sorting_results:
save_dir = os.path.join("results", ticket_id)
os.makedirs(save_dir, exist_ok=True)
save_file = os.path.join(save_dir, f"image_sorting_{vision_model}.json")
with open(save_file, "w", encoding="utf-8") as f:
json.dump({
"ticket_id": ticket_id,
"model": vision_model,
"images_total": len(all_images),
"images_relevant": len(relevant_images),
"results": {os.path.basename(k): v for k, v in image_sorting_results.items()},
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}, f, ensure_ascii=False, indent=2)
print(f"Résultats du tri d'images sauvegardés: {save_file}")
except Exception as e:
logger.error(f"Erreur lors de la récupération/tri des images: {e}")
print(f"ERREUR: Impossible de récupérer/trier les images: {e}")
if generate_report:
# On continue quand même pour générer le rapport, même sans images
print("On continue sans images...")
else:
return {
"error": str(e),
"stage": "sort_images",
"ticket_id": ticket_id,
"ticket_data": ticket_data,
"ticket_analysis": ticket_analysis
}
# ÉTAPE 6: Analyser les images pertinentes avec contexte
image_analysis_results = {}
if relevant_images:
print("Analyse des images pertinentes avec contexte...")
for img_path in relevant_images:
img_name = os.path.basename(img_path)
print(f"Analyse de l'image: {img_name}")
try:
analysis_result = image_analyser.executer(img_path, contexte=ticket_analysis)
image_analysis_results[img_path] = analysis_result
print(f" => Analyse terminée: {len(analysis_result.get('analyse', '')) if analysis_result else 0} caractères")
except Exception as e:
logger.error(f"Erreur lors de l'analyse de l'image {img_path}: {e}")
print(f"ERREUR: Impossible d'analyser l'image {img_path}: {e}")
image_analysis_results[img_path] = {"error": str(e)}
# Sauvegarder les analyses d'images si demandé
if save_results and image_analysis_results:
save_dir = os.path.join("results", ticket_id)
os.makedirs(save_dir, exist_ok=True)
save_file = os.path.join(save_dir, f"image_analysis_{vision_model}.json")
with open(save_file, "w", encoding="utf-8") as f:
json.dump({
"ticket_id": ticket_id,
"model": vision_model,
"images_analysed": len(image_analysis_results),
"results": {os.path.basename(k): v for k, v in image_analysis_results.items()},
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}, f, ensure_ascii=False, indent=2)
print(f"Analyses d'images sauvegardées: {save_file}")
else:
print("Aucune image pertinente à analyser")
# ÉTAPE 7: Générer le rapport final
report_file = None
report_content = None
if generate_report and report_generator is not None:
print("Génération du rapport final...")
try:
# Préparer les données pour le rapport
rapport_data = {
"ticket_id": ticket_id,
"ticket_data": ticket_data,
"ticket_analyse": ticket_analysis,
"analyse_images": {}
}
# Ajouter les résultats d'analyse d'images
for img_path, analysis in image_analysis_results.items():
sorting_result = image_sorting_results.get(img_path, {})
rapport_data["analyse_images"][img_path] = {
"sorting": sorting_result,
"analysis": analysis
}
# Définir le répertoire de destination pour le rapport
reports_dir = os.path.join("reports", ticket_id)
os.makedirs(reports_dir, exist_ok=True)
# Générer le rapport
report_result = report_generator.executer(rapport_data, reports_dir)
# Vérifier si le rapport a été généré avec succès
if report_result and isinstance(report_result, dict):
report_file = report_result.get("report_file")
report_content = report_result.get("content")
print(f"Rapport généré avec succès: {report_file}")
else:
print("Rapport généré mais le format de retour n'est pas standard")
except Exception as e:
logger.error(f"Erreur lors de la génération du rapport: {e}")
print(f"ERREUR: Impossible de générer le rapport: {e}")
elif generate_report:
logger.warning("Génération de rapport demandée mais l'agent n'a pas été initialisé correctement")
print("ATTENTION: Impossible de générer le rapport (agent non initialisé)")
# Calcul du temps total d'exécution
workflow_time = time.time() - workflow_start
print(f"Workflow complet terminé en {workflow_time:.2f} secondes")
# Retourner les résultats
return {
"ticket_id": ticket_id,
"ticket_analysis": ticket_analysis,
"images_count": len(all_images),
"relevant_images": len(relevant_images),
"images_analysed": len(image_analysis_results),
"report_file": report_file,
"execution_time": workflow_time
}
def main():
# Configuration de l'analyseur d'arguments
parser = argparse.ArgumentParser(description="Exécuter le workflow complet d'analyse de ticket")
parser.add_argument("ticket_id", help="ID du ticket à analyser (ex: T1234)")
parser.add_argument("--output_dir", default="output", help="Répertoire des tickets (défaut: output)")
parser.add_argument("--text_model", default="mistral_large", help="Modèle texte à utiliser (défaut: mistral_large)")
parser.add_argument("--vision_model", default="pixtral_large", help="Modèle vision à utiliser (défaut: pixtral_large)")
parser.add_argument("--no-report", action="store_true", help="Ne pas générer de rapport final")
parser.add_argument("--no-save", action="store_true", help="Ne pas sauvegarder les résultats intermédiaires")
parser.add_argument("--verbose", "-v", action="store_true", help="Mode verbeux")
# Analyser les arguments
args = parser.parse_args()
# Configurer le logging
log_level = logging.DEBUG if args.verbose else logging.INFO
log_file = f"complete_workflow_{args.ticket_id}.log"
handlers: List[logging.Handler] = [logging.StreamHandler()]
if log_file:
handlers.append(logging.FileHandler(log_file))
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=handlers
)
# Exécuter le workflow
results = execute_workflow(
ticket_id=args.ticket_id,
output_dir=args.output_dir,
text_model=args.text_model,
vision_model=args.vision_model,
generate_report=not args.no_report,
save_results=not args.no_save
)
# Afficher un résumé des résultats
if "error" in results:
print(f"\nErreur lors de l'exécution du workflow: {results['error']}")
print(f"Étape en échec: {results.get('stage', 'inconnue')}")
return 1
print("\n=== Résumé du workflow complet ===")
print(f"Ticket: {results['ticket_id']}")
print(f"Temps d'exécution total: {results['execution_time']:.2f} secondes")
print(f"Analyse du ticket: {len(results['ticket_analysis']) if results.get('ticket_analysis') else 0} caractères")
print(f"Images trouvées: {results['images_count']}")
print(f"Images pertinentes: {results['relevant_images']}")
print(f"Images analysées: {results['images_analysed']}")
if results.get("report_file"):
print(f"Rapport généré: {results['report_file']}")
elif not args.no_report:
print("Rapport: Non généré (échec ou aucune donnée disponible)")
else:
print("Rapport: Désactivé")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,3 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Workflow d'analyse d'images avec contexte.
Ce module implémente le workflow suivant:

View File

@ -0,0 +1,162 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Workflow d'analyse de tickets.
Ce module implémente le workflow pour analyser un ticket avec l'agent AgentTicketAnalyser.
"""
import os
import sys
import argparse
import json
import logging
import time
from typing import Dict, Any, Optional
# Import des factories
from tests.common.agent_factory import create_ticket_analyser
from tests.common.llm_factory import create_llm
from tests.common.ticket_utils import get_ticket_json, get_ticket_report_file
logger = logging.getLogger("TicketAnalyserWorkflow")
def execute_workflow(ticket_id: str, output_dir: str = "output", model: str = "mistral_large") -> Dict[str, Any]:
"""
Exécute le workflow d'analyse de ticket.
Args:
ticket_id: Identifiant du ticket
output_dir: Répertoire contenant les tickets
model: Nom du modèle à utiliser pour l'analyse
Returns:
Dict avec les résultats du workflow
"""
logger.info(f"Démarrage du workflow pour le ticket {ticket_id}")
print(f"Démarrage du workflow pour le ticket {ticket_id}")
workflow_start = time.time()
# ÉTAPE 1: Initialiser le modèle LLM
try:
print(f"Initialisation du modèle {model}...")
llm = create_llm(model)
print(f"Modèle {model} initialisé avec succès")
except Exception as e:
logger.error(f"Erreur lors de l'initialisation du modèle: {e}")
print(f"ERREUR: Impossible d'initialiser le modèle: {e}")
return {"error": str(e), "stage": "init_llm"}
# ÉTAPE 2: Initialiser l'agent
try:
print("Création de l'agent d'analyse de tickets...")
agent = create_ticket_analyser(llm)
print("Agent créé avec succès")
except Exception as e:
logger.error(f"Erreur lors de la création de l'agent: {e}")
print(f"ERREUR: Impossible de créer l'agent: {e}")
return {"error": str(e), "stage": "init_agent"}
# ÉTAPE 3: Charger les données du ticket
try:
print("Chargement des données du ticket...")
json_file, ticket_data = get_ticket_json(ticket_id, output_dir)
if not json_file or not ticket_data:
error_msg = f"Impossible de charger les données du ticket {ticket_id}"
logger.error(error_msg)
print(f"ERREUR: {error_msg}")
return {"error": error_msg, "stage": "load_ticket"}
print(f"Données du ticket chargées: {json_file}")
except Exception as e:
logger.error(f"Erreur lors du chargement du ticket: {e}")
print(f"ERREUR: Impossible de charger le ticket: {e}")
return {"error": str(e), "stage": "load_ticket"}
# ÉTAPE 4: Analyser le ticket
try:
print("Analyse du ticket en cours...")
analysis = agent.executer(ticket_data)
print(f"Analyse du ticket terminée: {len(analysis) if analysis else 0} caractères")
except Exception as e:
logger.error(f"Erreur lors de l'analyse du ticket: {e}")
print(f"ERREUR: Impossible d'analyser le ticket: {e}")
return {"error": str(e), "stage": "analyse_ticket", "ticket_data": ticket_data}
# Calcul du temps total d'exécution
workflow_time = time.time() - workflow_start
print(f"Workflow terminé en {workflow_time:.2f} secondes")
# Retourner les résultats
return {
"ticket_id": ticket_id,
"model": model,
"ticket_data": ticket_data,
"analysis": analysis,
"execution_time": workflow_time
}
def main():
# Configuration de l'analyseur d'arguments
parser = argparse.ArgumentParser(description="Exécuter le workflow d'analyse de tickets")
parser.add_argument("ticket_id", help="ID du ticket à analyser (ex: T1234)")
parser.add_argument("--output_dir", default="output", help="Répertoire de sortie contenant les tickets")
parser.add_argument("--model", default="mistral_large", help="Modèle LLM à utiliser")
parser.add_argument("--verbose", "-v", action="store_true", help="Mode verbeux")
parser.add_argument("--save", action="store_true", help="Sauvegarder les résultats dans un fichier JSON")
# Analyser les arguments
args = parser.parse_args()
# Configurer le logging
log_level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f"ticket_analyser_{args.ticket_id}.log"),
logging.StreamHandler()
]
)
# Exécuter le workflow
results = execute_workflow(args.ticket_id, args.output_dir, args.model)
# Afficher un résumé des résultats
if "error" in results:
print(f"\nErreur lors de l'exécution du workflow: {results['error']}")
print(f"Étape en échec: {results.get('stage', 'inconnue')}")
return 1
print("\nRésumé du workflow:")
print(f"Ticket: {results['ticket_id']}")
print(f"Modèle: {results['model']}")
print(f"Taille de l'analyse: {len(results['analysis']) if results['analysis'] else 0} caractères")
print(f"Temps d'exécution: {results['execution_time']:.2f} secondes")
# Sauvegarder les résultats si demandé
if args.save:
results_dir = os.path.join("results", args.ticket_id)
os.makedirs(results_dir, exist_ok=True)
results_file = os.path.join(results_dir, f"ticket_analysis_{args.model}.json")
save_results = {
"ticket_id": results["ticket_id"],
"model": results["model"],
"analysis": results["analysis"],
"execution_time": results["execution_time"],
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}
with open(results_file, "w", encoding="utf-8") as f:
json.dump(save_results, f, ensure_ascii=False, indent=2)
print(f"Résultats sauvegardés dans: {results_file}")
return 0
if __name__ == "__main__":
sys.exit(main())