mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-15 19:06:50 +01:00
385 lines
17 KiB
Python
385 lines
17 KiB
Python
from .base_agent import BaseAgent
|
|
import logging
|
|
import os
|
|
from typing import Dict, Any, Tuple
|
|
from PIL import Image
|
|
import base64
|
|
import io
|
|
|
|
logger = logging.getLogger("AgentImageSorter")
|
|
|
|
class AgentImageSorter(BaseAgent):
|
|
"""
|
|
Agent pour trier les images et identifier celles qui sont pertinentes.
|
|
"""
|
|
def __init__(self, llm):
|
|
super().__init__("AgentImageSorter", llm)
|
|
|
|
# Configuration locale de l'agent (remplace AgentConfig)
|
|
self.temperature = 0.2
|
|
self.top_p = 0.8
|
|
self.max_tokens = 300
|
|
self.system_prompt = """Tu es un expert en tri d'images pour le support technique de BRG_Lab.
|
|
Ta mission est de déterminer si une image est pertinente pour le support technique de logiciels.
|
|
|
|
Images PERTINENTES (réponds "oui" ou "pertinent"):
|
|
- Captures d'écran de logiciels ou d'interfaces
|
|
- logo BRG_LAB
|
|
- Référence à "logociel"
|
|
- Messages d'erreur
|
|
- Configurations système
|
|
- Tableaux de bord ou graphiques techniques
|
|
- Fenêtres de diagnostic
|
|
|
|
Images NON PERTINENTES (réponds "non" ou "non pertinent"):
|
|
- Photos personnelles
|
|
- Images marketing/promotionnelles
|
|
- Logos ou images de marque
|
|
- Paysages, personnes ou objets non liés à l'informatique
|
|
|
|
IMPORTANT: Ne commence JAMAIS ta réponse par "Je ne peux pas directement visualiser l'image".
|
|
Si tu ne peux pas analyser l'image, réponds simplement "ERREUR: Impossible d'analyser l'image".
|
|
|
|
Analyse d'abord ce que montre l'image, puis réponds par "oui"/"pertinent" ou "non"/"non pertinent"."""
|
|
|
|
# Appliquer la configuration au LLM
|
|
self._appliquer_config_locale()
|
|
|
|
logger.info("AgentImageSorter initialisé")
|
|
|
|
def _appliquer_config_locale(self) -> None:
|
|
"""
|
|
Applique la configuration locale au modèle LLM.
|
|
"""
|
|
# Appliquer le prompt système
|
|
if hasattr(self.llm, "prompt_system"):
|
|
self.llm.prompt_system = self.system_prompt
|
|
|
|
# Appliquer les paramètres
|
|
if hasattr(self.llm, "configurer"):
|
|
params = {
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens
|
|
}
|
|
|
|
# Ajustements selon le type de modèle
|
|
if "mistral_medium" in self.llm.__class__.__name__.lower():
|
|
params["temperature"] += 0.05
|
|
params["max_tokens"] = 1000
|
|
elif "pixtral" in self.llm.__class__.__name__.lower():
|
|
params["temperature"] -= 0.05
|
|
elif "ollama" in self.llm.__class__.__name__.lower():
|
|
params["temperature"] += 0.1
|
|
params.update({
|
|
"num_ctx": 2048,
|
|
"repeat_penalty": 1.1,
|
|
})
|
|
|
|
self.llm.configurer(**params)
|
|
|
|
def _verifier_image(self, image_path: str) -> bool:
|
|
"""
|
|
Vérifie si l'image existe et est accessible
|
|
|
|
Args:
|
|
image_path: Chemin vers l'image
|
|
|
|
Returns:
|
|
True si l'image existe et est accessible, False sinon
|
|
"""
|
|
try:
|
|
# Vérifier que le fichier existe
|
|
if not os.path.exists(image_path):
|
|
logger.error(f"L'image n'existe pas: {image_path}")
|
|
return False
|
|
|
|
# Vérifier que le fichier est accessible en lecture
|
|
if not os.access(image_path, os.R_OK):
|
|
logger.error(f"L'image n'est pas accessible en lecture: {image_path}")
|
|
return False
|
|
|
|
# Vérifier que le fichier peut être ouvert comme une image
|
|
with Image.open(image_path) as img:
|
|
# Vérifier les dimensions de l'image
|
|
width, height = img.size
|
|
if width <= 0 or height <= 0:
|
|
logger.error(f"Dimensions d'image invalides: {width}x{height}")
|
|
return False
|
|
|
|
logger.info(f"Image vérifiée avec succès: {image_path} ({width}x{height})")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la vérification de l'image {image_path}: {str(e)}")
|
|
return False
|
|
|
|
def _encoder_image_base64(self, image_path: str) -> str:
|
|
"""
|
|
Encode l'image en base64 pour l'inclure directement dans le prompt
|
|
|
|
Args:
|
|
image_path: Chemin vers l'image
|
|
|
|
Returns:
|
|
Chaîne de caractères au format data URI avec l'image encodée en base64
|
|
"""
|
|
try:
|
|
# Ouvrir l'image et la redimensionner si trop grande
|
|
with Image.open(image_path) as img:
|
|
# Redimensionner l'image si elle est trop grande (max 800x800)
|
|
max_size = 800
|
|
if img.width > max_size or img.height > max_size:
|
|
img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
|
|
|
|
# Convertir en RGB si nécessaire (pour les formats comme PNG)
|
|
if img.mode != "RGB":
|
|
img = img.convert("RGB")
|
|
|
|
# Sauvegarder l'image en JPEG dans un buffer mémoire
|
|
buffer = io.BytesIO()
|
|
img.save(buffer, format="JPEG", quality=85)
|
|
buffer.seek(0)
|
|
|
|
# Encoder en base64
|
|
img_base64 = base64.b64encode(buffer.read()).decode("utf-8")
|
|
|
|
# Construire le data URI
|
|
data_uri = f"data:image/jpeg;base64,{img_base64}"
|
|
|
|
return data_uri
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'encodage de l'image {image_path}: {str(e)}")
|
|
return ""
|
|
|
|
def executer(self, image_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Évalue si une image est pertinente pour l'analyse d'un ticket technique
|
|
|
|
Args:
|
|
image_path: Chemin vers l'image à analyser
|
|
|
|
Returns:
|
|
Dictionnaire contenant la décision de pertinence, l'analyse et les métadonnées
|
|
"""
|
|
image_name = os.path.basename(image_path)
|
|
logger.info(f"Évaluation de la pertinence de l'image: {image_name}")
|
|
print(f" AgentImageSorter: Évaluation de {image_name}")
|
|
|
|
# Vérifier que l'image existe et est accessible
|
|
if not self._verifier_image(image_path):
|
|
error_message = f"L'image n'est pas accessible ou n'est pas valide: {image_name}"
|
|
logger.error(error_message)
|
|
print(f" ERREUR: {error_message}")
|
|
|
|
return {
|
|
"is_relevant": False,
|
|
"reason": f"Erreur d'accès: {error_message}",
|
|
"raw_response": "",
|
|
"error": True,
|
|
"metadata": {
|
|
"image_path": image_path,
|
|
"image_name": image_name,
|
|
"timestamp": self._get_timestamp(),
|
|
"error": True
|
|
}
|
|
}
|
|
|
|
# Utiliser une référence au fichier image que le modèle peut comprendre
|
|
try:
|
|
# Préparation du prompt
|
|
prompt = f"""Est-ce une image pertinente pour un ticket de support technique?
|
|
Réponds simplement par 'oui' ou 'non' suivi d'une brève explication."""
|
|
|
|
# Utiliser la méthode interroger_avec_image au lieu de interroger
|
|
if hasattr(self.llm, "interroger_avec_image"):
|
|
logger.info(f"Utilisation de la méthode interroger_avec_image pour {image_name}")
|
|
response = self.llm.interroger_avec_image(image_path, prompt)
|
|
else:
|
|
# Fallback vers la méthode standard avec base64 si interroger_avec_image n'existe pas
|
|
logger.warning(f"La méthode interroger_avec_image n'existe pas, utilisation du fallback pour {image_name}")
|
|
img_base64 = self._encoder_image_base64(image_path)
|
|
if img_base64:
|
|
prompt_base64 = f"""Analyse cette image:
|
|
{img_base64}
|
|
|
|
Est-ce une image pertinente pour un ticket de support technique?
|
|
Réponds simplement par 'oui' ou 'non' suivi d'une brève explication."""
|
|
response = self.llm.interroger(prompt_base64)
|
|
else:
|
|
error_message = "Impossible d'encoder l'image en base64"
|
|
logger.error(f"Erreur d'analyse pour {image_name}: {error_message}")
|
|
print(f" ERREUR: {error_message}")
|
|
|
|
return {
|
|
"is_relevant": False,
|
|
"reason": f"Erreur d'analyse: {error_message}",
|
|
"raw_response": "",
|
|
"error": True,
|
|
"metadata": {
|
|
"image_path": image_path,
|
|
"image_name": image_name,
|
|
"timestamp": self._get_timestamp(),
|
|
"error": True
|
|
}
|
|
}
|
|
|
|
# Vérifier si la réponse contient des indications que le modèle ne peut pas analyser l'image
|
|
error_phrases = [
|
|
"je ne peux pas directement visualiser",
|
|
"je n'ai pas accès à l'image",
|
|
"je ne peux pas voir l'image",
|
|
"sans accès direct à l'image",
|
|
"je n'ai pas la possibilité de voir",
|
|
"je ne peux pas accéder directement",
|
|
"erreur: impossible d'analyser l'image"
|
|
]
|
|
|
|
# Vérifier si une des phrases d'erreur est présente dans la réponse
|
|
if any(phrase in response.lower() for phrase in error_phrases):
|
|
logger.warning(f"Le modèle indique qu'il ne peut pas analyser l'image: {image_name}")
|
|
error_message = "Le modèle n'a pas pu analyser l'image correctement"
|
|
logger.error(f"Erreur d'analyse pour {image_name}: {error_message}")
|
|
print(f" ERREUR: {error_message}")
|
|
|
|
# Retourner un résultat d'erreur explicite
|
|
return {
|
|
"is_relevant": False,
|
|
"reason": f"Erreur d'analyse: {error_message}",
|
|
"raw_response": response,
|
|
"error": True,
|
|
"metadata": {
|
|
"image_path": image_path,
|
|
"image_name": image_name,
|
|
"timestamp": self._get_timestamp(),
|
|
"error": True
|
|
}
|
|
}
|
|
|
|
# Analyse de la réponse pour déterminer la pertinence
|
|
is_relevant, reason = self._analyser_reponse(response)
|
|
|
|
logger.info(f"Image {image_name} considérée comme {'pertinente' if is_relevant else 'non pertinente'}")
|
|
print(f" Décision: Image {image_name} {'pertinente' if is_relevant else 'non pertinente'}")
|
|
|
|
# Préparer le résultat
|
|
result = {
|
|
"is_relevant": is_relevant,
|
|
"reason": reason,
|
|
"raw_response": response,
|
|
"metadata": {
|
|
"image_path": image_path,
|
|
"image_name": image_name,
|
|
"timestamp": self._get_timestamp(),
|
|
"model_info": {
|
|
"model": getattr(self.llm, "modele", str(type(self.llm))),
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens
|
|
}
|
|
}
|
|
}
|
|
|
|
# Enregistrer la décision et le raisonnement dans l'historique
|
|
self.ajouter_historique("tri_image",
|
|
{
|
|
"image_path": image_path,
|
|
"prompt": prompt
|
|
},
|
|
{
|
|
"response": response,
|
|
"is_relevant": is_relevant,
|
|
"reason": reason
|
|
})
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'analyse de l'image {image_name}: {str(e)}")
|
|
print(f" ERREUR: Impossible d'analyser l'image {image_name}")
|
|
|
|
# Retourner un résultat par défaut en cas d'erreur
|
|
return {
|
|
"is_relevant": False, # Par défaut, considérer non pertinent en cas d'erreur
|
|
"reason": f"Erreur d'analyse: {str(e)}",
|
|
"raw_response": "",
|
|
"error": True,
|
|
"metadata": {
|
|
"image_path": image_path,
|
|
"image_name": image_name,
|
|
"timestamp": self._get_timestamp(),
|
|
"error": True
|
|
}
|
|
}
|
|
|
|
def _analyser_reponse(self, response: str) -> Tuple[bool, str]:
|
|
"""
|
|
Analyse la réponse du LLM pour déterminer la pertinence et extraire le raisonnement
|
|
|
|
Args:
|
|
response: Réponse brute du LLM
|
|
|
|
Returns:
|
|
Tuple (is_relevant, reason) contenant la décision et le raisonnement
|
|
"""
|
|
# Convertir en minuscule pour faciliter la comparaison
|
|
response_lower = response.lower()
|
|
|
|
# Détection directe des réponses négatives en début de texte
|
|
first_line = response_lower.split('\n')[0] if '\n' in response_lower else response_lower[:50]
|
|
starts_with_non = first_line.strip().startswith("non") or first_line.strip().startswith("non.")
|
|
|
|
# Détection explicite d'une réponse négative au début de la réponse
|
|
explicit_negative = starts_with_non or any(neg_start in first_line for neg_start in ["non pertinent", "pas pertinent"])
|
|
|
|
# Détection explicite d'une réponse positive au début de la réponse
|
|
explicit_positive = first_line.strip().startswith("oui") or first_line.strip().startswith("pertinent")
|
|
|
|
# Si une réponse explicite est détectée, l'utiliser directement
|
|
if explicit_negative:
|
|
is_relevant = False
|
|
elif explicit_positive:
|
|
is_relevant = True
|
|
else:
|
|
# Sinon, utiliser l'analyse par mots-clés
|
|
# Mots clés positifs forts
|
|
positive_keywords = ["oui", "pertinent", "pertinente", "utile", "important", "relevante",
|
|
"capture d'écran", "message d'erreur", "interface logicielle",
|
|
"configuration", "technique", "diagnostic"]
|
|
|
|
# Mots clés négatifs forts
|
|
negative_keywords = ["non", "pas pertinent", "non pertinente", "inutile", "irrelevant",
|
|
"photo personnelle", "marketing", "sans rapport", "hors sujet",
|
|
"décorative", "logo"]
|
|
|
|
# Compter les occurrences de mots clés
|
|
positive_count = sum(1 for kw in positive_keywords if kw in response_lower)
|
|
negative_count = sum(1 for kw in negative_keywords if kw in response_lower)
|
|
|
|
# Heuristique de décision basée sur la prépondérance des mots clés
|
|
is_relevant = positive_count > negative_count
|
|
|
|
# Extraire le raisonnement (les dernières phrases de la réponse)
|
|
lines = response.split('\n')
|
|
reason_lines = []
|
|
for line in reversed(lines):
|
|
if line.strip():
|
|
reason_lines.insert(0, line.strip())
|
|
if len(reason_lines) >= 2: # Prendre les 2 dernières lignes non vides
|
|
break
|
|
|
|
reason = " ".join(reason_lines) if reason_lines else "Décision basée sur l'analyse des mots-clés"
|
|
|
|
# Log détaillé de l'analyse
|
|
logger.debug(f"Analyse de la réponse: \n - Réponse brute: {response[:100]}...\n"
|
|
f" - Commence par 'non': {starts_with_non}\n"
|
|
f" - Détection explicite négative: {explicit_negative}\n"
|
|
f" - Détection explicite positive: {explicit_positive}\n"
|
|
f" - Décision finale: {'pertinente' if is_relevant else 'non pertinente'}\n"
|
|
f" - Raison: {reason}")
|
|
|
|
return is_relevant, reason
|
|
|
|
def _get_timestamp(self) -> str:
|
|
"""Retourne un timestamp au format YYYYMMDD_HHMMSS"""
|
|
from datetime import datetime
|
|
return datetime.now().strftime("%Y%m%d_%H%M%S") |