mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-13 10:46:51 +01:00
459 lines
22 KiB
Python
459 lines
22 KiB
Python
import os
|
|
import json
|
|
import logging
|
|
import time
|
|
import traceback
|
|
from typing import List, Dict, Any, Optional, Tuple, cast
|
|
|
|
from agents.base_agent import BaseAgent
|
|
from agents.llama_vision.agent_image_sorter import AgentImageSorter
|
|
from loaders.ticket_data_loader import TicketDataLoader
|
|
from utils.image_dedup import filtrer_images_uniques
|
|
from utils.ocr_utils import extraire_texte
|
|
from utils.translate_utils import fr_to_en, en_to_fr, sauvegarder_ocr_traduction
|
|
|
|
# Configuration du logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
filename='orchestrator_llama.log', filemode='w')
|
|
logger = logging.getLogger("OrchestratorLlamaVision")
|
|
|
|
def __init__(self,
|
|
output_dir: str = "output/",
|
|
ticket_agent: Optional[BaseAgent] = None,
|
|
image_sorter: Optional[BaseAgent] = None,
|
|
image_analyser: Optional[BaseAgent] = None,
|
|
report_generator: Optional[BaseAgent] = None,
|
|
vision_ocr: Optional[BaseAgent] = None,
|
|
config: Optional[Dict[str, Any]] = None):
|
|
|
|
self.output_dir = output_dir
|
|
self.ticket_agent = ticket_agent
|
|
self.image_sorter = image_sorter
|
|
self.image_analyser = image_analyser
|
|
self.report_generator = report_generator
|
|
self.vision_ocr = vision_ocr
|
|
self.ticket_loader = TicketDataLoader()
|
|
|
|
self.config = {
|
|
"dedup_enabled": True,
|
|
"dedup_threshold": 5,
|
|
"save_results": True,
|
|
"debug_mode": False,
|
|
"reports_dir": "reports",
|
|
"ocr_enabled": True,
|
|
"ocr_llm_enabled": True,
|
|
"english_only": True,
|
|
"model_name": "llama3-vision-90b-instruct"
|
|
}
|
|
|
|
if config:
|
|
self.config.update(config)
|
|
|
|
# Assurer la cohérence des noms de modèles
|
|
if "model_name" in self.config:
|
|
self.config["model_name"] = self.config["model_name"].replace(".", "-").replace(":", "-").replace("_", "-")
|
|
|
|
logger.info(f"OrchestratorLlamaVision initialisé avec les paramètres: {self.config}")
|
|
|
|
def executer(self, ticket_id: Optional[str] = None):
|
|
ticket_path = os.path.join(self.output_dir, f"ticket_{ticket_id}")
|
|
if not os.path.exists(ticket_path):
|
|
logger.error(f"Le ticket {ticket_id} est introuvable dans {ticket_path}")
|
|
return
|
|
|
|
try:
|
|
self.traiter_ticket(ticket_path)
|
|
except Exception as e:
|
|
logger.error(f"Erreur globale sur le ticket {ticket_id}: {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
|
|
def traiter_ticket(self, ticket_path: str):
|
|
ticket_id = os.path.basename(ticket_path).replace("ticket_", "")
|
|
logger.info(f"Traitement du ticket {ticket_id}")
|
|
|
|
extractions = self._trouver_extractions(ticket_path, ticket_id)
|
|
if not extractions:
|
|
logger.warning(f"Aucune extraction trouvée pour le ticket {ticket_id}")
|
|
return
|
|
extraction_path = extractions[0]
|
|
|
|
attachments_dir = os.path.join(extraction_path, "attachments")
|
|
rapport_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
|
|
os.makedirs(rapport_dir, exist_ok=True)
|
|
|
|
# Créer le répertoire pipeline une seule fois
|
|
pipeline_dir = os.path.join(rapport_dir, "pipeline")
|
|
os.makedirs(pipeline_dir, exist_ok=True)
|
|
|
|
# Récupérer le nom du modèle pour le logging
|
|
model_name = self.config.get("model_name", "llama3-vision-90b-instruct")
|
|
# Normaliser pour éviter les problèmes dans les noms de fichiers
|
|
model_name = model_name.replace(".", "-").replace(":", "-").replace("_", "-")
|
|
logger.info(f"Utilisation du modèle: {model_name}")
|
|
|
|
json_path = self.ticket_loader.trouver_ticket(extraction_path, ticket_id)
|
|
ticket_data = self._charger_ticket(json_path)
|
|
if not ticket_data:
|
|
logger.error(f"Impossible de charger les données du ticket {ticket_id}")
|
|
return
|
|
|
|
# Ajouter le chemin du fichier JSON au ticket_data pour faciliter l'extraction du ticket_id
|
|
if json_path:
|
|
ticket_data["file_path"] = json_path
|
|
|
|
# Traduire le contenu du ticket en anglais avant l'analyse
|
|
# et stocker la version originale
|
|
if self.config.get("english_only", True) and ticket_data.get("content"):
|
|
logger.info(f"Traduction du contenu du ticket {ticket_id} en anglais")
|
|
ticket_data["content_original"] = ticket_data["content"]
|
|
|
|
# Vérifier si le contenu est déjà en anglais (détection simple)
|
|
english_indicators = ["the ", "is ", "are ", "what ", "when ", "how ", "why "]
|
|
if any(indicator in ticket_data["content"].lower() for indicator in english_indicators):
|
|
logger.info("Le contenu semble déjà être en anglais, pas de traduction nécessaire")
|
|
ticket_data["is_english"] = True
|
|
ticket_data["content_en"] = ticket_data["content"]
|
|
else:
|
|
# Traduire en anglais
|
|
ticket_data["content_en"] = fr_to_en(ticket_data["content"])
|
|
ticket_data["is_english"] = False
|
|
|
|
logger.info(f"Traduction terminée: {len(ticket_data['content_en'])} caractères")
|
|
|
|
# Étape 1: Analyser le ticket (si l'agent est disponible)
|
|
ticket_analysis = None
|
|
if self.ticket_agent:
|
|
try:
|
|
logger.info(f"Exécution de l'agent d'analyse de ticket pour {ticket_id}")
|
|
ticket_analysis = self.ticket_agent.executer(ticket_data)
|
|
|
|
# Vérifier si l'analyse a été réalisée avec succès
|
|
if ticket_analysis:
|
|
logger.info(f"Analyse du ticket terminée: {len(ticket_analysis.get('response', ''))} caractères")
|
|
|
|
# Si le répertoire des rapports existe mais que le fichier d'analyse n'a pas été créé
|
|
# on force sa création ici
|
|
pipeline_dir = os.path.join(rapport_dir, "pipeline")
|
|
analyse_files = [f for f in os.listdir(pipeline_dir)
|
|
if f.startswith("analyse_ticket_") and f.endswith("_results.json")]
|
|
|
|
if not analyse_files:
|
|
logger.warning("Aucun fichier d'analyse de ticket trouvé, tentative de sauvegarde forcée")
|
|
from agents.utils.pipeline_logger import sauvegarder_donnees
|
|
sauvegarder_donnees(
|
|
ticket_id=ticket_id,
|
|
step_name="analyse_ticket",
|
|
data=ticket_analysis,
|
|
base_dir=rapport_dir,
|
|
is_resultat=True
|
|
)
|
|
else:
|
|
logger.error(f"L'analyse du ticket {ticket_id} n'a pas produit de résultat")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'analyse du ticket {ticket_id}: {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
|
|
images_analyses, relevant_images = {}, []
|
|
ocr_results = {}
|
|
|
|
if os.path.exists(attachments_dir):
|
|
images = self._lister_images(attachments_dir)
|
|
if self.config.get("dedup_enabled", True):
|
|
images = filtrer_images_uniques(images, seuil_hamming=self.config["dedup_threshold"], ticket_id=ticket_id)
|
|
|
|
# Réaliser l'OCR sur toutes les images avant le tri
|
|
if self.config.get("ocr_enabled", True):
|
|
logger.info(f"Traitement OCR de {len(images)} images")
|
|
for img in images:
|
|
try:
|
|
ocr_fr, langue = extraire_texte(img, lang="auto")
|
|
|
|
# Traduire le texte extrait en anglais pour une meilleure analyse
|
|
ocr_en = fr_to_en(ocr_fr) if ocr_fr else ""
|
|
|
|
# Traduire à nouveau en français pour vérification (optionnel)
|
|
ocr_en_back_fr = en_to_fr(ocr_en) if ocr_en else ""
|
|
|
|
# Sauvegarder les résultats OCR directement dans le répertoire pipeline
|
|
# au lieu de créer un sous-répertoire T11143
|
|
sauvegarder_ocr_traduction(img, ticket_id, ocr_fr, ocr_en, ocr_en_back_fr, base_dir=rapport_dir)
|
|
|
|
# Stocker le résultat de l'OCR pour utilisation ultérieure
|
|
ocr_results[img] = {
|
|
"texte_fr": ocr_fr,
|
|
"texte_en": ocr_en,
|
|
"langue_detectee": langue
|
|
}
|
|
|
|
logger.info(f"OCR terminé pour {os.path.basename(img)}: {len(ocr_fr)} caractères ({langue})")
|
|
except Exception as e:
|
|
logger.warning(f"Erreur OCR pour {os.path.basename(img)}: {e}")
|
|
ocr_results[img] = {"texte_fr": "", "texte_en": "", "langue_detectee": "unknown"}
|
|
|
|
# Traiter toutes les images avec l'agent de tri
|
|
if self.image_sorter:
|
|
logger.info(f"Traitement de {len(images)} images uniques avec l'agent de tri")
|
|
|
|
# Trier toutes les images et collecter les résultats
|
|
for img in images:
|
|
try:
|
|
# Inclure l'OCR avec le chemin de l'image pour aider au tri
|
|
ocr_context = ocr_results.get(img, {"texte_en": ""}).get("texte_en", "")
|
|
result_sort = self.image_sorter.executer(img, ocr_context=ocr_context)
|
|
|
|
# Déterminer si l'image est pertinente
|
|
is_relevant = result_sort.get("is_relevant", True)
|
|
if is_relevant:
|
|
relevant_images.append(img)
|
|
|
|
# Stocker le résultat pour l'analyse ultérieure
|
|
images_analyses[img] = {
|
|
"sorting": result_sort or {"is_relevant": True},
|
|
"analysis": None,
|
|
"ocr": ocr_results.get(img, {})
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f"Erreur tri image {os.path.basename(img)}: {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
|
|
# Sauvegarder tous les résultats en une seule fois pour éviter les doublons
|
|
if self.image_sorter and hasattr(self.image_sorter, "sauvegarder_resultats"):
|
|
try:
|
|
# Cast l'agent en AgentImageSorter pour satisfaire le linter
|
|
image_sorter = cast(AgentImageSorter, self.image_sorter)
|
|
# Méthode sauvegarder_resultats améliorée pour accumuler les résultats
|
|
image_sorter.sauvegarder_resultats()
|
|
logger.info(f"Sauvegarde groupée de {len(images)} résultats de tri d'images effectuée")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la sauvegarde groupée des résultats de tri: {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
else:
|
|
logger.warning("L'agent de tri d'images ne dispose pas de la méthode sauvegarder_resultats")
|
|
else:
|
|
# Si pas d'agent de tri, considérer toutes les images comme pertinentes
|
|
relevant_images = images.copy()
|
|
for img in images:
|
|
images_analyses[img] = {
|
|
"sorting": {"is_relevant": True},
|
|
"analysis": None,
|
|
"ocr": ocr_results.get(img, {})
|
|
}
|
|
|
|
# Analyser les images pertinentes avec l'agent d'analyse d'images
|
|
if self.image_analyser:
|
|
logger.info(f"Début de l'analyse des images avec {len(relevant_images)} images pertinentes")
|
|
analyses_resultats = [] # Pour accumuler les résultats
|
|
|
|
for img in relevant_images:
|
|
try:
|
|
# Intégrer les résultats de l'OCR dans le contexte
|
|
ocr_info = ocr_results.get(img, {})
|
|
contexte_enrichi = self._enrichir_contexte(
|
|
ticket_analysis if ticket_analysis else {},
|
|
ocr_info
|
|
)
|
|
|
|
logger.info(f"Analyse de l'image: {os.path.basename(img)}")
|
|
result = self.image_analyser.executer(img, contexte=contexte_enrichi)
|
|
|
|
if result:
|
|
images_analyses[img]["analysis"] = result
|
|
analyses_resultats.append(result)
|
|
logger.info(f"Analyse terminée pour {os.path.basename(img)}")
|
|
else:
|
|
logger.warning(f"Pas de résultat d'analyse pour {os.path.basename(img)}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur analyse image {os.path.basename(img)}: {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
|
|
# Sauvegarder les résultats via la méthode sauvegarder_resultats si disponible
|
|
if hasattr(self.image_analyser, "sauvegarder_resultats"):
|
|
try:
|
|
self.image_analyser.sauvegarder_resultats()
|
|
logger.info(f"Sauvegarde des résultats d'analyse d'images via sauvegarder_resultats")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la sauvegarde des résultats d'analyse d'images: {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
else:
|
|
# Fallback vers sauvegarder_donnees si sauvegarder_resultats n'est pas disponible
|
|
try:
|
|
from agents.utils.pipeline_logger import sauvegarder_donnees
|
|
sauvegarder_donnees(
|
|
ticket_id=ticket_id,
|
|
step_name="analyse_image",
|
|
data=analyses_resultats,
|
|
base_dir=rapport_dir,
|
|
is_resultat=True
|
|
)
|
|
logger.info(f"Sauvegarde de {len(analyses_resultats)} analyses d'images effectuée via sauvegarder_donnees")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la sauvegarde des analyses d'images: {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
|
|
if self.report_generator and ticket_analysis:
|
|
try:
|
|
# Normaliser le nom du modèle pour éviter les doublons de rapports
|
|
model_name = self.config.get("model_name", "").replace(".", "-").replace(":", "-").replace("_", "-")
|
|
if not model_name:
|
|
model_name = "llama3-vision-90b-instruct"
|
|
|
|
rapport_data = {
|
|
"ticket_id": ticket_id,
|
|
"ticket_data": ticket_data,
|
|
"ticket_analyse": ticket_analysis,
|
|
"analyse_images": images_analyses,
|
|
"metadata": {
|
|
"model_name": model_name
|
|
}
|
|
}
|
|
|
|
# Utiliser directement le répertoire rapports_dir au lieu de recréer un chemin
|
|
rapport_final = self.report_generator.executer(rapport_data)
|
|
|
|
# Le rapport_generator est responsable de sauvegarder les fichiers
|
|
# Nous n'avons pas besoin de dupliquer les sauvegardes ici
|
|
# Les files sont sauvegardés via la fonction sauvegarder_donnees dans le pipeline
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du rapport : {e}")
|
|
if self.config.get("debug_mode"):
|
|
logger.error(traceback.format_exc())
|
|
|
|
logger.info(f"Traitement terminé pour le ticket {ticket_id}")
|
|
|
|
def _enrichir_contexte(self, ticket_analysis: Dict[str, Any], ocr_info: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Enrichit le contexte de l'analyse du ticket avec les informations OCR
|
|
|
|
Args:
|
|
ticket_analysis: Résultat de l'analyse du ticket
|
|
ocr_info: Informations OCR d'une image
|
|
|
|
Returns:
|
|
Contexte enrichi
|
|
"""
|
|
if not isinstance(ticket_analysis, dict):
|
|
return ticket_analysis
|
|
|
|
# Créer une copie pour ne pas modifier l'original
|
|
contexte_enrichi = ticket_analysis.copy()
|
|
|
|
# Ajouter les informations OCR
|
|
if ocr_info:
|
|
contexte_enrichi["ocr_info"] = ocr_info
|
|
|
|
# Utiliser la version anglaise du texte OCR pour l'analyse
|
|
if self.config.get("english_only") and ocr_info.get("texte_en"):
|
|
contexte_enrichi["ocr_text"] = ocr_info["texte_en"]
|
|
else:
|
|
contexte_enrichi["ocr_text"] = ocr_info.get("texte_fr", "")
|
|
|
|
return contexte_enrichi
|
|
|
|
def _charger_ticket(self, json_path: Optional[str]) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Charge et prépare les données du ticket à partir d'un fichier JSON.
|
|
|
|
Args:
|
|
json_path: Chemin du fichier JSON
|
|
|
|
Returns:
|
|
Données du ticket chargées ou None en cas d'erreur
|
|
"""
|
|
if not json_path:
|
|
logger.warning("Aucun chemin JSON fourni")
|
|
return None
|
|
try:
|
|
ticket_data = self.ticket_loader.charger(json_path)
|
|
|
|
# Préparer le contenu du ticket à partir des messages
|
|
messages = ticket_data.get("messages", [])
|
|
contenu = []
|
|
|
|
# Ajouter le titre/description
|
|
if "name" in ticket_data:
|
|
contenu.append(f"TITRE: {ticket_data['name']}")
|
|
if "description" in ticket_data and ticket_data["description"] != "*Contenu non extractible*":
|
|
contenu.append(f"DESCRIPTION: {ticket_data['description']}")
|
|
|
|
# Ajouter chaque message
|
|
for msg in messages:
|
|
auteur = msg.get("author_id", "Inconnu")
|
|
date = msg.get("date", "")
|
|
msg_type = msg.get("message_type", "")
|
|
content = msg.get("content", "").strip()
|
|
|
|
if content:
|
|
contenu.append(f"\n[{date}] {auteur} ({msg_type}):")
|
|
contenu.append(content)
|
|
|
|
# Ajouter le contenu formaté au ticket_data
|
|
ticket_data["content"] = "\n".join(contenu)
|
|
|
|
logger.info(f"Données du ticket chargées depuis {json_path} avec {len(messages)} messages")
|
|
return ticket_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur chargement JSON : {e}")
|
|
return None
|
|
|
|
def _trouver_extractions(self, ticket_path: str, ticket_id: str) -> List[str]:
|
|
return sorted(
|
|
[os.path.join(ticket_path, d) for d in os.listdir(ticket_path)
|
|
if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(ticket_id)],
|
|
key=lambda x: os.path.getmtime(x),
|
|
reverse=True
|
|
)
|
|
|
|
def _lister_images(self, dossier: str) -> List[str]:
|
|
"""
|
|
Liste toutes les images dans un dossier avec une reconnaissance étendue
|
|
des formats d'images.
|
|
|
|
Args:
|
|
dossier: Dossier contenant les images à analyser
|
|
|
|
Returns:
|
|
Liste des chemins d'images trouvées
|
|
"""
|
|
# Liste étendue des extensions d'images courantes
|
|
extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.tiff', '.tif']
|
|
images = []
|
|
|
|
# Parcourir le dossier pour trouver toutes les images
|
|
if os.path.exists(dossier):
|
|
for racine, _, fichiers in os.walk(dossier):
|
|
for f in fichiers:
|
|
# Vérifier l'extension du fichier (non sensible à la casse)
|
|
if any(f.lower().endswith(ext) for ext in extensions):
|
|
chemin_complet = os.path.join(racine, f)
|
|
# Vérifier que le fichier est bien une image valide et accessible
|
|
try:
|
|
from PIL import Image
|
|
with Image.open(chemin_complet) as img:
|
|
# S'assurer que c'est bien une image en vérifiant ses dimensions
|
|
width, height = img.size
|
|
if width > 0 and height > 0:
|
|
images.append(chemin_complet)
|
|
except Exception as e:
|
|
logger.warning(f"Image ignorée {f}: {str(e)}")
|
|
|
|
if not images:
|
|
logger.warning(f"Aucune image trouvée dans {dossier}")
|
|
else:
|
|
logger.info(f"{len(images)} images trouvées dans {dossier}")
|
|
|
|
return images
|
|
|
|
|
|
|
|
|
|
|