llm_ticket3/orchestrator_llama.py
2025-04-25 16:02:24 +02:00

475 lines
24 KiB
Python

import os
import json
import logging
import time
import traceback
from typing import List, Dict, Any, Optional, Tuple, cast
from agents.base_agent import BaseAgent
from agents.llama_vision.agent_image_sorter import AgentImageSorter
from loaders.ticket_data_loader import TicketDataLoader
from utils.image_dedup import filtrer_images_uniques
from utils.ocr_utils import extraire_texte
from utils.translate_utils import fr_to_en, en_to_fr, sauvegarder_ocr_traduction
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='orchestrator_llama.log', filemode='w')
logger = logging.getLogger("OrchestratorLlamaVision")
class OrchestratorLlamaVision:
"""Orchestrateur pour l'analyse des tickets avec llama_vision."""
def __init__(self,
output_dir: str = "output/",
ticket_agent: Optional[BaseAgent] = None,
image_sorter: Optional[BaseAgent] = None,
image_analyser: Optional[BaseAgent] = None,
report_generator: Optional[BaseAgent] = None,
config: Optional[Dict[str, Any]] = None):
"""Initialisation de l'orchestrateur."""
self.output_dir = output_dir
self.ticket_agent = ticket_agent
self.image_sorter = image_sorter
self.image_analyser = image_analyser
self.report_generator = report_generator
self.ticket_loader = TicketDataLoader()
self.config = {
"dedup_enabled": True,
"dedup_threshold": 5,
"save_results": True,
"debug_mode": False,
"reports_dir": "reports",
"ocr_enabled": True,
"english_only": True,
"model_name": "llama3-vision-90b-instruct" # Nom du modèle par défaut
}
if config:
self.config.update(config)
# Assurer la cohérence des noms de modèles
if "model_name" in self.config:
self.config["model_name"] = self.config["model_name"].replace(".", "-").replace(":", "-").replace("_", "-")
logger.info(f"OrchestratorLlamaVision initialisé avec les paramètres: {self.config}")
def executer(self, ticket_id: Optional[str] = None):
ticket_path = os.path.join(self.output_dir, f"ticket_{ticket_id}")
if not os.path.exists(ticket_path):
logger.error(f"Le ticket {ticket_id} est introuvable dans {ticket_path}")
return
try:
self.traiter_ticket(ticket_path)
except Exception as e:
logger.error(f"Erreur globale sur le ticket {ticket_id}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
def traiter_ticket(self, ticket_path: str):
ticket_id = os.path.basename(ticket_path).replace("ticket_", "")
logger.info(f"Traitement du ticket {ticket_id}")
extractions = self._trouver_extractions(ticket_path, ticket_id)
if not extractions:
logger.warning(f"Aucune extraction trouvée pour le ticket {ticket_id}")
return
extraction_path = extractions[0]
attachments_dir = os.path.join(extraction_path, "attachments")
rapport_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
os.makedirs(rapport_dir, exist_ok=True)
# Créer le répertoire pipeline une seule fois
pipeline_dir = os.path.join(rapport_dir, "pipeline")
os.makedirs(pipeline_dir, exist_ok=True)
# Récupérer le nom du modèle pour le logging
model_name = self.config.get("model_name", "llama3-vision-90b-instruct")
# Normaliser pour éviter les problèmes dans les noms de fichiers
model_name = model_name.replace(".", "-").replace(":", "-").replace("_", "-")
logger.info(f"Utilisation du modèle: {model_name}")
json_path = self.ticket_loader.trouver_ticket(extraction_path, ticket_id)
ticket_data = self._charger_ticket(json_path)
if not ticket_data:
logger.error(f"Impossible de charger les données du ticket {ticket_id}")
return
# Ajouter le chemin du fichier JSON au ticket_data pour faciliter l'extraction du ticket_id
if json_path:
ticket_data["file_path"] = json_path
# Traduire le contenu du ticket en anglais avant l'analyse
# et stocker la version originale
if self.config.get("english_only", True) and ticket_data.get("content"):
logger.info(f"[TRADUCTION] Début de traduction du contenu du ticket {ticket_id} (FR → EN)")
logger.info(f"[TRADUCTION] Taille du contenu original: {len(ticket_data['content'])} caractères")
ticket_data["content_original"] = ticket_data["content"]
# Vérifier si le contenu est déjà en anglais (détection simple)
english_indicators = ["the ", "is ", "are ", "what ", "when ", "how ", "why "]
if any(indicator in ticket_data["content"].lower() for indicator in english_indicators):
logger.info("[TRADUCTION] Le contenu semble déjà être en anglais, pas de traduction nécessaire")
ticket_data["is_english"] = True
ticket_data["content_en"] = ticket_data["content"]
else:
# Traduire en anglais
logger.info("[TRADUCTION] Traduction du contenu original en anglais via fr_to_en")
ticket_data["content_en"] = fr_to_en(ticket_data["content"])
ticket_data["is_english"] = False
logger.info(f"[TRADUCTION] Traduction terminée: {len(ticket_data['content_en'])} caractères")
logger.info(f"[TRADUCTION] La clé 'content_en' sera utilisée par les agents pour l'analyse en anglais")
# Étape 1: Analyser le ticket (si l'agent est disponible)
ticket_analysis = None
if self.ticket_agent:
try:
logger.info(f"Exécution de l'agent d'analyse de ticket pour {ticket_id}")
ticket_analysis = self.ticket_agent.executer(ticket_data)
# Vérifier si l'analyse a été réalisée avec succès
if ticket_analysis:
logger.info(f"Analyse du ticket terminée: {len(ticket_analysis.get('response', ''))} caractères")
# Si le répertoire des rapports existe mais que le fichier d'analyse n'a pas été créé
# on force sa création ici
pipeline_dir = os.path.join(rapport_dir, "pipeline")
analyse_files = [f for f in os.listdir(pipeline_dir)
if f.startswith("analyse_ticket_") and f.endswith("_results.json")]
if not analyse_files:
logger.warning("Aucun fichier d'analyse de ticket trouvé, tentative de sauvegarde forcée")
from agents.utils.pipeline_logger import sauvegarder_donnees
sauvegarder_donnees(
ticket_id=ticket_id,
step_name="analyse_ticket",
data=ticket_analysis,
base_dir=rapport_dir,
is_resultat=True
)
else:
logger.error(f"L'analyse du ticket {ticket_id} n'a pas produit de résultat")
except Exception as e:
logger.error(f"Erreur lors de l'analyse du ticket {ticket_id}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
images_analyses, relevant_images = {}, []
ocr_results = {}
if os.path.exists(attachments_dir):
images = self._lister_images(attachments_dir)
if self.config.get("dedup_enabled", True):
images = filtrer_images_uniques(images, seuil_hamming=self.config["dedup_threshold"], ticket_id=ticket_id)
# Réaliser l'OCR sur toutes les images avant le tri
if self.config.get("ocr_enabled", True):
logger.info(f"Traitement OCR de {len(images)} images")
for img in images:
try:
ocr_fr, langue = extraire_texte(img, lang="auto")
# Traduire le texte extrait en anglais pour une meilleure analyse
logger.info(f"[TRADUCTION] Traduction OCR pour {os.path.basename(img)} (FR → EN)")
logger.info(f"[TRADUCTION] Texte OCR original (FR): {len(ocr_fr)} caractères")
ocr_en = fr_to_en(ocr_fr) if ocr_fr else ""
logger.info(f"[TRADUCTION] Texte OCR traduit (EN): {len(ocr_en)} caractères")
# Traduire à nouveau en français pour vérification (optionnel)
logger.info(f"[TRADUCTION] Traduction OCR inverse pour validation (EN → FR)")
ocr_en_back_fr = en_to_fr(ocr_en) if ocr_en else ""
logger.info(f"[TRADUCTION] Texte OCR retraduit (FR): {len(ocr_en_back_fr)} caractères")
# Sauvegarder les résultats OCR directement dans le répertoire pipeline
# au lieu de créer un sous-répertoire T11143
sauvegarder_ocr_traduction(img, ticket_id, ocr_fr, ocr_en, ocr_en_back_fr, base_dir=rapport_dir)
# Stocker le résultat de l'OCR pour utilisation ultérieure
ocr_results[img] = {
"texte_fr": ocr_fr,
"texte_en": ocr_en,
"langue_detectee": langue
}
logger.info(f"OCR terminé pour {os.path.basename(img)}: {len(ocr_fr)} caractères ({langue})")
except Exception as e:
logger.warning(f"Erreur OCR pour {os.path.basename(img)}: {e}")
ocr_results[img] = {"texte_fr": "", "texte_en": "", "langue_detectee": "unknown"}
# Traiter toutes les images avec l'agent de tri
if self.image_sorter:
logger.info(f"Traitement de {len(images)} images uniques avec l'agent de tri")
# Trier toutes les images et collecter les résultats
for img in images:
try:
# Inclure l'OCR avec le chemin de l'image pour aider au tri
ocr_context = ocr_results.get(img, {"texte_en": ""}).get("texte_en", "")
logger.info(f"[AGENT] Transmission à l'agent de tri: image={os.path.basename(img)}, OCR EN={len(ocr_context)} caractères")
result_sort = self.image_sorter.executer(img, ocr_context=ocr_context)
# Déterminer si l'image est pertinente
is_relevant = result_sort.get("is_relevant", True)
if is_relevant:
relevant_images.append(img)
# Stocker le résultat pour l'analyse ultérieure
images_analyses[img] = {
"sorting": result_sort or {"is_relevant": True},
"analysis": None,
"ocr": ocr_results.get(img, {})
}
except Exception as e:
logger.warning(f"Erreur tri image {os.path.basename(img)}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
# Sauvegarder tous les résultats en une seule fois pour éviter les doublons
if self.image_sorter and hasattr(self.image_sorter, "sauvegarder_resultats"):
try:
# Cast l'agent en AgentImageSorter pour satisfaire le linter
image_sorter = cast(AgentImageSorter, self.image_sorter)
# Méthode sauvegarder_resultats améliorée pour accumuler les résultats
image_sorter.sauvegarder_resultats()
logger.info(f"Sauvegarde groupée de {len(images)} résultats de tri d'images effectuée")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde groupée des résultats de tri: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
else:
logger.warning("L'agent de tri d'images ne dispose pas de la méthode sauvegarder_resultats")
else:
# Si pas d'agent de tri, considérer toutes les images comme pertinentes
relevant_images = images.copy()
for img in images:
images_analyses[img] = {
"sorting": {"is_relevant": True},
"analysis": None,
"ocr": ocr_results.get(img, {})
}
# Analyser les images pertinentes avec l'agent d'analyse d'images
if self.image_analyser:
logger.info(f"Début de l'analyse des images avec {len(relevant_images)} images pertinentes")
analyses_resultats = [] # Pour accumuler les résultats
for img in relevant_images:
try:
# Intégrer les résultats de l'OCR dans le contexte
ocr_info = ocr_results.get(img, {})
contexte_enrichi = self._enrichir_contexte(
ticket_analysis if ticket_analysis else {},
ocr_info
)
logger.info(f"[AGENT] Analyse de l'image: {os.path.basename(img)}")
logger.info(f"[AGENT] Contexte transmis: ticket_analysis={bool(ticket_analysis)}, OCR_FR={len(ocr_info.get('texte_fr', ''))}, OCR_EN={len(ocr_info.get('texte_en', ''))}")
result = self.image_analyser.executer(img, contexte=contexte_enrichi)
if result:
images_analyses[img]["analysis"] = result
analyses_resultats.append(result)
logger.info(f"Analyse terminée pour {os.path.basename(img)}")
else:
logger.warning(f"Pas de résultat d'analyse pour {os.path.basename(img)}")
except Exception as e:
logger.error(f"Erreur analyse image {os.path.basename(img)}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
# Sauvegarder les résultats via la méthode sauvegarder_resultats si disponible
if hasattr(self.image_analyser, "sauvegarder_resultats"):
try:
self.image_analyser.sauvegarder_resultats()
logger.info(f"Sauvegarde des résultats d'analyse d'images via sauvegarder_resultats")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des résultats d'analyse d'images: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
else:
# Fallback vers sauvegarder_donnees si sauvegarder_resultats n'est pas disponible
try:
from agents.utils.pipeline_logger import sauvegarder_donnees
sauvegarder_donnees(
ticket_id=ticket_id,
step_name="analyse_image",
data=analyses_resultats,
base_dir=rapport_dir,
is_resultat=True
)
logger.info(f"Sauvegarde de {len(analyses_resultats)} analyses d'images effectuée via sauvegarder_donnees")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des analyses d'images: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
if self.report_generator and ticket_analysis:
try:
# Normaliser le nom du modèle pour éviter les doublons de rapports
model_name = self.config.get("model_name", "").replace(".", "-").replace(":", "-").replace("_", "-")
if not model_name:
model_name = "llama3-vision-90b-instruct"
rapport_data = {
"ticket_id": ticket_id,
"ticket_data": ticket_data,
"ticket_analyse": ticket_analysis,
"analyse_images": images_analyses,
"metadata": {
"model_name": model_name
}
}
logger.info(f"[AGENT] Transmission au générateur de rapport: ticket_id={ticket_id}, ticket_analyse={bool(ticket_analysis)}, images_analysées={len(images_analyses)}")
logger.info(f"[AGENT] Données du ticket transmises: originales (FR) et traduites (EN)")
# Utiliser directement le répertoire rapports_dir au lieu de recréer un chemin
rapport_final = self.report_generator.executer(rapport_data)
# Le rapport_generator est responsable de sauvegarder les fichiers
# Nous n'avons pas besoin de dupliquer les sauvegardes ici
# Les files sont sauvegardés via la fonction sauvegarder_donnees dans le pipeline
except Exception as e:
logger.error(f"Erreur lors du rapport : {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
logger.info(f"Traitement terminé pour le ticket {ticket_id}")
def _enrichir_contexte(self, ticket_analysis: Dict[str, Any], ocr_info: Dict[str, Any]) -> Dict[str, Any]:
"""
Enrichit le contexte de l'analyse du ticket avec les informations OCR
Args:
ticket_analysis: Résultat de l'analyse du ticket
ocr_info: Informations OCR d'une image
Returns:
Contexte enrichi
"""
if not isinstance(ticket_analysis, dict):
return ticket_analysis
# Créer une copie pour ne pas modifier l'original
contexte_enrichi = ticket_analysis.copy()
# Ajouter les informations OCR
if ocr_info:
contexte_enrichi["ocr_info"] = ocr_info
# Utiliser la version anglaise du texte OCR pour l'analyse
if self.config.get("english_only") and ocr_info.get("texte_en"):
contexte_enrichi["ocr_text"] = ocr_info["texte_en"]
logger.debug(f"[CONTEXTE] Utilisation du texte OCR en anglais ({len(ocr_info['texte_en'])} caractères) pour enrichir le contexte")
else:
contexte_enrichi["ocr_text"] = ocr_info.get("texte_fr", "")
logger.debug(f"[CONTEXTE] Utilisation du texte OCR en français ({len(ocr_info.get('texte_fr', ''))}) caractères pour enrichir le contexte")
return contexte_enrichi
def _charger_ticket(self, json_path: Optional[str]) -> Optional[Dict[str, Any]]:
"""
Charge et prépare les données du ticket à partir d'un fichier JSON.
Args:
json_path: Chemin du fichier JSON
Returns:
Données du ticket chargées ou None en cas d'erreur
"""
if not json_path:
logger.warning("Aucun chemin JSON fourni")
return None
try:
ticket_data = self.ticket_loader.charger(json_path)
# Préparer le contenu du ticket à partir des messages
messages = ticket_data.get("messages", [])
contenu = []
# Ajouter le titre/description
if "name" in ticket_data:
contenu.append(f"TITRE: {ticket_data['name']}")
if "description" in ticket_data and ticket_data["description"] != "*Contenu non extractible*":
contenu.append(f"DESCRIPTION: {ticket_data['description']}")
# Ajouter chaque message
for msg in messages:
auteur = msg.get("author_id", "Inconnu")
date = msg.get("date", "")
msg_type = msg.get("message_type", "")
content = msg.get("content", "").strip()
if content:
contenu.append(f"\n[{date}] {auteur} ({msg_type}):")
contenu.append(content)
# Ajouter le contenu formaté au ticket_data
ticket_data["content"] = "\n".join(contenu)
logger.info(f"Données du ticket chargées depuis {json_path} avec {len(messages)} messages")
return ticket_data
except Exception as e:
logger.error(f"Erreur chargement JSON : {e}")
return None
def _trouver_extractions(self, ticket_path: str, ticket_id: str) -> List[str]:
return sorted(
[os.path.join(ticket_path, d) for d in os.listdir(ticket_path)
if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(ticket_id)],
key=lambda x: os.path.getmtime(x),
reverse=True
)
def _lister_images(self, dossier: str) -> List[str]:
"""
Liste toutes les images dans un dossier avec une reconnaissance étendue
des formats d'images.
Args:
dossier: Dossier contenant les images à analyser
Returns:
Liste des chemins d'images trouvées
"""
# Liste étendue des extensions d'images courantes
extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.tiff', '.tif']
images = []
# Parcourir le dossier pour trouver toutes les images
if os.path.exists(dossier):
for racine, _, fichiers in os.walk(dossier):
for f in fichiers:
# Vérifier l'extension du fichier (non sensible à la casse)
if any(f.lower().endswith(ext) for ext in extensions):
chemin_complet = os.path.join(racine, f)
# Vérifier que le fichier est bien une image valide et accessible
try:
from PIL import Image
with Image.open(chemin_complet) as img:
# S'assurer que c'est bien une image en vérifiant ses dimensions
width, height = img.size
if width > 0 and height > 0:
images.append(chemin_complet)
except Exception as e:
logger.warning(f"Image ignorée {f}: {str(e)}")
if not images:
logger.warning(f"Aucune image trouvée dans {dossier}")
else:
logger.info(f"{len(images)} images trouvées dans {dossier}")
return images