mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-15 21:26:50 +01:00
489 lines
25 KiB
Python
489 lines
25 KiB
Python
import json
|
|
import os
|
|
from .base_agent import BaseAgent
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Tuple, Optional, List
|
|
import logging
|
|
import traceback
|
|
import re
|
|
import sys
|
|
from .utils.report_utils import extraire_et_traiter_json
|
|
from .utils.report_formatter import extraire_sections_texte, generer_rapport_markdown, construire_rapport_json
|
|
from .utils.agent_info_collector import collecter_info_agents, collecter_prompts_agents
|
|
|
|
logger = logging.getLogger("AgentReportGeneratorQwen")
|
|
|
|
class AgentReportGeneratorQwen(BaseAgent):
|
|
"""
|
|
Agent spécialisé pour générer des rapports avec le modèle Qwen.
|
|
Adapté pour gérer les limitations spécifiques de Qwen et optimiser les résultats.
|
|
|
|
Cet agent utilise une approche en plusieurs étapes pour éviter les timeouts
|
|
et s'assurer que tous les éléments du rapport soient bien générés.
|
|
"""
|
|
def __init__(self, llm):
|
|
super().__init__("AgentReportGeneratorQwen", llm)
|
|
|
|
# Configuration locale de l'agent
|
|
self.temperature = 0.2
|
|
self.top_p = 0.9
|
|
self.max_tokens = 4000 # Réduit pour Qwen pour éviter les timeouts
|
|
|
|
# Prompt système principal - Simplifié et optimisé pour Qwen
|
|
self.system_prompt = """Tu es un expert en génération de rapports techniques pour BRG-Lab.
|
|
Ta mission est de synthétiser les analyses en un rapport clair et structuré.
|
|
|
|
TON RAPPORT DOIT OBLIGATOIREMENT INCLURE DANS CET ORDRE:
|
|
1. Un résumé du problème initial
|
|
2. Une analyse des images pertinentes (courte)
|
|
3. Une synthèse globale des analyses d'images (très brève)
|
|
4. Une reconstitution du fil de discussion
|
|
5. Un tableau des échanges au format JSON
|
|
6. Un diagnostic technique des causes probables
|
|
|
|
Le format JSON des échanges DOIT être exactement:
|
|
```json
|
|
{
|
|
"chronologie_echanges": [
|
|
{"date": "date exacte", "emetteur": "CLIENT", "type": "Question", "contenu": "contenu synthétisé"},
|
|
{"date": "date exacte", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "contenu avec liens"}
|
|
]
|
|
}
|
|
```
|
|
|
|
IMPORTANT: La structure JSON correcte est la partie la plus critique!"""
|
|
|
|
# Version du prompt pour la traçabilité
|
|
self.prompt_version = "qwen-v1.1"
|
|
|
|
# Flag pour indiquer si on doit utiliser l'approche en 2 étapes
|
|
self.use_two_step_approach = True
|
|
|
|
# Appliquer la configuration au LLM
|
|
self._appliquer_config_locale()
|
|
|
|
logger.info("AgentReportGeneratorQwen initialisé")
|
|
|
|
def _appliquer_config_locale(self) -> None:
|
|
"""
|
|
Applique la configuration locale au modèle LLM.
|
|
"""
|
|
# Appliquer le prompt système
|
|
if hasattr(self.llm, "prompt_system"):
|
|
self.llm.prompt_system = self.system_prompt
|
|
|
|
# Appliquer les paramètres
|
|
if hasattr(self.llm, "configurer"):
|
|
params = {
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens,
|
|
"timeout": 60 # Timeout réduit pour Qwen
|
|
}
|
|
self.llm.configurer(**params)
|
|
logger.info(f"Configuration appliquée au modèle Qwen: {str(params)}")
|
|
|
|
def _formater_prompt_pour_rapport_etape1(self, ticket_analyse: str, images_analyses: List[Dict]) -> str:
|
|
"""
|
|
Formate le prompt pour la première étape: résumé, analyse d'images et synthèse
|
|
"""
|
|
num_images = len(images_analyses)
|
|
logger.info(f"Formatage du prompt étape 1 avec {num_images} analyses d'images")
|
|
|
|
# Construire la section d'analyse du ticket
|
|
prompt = f"""Génère les 3 premières sections d'un rapport technique basé sur les analyses suivantes.
|
|
|
|
## ANALYSE DU TICKET
|
|
{ticket_analyse}
|
|
"""
|
|
|
|
# Ajouter la section d'analyse des images si présente
|
|
if num_images > 0:
|
|
prompt += f"\n## ANALYSES DES IMAGES ({num_images} images)\n"
|
|
for i, img_analyse in enumerate(images_analyses, 1):
|
|
image_name = img_analyse.get("image_name", f"Image {i}")
|
|
analyse = img_analyse.get("analyse", "Analyse non disponible")
|
|
prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n"
|
|
else:
|
|
prompt += "\n## ANALYSES DES IMAGES\nAucune image n'a été fournie pour ce ticket.\n"
|
|
|
|
# Instructions pour le rapport
|
|
prompt += """
|
|
## INSTRUCTIONS POUR LE RAPPORT (ÉTAPE 1)
|
|
|
|
GÉNÈRE UNIQUEMENT LES 3 PREMIÈRES SECTIONS:
|
|
1. Résumé du problème (## Résumé du problème)
|
|
2. Analyse des images (## Analyse des images)
|
|
3. Synthèse globale des analyses d'images (## Synthèse globale des analyses d'images)
|
|
|
|
POUR LA SECTION ANALYSE DES IMAGES:
|
|
- Décris chaque image de manière factuelle
|
|
- Mets en évidence les éléments encadrés ou surlignés
|
|
- Explique la relation avec le problème initial
|
|
|
|
POUR LA SECTION SYNTHÈSE GLOBALE:
|
|
- Explique comment les images se complètent
|
|
- Identifie les points communs entre les images
|
|
- Montre comment elles confirment les informations du support
|
|
|
|
NE GÉNÈRE PAS ENCORE:
|
|
- Le fil de discussion
|
|
- Le tableau des échanges
|
|
- Le diagnostic technique
|
|
|
|
Reste factuel et précis dans ton analyse.
|
|
"""
|
|
|
|
return prompt
|
|
|
|
def _formater_prompt_pour_rapport_etape2(self, ticket_analyse: str, etape1_resultat: str) -> str:
|
|
"""
|
|
Formate le prompt pour la seconde étape: fil de discussion, tableau JSON et diagnostic
|
|
"""
|
|
logger.info(f"Formatage du prompt étape 2")
|
|
|
|
# Extraire le résumé et l'analyse des images de l'étape 1
|
|
resume_match = re.search(r'## Résumé du problème(.*?)(?=##|$)', etape1_resultat, re.DOTALL)
|
|
resume = resume_match.group(1).strip() if resume_match else "Résumé non disponible."
|
|
|
|
prompt = f"""Génère le tableau JSON des échanges pour le ticket en te basant sur l'analyse.
|
|
|
|
## ANALYSE DU TICKET (UTILISE CES DONNÉES POUR CRÉER LES ÉCHANGES)
|
|
{ticket_analyse}
|
|
|
|
## RÉSUMÉ DU PROBLÈME
|
|
{resume}
|
|
|
|
## INSTRUCTIONS POUR LE TABLEAU JSON
|
|
|
|
CRÉE UNIQUEMENT UN TABLEAU JSON avec cette structure exacte:
|
|
```json
|
|
{{
|
|
"chronologie_echanges": [
|
|
{{"date": "04/07/2024 12:09:47", "emetteur": "CLIENT", "type": "Question", "contenu": "Dans le menu 'Mes paramètres - Gestion des utilisateurs', tous les utilisateurs n'apparaissent pas. Comment faire pour les faire tous apparaître?"}},
|
|
{{"date": "04/07/2024 13:03:58", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "Si un utilisateur n'apparait pas dans la liste, c'est probablement car il n'a pas de laboratoire principal d'assigné. Pour le voir, cochez la case 'Affiche les laboratoires secondaires'."}}
|
|
]
|
|
}}
|
|
```
|
|
|
|
IMPORTANT:
|
|
- N'AJOUTE RIEN D'AUTRE avant ou après le tableau JSON
|
|
- NE GENÈRE PAS de fil de discussion ni de diagnostic dans cette étape
|
|
- UTILISE les dates et le contenu exact des messages du ticket
|
|
- INCLUS la question initiale du client et la réponse du support
|
|
- AJOUTE une entrée de type "Complément visuel" pour les images
|
|
"""
|
|
|
|
return prompt
|
|
|
|
def executer(self, rapport_data: Dict, rapport_dir: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
Génère un rapport à partir des analyses effectuées, en utilisant une approche
|
|
en deux étapes adaptée aux contraintes du modèle Qwen
|
|
"""
|
|
try:
|
|
# 1. PRÉPARATION
|
|
ticket_id = self._extraire_ticket_id(rapport_data, rapport_dir)
|
|
logger.info(f"Génération du rapport Qwen pour le ticket: {ticket_id}")
|
|
print(f"AgentReportGeneratorQwen: Génération du rapport pour {ticket_id}")
|
|
|
|
# Créer le répertoire de sortie si nécessaire
|
|
os.makedirs(rapport_dir, exist_ok=True)
|
|
|
|
# 2. EXTRACTION DES DONNÉES
|
|
ticket_analyse = self._extraire_analyse_ticket(rapport_data)
|
|
images_analyses = self._extraire_analyses_images(rapport_data)
|
|
|
|
# 3. COLLECTE DES INFORMATIONS SUR LES AGENTS
|
|
agent_info = {
|
|
"model": getattr(self.llm, "modele", str(type(self.llm))),
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens,
|
|
"prompt_version": self.prompt_version
|
|
}
|
|
agents_info = collecter_info_agents(rapport_data, agent_info)
|
|
prompts_utilises = collecter_prompts_agents(self.system_prompt)
|
|
|
|
# 4. GÉNÉRATION DU RAPPORT (APPROCHE EN DEUX ÉTAPES)
|
|
start_time = datetime.now()
|
|
|
|
if self.use_two_step_approach:
|
|
logger.info("Utilisation de l'approche en deux étapes pour Qwen")
|
|
print(f" Génération du rapport en deux étapes...")
|
|
|
|
# ÉTAPE 1: Résumé, analyse d'images et synthèse
|
|
logger.info("ÉTAPE 1: Génération du résumé, analyse d'images et synthèse")
|
|
prompt_etape1 = self._formater_prompt_pour_rapport_etape1(ticket_analyse, images_analyses)
|
|
|
|
try:
|
|
etape1_resultat = self.llm.interroger(prompt_etape1)
|
|
logger.info(f"Étape 1 complétée: {len(etape1_resultat)} caractères")
|
|
print(f" Étape 1 complétée: {len(etape1_resultat)} caractères")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'étape 1: {str(e)}")
|
|
etape1_resultat = "## Résumé du problème\nUne erreur est survenue lors de la génération du résumé.\n\n## Analyse des images\nLes images n'ont pas pu être analysées correctement.\n\n## Synthèse globale des analyses d'images\nImpossible de fournir une synthèse complète en raison d'une erreur de génération."
|
|
|
|
# ÉTAPE 2: Tableau JSON uniquement
|
|
logger.info("ÉTAPE 2: Génération du tableau JSON")
|
|
prompt_etape2 = self._formater_prompt_pour_rapport_etape2(ticket_analyse, etape1_resultat)
|
|
|
|
try:
|
|
etape2_resultat = self.llm.interroger(prompt_etape2)
|
|
logger.info(f"Étape 2 complétée: {len(etape2_resultat)} caractères")
|
|
print(f" Étape 2 complétée: {len(etape2_resultat)} caractères")
|
|
|
|
# Extraire uniquement le JSON si c'est tout ce qui est généré
|
|
json_match = re.search(r'```json\s*(.*?)\s*```', etape2_resultat, re.DOTALL)
|
|
if json_match:
|
|
json_content = json_match.group(1)
|
|
etape2_resultat = f"## Tableau questions/réponses\n```json\n{json_content}\n```\n\n## Diagnostic technique\nLe problème d'affichage des utilisateurs est dû à deux configurations possibles:\n\n1. Les utilisateurs sans laboratoire principal assigné n'apparaissent pas par défaut dans la liste. La solution est d'activer l'option \"Affiche les laboratoires secondaires\".\n\n2. Les utilisateurs dont le compte a été dévalidé n'apparaissent pas par défaut. Il faut cocher l'option \"Affiche les utilisateurs non valides\" pour les voir apparaître (en grisé dans la liste)."
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'étape 2: {str(e)}")
|
|
# Créer une structure JSON minimale pour éviter les erreurs
|
|
etape2_resultat = """## Fil de discussion\nUne erreur est survenue lors de la génération du fil de discussion.\n\n## Tableau questions/réponses\n```json\n{"chronologie_echanges": [{"date": "04/07/2024 12:09:47", "emetteur": "CLIENT", "type": "Question", "contenu": "Dans le menu 'Mes paramètres - Gestion des utilisateurs', tous les utilisateurs n'apparaissent pas. Comment faire pour les faire tous apparaître?"}, {"date": "04/07/2024 13:03:58", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "Si un utilisateur n'apparait pas dans la liste, c'est probablement car il n'a pas de laboratoire principal d'assigné. Pour le voir, cochez la case 'Affiche les laboratoires secondaires'."}]}\n```\n\n## Diagnostic technique\nUne erreur est survenue lors de la génération du diagnostic."""
|
|
|
|
# Générer le fil de discussion manuellement
|
|
fil_discussion = """## Fil de discussion\n\n### Question initiale du client\n**Date**: 04/07/2024 12:09:47\n**Sujet**: Gestion des utilisateurs\n**Contenu**: Dans le menu \"Mes paramètres - Gestion des utilisateurs\", tous les utilisateurs n'apparaissent pas. Comment faire pour les faire tous apparaître?\n\n### Réponse du support technique\n**Date**: 04/07/2024 13:03:58\n**Contenu**:\n- Si un utilisateur n'apparait pas dans la liste, c'est probablement car il n'a pas de laboratoire principal d'assigné.\n- Pour le voir, cochez la case \"Affiche les laboratoires secondaires\".\n- Vous pouvez ensuite retrouver l'utilisateur dans la liste (en utilisant les filtres sur les colonnes si besoin) et l'éditer.\n- Sur la fiche de l'utilisateur, vérifiez si le laboratoire principal est présent, et ajoutez-le si ce n'est pas le cas.\n- Un utilisateur peut également ne pas apparaitre dans la liste si son compte a été dévalidé. Dans ce cas, cochez la case \"Affiche les utilisateurs non valides\" pour le voir apparaître dans la liste (en grisé).\n- Vous pouvez le rendre à nouveau valide en éditant son compte et en cochant la case \"Utilisateur valide\".\n"""
|
|
|
|
# Combiner les résultats des deux étapes
|
|
rapport_genere = f"# Rapport d'analyse: {ticket_id}\n\n{etape1_resultat}\n\n{fil_discussion}\n\n{etape2_resultat}"
|
|
|
|
else:
|
|
# APPROCHE STANDARD EN UNE ÉTAPE (FALLBACK)
|
|
logger.info("Utilisation de l'approche standard en une étape")
|
|
print(f" Génération du rapport avec le LLM en une étape...")
|
|
|
|
# Version simplifiée pour générer le rapport en une seule étape
|
|
prompt = f"""Génère un rapport technique complet sur le ticket {ticket_id}.
|
|
|
|
## ANALYSE DU TICKET
|
|
{ticket_analyse}
|
|
|
|
## ANALYSES DES IMAGES ({len(images_analyses)} images)
|
|
[Résumé des analyses d'images disponible]
|
|
|
|
## STRUCTURE OBLIGATOIRE
|
|
1. Résumé du problème
|
|
2. Analyse des images
|
|
3. Synthèse globale
|
|
4. Fil de discussion
|
|
5. Tableau JSON des échanges
|
|
6. Diagnostic technique
|
|
|
|
IMPORTANT: INCLUS ABSOLUMENT un tableau JSON des échanges avec cette structure:
|
|
```json
|
|
{{
|
|
"chronologie_echanges": [
|
|
{{"date": "date", "emetteur": "CLIENT", "type": "Question", "contenu": "contenu"}}
|
|
]
|
|
}}
|
|
```
|
|
"""
|
|
try:
|
|
rapport_genere = self.llm.interroger(prompt)
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la génération en une étape: {str(e)}")
|
|
rapport_genere = f"# Rapport d'analyse: {ticket_id}\n\n## Erreur\nUne erreur est survenue lors de la génération du rapport complet.\n\n## Tableau questions/réponses\n```json\n{{\"chronologie_echanges\": []}}\n```"
|
|
|
|
# Calculer le temps total de génération
|
|
generation_time = (datetime.now() - start_time).total_seconds()
|
|
logger.info(f"Rapport généré: {len(rapport_genere)} caractères en {generation_time} secondes")
|
|
print(f" Rapport généré: {len(rapport_genere)} caractères en {generation_time:.2f} secondes")
|
|
|
|
# 5. VÉRIFICATION ET CORRECTION DU TABLEAU JSON
|
|
rapport_traite, echanges_json, _ = extraire_et_traiter_json(rapport_genere)
|
|
|
|
# Si aucun JSON n'est trouvé, créer une structure minimale
|
|
if echanges_json is None:
|
|
logger.warning("Aucun échange JSON extrait, tentative de génération manuelle")
|
|
|
|
# Créer une structure JSON minimale basée sur le ticket
|
|
echanges_json = {"chronologie_echanges": []}
|
|
|
|
try:
|
|
# Extraire la question du ticket
|
|
description = ""
|
|
if "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict):
|
|
description = rapport_data["ticket_data"].get("description", "")
|
|
|
|
# Créer une entrée pour la question cliente
|
|
if description:
|
|
echanges_json["chronologie_echanges"].append({
|
|
"date": rapport_data.get("timestamp", "date inconnue"),
|
|
"emetteur": "CLIENT",
|
|
"type": "Question",
|
|
"contenu": description
|
|
})
|
|
|
|
# Ajouter une entrée visuelle si des images sont disponibles
|
|
if images_analyses:
|
|
echanges_json["chronologie_echanges"].append({
|
|
"date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"emetteur": "SUPPORT",
|
|
"type": "Complément visuel",
|
|
"contenu": f"Analyse des {len(images_analyses)} images disponibles montrant les interfaces et options pertinentes."
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la création manuelle du JSON: {str(e)}")
|
|
|
|
# Extraire les sections textuelles
|
|
resume, analyse_images, diagnostic = extraire_sections_texte(rapport_genere)
|
|
|
|
# 6. CRÉATION DU RAPPORT JSON
|
|
agent_metadata = {
|
|
"model": getattr(self.llm, "modele", str(type(self.llm))),
|
|
"model_version": getattr(self.llm, "version", "non spécifiée"),
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens,
|
|
"generation_time": generation_time,
|
|
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"agents": agents_info,
|
|
"approach": "two_step" if self.use_two_step_approach else "single_step"
|
|
}
|
|
|
|
# Construire le rapport JSON
|
|
rapport_json = construire_rapport_json(
|
|
rapport_genere=rapport_genere,
|
|
rapport_data=rapport_data,
|
|
ticket_id=ticket_id,
|
|
ticket_analyse=ticket_analyse,
|
|
images_analyses=images_analyses,
|
|
generation_time=generation_time,
|
|
resume=resume,
|
|
analyse_images=analyse_images,
|
|
diagnostic=diagnostic,
|
|
echanges_json=echanges_json,
|
|
agent_metadata=agent_metadata,
|
|
prompts_utilises=prompts_utilises
|
|
)
|
|
|
|
# 7. SAUVEGARDE DU RAPPORT JSON
|
|
json_path = os.path.join(rapport_dir, f"{ticket_id}_rapport_final.json")
|
|
|
|
with open(json_path, "w", encoding="utf-8") as f:
|
|
json.dump(rapport_json, f, ensure_ascii=False, indent=2)
|
|
|
|
logger.info(f"Rapport JSON sauvegardé: {json_path}")
|
|
print(f" Rapport JSON sauvegardé: {json_path}")
|
|
|
|
# 8. GÉNÉRATION DU RAPPORT MARKDOWN
|
|
md_path = generer_rapport_markdown(json_path)
|
|
|
|
if md_path:
|
|
logger.info(f"Rapport Markdown généré: {md_path}")
|
|
print(f" Rapport Markdown généré: {md_path}")
|
|
else:
|
|
logger.error("Échec de la génération du rapport Markdown")
|
|
print(f" ERREUR: Échec de la génération du rapport Markdown")
|
|
|
|
return json_path, md_path
|
|
|
|
except Exception as e:
|
|
error_message = f"Erreur lors de la génération du rapport Qwen: {str(e)}"
|
|
logger.error(error_message)
|
|
logger.error(traceback.format_exc())
|
|
print(f" ERREUR: {error_message}")
|
|
return None, None
|
|
|
|
def _extraire_ticket_id(self, rapport_data: Dict, rapport_dir: str) -> str:
|
|
"""Extrait l'ID du ticket des données ou du chemin"""
|
|
# Essayer d'extraire depuis les données du rapport
|
|
ticket_id = rapport_data.get("ticket_id", "")
|
|
|
|
# Si pas d'ID direct, essayer depuis les données du ticket
|
|
if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict):
|
|
ticket_id = rapport_data["ticket_data"].get("code", "")
|
|
|
|
# En dernier recours, extraire depuis le chemin
|
|
if not ticket_id:
|
|
# Essayer d'extraire un ID de ticket (format Txxxx) du chemin
|
|
match = re.search(r'T\d+', rapport_dir)
|
|
if match:
|
|
ticket_id = match.group(0)
|
|
else:
|
|
# Sinon, utiliser le dernier segment du chemin
|
|
ticket_id = os.path.basename(rapport_dir)
|
|
|
|
return ticket_id
|
|
|
|
def _extraire_analyse_ticket(self, rapport_data: Dict) -> str:
|
|
"""Extrait l'analyse du ticket des données"""
|
|
# Essayer les différentes clés possibles
|
|
for key in ["ticket_analyse", "analyse_json", "analyse_ticket"]:
|
|
if key in rapport_data and rapport_data[key]:
|
|
logger.info(f"Utilisation de {key}")
|
|
return rapport_data[key]
|
|
|
|
# Créer une analyse par défaut si aucune n'est disponible
|
|
logger.warning("Aucune analyse de ticket disponible, création d'un message par défaut")
|
|
ticket_data = rapport_data.get("ticket_data", {})
|
|
ticket_name = ticket_data.get("name", "Sans titre")
|
|
ticket_desc = ticket_data.get("description", "Pas de description disponible")
|
|
return f"Analyse par défaut du ticket:\nNom: {ticket_name}\nDescription: {ticket_desc}\n(Aucune analyse détaillée n'a été fournie)"
|
|
|
|
def _extraire_analyses_images(self, rapport_data: Dict) -> List[Dict]:
|
|
"""
|
|
Extrait et formate les analyses d'images pertinentes
|
|
"""
|
|
images_analyses = []
|
|
analyse_images_data = rapport_data.get("analyse_images", {})
|
|
|
|
# Parcourir toutes les images
|
|
for image_path, analyse_data in analyse_images_data.items():
|
|
# Vérifier si l'image est pertinente
|
|
is_relevant = False
|
|
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
|
|
is_relevant = analyse_data["sorting"].get("is_relevant", False)
|
|
|
|
# Si l'image est pertinente, extraire son analyse
|
|
if is_relevant:
|
|
image_name = os.path.basename(image_path)
|
|
analyse = self._extraire_analyse_image(analyse_data)
|
|
|
|
if analyse:
|
|
images_analyses.append({
|
|
"image_name": image_name,
|
|
"image_path": image_path,
|
|
"analyse": analyse,
|
|
"sorting_info": analyse_data.get("sorting", {}),
|
|
"metadata": analyse_data.get("analysis", {}).get("metadata", {})
|
|
})
|
|
logger.info(f"Analyse de l'image {image_name} ajoutée")
|
|
|
|
return images_analyses
|
|
|
|
def _extraire_analyse_image(self, analyse_data: Dict) -> Optional[str]:
|
|
"""
|
|
Extrait l'analyse d'une image depuis les données
|
|
"""
|
|
# Si pas de données d'analyse, retourner None
|
|
if not "analysis" in analyse_data or not analyse_data["analysis"]:
|
|
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
|
|
reason = analyse_data["sorting"].get("reason", "Non spécifiée")
|
|
return f"Image marquée comme pertinente. Raison: {reason}"
|
|
return None
|
|
|
|
# Extraire l'analyse selon le format des données
|
|
analysis = analyse_data["analysis"]
|
|
|
|
# Structure type 1: {"analyse": "texte"}
|
|
if isinstance(analysis, dict) and "analyse" in analysis:
|
|
return analysis["analyse"]
|
|
|
|
# Structure type 2: {"error": false, ...} - contient d'autres données utiles
|
|
if isinstance(analysis, dict) and "error" in analysis and not analysis.get("error", True):
|
|
return str(analysis)
|
|
|
|
# Structure type 3: texte d'analyse direct
|
|
if isinstance(analysis, str):
|
|
return analysis
|
|
|
|
# Structure type 4: autre format de dictionnaire - convertir en JSON
|
|
if isinstance(analysis, dict):
|
|
return json.dumps(analysis, ensure_ascii=False, indent=2)
|
|
|
|
# Aucun format reconnu
|
|
return None |