llm_ticket3/orchestrator_llama.py
2025-04-25 16:59:45 +02:00

1125 lines
60 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import logging
import time
import traceback
from typing import List, Dict, Any, Optional, Tuple, cast
from agents.base_agent import BaseAgent
from agents.llama_vision.agent_image_sorter import AgentImageSorter
from loaders.ticket_data_loader import TicketDataLoader
from utils.image_dedup import filtrer_images_uniques
from utils.ocr_utils import extraire_texte
from utils.translate_utils import fr_to_en, en_to_fr, sauvegarder_ocr_traduction
# Configuration du logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='orchestrator_llama.log', filemode='w')
logger = logging.getLogger("OrchestratorLlamaVision")
class OrchestratorLlamaVision:
"""Orchestrateur pour l'analyse des tickets avec llama_vision."""
def __init__(self,
output_dir: str = "output/",
ticket_agent: Optional[BaseAgent] = None,
image_sorter: Optional[BaseAgent] = None,
image_analyser: Optional[BaseAgent] = None,
report_generator: Optional[BaseAgent] = None,
vision_ocr: Optional[BaseAgent] = None,
config: Optional[Dict[str, Any]] = None):
"""Initialisation de l'orchestrateur."""
self.output_dir = output_dir
self.ticket_agent = ticket_agent
self.image_sorter = image_sorter
self.image_analyser = image_analyser
self.report_generator = report_generator
self.vision_ocr = vision_ocr
self.ticket_loader = TicketDataLoader()
self.config = {
"dedup_enabled": True,
"dedup_threshold": 5,
"save_results": True,
"debug_mode": False,
"reports_dir": "reports",
"ocr_enabled": True,
"ocr_llm_enabled": True,
"english_only": True,
"model_name": "llama3-vision-90b-instruct" # Nom du modèle par défaut
}
if config:
self.config.update(config)
# Assurer la cohérence des noms de modèles
if "model_name" in self.config:
self.config["model_name"] = self.config["model_name"].replace(".", "-").replace(":", "-").replace("_", "-")
logger.info(f"OrchestratorLlamaVision initialisé avec les paramètres: {self.config}")
def executer(self, ticket_id: Optional[str] = None):
ticket_path = os.path.join(self.output_dir, f"ticket_{ticket_id}")
if not os.path.exists(ticket_path):
logger.error(f"Le ticket {ticket_id} est introuvable dans {ticket_path}")
return
try:
self.traiter_ticket(ticket_path)
except Exception as e:
logger.error(f"Erreur globale sur le ticket {ticket_id}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
def executer_ticket(self, ticket_id: str, ocr_enabled: bool = True, ocr_avance_enabled: bool = True,
image_triage_enabled: bool = True, image_analyse_enabled: bool = True,
report_generation_enabled: bool = True) -> Dict[str, Any]:
"""
Exécute l'analyse d'un ticket avec des options pour activer/désactiver spécifiquement
chaque étape du processus d'analyse.
Args:
ticket_id: Identifiant du ticket à analyser
ocr_enabled: Activer l'OCR standard sur les images
ocr_avance_enabled: Activer l'OCR avancé via LLM sur les images
image_triage_enabled: Activer le tri des images pertinentes
image_analyse_enabled: Activer l'analyse des images
report_generation_enabled: Activer la génération du rapport final
Returns:
Dictionnaire contenant les résultats du traitement
"""
logger.info(f"=== Exécution du ticket {ticket_id} ===")
logger.info(f"Options: OCR={ocr_enabled}, OCR_Avancé={ocr_avance_enabled}, "
f"Tri_Images={image_triage_enabled}, Analyse_Images={image_analyse_enabled}, "
f"Rapport={report_generation_enabled}")
# Affichage dans le terminal pour suivi
print(f"=== Exécution du ticket {ticket_id} ===")
print(f"Options activées: OCR={ocr_enabled}, OCR_Avancé={ocr_avance_enabled}, "
f"Tri_Images={image_triage_enabled}, Analyse_Images={image_analyse_enabled}, "
f"Rapport={report_generation_enabled}")
ticket_path = os.path.join(self.output_dir, f"ticket_{ticket_id}")
if not os.path.exists(ticket_path):
logger.error(f"Le ticket {ticket_id} est introuvable dans {ticket_path}")
print(f"⚠️ Erreur: Le ticket {ticket_id} est introuvable")
return {"error": f"Ticket {ticket_id} introuvable"}
# Sauvegarder les configurations originales
original_ocr_enabled = self.config.get("ocr_enabled", True)
original_ocr_llm_enabled = self.config.get("ocr_llm_enabled", True)
# Mettre à jour temporairement les configurations
self.config["ocr_enabled"] = ocr_enabled
self.config["ocr_llm_enabled"] = ocr_avance_enabled
# Variables pour collecter tous les résultats
resultats = {
"ticket_analysis": None,
"image_triage": {},
"image_analysis": {},
"ocr_standard": {},
"ocr_advanced": {},
"report": None
}
try:
extractions = self._trouver_extractions(ticket_path, ticket_id)
if not extractions:
logger.warning(f"Aucune extraction trouvée pour le ticket {ticket_id}")
print(f"⚠️ Aucune extraction trouvée pour le ticket {ticket_id}")
return {"error": "Aucune extraction trouvée"}
extraction_path = extractions[0]
attachments_dir = os.path.join(extraction_path, "attachments")
rapport_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
os.makedirs(rapport_dir, exist_ok=True)
# Créer le répertoire pipeline une seule fois
pipeline_dir = os.path.join(rapport_dir, "pipeline")
os.makedirs(pipeline_dir, exist_ok=True)
# Récupérer le nom du modèle pour le logging
model_name = self.config.get("model_name", "llama3-vision-90b-instruct")
# Normaliser pour éviter les problèmes dans les noms de fichiers
model_name = model_name.replace(".", "-").replace(":", "-").replace("_", "-")
logger.info(f"Utilisation du modèle: {model_name}")
print(f"Modèle utilisé: {model_name}")
json_path = self.ticket_loader.trouver_ticket(extraction_path, ticket_id)
ticket_data = self._charger_ticket(json_path)
if not ticket_data:
logger.error(f"Impossible de charger les données du ticket {ticket_id}")
print(f"⚠️ Impossible de charger les données du ticket {ticket_id}")
return {"error": "Impossible de charger les données du ticket"}
# Ajouter le chemin du fichier JSON au ticket_data pour faciliter l'extraction du ticket_id
if json_path:
ticket_data["file_path"] = json_path
# Traduire le contenu du ticket en anglais avant l'analyse
if self.config.get("english_only", True) and ticket_data.get("content"):
logger.info(f"[TRADUCTION] Début de traduction du contenu du ticket {ticket_id} (FR → EN)")
ticket_data["content_original"] = ticket_data["content"]
# Vérifier si déjà en anglais
english_indicators = ["the ", "is ", "are ", "what ", "when ", "how ", "why "]
if any(indicator in ticket_data["content"].lower() for indicator in english_indicators):
logger.info("[TRADUCTION] Le contenu semble déjà être en anglais")
ticket_data["is_english"] = True
ticket_data["content_en"] = ticket_data["content"]
else:
logger.info("[TRADUCTION] Traduction du contenu en anglais")
print("Traduction du contenu du ticket en anglais...")
ticket_data["content_en"] = fr_to_en(ticket_data["content"])
ticket_data["is_english"] = False
# Étape 1: Analyser le ticket
if self.ticket_agent:
logger.info(f"Exécution de l'agent d'analyse de ticket pour {ticket_id}")
print(f"1⃣ Analyse du ticket {ticket_id}...")
try:
ticket_analysis = self.ticket_agent.executer(ticket_data)
resultats["ticket_analysis"] = ticket_analysis
# Sauvegarde des résultats d'analyse de ticket
if ticket_analysis:
from agents.utils.pipeline_logger import sauvegarder_donnees
sauvegarder_donnees(
ticket_id=ticket_id,
step_name="analyse_ticket",
data=ticket_analysis,
base_dir=rapport_dir,
is_resultat=True
)
logger.info(f"Analyse du ticket terminée et sauvegardée")
print(f"✅ Analyse du ticket terminée")
else:
logger.error(f"L'analyse du ticket n'a pas produit de résultat")
print(f"❌ L'analyse du ticket n'a pas produit de résultat")
except Exception as e:
logger.error(f"Erreur lors de l'analyse du ticket: {e}")
print(f"❌ Erreur lors de l'analyse du ticket: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
# Lister et filtrer les images
images = []
if os.path.exists(attachments_dir):
images = self._lister_images(attachments_dir)
if self.config.get("dedup_enabled", True):
images = filtrer_images_uniques(images, seuil_hamming=self.config["dedup_threshold"], ticket_id=ticket_id)
# OCR standard sur les images
ocr_results = {}
if ocr_enabled and images:
logger.info(f"Traitement OCR standard de {len(images)} images")
print(f"2⃣ OCR standard sur {len(images)} images...")
for img in images:
try:
ocr_fr, langue = extraire_texte(img, lang="auto")
# Traduire le texte extrait en anglais
ocr_en = fr_to_en(ocr_fr) if ocr_fr else ""
# Sauvegarder les résultats OCR
sauvegarder_ocr_traduction(img, ticket_id, ocr_fr, ocr_en, "", base_dir=rapport_dir)
# Stocker le résultat pour utilisation ultérieure
ocr_results[img] = {
"texte_fr": ocr_fr,
"texte_en": ocr_en,
"langue_detectee": langue
}
resultats["ocr_standard"][img] = ocr_results[img]
logger.info(f"OCR terminé pour {os.path.basename(img)}: {len(ocr_fr)} caractères ({langue})")
print(f" • OCR terminé: {os.path.basename(img)} ({len(ocr_fr)} caractères)")
except Exception as e:
logger.warning(f"Erreur OCR pour {os.path.basename(img)}: {e}")
print(f" ⚠️ Erreur OCR pour {os.path.basename(img)}")
print(f"✅ OCR standard terminé")
# Triage des images
relevant_images = []
images_analyses = {}
if image_triage_enabled and self.image_sorter and images:
logger.info(f"Tri de {len(images)} images")
print(f"3⃣ Tri de {len(images)} images...")
for img in images:
try:
ocr_context = ocr_results.get(img, {"texte_en": ""}).get("texte_en", "")
result_sort = self.image_sorter.executer(img, ocr_context=ocr_context)
# Vérifier si l'image est pertinente
is_relevant = result_sort.get("is_relevant", True)
if is_relevant:
relevant_images.append(img)
# Stocker les résultats
images_analyses[img] = {
"sorting": result_sort or {"is_relevant": True},
"analysis": None,
"ocr": ocr_results.get(img, {})
}
resultats["image_triage"][img] = result_sort
print(f" • Image {os.path.basename(img)}: {'Pertinente ✅' if is_relevant else 'Non pertinente ❌'}")
except Exception as e:
logger.warning(f"Erreur tri image {os.path.basename(img)}: {e}")
print(f" ⚠️ Erreur lors du tri de {os.path.basename(img)}")
# Sauvegarder les résultats de tri
if hasattr(self.image_sorter, "sauvegarder_resultats"):
try:
image_sorter = cast(AgentImageSorter, self.image_sorter)
image_sorter.sauvegarder_resultats()
logger.info(f"Résultats de tri sauvegardés")
print(f"✅ Résultats de tri sauvegardés ({len(relevant_images)} images pertinentes sur {len(images)})")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des résultats de tri: {e}")
print(f"⚠️ Erreur lors de la sauvegarde des résultats de tri")
else:
# Si pas de tri, toutes les images sont considérées pertinentes
relevant_images = images.copy()
for img in images:
images_analyses[img] = {
"sorting": {"is_relevant": True},
"analysis": None,
"ocr": ocr_results.get(img, {})
}
print(f" Tri des images désactivé, toutes les images ({len(images)}) sont considérées pertinentes")
# OCR avancé sur les images pertinentes
ocr_llm_results = {}
if ocr_avance_enabled and self.vision_ocr and relevant_images:
logger.info(f"OCR avancé sur {len(relevant_images)} images pertinentes")
print(f"4⃣ OCR avancé (LLM) sur {len(relevant_images)} images pertinentes...")
for img in relevant_images:
try:
ocr_baseline = ocr_results.get(img, {}).get("texte_en", "")
ocr_baseline_len = len(ocr_baseline)
# Exécuter l'OCR avancé via LLM
logger.info(f"[AGENT] OCR avancé sur l'image: {os.path.basename(img)}")
print(f" • Exécution de l'OCR avancé sur {os.path.basename(img)}...")
ocr_result = self.vision_ocr.executer(img, ocr_baseline=ocr_baseline)
if ocr_result:
ocr_llm_results[img] = ocr_result
resultats["ocr_advanced"][img] = ocr_result
# Mettre à jour les informations d'OCR dans images_analyses
if img in images_analyses:
images_analyses[img]["ocr_llm"] = ocr_result
extracted_text_len = len(ocr_result.get('extracted_text', ''))
logger.info(f"OCR avancé terminé pour {os.path.basename(img)}: {extracted_text_len} caractères")
print(f" ✅ OCR avancé terminé: {os.path.basename(img)} ({extracted_text_len} caractères)")
# Comparer avec l'OCR standard pour voir l'amélioration
if ocr_baseline_len > 0:
improvement = int((extracted_text_len - ocr_baseline_len) / ocr_baseline_len * 100)
logger.info(f"Amélioration OCR pour {os.path.basename(img)}: {improvement}% (Standard: {ocr_baseline_len}, LLM: {extracted_text_len})")
print(f" ↳ Amélioration: {improvement}% (Standard: {ocr_baseline_len}, LLM: {extracted_text_len})")
else:
logger.warning(f"Pas de résultat OCR avancé pour {os.path.basename(img)}")
print(f" ❌ Pas de résultat OCR avancé pour {os.path.basename(img)}")
except Exception as e:
logger.error(f"Erreur OCR avancé pour {os.path.basename(img)}: {e}")
print(f" ❌ Erreur OCR avancé pour {os.path.basename(img)}: {e}")
print(f"✅ OCR avancé terminé pour toutes les images pertinentes")
# Analyse des images pertinentes
if image_analyse_enabled and self.image_analyser and relevant_images:
logger.info(f"Début de l'analyse de {len(relevant_images)} images pertinentes")
print(f"5⃣ Analyse de {len(relevant_images)} images pertinentes...")
analyses_resultats = []
for img in relevant_images:
try:
# Préparer le contexte enrichi
ocr_info = ocr_results.get(img, {})
ocr_llm = ocr_llm_results.get(img, {})
contexte_enrichi = self._enrichir_contexte(
resultats["ticket_analysis"] if resultats["ticket_analysis"] else {},
ocr_info,
ocr_llm
)
# Analyser l'image
logger.info(f"Analyse de l'image: {os.path.basename(img)}")
print(f" • Analyse de l'image {os.path.basename(img)}...")
# Afficher les types de contexte disponibles pour cette image
has_ocr = bool(ocr_info and (ocr_info.get("texte_en") or ocr_info.get("texte_fr")))
has_ocr_llm = bool(ocr_llm and ocr_llm.get("extracted_text"))
has_ticket = bool(resultats["ticket_analysis"])
logger.debug(f"Contexte pour {os.path.basename(img)}: Ticket={has_ticket}, OCR={has_ocr}, OCR_LLM={has_ocr_llm}")
result = self.image_analyser.executer(img, contexte=contexte_enrichi)
if result:
images_analyses[img]["analysis"] = result
analyses_resultats.append(result)
resultats["image_analysis"][img] = result
logger.info(f"Analyse terminée pour {os.path.basename(img)}")
print(f" ✅ Analyse terminée pour {os.path.basename(img)}")
else:
logger.warning(f"Pas de résultat d'analyse pour {os.path.basename(img)}")
print(f" ❌ Pas de résultat d'analyse pour {os.path.basename(img)}")
except Exception as e:
logger.error(f"Erreur analyse image {os.path.basename(img)}: {e}")
print(f" ❌ Erreur lors de l'analyse de {os.path.basename(img)}: {e}")
# Sauvegarder les résultats d'analyse
if hasattr(self.image_analyser, "sauvegarder_resultats"):
try:
self.image_analyser.sauvegarder_resultats()
logger.info(f"Résultats d'analyse d'images sauvegardés")
print(f"✅ Résultats d'analyse d'images sauvegardés")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des résultats d'analyse: {e}")
print(f"⚠️ Erreur lors de la sauvegarde des résultats d'analyse")
else:
try:
from agents.utils.pipeline_logger import sauvegarder_donnees
sauvegarder_donnees(
ticket_id=ticket_id,
step_name="analyse_image",
data=analyses_resultats,
base_dir=rapport_dir,
is_resultat=True
)
logger.info(f"Analyse d'images sauvegardée via sauvegarder_donnees")
print(f"✅ Analyse d'images sauvegardée via sauvegarder_donnees")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des analyses d'images: {e}")
print(f"⚠️ Erreur lors de la sauvegarde des analyses d'images")
# Génération du rapport final
if report_generation_enabled and self.report_generator and resultats["ticket_analysis"]:
try:
# Normaliser le nom du modèle pour éviter les doublons de rapports
model_name = self.config.get("model_name", "").replace(".", "-").replace(":", "-").replace("_", "-")
if not model_name:
model_name = "llama3-vision-90b-instruct"
rapport_data = {
"ticket_id": ticket_id,
"ticket_data": ticket_data,
"ticket_analyse": resultats["ticket_analysis"],
"analyse_images": images_analyses,
"metadata": {
"model_name": model_name
}
}
logger.info(f"Génération du rapport final pour le ticket {ticket_id}")
print(f"6⃣ Génération du rapport final pour le ticket {ticket_id}...")
rapport_final = self.report_generator.executer(rapport_data)
resultats["report"] = rapport_final
logger.info(f"Rapport généré avec succès")
print(f"✅ Rapport généré avec succès")
except Exception as e:
logger.error(f"Erreur lors de la génération du rapport: {e}")
print(f"❌ Erreur lors de la génération du rapport: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
# Sauvegarder tous les résultats dans des fichiers JSON et TXT
if self.config.get("save_results", True):
try:
# Créer des dossiers pour chaque type de résultat
results_dir = os.path.join(rapport_dir, "results")
os.makedirs(results_dir, exist_ok=True)
print(f"7⃣ Sauvegarde des résultats...")
# Sauvegarder les résultats d'analyse de ticket
if resultats["ticket_analysis"]:
self._sauvegarder_resultat(
resultats["ticket_analysis"],
os.path.join(results_dir, f"analyse_ticket_{model_name}")
)
# Sauvegarder les résultats de tri d'images
if resultats["image_triage"]:
self._sauvegarder_resultat(
resultats["image_triage"],
os.path.join(results_dir, f"tri_image_{model_name}")
)
# Sauvegarder les résultats d'analyse d'images
if resultats["image_analysis"]:
self._sauvegarder_resultat(
resultats["image_analysis"],
os.path.join(results_dir, f"analyse_image_{model_name}")
)
# Sauvegarder les résultats OCR standard
if resultats["ocr_standard"]:
self._sauvegarder_resultat(
resultats["ocr_standard"],
os.path.join(results_dir, f"ocr_standard_{model_name}")
)
# Sauvegarder les résultats OCR avancé
if resultats["ocr_advanced"]:
self._sauvegarder_resultat(
resultats["ocr_advanced"],
os.path.join(results_dir, f"ocr_avance_{model_name}")
)
# Sauvegarder le rapport final
if resultats["report"]:
self._sauvegarder_resultat(
resultats["report"],
os.path.join(results_dir, f"rapport_{model_name}")
)
logger.info(f"Tous les résultats ont été sauvegardés dans {results_dir}")
print(f"✅ Tous les résultats ont été sauvegardés dans {results_dir}")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des résultats finaux: {e}")
print(f"⚠️ Erreur lors de la sauvegarde des résultats finaux: {e}")
logger.info(f"=== Traitement du ticket {ticket_id} terminé ===")
print(f"=== Traitement du ticket {ticket_id} terminé ===")
except Exception as e:
logger.error(f"Erreur globale sur le ticket {ticket_id}: {e}")
print(f"❌ Erreur globale sur le ticket {ticket_id}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
resultats["error"] = str(e)
finally:
# Restaurer les configurations originales
self.config["ocr_enabled"] = original_ocr_enabled
self.config["ocr_llm_enabled"] = original_ocr_llm_enabled
return resultats
def _sauvegarder_resultat(self, data: Any, chemin_base: str) -> None:
"""
Sauvegarde des données dans des fichiers JSON et TXT.
Args:
data: Données à sauvegarder
chemin_base: Chemin de base pour les fichiers (sans extension)
"""
try:
# Extraire le nom de fichier pour le logging
fichier_nom = os.path.basename(chemin_base)
# Extraire le type de données et le nom du modèle depuis le chemin
type_donnees = "inconnu"
modele_nom = "inconnu"
for prefix in ["analyse_ticket_", "tri_image_", "analyse_image_", "ocr_standard_", "ocr_avance_", "rapport_"]:
if fichier_nom.startswith(prefix):
type_donnees = prefix.replace("_", "")
modele_nom = fichier_nom[len(prefix):]
break
logger.info(f"Sauvegarde des données '{type_donnees}' avec modèle '{modele_nom}'")
# Vérifier si le répertoire existe, sinon le créer
repertoire = os.path.dirname(chemin_base)
if not os.path.exists(repertoire):
os.makedirs(repertoire, exist_ok=True)
logger.debug(f"Création du répertoire pour la sauvegarde: {repertoire}")
# Estimer la taille des données pour le logging
data_size = 0
if isinstance(data, dict):
data_size = len(data)
elif isinstance(data, list):
data_size = len(data)
# Sauvegarder au format JSON
json_path = f"{chemin_base}_results.json"
with open(json_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
# Sauvegarder au format TXT pour lisibilité humaine
txt_path = f"{chemin_base}_results.txt"
with open(txt_path, "w", encoding="utf-8") as f:
f.write(json.dumps(data, ensure_ascii=False, indent=2))
logger.info(f"Données sauvegardées avec succès: {type_donnees}_{modele_nom} ({data_size} éléments)")
logger.debug(f"Fichiers créés: {json_path} et {txt_path}")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des données: {e}")
logger.error(f"Chemin de base: {chemin_base}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
def traiter_ticket(self, ticket_path: str):
ticket_id = os.path.basename(ticket_path).replace("ticket_", "")
logger.info(f"Traitement du ticket {ticket_id}")
extractions = self._trouver_extractions(ticket_path, ticket_id)
if not extractions:
logger.warning(f"Aucune extraction trouvée pour le ticket {ticket_id}")
return
extraction_path = extractions[0]
attachments_dir = os.path.join(extraction_path, "attachments")
rapport_dir = os.path.join(extraction_path, f"{ticket_id}_rapports")
os.makedirs(rapport_dir, exist_ok=True)
# Créer le répertoire pipeline une seule fois
pipeline_dir = os.path.join(rapport_dir, "pipeline")
os.makedirs(pipeline_dir, exist_ok=True)
# Récupérer le nom du modèle pour le logging
model_name = self.config.get("model_name", "llama3-vision-90b-instruct")
# Normaliser pour éviter les problèmes dans les noms de fichiers
model_name = model_name.replace(".", "-").replace(":", "-").replace("_", "-")
logger.info(f"Utilisation du modèle: {model_name}")
json_path = self.ticket_loader.trouver_ticket(extraction_path, ticket_id)
ticket_data = self._charger_ticket(json_path)
if not ticket_data:
logger.error(f"Impossible de charger les données du ticket {ticket_id}")
return
# Ajouter le chemin du fichier JSON au ticket_data pour faciliter l'extraction du ticket_id
if json_path:
ticket_data["file_path"] = json_path
# Traduire le contenu du ticket en anglais avant l'analyse
# et stocker la version originale
if self.config.get("english_only", True) and ticket_data.get("content"):
logger.info(f"[TRADUCTION] Début de traduction du contenu du ticket {ticket_id} (FR → EN)")
logger.info(f"[TRADUCTION] Taille du contenu original: {len(ticket_data['content'])} caractères")
ticket_data["content_original"] = ticket_data["content"]
# Vérifier si le contenu est déjà en anglais (détection simple)
english_indicators = ["the ", "is ", "are ", "what ", "when ", "how ", "why "]
if any(indicator in ticket_data["content"].lower() for indicator in english_indicators):
logger.info("[TRADUCTION] Le contenu semble déjà être en anglais, pas de traduction nécessaire")
ticket_data["is_english"] = True
ticket_data["content_en"] = ticket_data["content"]
else:
# Traduire en anglais
logger.info("[TRADUCTION] Traduction du contenu original en anglais via fr_to_en")
ticket_data["content_en"] = fr_to_en(ticket_data["content"])
ticket_data["is_english"] = False
logger.info(f"[TRADUCTION] Traduction terminée: {len(ticket_data['content_en'])} caractères")
logger.info(f"[TRADUCTION] La clé 'content_en' sera utilisée par les agents pour l'analyse en anglais")
# Étape 1: Analyser le ticket (si l'agent est disponible)
ticket_analysis = None
if self.ticket_agent:
try:
logger.info(f"Exécution de l'agent d'analyse de ticket pour {ticket_id}")
ticket_analysis = self.ticket_agent.executer(ticket_data)
# Vérifier si l'analyse a été réalisée avec succès
if ticket_analysis:
logger.info(f"Analyse du ticket terminée: {len(ticket_analysis.get('response', ''))} caractères")
# Si le répertoire des rapports existe mais que le fichier d'analyse n'a pas été créé
# on force sa création ici
pipeline_dir = os.path.join(rapport_dir, "pipeline")
analyse_files = [f for f in os.listdir(pipeline_dir)
if f.startswith("analyse_ticket_") and f.endswith("_results.json")]
if not analyse_files:
logger.warning("Aucun fichier d'analyse de ticket trouvé, tentative de sauvegarde forcée")
from agents.utils.pipeline_logger import sauvegarder_donnees
sauvegarder_donnees(
ticket_id=ticket_id,
step_name="analyse_ticket",
data=ticket_analysis,
base_dir=rapport_dir,
is_resultat=True
)
else:
logger.error(f"L'analyse du ticket {ticket_id} n'a pas produit de résultat")
except Exception as e:
logger.error(f"Erreur lors de l'analyse du ticket {ticket_id}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
images_analyses, relevant_images = {}, []
ocr_results = {}
if os.path.exists(attachments_dir):
images = self._lister_images(attachments_dir)
if self.config.get("dedup_enabled", True):
images = filtrer_images_uniques(images, seuil_hamming=self.config["dedup_threshold"], ticket_id=ticket_id)
# Réaliser l'OCR sur toutes les images avant le tri
if self.config.get("ocr_enabled", True):
logger.info(f"Traitement OCR de {len(images)} images")
for img in images:
try:
ocr_fr, langue = extraire_texte(img, lang="auto")
# Traduire le texte extrait en anglais pour une meilleure analyse
logger.info(f"[TRADUCTION] Traduction OCR pour {os.path.basename(img)} (FR → EN)")
logger.info(f"[TRADUCTION] Texte OCR original (FR): {len(ocr_fr)} caractères")
ocr_en = fr_to_en(ocr_fr) if ocr_fr else ""
logger.info(f"[TRADUCTION] Texte OCR traduit (EN): {len(ocr_en)} caractères")
# Traduire à nouveau en français pour vérification (optionnel)
logger.info(f"[TRADUCTION] Traduction OCR inverse pour validation (EN → FR)")
ocr_en_back_fr = en_to_fr(ocr_en) if ocr_en else ""
logger.info(f"[TRADUCTION] Texte OCR retraduit (FR): {len(ocr_en_back_fr)} caractères")
# Sauvegarder les résultats OCR directement dans le répertoire pipeline
# au lieu de créer un sous-répertoire T11143
sauvegarder_ocr_traduction(img, ticket_id, ocr_fr, ocr_en, ocr_en_back_fr, base_dir=rapport_dir)
# Stocker le résultat de l'OCR pour utilisation ultérieure
ocr_results[img] = {
"texte_fr": ocr_fr,
"texte_en": ocr_en,
"langue_detectee": langue
}
logger.info(f"OCR terminé pour {os.path.basename(img)}: {len(ocr_fr)} caractères ({langue})")
except Exception as e:
logger.warning(f"Erreur OCR pour {os.path.basename(img)}: {e}")
ocr_results[img] = {"texte_fr": "", "texte_en": "", "langue_detectee": "unknown"}
# Traiter toutes les images avec l'agent de tri
if self.image_sorter:
logger.info(f"Traitement de {len(images)} images uniques avec l'agent de tri")
# Trier toutes les images et collecter les résultats
for img in images:
try:
# Inclure l'OCR avec le chemin de l'image pour aider au tri
ocr_context = ocr_results.get(img, {"texte_en": ""}).get("texte_en", "")
logger.info(f"[AGENT] Transmission à l'agent de tri: image={os.path.basename(img)}, OCR EN={len(ocr_context)} caractères")
result_sort = self.image_sorter.executer(img, ocr_context=ocr_context)
# Déterminer si l'image est pertinente
is_relevant = result_sort.get("is_relevant", True)
if is_relevant:
relevant_images.append(img)
# Stocker le résultat pour l'analyse ultérieure
images_analyses[img] = {
"sorting": result_sort or {"is_relevant": True},
"analysis": None,
"ocr": ocr_results.get(img, {})
}
except Exception as e:
logger.warning(f"Erreur tri image {os.path.basename(img)}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
# Sauvegarder tous les résultats en une seule fois pour éviter les doublons
if self.image_sorter and hasattr(self.image_sorter, "sauvegarder_resultats"):
try:
# Cast l'agent en AgentImageSorter pour satisfaire le linter
image_sorter = cast(AgentImageSorter, self.image_sorter)
# Méthode sauvegarder_resultats améliorée pour accumuler les résultats
image_sorter.sauvegarder_resultats()
logger.info(f"Sauvegarde groupée de {len(images)} résultats de tri d'images effectuée")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde groupée des résultats de tri: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
else:
logger.warning("L'agent de tri d'images ne dispose pas de la méthode sauvegarder_resultats")
else:
# Si pas d'agent de tri, considérer toutes les images comme pertinentes
relevant_images = images.copy()
for img in images:
images_analyses[img] = {
"sorting": {"is_relevant": True},
"analysis": None,
"ocr": ocr_results.get(img, {})
}
# Traiter les images pertinentes avec l'agent d'OCR avancé (vision_ocr)
ocr_llm_results = {}
if self.vision_ocr and self.config.get("ocr_llm_enabled", True) and relevant_images:
logger.info(f"Traitement OCR avancé avec LLM pour {len(relevant_images)} images pertinentes")
for img in relevant_images:
try:
# Récupérer le texte OCR standard à titre de référence
ocr_baseline = ocr_results.get(img, {}).get("texte_en", "")
# Exécuter l'OCR avancé via LLM
logger.info(f"[AGENT] OCR avancé sur l'image: {os.path.basename(img)}")
ocr_result = self.vision_ocr.executer(img, ocr_baseline=ocr_baseline)
if ocr_result:
# Stocker le résultat pour l'enrichissement du contexte
ocr_llm_results[img] = ocr_result
# Mettre à jour les informations d'OCR dans images_analyses
if img in images_analyses:
images_analyses[img]["ocr_llm"] = ocr_result
logger.info(f"OCR avancé terminé pour {os.path.basename(img)}: {len(ocr_result.get('extracted_text', ''))} caractères")
else:
logger.warning(f"Pas de résultat OCR avancé pour {os.path.basename(img)}")
except Exception as e:
logger.error(f"Erreur OCR avancé pour {os.path.basename(img)}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
# Analyser les images pertinentes avec l'agent d'analyse d'images
if self.image_analyser:
logger.info(f"Début de l'analyse des images avec {len(relevant_images)} images pertinentes")
analyses_resultats = [] # Pour accumuler les résultats
for img in relevant_images:
try:
# Préparer le contexte enrichi avec OCR standard et OCR LLM
ocr_info = ocr_results.get(img, {})
ocr_llm = ocr_llm_results.get(img, {})
contexte_enrichi = self._enrichir_contexte(
ticket_analysis if ticket_analysis else {},
ocr_info,
ocr_llm
)
logger.info(f"[AGENT] Analyse de l'image: {os.path.basename(img)}")
ocr_llm_len = len(contexte_enrichi.get("ocr_llm", ""))
logger.info(f"[AGENT] Contexte transmis: ticket_analysis={bool(ticket_analysis)}, OCR_FR={len(ocr_info.get('texte_fr', ''))}, OCR_EN={len(ocr_info.get('texte_en', ''))}, OCR_LLM={ocr_llm_len}")
result = self.image_analyser.executer(img, contexte=contexte_enrichi)
if result:
images_analyses[img]["analysis"] = result
analyses_resultats.append(result)
logger.info(f"Analyse terminée pour {os.path.basename(img)}")
else:
logger.warning(f"Pas de résultat d'analyse pour {os.path.basename(img)}")
except Exception as e:
logger.error(f"Erreur analyse image {os.path.basename(img)}: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
# Sauvegarder les résultats via la méthode sauvegarder_resultats si disponible
if hasattr(self.image_analyser, "sauvegarder_resultats"):
try:
self.image_analyser.sauvegarder_resultats()
logger.info(f"Sauvegarde des résultats d'analyse d'images via sauvegarder_resultats")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des résultats d'analyse d'images: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
else:
# Fallback vers sauvegarder_donnees si sauvegarder_resultats n'est pas disponible
try:
from agents.utils.pipeline_logger import sauvegarder_donnees
sauvegarder_donnees(
ticket_id=ticket_id,
step_name="analyse_image",
data=analyses_resultats,
base_dir=rapport_dir,
is_resultat=True
)
logger.info(f"Sauvegarde de {len(analyses_resultats)} analyses d'images effectuée via sauvegarder_donnees")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des analyses d'images: {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
if self.report_generator and ticket_analysis:
try:
# Normaliser le nom du modèle pour éviter les doublons de rapports
model_name = self.config.get("model_name", "").replace(".", "-").replace(":", "-").replace("_", "-")
if not model_name:
model_name = "llama3-vision-90b-instruct"
rapport_data = {
"ticket_id": ticket_id,
"ticket_data": ticket_data,
"ticket_analyse": ticket_analysis,
"analyse_images": images_analyses,
"metadata": {
"model_name": model_name
}
}
logger.info(f"[AGENT] Transmission au générateur de rapport: ticket_id={ticket_id}, ticket_analyse={bool(ticket_analysis)}, images_analysées={len(images_analyses)}")
logger.info(f"[AGENT] Données du ticket transmises: originales (FR) et traduites (EN)")
# Utiliser directement le répertoire rapports_dir au lieu de recréer un chemin
rapport_final = self.report_generator.executer(rapport_data)
# Le rapport_generator est responsable de sauvegarder les fichiers
# Nous n'avons pas besoin de dupliquer les sauvegardes ici
# Les files sont sauvegardés via la fonction sauvegarder_donnees dans le pipeline
except Exception as e:
logger.error(f"Erreur lors du rapport : {e}")
if self.config.get("debug_mode"):
logger.error(traceback.format_exc())
logger.info(f"Traitement terminé pour le ticket {ticket_id}")
def _enrichir_contexte(self, ticket_analyse: Dict[str, Any], ocr_info: Dict[str, Any],
ocr_llm: Dict[str, Any]) -> Dict[str, Any]:
"""
Enrichit le contexte pour l'analyse d'image avec les informations du ticket et OCR.
Args:
ticket_analyse: Résultat de l'analyse du ticket
ocr_info: Résultat OCR standard
ocr_llm: Résultat OCR avancé
Returns:
Dictionnaire contenant le contexte enrichi
"""
contexte = {}
# Extraire les informations pertinentes de l'analyse du ticket
if ticket_analyse:
# Copier certaines informations clés sans tout inclure
for clé in ["ticket_id", "issue_summary", "client_info", "category", "priority"]:
if clé in ticket_analyse:
contexte[f"ticket_{clé}"] = ticket_analyse[clé]
# Inclure les problèmes identifiés s'ils existent
if "identified_issues" in ticket_analyse:
contexte["ticket_issues"] = ticket_analyse["identified_issues"]
# Ajouter le texte OCR standard
if ocr_info:
if "texte_en" in ocr_info and ocr_info["texte_en"]:
contexte["ocr_text_en"] = ocr_info["texte_en"]
if "texte_fr" in ocr_info and ocr_info["texte_fr"]:
contexte["ocr_text_fr"] = ocr_info["texte_fr"]
if "langue_detectee" in ocr_info:
contexte["ocr_detected_language"] = ocr_info["langue_detectee"]
# Ajouter le texte OCR avancé (LLM)
if ocr_llm:
# Vérifier si OCR LLM contient directement du texte ou une structure
if isinstance(ocr_llm, str):
contexte["ocr_llm_text"] = ocr_llm
else:
# Chercher des champs spécifiques dans le résultat OCR LLM
for field in ["text", "text_content", "extracted_text", "content"]:
if field in ocr_llm and ocr_llm[field]:
contexte["ocr_llm_text"] = ocr_llm[field]
break
# Si on a un résultat structuré, l'inclure aussi
if "structured_content" in ocr_llm:
contexte["ocr_llm_structured"] = ocr_llm["structured_content"]
logger.debug(f"Contexte enrichi créé avec {len(contexte)} éléments")
return contexte
def _charger_ticket(self, json_path: Optional[str]) -> Optional[Dict[str, Any]]:
"""
Charge et prépare les données du ticket à partir d'un fichier JSON.
Args:
json_path: Chemin du fichier JSON
Returns:
Données du ticket chargées ou None en cas d'erreur
"""
if not json_path:
logger.warning("Aucun chemin JSON fourni")
return None
try:
ticket_data = self.ticket_loader.charger(json_path)
# Préparer le contenu du ticket à partir des messages
messages = ticket_data.get("messages", [])
contenu = []
# Ajouter le titre/description
if "name" in ticket_data:
contenu.append(f"TITRE: {ticket_data['name']}")
if "description" in ticket_data and ticket_data["description"] != "*Contenu non extractible*":
contenu.append(f"DESCRIPTION: {ticket_data['description']}")
# Ajouter chaque message
for msg in messages:
auteur = msg.get("author_id", "Inconnu")
date = msg.get("date", "")
msg_type = msg.get("message_type", "")
content = msg.get("content", "").strip()
if content:
contenu.append(f"\n[{date}] {auteur} ({msg_type}):")
contenu.append(content)
# Ajouter le contenu formaté au ticket_data
ticket_data["content"] = "\n".join(contenu)
logger.info(f"Données du ticket chargées depuis {json_path} avec {len(messages)} messages")
return ticket_data
except Exception as e:
logger.error(f"Erreur chargement JSON : {e}")
return None
def _trouver_extractions(self, ticket_path: str, ticket_id: str) -> List[str]:
return sorted(
[os.path.join(ticket_path, d) for d in os.listdir(ticket_path)
if os.path.isdir(os.path.join(ticket_path, d)) and d.startswith(ticket_id)],
key=lambda x: os.path.getmtime(x),
reverse=True
)
def _lister_images(self, dossier: str) -> List[str]:
"""
Liste toutes les images dans un dossier avec une reconnaissance étendue
des formats d'images.
Args:
dossier: Dossier contenant les images à analyser
Returns:
Liste des chemins d'images trouvées
"""
# Liste étendue des extensions d'images courantes
extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp', '.webp', '.tiff', '.tif']
images = []
# Parcourir le dossier pour trouver toutes les images
if os.path.exists(dossier):
for racine, _, fichiers in os.walk(dossier):
for f in fichiers:
# Vérifier l'extension du fichier (non sensible à la casse)
if any(f.lower().endswith(ext) for ext in extensions):
chemin_complet = os.path.join(racine, f)
# Vérifier que le fichier est bien une image valide et accessible
try:
from PIL import Image
with Image.open(chemin_complet) as img:
# S'assurer que c'est bien une image en vérifiant ses dimensions
width, height = img.size
if width > 0 and height > 0:
images.append(chemin_complet)
except Exception as e:
logger.warning(f"Image ignorée {f}: {str(e)}")
if not images:
logger.warning(f"Aucune image trouvée dans {dossier}")
else:
logger.info(f"{len(images)} images trouvées dans {dossier}")
return images
def _sauvegarder_resultats(self, resultats_analyses_tickets, resultats_analyses_images, resultats_tri_images=None,
ticket_id=None, resultats_ocr=None, resultats_ocr_llm=None, resultats_rapports=None):
"""
Sauvegarde les différents résultats générés pendant l'analyse du ticket
Args:
resultats_analyses_tickets: Résultats de l'analyse du ticket
resultats_analyses_images: Résultats de l'analyse des images
resultats_tri_images: Résultats du tri des images
ticket_id: ID du ticket
resultats_ocr: Résultats OCR standard
resultats_ocr_llm: Résultats OCR avancé
resultats_rapports: Résultats de la génération de rapports
"""
logger.info("Sauvegarde des résultats")
os.makedirs(self.output_dir, exist_ok=True)
# Sauvegarde de l'analyse du ticket
if resultats_analyses_tickets:
chemin_fichier = os.path.join(self.output_dir, f"analyse_ticket{self._format_fichier(ticket_id)}")
try:
with open(chemin_fichier + ".json", "w") as file:
json.dump(resultats_analyses_tickets, file, indent=2)
with open(chemin_fichier + ".txt", "w") as file:
file.write(json.dumps(resultats_analyses_tickets, indent=2))
logger.debug(f"Analyse du ticket sauvegardée dans {chemin_fichier}(.json/.txt)")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde de l'analyse du ticket: {e}")
# Sauvegarde de l'analyse des images
if resultats_analyses_images:
chemin_fichier = os.path.join(self.output_dir, f"analyse_image{self._format_fichier(ticket_id)}")
try:
with open(chemin_fichier + ".json", "w") as file:
json.dump(resultats_analyses_images, file, indent=2)
with open(chemin_fichier + ".txt", "w") as file:
file.write(json.dumps(resultats_analyses_images, indent=2))
logger.debug(f"Analyse des images sauvegardée dans {chemin_fichier}(.json/.txt)")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde de l'analyse des images: {e}")
# Sauvegarde du tri des images
if resultats_tri_images:
chemin_fichier = os.path.join(self.output_dir, f"tri_image{self._format_fichier(ticket_id)}")
try:
with open(chemin_fichier + ".json", "w") as file:
json.dump(resultats_tri_images, file, indent=2)
with open(chemin_fichier + ".txt", "w") as file:
file.write(json.dumps(resultats_tri_images, indent=2))
logger.debug(f"Tri des images sauvegardé dans {chemin_fichier}(.json/.txt)")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde du tri des images: {e}")
# Sauvegarde des résultats OCR standard
if resultats_ocr:
chemin_fichier = os.path.join(self.output_dir, f"ocr{self._format_fichier(ticket_id)}")
try:
with open(chemin_fichier + ".json", "w") as file:
json.dump(resultats_ocr, file, indent=2)
with open(chemin_fichier + ".txt", "w") as file:
file.write(json.dumps(resultats_ocr, indent=2))
logger.debug(f"Résultats OCR sauvegardés dans {chemin_fichier}(.json/.txt)")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des résultats OCR: {e}")
# Sauvegarde des résultats OCR avancé
if resultats_ocr_llm:
chemin_fichier = os.path.join(self.output_dir, f"ocr_llm{self._format_fichier(ticket_id)}")
try:
with open(chemin_fichier + ".json", "w") as file:
json.dump(resultats_ocr_llm, file, indent=2)
with open(chemin_fichier + ".txt", "w") as file:
file.write(json.dumps(resultats_ocr_llm, indent=2))
logger.debug(f"Résultats OCR avancé sauvegardés dans {chemin_fichier}(.json/.txt)")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des résultats OCR avancé: {e}")
# Sauvegarde des rapports
if resultats_rapports:
chemin_fichier = os.path.join(self.output_dir, f"rapport{self._format_fichier(ticket_id)}")
try:
with open(chemin_fichier + ".json", "w") as file:
json.dump(resultats_rapports, file, indent=2)
with open(chemin_fichier + ".txt", "w") as file:
file.write(json.dumps(resultats_rapports, indent=2))
logger.debug(f"Rapport sauvegardé dans {chemin_fichier}(.json/.txt)")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde du rapport: {e}")
def _format_fichier(self, ticket_id):
return f"_{ticket_id}" if ticket_id else ""