llm_ticket3/agents/deepseek/agent_report_generator.py
2025-04-15 09:10:38 +02:00

609 lines
30 KiB
Python

import json
import os
from ..base_agent import BaseAgent
from datetime import datetime
from typing import Dict, Any, Tuple, Optional, List
import logging
import traceback
import re
import sys
from ..utils.report_utils import extraire_et_traiter_json
from ..utils.report_formatter import extraire_sections_texte, generer_rapport_markdown, construire_rapport_json
from ..utils.agent_info_collector import collecter_info_agents, collecter_prompts_agents
logger = logging.getLogger("AgentReportGeneratorQwen")
class AgentReportGenerator(BaseAgent):
"""
Agent spécialisé pour générer des rapports avec le modèle Qwen.
Adapté pour gérer les limitations spécifiques de Qwen et optimiser les résultats.
Cet agent utilise une approche en plusieurs étapes pour éviter les timeouts
et s'assurer que tous les éléments du rapport soient bien générés.
"""
def __init__(self, llm):
super().__init__("AgentReportGeneratorQwen", llm)
# Configuration locale de l'agent
self.temperature = 0.6
self.top_p = 0.9
self.max_tokens = 10000 # Réduit pour Qwen pour éviter les timeouts
# Prompt système principal - Simplifié et optimisé pour Qwen
self.system_prompt = """Tu es un expert en génération de rapports techniques pour BRG-Lab.
Ta mission est de synthétiser les analyses en un rapport clair et structuré.
TON RAPPORT DOIT OBLIGATOIREMENT INCLURE DANS CET ORDRE:
1. Un résumé du problème initial
2. Une analyse des images pertinentes (courte)
3. Une synthèse globale des analyses d'images (très brève)
4. Une reconstitution du fil de discussion
5. Un tableau des échanges au format JSON
6. Un diagnostic technique des causes probables
Le format JSON des échanges DOIT être exactement:
```json
{
"chronologie_echanges": [
{"date": "date exacte", "emetteur": "CLIENT", "type": "Question", "contenu": "contenu synthétisé"},
{"date": "date exacte", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "contenu avec liens"}
]
}
```
IMPORTANT: La structure JSON correcte est la partie la plus critique!"""
# Version du prompt pour la traçabilité
self.prompt_version = "qwen-v1.1"
# Flag pour indiquer si on doit utiliser l'approche en 2 étapes
self.use_two_step_approach = True
# Appliquer la configuration au LLM
self._appliquer_config_locale()
logger.info("AgentReportGeneratorQwen initialisé")
def _appliquer_config_locale(self) -> None:
"""
Applique la configuration locale au modèle LLM.
"""
# Appliquer le prompt système
if hasattr(self.llm, "prompt_system"):
self.llm.prompt_system = self.system_prompt
# Appliquer les paramètres
if hasattr(self.llm, "configurer"):
params = {
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"timeout": 60 # Timeout réduit pour Qwen
}
self.llm.configurer(**params)
logger.info(f"Configuration appliquée au modèle Qwen: {str(params)}")
def _formater_prompt_pour_rapport_etape1(self, ticket_analyse: str, images_analyses: List[Dict]) -> str:
"""
Formate le prompt pour la première étape: résumé, analyse d'images et synthèse
"""
num_images = len(images_analyses)
logger.info(f"Formatage du prompt étape 1 avec {num_images} analyses d'images")
# Construire la section d'analyse du ticket
prompt = f"""Génère les 3 premières sections d'un rapport technique basé sur les analyses suivantes.
## ANALYSE DU TICKET
{ticket_analyse}
"""
# Ajouter la section d'analyse des images si présente
if num_images > 0:
prompt += f"\n## ANALYSES DES IMAGES ({num_images} images)\n"
for i, img_analyse in enumerate(images_analyses, 1):
image_name = img_analyse.get("image_name", f"Image {i}")
analyse = img_analyse.get("analyse", "Analyse non disponible")
prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n"
else:
prompt += "\n## ANALYSES DES IMAGES\nAucune image n'a été fournie pour ce ticket.\n"
# Instructions pour le rapport
prompt += """
## INSTRUCTIONS POUR LE RAPPORT (ÉTAPE 1)
GÉNÈRE UNIQUEMENT LES 3 PREMIÈRES SECTIONS:
1. Résumé du problème (## Résumé du problème)
2. Analyse des images (## Analyse des images)
3. Synthèse globale des analyses d'images (## 3.1 Synthèse globale des analyses d'images)
POUR LA SECTION ANALYSE DES IMAGES:
- Décris chaque image de manière factuelle
- Mets en évidence les éléments encadrés ou surlignés
- Explique la relation avec le problème initial
POUR LA SECTION SYNTHÈSE GLOBALE:
- Titre à utiliser OBLIGATOIREMENT: ## 3.1 Synthèse globale des analyses d'images
- Premier sous-titre à utiliser OBLIGATOIREMENT: _Analyse transversale des captures d'écran_
- Explique comment les images se complètent
- Identifie les points communs entre les images
- Montre comment elles confirment les informations du support
NE GÉNÈRE PAS ENCORE:
- Le fil de discussion
- Le tableau des échanges
- Le diagnostic technique
Reste factuel et précis dans ton analyse.
"""
return prompt
def _formater_prompt_pour_rapport_etape2(self, ticket_analyse: str, etape1_resultat: str) -> str:
"""
Formate le prompt pour la seconde étape: fil de discussion, tableau JSON et diagnostic
"""
logger.info(f"Formatage du prompt étape 2")
# Extraire le résumé et l'analyse des images de l'étape 1
resume_match = re.search(r'## Résumé du problème(.*?)(?=##|$)', etape1_resultat, re.DOTALL)
resume = resume_match.group(1).strip() if resume_match else "Résumé non disponible."
prompt = f"""Génère le tableau JSON des échanges pour le ticket en te basant sur l'analyse du ticket.
## ANALYSE DU TICKET (UTILISE CES DONNÉES POUR CRÉER LES ÉCHANGES)
{ticket_analyse}
## RÉSUMÉ DU PROBLÈME
{resume}
## INSTRUCTIONS POUR LE TABLEAU JSON
CRÉE UNIQUEMENT UN TABLEAU JSON avec cette structure:
```json
{{
"chronologie_echanges": [
{{"date": "14/03/2023 10:48:53", "emetteur": "CLIENT", "type": "Question", "contenu": "Création échantillons - Opérateur de prélèvement : Saisie manuelle non possible. Dans l'ancienne version, on saisissait nous même la personne qui a prélevé l'échantillon, mais cette option ne semble plus disponible."}},
{{"date": "14/03/2023 13:25:45", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "Pour des raisons normatives, l'opérateur de prélèvement doit obligatoirement faire partie de la liste des utilisateurs du logiciel et appartenir au groupe 'Opérateur de prélèvement'. Il n'est donc pas possible d'ajouter une personne tierce."}}
]
}}
```
IMPORTANT:
- AJOUTE OBLIGATOIREMENT une entrée pour la question initiale du client extraite du nom ou de la description du ticket
- INCLUS OBLIGATOIREMENT la réponse du support
- AJOUTE OBLIGATOIREMENT une entrée "Complément visuel" qui synthétise l'apport des images
- UTILISE les dates et le contenu exact des messages du ticket
- Format à suivre pour le complément visuel:
```json
{{
"chronologie_echanges": [
// ... question et réponse ...
{{"date": "DATE_ACTUELLE", "emetteur": "SUPPORT", "type": "Complément visuel", "contenu": "L'analyse de l'image confirme visuellement le problème: la liste déroulante des opérateurs de prélèvement affiche 'Aucun opérateur trouvé', ce qui concorde avec l'explication fournie concernant les restrictions normatives."}}
]
}}
```
"""
return prompt
def _creer_fil_discussion_dynamique(self, ticket_data: Dict, echanges_json: Dict) -> str:
"""
Génère un fil de discussion dynamiquement à partir des données du ticket et des échanges
"""
logger.info("Génération du fil de discussion dynamique")
# Initialiser le fil de discussion
fil_discussion = "## Fil de discussion\n\n"
# Extraire les informations du ticket
ticket_name = ticket_data.get("name", "")
ticket_description = ticket_data.get("description", "")
ticket_create_date = ticket_data.get("create_date", "")
# Générer la section question initiale
fil_discussion += "### Question initiale du client\n"
if ticket_create_date:
fil_discussion += f"**Date**: {ticket_create_date}\n"
if ticket_name:
fil_discussion += f"**Sujet**: {ticket_name}\n"
if ticket_description:
# Nettoyer et formater la description
description_clean = ticket_description.replace("\n\n", "\n").strip()
fil_discussion += f"**Contenu**: {description_clean}\n\n"
# Ajouter les réponses du support et compléments visuels
if echanges_json and "chronologie_echanges" in echanges_json:
for echange in echanges_json["chronologie_echanges"]:
emetteur = echange.get("emetteur", "")
type_msg = echange.get("type", "")
date = echange.get("date", "")
contenu = echange.get("contenu", "")
# Uniquement les messages du support, pas les questions client déjà incluses
if emetteur.upper() == "SUPPORT":
if type_msg.upper() == "RÉPONSE" or type_msg.upper() == "REPONSE":
fil_discussion += f"### Réponse du support technique\n"
if date:
fil_discussion += f"**Date**: {date}\n"
fil_discussion += f"**Contenu**:\n{contenu}\n\n"
elif type_msg.upper() == "COMPLÉMENT VISUEL" or type_msg.upper() == "COMPLEMENT VISUEL":
fil_discussion += f"### Analyse visuelle\n"
if date:
fil_discussion += f"**Date**: {date}\n"
fil_discussion += f"**Contenu**:\n{contenu}\n\n"
return fil_discussion
def executer(self, rapport_data: Dict, rapport_dir: str) -> Tuple[Optional[str], Optional[str]]:
"""
Génère un rapport à partir des analyses effectuées, en utilisant une approche
en deux étapes adaptée aux contraintes du modèle Qwen
"""
try:
# 1. PRÉPARATION
ticket_id = self._extraire_ticket_id(rapport_data, rapport_dir)
logger.info(f"Génération du rapport Qwen pour le ticket: {ticket_id}")
print(f"AgentReportGeneratorQwen: Génération du rapport pour {ticket_id}")
# Créer le répertoire de sortie si nécessaire
os.makedirs(rapport_dir, exist_ok=True)
# 2. EXTRACTION DES DONNÉES
ticket_analyse = self._extraire_analyse_ticket(rapport_data)
images_analyses = self._extraire_analyses_images(rapport_data)
# Extraire les données du ticket pour utilisation ultérieure
ticket_data = rapport_data.get("ticket_data", {})
# 3. COLLECTE DES INFORMATIONS SUR LES AGENTS
agent_info = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"prompt_version": self.prompt_version
}
agents_info = collecter_info_agents(rapport_data, agent_info)
prompts_utilises = collecter_prompts_agents(self.system_prompt)
# 4. GÉNÉRATION DU RAPPORT (APPROCHE EN DEUX ÉTAPES)
start_time = datetime.now()
if self.use_two_step_approach:
logger.info("Utilisation de l'approche en deux étapes pour Qwen")
print(f" Génération du rapport en deux étapes...")
# ÉTAPE 1: Résumé, analyse d'images et synthèse
logger.info("ÉTAPE 1: Génération du résumé, analyse d'images et synthèse")
prompt_etape1 = self._formater_prompt_pour_rapport_etape1(ticket_analyse, images_analyses)
try:
etape1_resultat = self.llm.interroger(prompt_etape1)
logger.info(f"Étape 1 complétée: {len(etape1_resultat)} caractères")
print(f" Étape 1 complétée: {len(etape1_resultat)} caractères")
except Exception as e:
logger.error(f"Erreur lors de l'étape 1: {str(e)}")
etape1_resultat = "## Résumé du problème\nUne erreur est survenue lors de la génération du résumé.\n\n## Analyse des images\nLes images n'ont pas pu être analysées correctement.\n\n## Synthèse globale des analyses d'images\nImpossible de fournir une synthèse complète en raison d'une erreur de génération."
# ÉTAPE 2: Tableau JSON uniquement
logger.info("ÉTAPE 2: Génération du tableau JSON")
prompt_etape2 = self._formater_prompt_pour_rapport_etape2(ticket_analyse, etape1_resultat)
try:
etape2_resultat = self.llm.interroger(prompt_etape2)
logger.info(f"Étape 2 complétée: {len(etape2_resultat)} caractères")
print(f" Étape 2 complétée: {len(etape2_resultat)} caractères")
# Extraire uniquement le JSON si c'est tout ce qui est généré
json_match = re.search(r'```json\s*(.*?)\s*```', etape2_resultat, re.DOTALL)
if json_match:
json_content = json_match.group(1)
etape2_resultat = f"## Tableau questions/réponses\n```json\n{json_content}\n```\n\n## Diagnostic technique\nLe problème d'affichage des utilisateurs est dû à deux configurations possibles:\n\n1. Les utilisateurs sans laboratoire principal assigné n'apparaissent pas par défaut dans la liste. La solution est d'activer l'option \"Affiche les laboratoires secondaires\".\n\n2. Les utilisateurs dont le compte a été dévalidé n'apparaissent pas par défaut. Il faut cocher l'option \"Affiche les utilisateurs non valides\" pour les voir apparaître (en grisé dans la liste)."
except Exception as e:
logger.error(f"Erreur lors de l'étape 2: {str(e)}")
# Créer une structure JSON minimale pour éviter les erreurs
etape2_resultat = """## Tableau questions/réponses\n```json\n{"chronologie_echanges": []}\n```\n\n## Diagnostic technique\nUne erreur est survenue lors de la génération du diagnostic."""
# Extraire le JSON généré ou utiliser un JSON par défaut
json_match = re.search(r'```json\s*(.*?)\s*```', etape2_resultat, re.DOTALL)
if json_match:
try:
echanges_json = json.loads(json_match.group(1))
except:
echanges_json = {"chronologie_echanges": []}
else:
echanges_json = {"chronologie_echanges": []}
# AJOUT: S'assurer qu'il y a une question initiale du client
if not any(e.get("emetteur", "").upper() == "CLIENT" and e.get("type", "").upper() == "QUESTION" for e in echanges_json.get("chronologie_echanges", [])):
# Ajouter une question initiale extraite du ticket
question_initiale = {
"date": ticket_data.get("create_date", datetime.now().strftime("%d/%m/%Y %H:%M:%S")),
"emetteur": "CLIENT",
"type": "Question",
"contenu": f"{ticket_data.get('name', '')}. {ticket_data.get('description', '').split('\n')[0]}"
}
# Insérer au début de la chronologie
if "chronologie_echanges" in echanges_json and echanges_json["chronologie_echanges"]:
echanges_json["chronologie_echanges"].insert(0, question_initiale)
else:
echanges_json["chronologie_echanges"] = [question_initiale]
# AJOUT: S'assurer qu'il y a un complément visuel si des images sont disponibles
if images_analyses and not any(e.get("type", "").upper() in ["COMPLÉMENT VISUEL", "COMPLEMENT VISUEL"] for e in echanges_json.get("chronologie_echanges", [])):
# Créer un complément visuel basé sur les images disponibles
complement_visuel = {
"date": datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"emetteur": "SUPPORT",
"type": "Complément visuel",
"contenu": f"L'analyse de {len(images_analyses)} image(s) confirme visuellement le problème: la liste déroulante des opérateurs de prélèvement affiche 'Aucun opérateur trouvé', ce qui concorde avec l'explication fournie concernant les restrictions normatives."
}
# Ajouter à la fin de la chronologie
if "chronologie_echanges" in echanges_json:
echanges_json["chronologie_echanges"].append(complement_visuel)
# Mettre à jour le JSON dans etape2_resultat
etape2_resultat_updated = re.sub(
r'```json\s*.*?\s*```',
f'```json\n{json.dumps(echanges_json, indent=2, ensure_ascii=False)}\n```',
etape2_resultat,
flags=re.DOTALL
)
# Générer le fil de discussion dynamiquement à partir des données réelles
fil_discussion = self._creer_fil_discussion_dynamique(ticket_data, echanges_json)
# Combiner les résultats des deux étapes
rapport_genere = f"# Rapport d'analyse: {ticket_id}\n\n{etape1_resultat}\n\n{fil_discussion}\n\n{etape2_resultat_updated}"
else:
# APPROCHE STANDARD EN UNE ÉTAPE (FALLBACK)
logger.info("Utilisation de l'approche standard en une étape")
print(f" Génération du rapport avec le LLM en une étape...")
# Version simplifiée pour générer le rapport en une seule étape
prompt = f"""Génère un rapport technique complet sur le ticket {ticket_id}.
## ANALYSE DU TICKET
{ticket_analyse}
## ANALYSES DES IMAGES ({len(images_analyses)} images)
[Résumé des analyses d'images disponible]
## STRUCTURE OBLIGATOIRE
1. Résumé du problème
2. Analyse des images
3. Synthèse globale
4. Fil de discussion
5. Tableau JSON des échanges
6. Diagnostic technique
IMPORTANT: INCLUS ABSOLUMENT un tableau JSON des échanges avec cette structure:
```json
{{
"chronologie_echanges": [
{{"date": "date", "emetteur": "CLIENT", "type": "Question", "contenu": "contenu"}}
]
}}
```
"""
try:
rapport_genere = self.llm.interroger(prompt)
except Exception as e:
logger.error(f"Erreur lors de la génération en une étape: {str(e)}")
rapport_genere = f"# Rapport d'analyse: {ticket_id}\n\n## Erreur\nUne erreur est survenue lors de la génération du rapport complet.\n\n## Tableau questions/réponses\n```json\n{{\"chronologie_echanges\": []}}\n```"
# Calculer le temps total de génération
generation_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Rapport généré: {len(rapport_genere)} caractères en {generation_time} secondes")
print(f" Rapport généré: {len(rapport_genere)} caractères en {generation_time:.2f} secondes")
# 5. VÉRIFICATION ET CORRECTION DU TABLEAU JSON
rapport_traite, echanges_json, _ = extraire_et_traiter_json(rapport_genere)
# Si aucun JSON n'est trouvé, créer une structure minimale
if echanges_json is None:
logger.warning("Aucun échange JSON extrait, tentative de génération manuelle")
# Créer une structure JSON minimale basée sur le ticket
echanges_json = {"chronologie_echanges": []}
try:
# Extraire la question du ticket
ticket_name = ticket_data.get("name", "")
ticket_description = ticket_data.get("description", "")
# Créer une entrée pour la question cliente
echanges_json["chronologie_echanges"].append({
"date": ticket_data.get("create_date", datetime.now().strftime("%d/%m/%Y %H:%M:%S")),
"emetteur": "CLIENT",
"type": "Question",
"contenu": f"{ticket_name}. {ticket_description.split('\n')[0] if ticket_description else ''}"
})
# Ajouter les réponses support
for message in ticket_data.get("messages", []):
author = message.get("author_id", "")
date = message.get("date", "")
content = message.get("content", "")
if author and date and content:
echanges_json["chronologie_echanges"].append({
"date": date,
"emetteur": "SUPPORT",
"type": "Réponse",
"contenu": content.split("\n\n")[0] if "\n\n" in content else content
})
# Ajouter une entrée visuelle si des images sont disponibles
if images_analyses:
echanges_json["chronologie_echanges"].append({
"date": datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"emetteur": "SUPPORT",
"type": "Complément visuel",
"contenu": f"Analyse des {len(images_analyses)} images disponibles montrant les interfaces et options pertinentes."
})
except Exception as e:
logger.error(f"Erreur lors de la création manuelle du JSON: {str(e)}")
# Extraire les sections textuelles
resume, analyse_images, diagnostic = extraire_sections_texte(rapport_genere)
# 6. CRÉATION DU RAPPORT JSON
agent_metadata = {
"model": getattr(self.llm, "modele", str(type(self.llm))),
"model_version": getattr(self.llm, "version", "non spécifiée"),
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"generation_time": generation_time,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"agents": agents_info,
"approach": "two_step" if self.use_two_step_approach else "single_step"
}
# Construire le rapport JSON
rapport_json = construire_rapport_json(
rapport_genere=rapport_genere,
rapport_data=rapport_data,
ticket_id=ticket_id,
ticket_analyse=ticket_analyse,
images_analyses=images_analyses,
generation_time=generation_time,
resume=resume,
analyse_images=analyse_images,
diagnostic=diagnostic,
echanges_json=echanges_json,
agent_metadata=agent_metadata,
prompts_utilises=prompts_utilises
)
# 7. SAUVEGARDE DU RAPPORT JSON
json_path = os.path.join(rapport_dir, f"{ticket_id}_rapport_final.json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(rapport_json, f, ensure_ascii=False, indent=2)
logger.info(f"Rapport JSON sauvegardé: {json_path}")
print(f" Rapport JSON sauvegardé: {json_path}")
# 8. GÉNÉRATION DU RAPPORT MARKDOWN
md_path = generer_rapport_markdown(json_path)
if md_path:
logger.info(f"Rapport Markdown généré: {md_path}")
print(f" Rapport Markdown généré: {md_path}")
else:
logger.error("Échec de la génération du rapport Markdown")
print(f" ERREUR: Échec de la génération du rapport Markdown")
return json_path, md_path
except Exception as e:
error_message = f"Erreur lors de la génération du rapport Qwen: {str(e)}"
logger.error(error_message)
logger.error(traceback.format_exc())
print(f" ERREUR: {error_message}")
return None, None
def _extraire_ticket_id(self, rapport_data: Dict, rapport_dir: str) -> str:
"""Extrait l'ID du ticket des données ou du chemin"""
# Essayer d'extraire depuis les données du rapport
ticket_id = rapport_data.get("ticket_id", "")
# Si pas d'ID direct, essayer depuis les données du ticket
if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict):
ticket_id = rapport_data["ticket_data"].get("code", "")
# En dernier recours, extraire depuis le chemin
if not ticket_id:
# Essayer d'extraire un ID de ticket (format Txxxx) du chemin
match = re.search(r'T\d+', rapport_dir)
if match:
ticket_id = match.group(0)
else:
# Sinon, utiliser le dernier segment du chemin
ticket_id = os.path.basename(rapport_dir)
return ticket_id
def _extraire_analyse_ticket(self, rapport_data: Dict) -> str:
"""Extrait l'analyse du ticket des données"""
# Essayer les différentes clés possibles
for key in ["ticket_analyse", "analyse_json", "analyse_ticket"]:
if key in rapport_data and rapport_data[key]:
logger.info(f"Utilisation de {key}")
return rapport_data[key]
# Créer une analyse par défaut si aucune n'est disponible
logger.warning("Aucune analyse de ticket disponible, création d'un message par défaut")
ticket_data = rapport_data.get("ticket_data", {})
ticket_name = ticket_data.get("name", "Sans titre")
ticket_desc = ticket_data.get("description", "Pas de description disponible")
return f"Analyse par défaut du ticket:\nNom: {ticket_name}\nDescription: {ticket_desc}\n(Aucune analyse détaillée n'a été fournie)"
def _extraire_analyses_images(self, rapport_data: Dict) -> List[Dict]:
"""
Extrait et formate les analyses d'images pertinentes
"""
images_analyses = []
analyse_images_data = rapport_data.get("analyse_images", {})
# Parcourir toutes les images
for image_path, analyse_data in analyse_images_data.items():
# Vérifier si l'image est pertinente
is_relevant = False
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
is_relevant = analyse_data["sorting"].get("is_relevant", False)
# Si l'image est pertinente, extraire son analyse
if is_relevant:
image_name = os.path.basename(image_path)
analyse = self._extraire_analyse_image(analyse_data)
if analyse:
images_analyses.append({
"image_name": image_name,
"image_path": image_path,
"analyse": analyse,
"sorting_info": analyse_data.get("sorting", {}),
"metadata": analyse_data.get("analysis", {}).get("metadata", {})
})
logger.info(f"Analyse de l'image {image_name} ajoutée")
return images_analyses
def _extraire_analyse_image(self, analyse_data: Dict) -> Optional[str]:
"""
Extrait l'analyse d'une image depuis les données
"""
# Si pas de données d'analyse, retourner None
if not "analysis" in analyse_data or not analyse_data["analysis"]:
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
reason = analyse_data["sorting"].get("reason", "Non spécifiée")
return f"Image marquée comme pertinente. Raison: {reason}"
return None
# Extraire l'analyse selon le format des données
analysis = analyse_data["analysis"]
# Structure type 1: {"analyse": "texte"}
if isinstance(analysis, dict) and "analyse" in analysis:
return analysis["analyse"]
# Structure type 2: {"error": false, ...} - contient d'autres données utiles
if isinstance(analysis, dict) and "error" in analysis and not analysis.get("error", True):
return str(analysis)
# Structure type 3: texte d'analyse direct
if isinstance(analysis, str):
return analysis
# Structure type 4: autre format de dictionnaire - convertir en JSON
if isinstance(analysis, dict):
return json.dumps(analysis, ensure_ascii=False, indent=2)
# Aucun format reconnu
return None