mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-13 09:06:51 +01:00
354 lines
16 KiB
Python
354 lines
16 KiB
Python
import os
|
|
import json
|
|
import logging
|
|
import time
|
|
import traceback
|
|
from typing import List, Dict, Any, Optional, Tuple, Union, Type
|
|
from agents.base_agent import BaseAgent
|
|
from loaders.ticket_data_loader import TicketDataLoader
|
|
from utils.image_dedup import filtrer_images_uniques
|
|
|
|
# Configuration globale du logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s',
|
|
filename='orchestrator.log', filemode='w')
|
|
logger = logging.getLogger("Orchestrator")
|
|
|
|
class Orchestrator:
|
|
def __init__(self,
|
|
output_dir: str = "output/",
|
|
ticket_agent: Optional[BaseAgent] = None,
|
|
image_sorter: Optional[BaseAgent] = None,
|
|
image_analyser: Optional[BaseAgent] = None,
|
|
report_generator: Optional[BaseAgent] = None,
|
|
config: Optional[Dict[str, Any]] = None):
|
|
"""
|
|
Initialise l'orchestrateur avec les agents nécessaires et la configuration.
|
|
|
|
Args:
|
|
output_dir: Répertoire de sortie pour les tickets
|
|
ticket_agent: Agent d'analyse de tickets
|
|
image_sorter: Agent de tri d'images
|
|
image_analyser: Agent d'analyse d'images
|
|
report_generator: Agent de génération de rapports
|
|
config: Configuration supplémentaire pour l'orchestrateur
|
|
"""
|
|
self.output_dir = output_dir
|
|
self.ticket_agent = ticket_agent
|
|
self.image_sorter = image_sorter
|
|
self.image_analyser = image_analyser
|
|
self.report_generator = report_generator
|
|
self.ticket_loader = TicketDataLoader()
|
|
|
|
# Configuration par défaut
|
|
self.config = {
|
|
"dedup_enabled": True,
|
|
"dedup_threshold": 5,
|
|
"save_results": True,
|
|
"debug_mode": False,
|
|
"reports_dir": "reports"
|
|
}
|
|
|
|
# Mise à jour avec la configuration fournie
|
|
if config:
|
|
self.config.update(config)
|
|
|
|
logger.info(f"Orchestrator initialisé avec la configuration: {self.config}")
|
|
|
|
def trouver_rapport(self, extraction_path: str, ticket_id: str) -> Optional[str]:
|
|
"""
|
|
Trouve le rapport JSON associé à un ticket dans un chemin d'extraction.
|
|
|
|
Args:
|
|
extraction_path: Chemin de l'extraction
|
|
ticket_id: ID du ticket
|
|
|
|
Returns:
|
|
Chemin du rapport trouvé ou None
|
|
"""
|
|
return self.ticket_loader.trouver_ticket(extraction_path, ticket_id)
|
|
|
|
def _preparer_donnees_ticket(self, json_path: Optional[str]) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Charge et prépare les données du ticket à partir d'un fichier JSON.
|
|
|
|
Args:
|
|
json_path: Chemin du fichier JSON
|
|
|
|
Returns:
|
|
Données du ticket chargées ou None en cas d'erreur
|
|
"""
|
|
if not json_path:
|
|
return None
|
|
try:
|
|
ticket_data = self.ticket_loader.charger(json_path)
|
|
logger.info(f"Données du ticket chargées depuis {json_path}")
|
|
return ticket_data
|
|
except Exception as e:
|
|
logger.error(f"Erreur chargement JSON: {e}")
|
|
return None
|
|
|
|
def executer(self, ticket_specifique: Optional[str] = None) -> None:
|
|
"""
|
|
Exécute l'analyse sur un ticket spécifique ou sur tous les tickets disponibles.
|
|
|
|
Args:
|
|
ticket_specifique: ID du ticket spécifique à analyser
|
|
"""
|
|
tickets = ([f"ticket_{ticket_specifique}"] if ticket_specifique else self._detecter_tickets())
|
|
|
|
if not tickets:
|
|
logger.warning("Aucun ticket à traiter trouvé.")
|
|
return
|
|
|
|
logger.info(f"Tickets à traiter: {tickets}")
|
|
|
|
for ticket in tickets:
|
|
try:
|
|
self.traiter_ticket(os.path.join(self.output_dir, ticket))
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du traitement du ticket {ticket}: {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
|
|
def traiter_ticket(self, ticket_path: str) -> None:
|
|
"""
|
|
Traite un ticket en exécutant tous les agents configurés.
|
|
|
|
Args:
|
|
ticket_path: Chemin du répertoire du ticket
|
|
"""
|
|
ticket_id = os.path.basename(ticket_path).replace("ticket_", "")
|
|
logger.info(f"Traitement du ticket {ticket_id}")
|
|
|
|
# Trouver l'extraction la plus récente
|
|
extractions = self._trouver_extractions(ticket_path, ticket_id)
|
|
if not extractions:
|
|
logger.warning(f"Aucune extraction trouvée pour le ticket {ticket_id}")
|
|
return
|
|
|
|
extraction_path = extractions[0]
|
|
logger.info(f"Utilisation de l'extraction: {os.path.basename(extraction_path)}")
|
|
|
|
# Créer les répertoires nécessaires
|
|
attachments_dir = os.path.join(extraction_path, "attachments")
|
|
rapports_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
|
|
os.makedirs(rapports_dir, exist_ok=True)
|
|
|
|
# Charger les données du ticket
|
|
json_path = self.trouver_rapport(extraction_path, ticket_id)
|
|
ticket_data = self._preparer_donnees_ticket(json_path)
|
|
if not ticket_data:
|
|
logger.error(f"Impossible de charger les données du ticket {ticket_id}")
|
|
return
|
|
|
|
# Étape 1: Analyser le ticket (si l'agent est disponible)
|
|
ticket_analysis = None
|
|
if self.ticket_agent:
|
|
logger.info(f"Exécution de l'agent d'analyse de ticket pour {ticket_id}")
|
|
try:
|
|
ticket_analysis = self.ticket_agent.executer(ticket_data)
|
|
logger.info(f"Analyse du ticket terminée: {len(ticket_analysis) if ticket_analysis else 0} caractères")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'analyse du ticket {ticket_id}: {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
|
|
# Étape 2: Trier et analyser les images
|
|
relevant_images, images_analyses = [], {}
|
|
if os.path.exists(attachments_dir) and (self.image_sorter or self.image_analyser):
|
|
logger.info(f"Traitement des images pour le ticket {ticket_id}")
|
|
|
|
# Récupérer toutes les images
|
|
images = self._trouver_images_dans_dossier(attachments_dir)
|
|
logger.info(f"Images trouvées: {len(images)}")
|
|
|
|
# Dédupliquer les images si la fonctionnalité est activée
|
|
if self.config.get("dedup_enabled", True):
|
|
images_avant = len(images)
|
|
images = filtrer_images_uniques(
|
|
images,
|
|
seuil_hamming=self.config.get("dedup_threshold", 5),
|
|
ticket_id=ticket_id
|
|
)
|
|
logger.info(f"Dédoublonnage: {images_avant} → {len(images)} images")
|
|
|
|
# Trier les images
|
|
if self.image_sorter:
|
|
for path in images:
|
|
try:
|
|
result_tri = self.image_sorter.executer(path)
|
|
is_relevant = result_tri.get("is_relevant", True)
|
|
images_analyses[path] = {"sorting": result_tri, "analysis": None}
|
|
|
|
if is_relevant:
|
|
relevant_images.append(path)
|
|
logger.info(f"Image pertinente: {os.path.basename(path)}")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du tri de l'image {os.path.basename(path)}: {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
|
|
# Sauvegarder les résultats accumulés après le traitement de toutes les images
|
|
if hasattr(self.image_sorter, "sauvegarder_resultats"):
|
|
try:
|
|
# Ignorer le type pour éviter l'erreur de linter
|
|
sorter_agent = self.image_sorter # type: ignore
|
|
sorter_agent.sauvegarder_resultats()
|
|
logger.info(f"Résultats de tri d'images sauvegardés pour le ticket {ticket_id}")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la sauvegarde des résultats de tri d'images: {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
else:
|
|
# Sans agent de tri, toutes les images sont considérées comme pertinentes
|
|
relevant_images = images
|
|
for path in images:
|
|
images_analyses[path] = {
|
|
"sorting": {"is_relevant": True, "reason": "pas d'agent de tri configuré"},
|
|
"analysis": None
|
|
}
|
|
|
|
logger.info(f"Images pertinentes: {len(relevant_images)}/{len(images)}")
|
|
|
|
# Analyser les images pertinentes
|
|
if self.image_analyser and ticket_analysis:
|
|
for path in relevant_images:
|
|
try:
|
|
logger.info(f"Analyse de l'image: {os.path.basename(path)}")
|
|
result = self.image_analyser.executer(path, contexte=ticket_analysis)
|
|
images_analyses[path]["analysis"] = result
|
|
|
|
if result and "error" not in result:
|
|
logger.info(f"Analyse terminée pour {os.path.basename(path)}")
|
|
else:
|
|
logger.warning(f"Analyse incomplète pour {os.path.basename(path)}")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'analyse de l'image {os.path.basename(path)}: {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
|
|
# Étape 3: Générer le rapport final
|
|
if self.report_generator and ticket_analysis:
|
|
try:
|
|
logger.info(f"Génération du rapport final pour le ticket {ticket_id}")
|
|
|
|
rapport_data = {
|
|
"ticket_id": ticket_id,
|
|
"ticket_data": ticket_data,
|
|
"ticket_analyse": ticket_analysis,
|
|
"analyse_images": images_analyses
|
|
}
|
|
|
|
# Créer le répertoire pour les rapports si nécessaire
|
|
reports_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", self.config.get("reports_dir", "reports")))
|
|
dest_dir = os.path.join(reports_root, ticket_id)
|
|
os.makedirs(dest_dir, exist_ok=True)
|
|
|
|
# Générer le rapport
|
|
rapport_final = self.report_generator.executer(rapport_data)
|
|
|
|
# Sauvegarder le rapport
|
|
if self.config.get("save_results", True):
|
|
rapport_path = os.path.join(dest_dir, f"rapport_final_{ticket_id}.json")
|
|
with open(rapport_path, "w", encoding="utf-8") as f:
|
|
json.dump(
|
|
{
|
|
"ticket_id": ticket_id,
|
|
"rapport": rapport_final,
|
|
"metadata": {
|
|
"images_total": len(images_analyses),
|
|
"images_pertinentes": len(relevant_images),
|
|
"images_analysees": sum(1 for img in images_analyses.values() if img.get("analysis")),
|
|
"timestamp": time.strftime("%Y%m%d_%H%M%S")
|
|
}
|
|
},
|
|
f,
|
|
ensure_ascii=False,
|
|
indent=2
|
|
)
|
|
logger.info(f"Rapport final sauvegardé dans {rapport_path}")
|
|
|
|
# Sauvegarder également une version texte du rapport pour faciliter la lecture
|
|
rapport_txt_path = os.path.join(dest_dir, f"rapport_final_{ticket_id}.txt")
|
|
with open(rapport_txt_path, "w", encoding="utf-8") as f:
|
|
f.write(f"RAPPORT D'ANALYSE DU TICKET {ticket_id}\n")
|
|
f.write("="*50 + "\n\n")
|
|
f.write(rapport_final)
|
|
logger.info(f"Version texte du rapport sauvegardée dans {rapport_txt_path}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la génération du rapport pour {ticket_id}: {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
|
|
logger.info(f"Traitement du ticket {ticket_id} terminé")
|
|
|
|
def _detecter_tickets(self) -> List[str]:
|
|
"""
|
|
Détecte tous les tickets disponibles dans le répertoire de sortie.
|
|
|
|
Returns:
|
|
Liste des noms de répertoires de tickets
|
|
"""
|
|
if not os.path.exists(self.output_dir):
|
|
logger.warning(f"Le répertoire de sortie {self.output_dir} n'existe pas")
|
|
return []
|
|
|
|
return [d for d in os.listdir(self.output_dir)
|
|
if os.path.isdir(os.path.join(self.output_dir, d))
|
|
and d.startswith("ticket_")]
|
|
|
|
def _trouver_extractions(self, ticket_path: str, ticket_id: str) -> List[str]:
|
|
"""
|
|
Trouve toutes les extractions pour un ticket et les trie par date (la plus récente en premier).
|
|
|
|
Args:
|
|
ticket_path: Chemin du répertoire du ticket
|
|
ticket_id: ID du ticket
|
|
|
|
Returns:
|
|
Liste des chemins d'extraction triés par date (la plus récente en premier)
|
|
"""
|
|
if not os.path.exists(ticket_path):
|
|
return []
|
|
|
|
extractions = [
|
|
os.path.join(ticket_path, d) for d in os.listdir(ticket_path)
|
|
if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(ticket_id)
|
|
]
|
|
|
|
# Trier par date de modification (plus récente en premier)
|
|
extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True)
|
|
|
|
return extractions
|
|
|
|
def _trouver_images_dans_dossier(self, dossier: str) -> List[str]:
|
|
"""
|
|
Trouve toutes les images dans un dossier et ses sous-dossiers.
|
|
|
|
Args:
|
|
dossier: Chemin du dossier à explorer
|
|
|
|
Returns:
|
|
Liste des chemins d'images trouvées
|
|
"""
|
|
extensions = ['.jpg', '.jpeg', '.png', '.gif', '.webp']
|
|
images = []
|
|
|
|
# Explorer le dossier principal
|
|
for fichier in os.listdir(dossier):
|
|
fichier_path = os.path.join(dossier, fichier)
|
|
|
|
# Si c'est un fichier avec une extension d'image
|
|
if os.path.isfile(fichier_path) and any(fichier.lower().endswith(ext) for ext in extensions):
|
|
images.append(fichier_path)
|
|
|
|
# Si c'est un dossier, l'explorer récursivement
|
|
elif os.path.isdir(fichier_path):
|
|
for sous_fichier in os.listdir(fichier_path):
|
|
sous_fichier_path = os.path.join(fichier_path, sous_fichier)
|
|
if os.path.isfile(sous_fichier_path) and any(sous_fichier.lower().endswith(ext) for ext in extensions):
|
|
images.append(sous_fichier_path)
|
|
|
|
return images
|