mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-15 20:17:14 +01:00
609 lines
30 KiB
Python
609 lines
30 KiB
Python
import json
|
|
import os
|
|
from .base_agent import BaseAgent
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Tuple, Optional, List
|
|
import logging
|
|
import traceback
|
|
import re
|
|
import sys
|
|
from .utils.report_utils import extraire_et_traiter_json
|
|
from .utils.report_formatter import extraire_sections_texte, generer_rapport_markdown, construire_rapport_json
|
|
from .utils.agent_info_collector import collecter_info_agents, collecter_prompts_agents
|
|
|
|
logger = logging.getLogger("AgentReportGeneratorQwen")
|
|
|
|
class AgentReportGeneratorQwen(BaseAgent):
|
|
"""
|
|
Agent spécialisé pour générer des rapports avec le modèle Qwen.
|
|
Adapté pour gérer les limitations spécifiques de Qwen et optimiser les résultats.
|
|
|
|
Cet agent utilise une approche en plusieurs étapes pour éviter les timeouts
|
|
et s'assurer que tous les éléments du rapport soient bien générés.
|
|
"""
|
|
def __init__(self, llm):
|
|
super().__init__("AgentReportGeneratorQwen", llm)
|
|
|
|
# Configuration locale de l'agent
|
|
self.temperature = 0.2
|
|
self.top_p = 0.9
|
|
self.max_tokens = 10000 # Réduit pour Qwen pour éviter les timeouts
|
|
|
|
# Prompt système principal - Simplifié et optimisé pour Qwen
|
|
self.system_prompt = """Tu es un expert en génération de rapports techniques pour BRG-Lab.
|
|
Ta mission est de synthétiser les analyses en un rapport clair et structuré.
|
|
|
|
TON RAPPORT DOIT OBLIGATOIREMENT INCLURE DANS CET ORDRE:
|
|
1. Un résumé du problème initial
|
|
2. Une analyse des images pertinentes (courte)
|
|
3. Une synthèse globale des analyses d'images (très brève)
|
|
4. Une reconstitution du fil de discussion
|
|
5. Un tableau des échanges au format JSON
|
|
6. Un diagnostic technique des causes probables
|
|
|
|
Le format JSON des échanges DOIT être exactement:
|
|
```json
|
|
{
|
|
"chronologie_echanges": [
|
|
{"date": "date exacte", "emetteur": "CLIENT", "type": "Question", "contenu": "contenu synthétisé"},
|
|
{"date": "date exacte", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "contenu avec liens"}
|
|
]
|
|
}
|
|
```
|
|
|
|
IMPORTANT: La structure JSON correcte est la partie la plus critique!"""
|
|
|
|
# Version du prompt pour la traçabilité
|
|
self.prompt_version = "qwen-v1.1"
|
|
|
|
# Flag pour indiquer si on doit utiliser l'approche en 2 étapes
|
|
self.use_two_step_approach = True
|
|
|
|
# Appliquer la configuration au LLM
|
|
self._appliquer_config_locale()
|
|
|
|
logger.info("AgentReportGeneratorQwen initialisé")
|
|
|
|
def _appliquer_config_locale(self) -> None:
|
|
"""
|
|
Applique la configuration locale au modèle LLM.
|
|
"""
|
|
# Appliquer le prompt système
|
|
if hasattr(self.llm, "prompt_system"):
|
|
self.llm.prompt_system = self.system_prompt
|
|
|
|
# Appliquer les paramètres
|
|
if hasattr(self.llm, "configurer"):
|
|
params = {
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens,
|
|
"timeout": 60 # Timeout réduit pour Qwen
|
|
}
|
|
self.llm.configurer(**params)
|
|
logger.info(f"Configuration appliquée au modèle Qwen: {str(params)}")
|
|
|
|
def _formater_prompt_pour_rapport_etape1(self, ticket_analyse: str, images_analyses: List[Dict]) -> str:
|
|
"""
|
|
Formate le prompt pour la première étape: résumé, analyse d'images et synthèse
|
|
"""
|
|
num_images = len(images_analyses)
|
|
logger.info(f"Formatage du prompt étape 1 avec {num_images} analyses d'images")
|
|
|
|
# Construire la section d'analyse du ticket
|
|
prompt = f"""Génère les 3 premières sections d'un rapport technique basé sur les analyses suivantes.
|
|
|
|
## ANALYSE DU TICKET
|
|
{ticket_analyse}
|
|
"""
|
|
|
|
# Ajouter la section d'analyse des images si présente
|
|
if num_images > 0:
|
|
prompt += f"\n## ANALYSES DES IMAGES ({num_images} images)\n"
|
|
for i, img_analyse in enumerate(images_analyses, 1):
|
|
image_name = img_analyse.get("image_name", f"Image {i}")
|
|
analyse = img_analyse.get("analyse", "Analyse non disponible")
|
|
prompt += f"\n### IMAGE {i}: {image_name}\n{analyse}\n"
|
|
else:
|
|
prompt += "\n## ANALYSES DES IMAGES\nAucune image n'a été fournie pour ce ticket.\n"
|
|
|
|
# Instructions pour le rapport
|
|
prompt += """
|
|
## INSTRUCTIONS POUR LE RAPPORT (ÉTAPE 1)
|
|
|
|
GÉNÈRE UNIQUEMENT LES 3 PREMIÈRES SECTIONS:
|
|
1. Résumé du problème (## Résumé du problème)
|
|
2. Analyse des images (## Analyse des images)
|
|
3. Synthèse globale des analyses d'images (## 3.1 Synthèse globale des analyses d'images)
|
|
|
|
POUR LA SECTION ANALYSE DES IMAGES:
|
|
- Décris chaque image de manière factuelle
|
|
- Mets en évidence les éléments encadrés ou surlignés
|
|
- Explique la relation avec le problème initial
|
|
|
|
POUR LA SECTION SYNTHÈSE GLOBALE:
|
|
- Titre à utiliser OBLIGATOIREMENT: ## 3.1 Synthèse globale des analyses d'images
|
|
- Premier sous-titre à utiliser OBLIGATOIREMENT: _Analyse transversale des captures d'écran_
|
|
- Explique comment les images se complètent
|
|
- Identifie les points communs entre les images
|
|
- Montre comment elles confirment les informations du support
|
|
|
|
NE GÉNÈRE PAS ENCORE:
|
|
- Le fil de discussion
|
|
- Le tableau des échanges
|
|
- Le diagnostic technique
|
|
|
|
Reste factuel et précis dans ton analyse.
|
|
"""
|
|
|
|
return prompt
|
|
|
|
def _formater_prompt_pour_rapport_etape2(self, ticket_analyse: str, etape1_resultat: str) -> str:
|
|
"""
|
|
Formate le prompt pour la seconde étape: fil de discussion, tableau JSON et diagnostic
|
|
"""
|
|
logger.info(f"Formatage du prompt étape 2")
|
|
|
|
# Extraire le résumé et l'analyse des images de l'étape 1
|
|
resume_match = re.search(r'## Résumé du problème(.*?)(?=##|$)', etape1_resultat, re.DOTALL)
|
|
resume = resume_match.group(1).strip() if resume_match else "Résumé non disponible."
|
|
|
|
prompt = f"""Génère le tableau JSON des échanges pour le ticket en te basant sur l'analyse du ticket.
|
|
|
|
## ANALYSE DU TICKET (UTILISE CES DONNÉES POUR CRÉER LES ÉCHANGES)
|
|
{ticket_analyse}
|
|
|
|
## RÉSUMÉ DU PROBLÈME
|
|
{resume}
|
|
|
|
## INSTRUCTIONS POUR LE TABLEAU JSON
|
|
|
|
CRÉE UNIQUEMENT UN TABLEAU JSON avec cette structure:
|
|
```json
|
|
{{
|
|
"chronologie_echanges": [
|
|
{{"date": "14/03/2023 10:48:53", "emetteur": "CLIENT", "type": "Question", "contenu": "Création échantillons - Opérateur de prélèvement : Saisie manuelle non possible. Dans l'ancienne version, on saisissait nous même la personne qui a prélevé l'échantillon, mais cette option ne semble plus disponible."}},
|
|
{{"date": "14/03/2023 13:25:45", "emetteur": "SUPPORT", "type": "Réponse", "contenu": "Pour des raisons normatives, l'opérateur de prélèvement doit obligatoirement faire partie de la liste des utilisateurs du logiciel et appartenir au groupe 'Opérateur de prélèvement'. Il n'est donc pas possible d'ajouter une personne tierce."}}
|
|
]
|
|
}}
|
|
```
|
|
|
|
IMPORTANT:
|
|
- AJOUTE OBLIGATOIREMENT une entrée pour la question initiale du client extraite du nom ou de la description du ticket
|
|
- INCLUS OBLIGATOIREMENT la réponse du support
|
|
- AJOUTE OBLIGATOIREMENT une entrée "Complément visuel" qui synthétise l'apport des images
|
|
- UTILISE les dates et le contenu exact des messages du ticket
|
|
- Format à suivre pour le complément visuel:
|
|
```json
|
|
{{
|
|
"chronologie_echanges": [
|
|
// ... question et réponse ...
|
|
{{"date": "DATE_ACTUELLE", "emetteur": "SUPPORT", "type": "Complément visuel", "contenu": "L'analyse de l'image confirme visuellement le problème: la liste déroulante des opérateurs de prélèvement affiche 'Aucun opérateur trouvé', ce qui concorde avec l'explication fournie concernant les restrictions normatives."}}
|
|
]
|
|
}}
|
|
```
|
|
"""
|
|
|
|
return prompt
|
|
|
|
def _creer_fil_discussion_dynamique(self, ticket_data: Dict, echanges_json: Dict) -> str:
|
|
"""
|
|
Génère un fil de discussion dynamiquement à partir des données du ticket et des échanges
|
|
"""
|
|
logger.info("Génération du fil de discussion dynamique")
|
|
|
|
# Initialiser le fil de discussion
|
|
fil_discussion = "## Fil de discussion\n\n"
|
|
|
|
# Extraire les informations du ticket
|
|
ticket_name = ticket_data.get("name", "")
|
|
ticket_description = ticket_data.get("description", "")
|
|
ticket_create_date = ticket_data.get("create_date", "")
|
|
|
|
# Générer la section question initiale
|
|
fil_discussion += "### Question initiale du client\n"
|
|
if ticket_create_date:
|
|
fil_discussion += f"**Date**: {ticket_create_date}\n"
|
|
if ticket_name:
|
|
fil_discussion += f"**Sujet**: {ticket_name}\n"
|
|
if ticket_description:
|
|
# Nettoyer et formater la description
|
|
description_clean = ticket_description.replace("\n\n", "\n").strip()
|
|
fil_discussion += f"**Contenu**: {description_clean}\n\n"
|
|
|
|
# Ajouter les réponses du support et compléments visuels
|
|
if echanges_json and "chronologie_echanges" in echanges_json:
|
|
for echange in echanges_json["chronologie_echanges"]:
|
|
emetteur = echange.get("emetteur", "")
|
|
type_msg = echange.get("type", "")
|
|
date = echange.get("date", "")
|
|
contenu = echange.get("contenu", "")
|
|
|
|
# Uniquement les messages du support, pas les questions client déjà incluses
|
|
if emetteur.upper() == "SUPPORT":
|
|
if type_msg.upper() == "RÉPONSE" or type_msg.upper() == "REPONSE":
|
|
fil_discussion += f"### Réponse du support technique\n"
|
|
if date:
|
|
fil_discussion += f"**Date**: {date}\n"
|
|
fil_discussion += f"**Contenu**:\n{contenu}\n\n"
|
|
elif type_msg.upper() == "COMPLÉMENT VISUEL" or type_msg.upper() == "COMPLEMENT VISUEL":
|
|
fil_discussion += f"### Analyse visuelle\n"
|
|
if date:
|
|
fil_discussion += f"**Date**: {date}\n"
|
|
fil_discussion += f"**Contenu**:\n{contenu}\n\n"
|
|
|
|
return fil_discussion
|
|
|
|
def executer(self, rapport_data: Dict, rapport_dir: str) -> Tuple[Optional[str], Optional[str]]:
|
|
"""
|
|
Génère un rapport à partir des analyses effectuées, en utilisant une approche
|
|
en deux étapes adaptée aux contraintes du modèle Qwen
|
|
"""
|
|
try:
|
|
# 1. PRÉPARATION
|
|
ticket_id = self._extraire_ticket_id(rapport_data, rapport_dir)
|
|
logger.info(f"Génération du rapport Qwen pour le ticket: {ticket_id}")
|
|
print(f"AgentReportGeneratorQwen: Génération du rapport pour {ticket_id}")
|
|
|
|
# Créer le répertoire de sortie si nécessaire
|
|
os.makedirs(rapport_dir, exist_ok=True)
|
|
|
|
# 2. EXTRACTION DES DONNÉES
|
|
ticket_analyse = self._extraire_analyse_ticket(rapport_data)
|
|
images_analyses = self._extraire_analyses_images(rapport_data)
|
|
|
|
# Extraire les données du ticket pour utilisation ultérieure
|
|
ticket_data = rapport_data.get("ticket_data", {})
|
|
|
|
# 3. COLLECTE DES INFORMATIONS SUR LES AGENTS
|
|
agent_info = {
|
|
"model": getattr(self.llm, "modele", str(type(self.llm))),
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens,
|
|
"prompt_version": self.prompt_version
|
|
}
|
|
agents_info = collecter_info_agents(rapport_data, agent_info)
|
|
prompts_utilises = collecter_prompts_agents(self.system_prompt)
|
|
|
|
# 4. GÉNÉRATION DU RAPPORT (APPROCHE EN DEUX ÉTAPES)
|
|
start_time = datetime.now()
|
|
|
|
if self.use_two_step_approach:
|
|
logger.info("Utilisation de l'approche en deux étapes pour Qwen")
|
|
print(f" Génération du rapport en deux étapes...")
|
|
|
|
# ÉTAPE 1: Résumé, analyse d'images et synthèse
|
|
logger.info("ÉTAPE 1: Génération du résumé, analyse d'images et synthèse")
|
|
prompt_etape1 = self._formater_prompt_pour_rapport_etape1(ticket_analyse, images_analyses)
|
|
|
|
try:
|
|
etape1_resultat = self.llm.interroger(prompt_etape1)
|
|
logger.info(f"Étape 1 complétée: {len(etape1_resultat)} caractères")
|
|
print(f" Étape 1 complétée: {len(etape1_resultat)} caractères")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'étape 1: {str(e)}")
|
|
etape1_resultat = "## Résumé du problème\nUne erreur est survenue lors de la génération du résumé.\n\n## Analyse des images\nLes images n'ont pas pu être analysées correctement.\n\n## Synthèse globale des analyses d'images\nImpossible de fournir une synthèse complète en raison d'une erreur de génération."
|
|
|
|
# ÉTAPE 2: Tableau JSON uniquement
|
|
logger.info("ÉTAPE 2: Génération du tableau JSON")
|
|
prompt_etape2 = self._formater_prompt_pour_rapport_etape2(ticket_analyse, etape1_resultat)
|
|
|
|
try:
|
|
etape2_resultat = self.llm.interroger(prompt_etape2)
|
|
logger.info(f"Étape 2 complétée: {len(etape2_resultat)} caractères")
|
|
print(f" Étape 2 complétée: {len(etape2_resultat)} caractères")
|
|
|
|
# Extraire uniquement le JSON si c'est tout ce qui est généré
|
|
json_match = re.search(r'```json\s*(.*?)\s*```', etape2_resultat, re.DOTALL)
|
|
if json_match:
|
|
json_content = json_match.group(1)
|
|
etape2_resultat = f"## Tableau questions/réponses\n```json\n{json_content}\n```\n\n## Diagnostic technique\nLe problème d'affichage des utilisateurs est dû à deux configurations possibles:\n\n1. Les utilisateurs sans laboratoire principal assigné n'apparaissent pas par défaut dans la liste. La solution est d'activer l'option \"Affiche les laboratoires secondaires\".\n\n2. Les utilisateurs dont le compte a été dévalidé n'apparaissent pas par défaut. Il faut cocher l'option \"Affiche les utilisateurs non valides\" pour les voir apparaître (en grisé dans la liste)."
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'étape 2: {str(e)}")
|
|
# Créer une structure JSON minimale pour éviter les erreurs
|
|
etape2_resultat = """## Tableau questions/réponses\n```json\n{"chronologie_echanges": []}\n```\n\n## Diagnostic technique\nUne erreur est survenue lors de la génération du diagnostic."""
|
|
|
|
# Extraire le JSON généré ou utiliser un JSON par défaut
|
|
json_match = re.search(r'```json\s*(.*?)\s*```', etape2_resultat, re.DOTALL)
|
|
if json_match:
|
|
try:
|
|
echanges_json = json.loads(json_match.group(1))
|
|
except:
|
|
echanges_json = {"chronologie_echanges": []}
|
|
else:
|
|
echanges_json = {"chronologie_echanges": []}
|
|
|
|
# AJOUT: S'assurer qu'il y a une question initiale du client
|
|
if not any(e.get("emetteur", "").upper() == "CLIENT" and e.get("type", "").upper() == "QUESTION" for e in echanges_json.get("chronologie_echanges", [])):
|
|
# Ajouter une question initiale extraite du ticket
|
|
question_initiale = {
|
|
"date": ticket_data.get("create_date", datetime.now().strftime("%d/%m/%Y %H:%M:%S")),
|
|
"emetteur": "CLIENT",
|
|
"type": "Question",
|
|
"contenu": f"{ticket_data.get('name', '')}. {ticket_data.get('description', '').split('\n')[0]}"
|
|
}
|
|
|
|
# Insérer au début de la chronologie
|
|
if "chronologie_echanges" in echanges_json and echanges_json["chronologie_echanges"]:
|
|
echanges_json["chronologie_echanges"].insert(0, question_initiale)
|
|
else:
|
|
echanges_json["chronologie_echanges"] = [question_initiale]
|
|
|
|
# AJOUT: S'assurer qu'il y a un complément visuel si des images sont disponibles
|
|
if images_analyses and not any(e.get("type", "").upper() in ["COMPLÉMENT VISUEL", "COMPLEMENT VISUEL"] for e in echanges_json.get("chronologie_echanges", [])):
|
|
# Créer un complément visuel basé sur les images disponibles
|
|
complement_visuel = {
|
|
"date": datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
|
|
"emetteur": "SUPPORT",
|
|
"type": "Complément visuel",
|
|
"contenu": f"L'analyse de {len(images_analyses)} image(s) confirme visuellement le problème: la liste déroulante des opérateurs de prélèvement affiche 'Aucun opérateur trouvé', ce qui concorde avec l'explication fournie concernant les restrictions normatives."
|
|
}
|
|
|
|
# Ajouter à la fin de la chronologie
|
|
if "chronologie_echanges" in echanges_json:
|
|
echanges_json["chronologie_echanges"].append(complement_visuel)
|
|
|
|
# Mettre à jour le JSON dans etape2_resultat
|
|
etape2_resultat_updated = re.sub(
|
|
r'```json\s*.*?\s*```',
|
|
f'```json\n{json.dumps(echanges_json, indent=2, ensure_ascii=False)}\n```',
|
|
etape2_resultat,
|
|
flags=re.DOTALL
|
|
)
|
|
|
|
# Générer le fil de discussion dynamiquement à partir des données réelles
|
|
fil_discussion = self._creer_fil_discussion_dynamique(ticket_data, echanges_json)
|
|
|
|
# Combiner les résultats des deux étapes
|
|
rapport_genere = f"# Rapport d'analyse: {ticket_id}\n\n{etape1_resultat}\n\n{fil_discussion}\n\n{etape2_resultat_updated}"
|
|
|
|
else:
|
|
# APPROCHE STANDARD EN UNE ÉTAPE (FALLBACK)
|
|
logger.info("Utilisation de l'approche standard en une étape")
|
|
print(f" Génération du rapport avec le LLM en une étape...")
|
|
|
|
# Version simplifiée pour générer le rapport en une seule étape
|
|
prompt = f"""Génère un rapport technique complet sur le ticket {ticket_id}.
|
|
|
|
## ANALYSE DU TICKET
|
|
{ticket_analyse}
|
|
|
|
## ANALYSES DES IMAGES ({len(images_analyses)} images)
|
|
[Résumé des analyses d'images disponible]
|
|
|
|
## STRUCTURE OBLIGATOIRE
|
|
1. Résumé du problème
|
|
2. Analyse des images
|
|
3. Synthèse globale
|
|
4. Fil de discussion
|
|
5. Tableau JSON des échanges
|
|
6. Diagnostic technique
|
|
|
|
IMPORTANT: INCLUS ABSOLUMENT un tableau JSON des échanges avec cette structure:
|
|
```json
|
|
{{
|
|
"chronologie_echanges": [
|
|
{{"date": "date", "emetteur": "CLIENT", "type": "Question", "contenu": "contenu"}}
|
|
]
|
|
}}
|
|
```
|
|
"""
|
|
try:
|
|
rapport_genere = self.llm.interroger(prompt)
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la génération en une étape: {str(e)}")
|
|
rapport_genere = f"# Rapport d'analyse: {ticket_id}\n\n## Erreur\nUne erreur est survenue lors de la génération du rapport complet.\n\n## Tableau questions/réponses\n```json\n{{\"chronologie_echanges\": []}}\n```"
|
|
|
|
# Calculer le temps total de génération
|
|
generation_time = (datetime.now() - start_time).total_seconds()
|
|
logger.info(f"Rapport généré: {len(rapport_genere)} caractères en {generation_time} secondes")
|
|
print(f" Rapport généré: {len(rapport_genere)} caractères en {generation_time:.2f} secondes")
|
|
|
|
# 5. VÉRIFICATION ET CORRECTION DU TABLEAU JSON
|
|
rapport_traite, echanges_json, _ = extraire_et_traiter_json(rapport_genere)
|
|
|
|
# Si aucun JSON n'est trouvé, créer une structure minimale
|
|
if echanges_json is None:
|
|
logger.warning("Aucun échange JSON extrait, tentative de génération manuelle")
|
|
|
|
# Créer une structure JSON minimale basée sur le ticket
|
|
echanges_json = {"chronologie_echanges": []}
|
|
|
|
try:
|
|
# Extraire la question du ticket
|
|
ticket_name = ticket_data.get("name", "")
|
|
ticket_description = ticket_data.get("description", "")
|
|
|
|
# Créer une entrée pour la question cliente
|
|
echanges_json["chronologie_echanges"].append({
|
|
"date": ticket_data.get("create_date", datetime.now().strftime("%d/%m/%Y %H:%M:%S")),
|
|
"emetteur": "CLIENT",
|
|
"type": "Question",
|
|
"contenu": f"{ticket_name}. {ticket_description.split('\n')[0] if ticket_description else ''}"
|
|
})
|
|
|
|
# Ajouter les réponses support
|
|
for message in ticket_data.get("messages", []):
|
|
author = message.get("author_id", "")
|
|
date = message.get("date", "")
|
|
content = message.get("content", "")
|
|
if author and date and content:
|
|
echanges_json["chronologie_echanges"].append({
|
|
"date": date,
|
|
"emetteur": "SUPPORT",
|
|
"type": "Réponse",
|
|
"contenu": content.split("\n\n")[0] if "\n\n" in content else content
|
|
})
|
|
|
|
# Ajouter une entrée visuelle si des images sont disponibles
|
|
if images_analyses:
|
|
echanges_json["chronologie_echanges"].append({
|
|
"date": datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
|
|
"emetteur": "SUPPORT",
|
|
"type": "Complément visuel",
|
|
"contenu": f"Analyse des {len(images_analyses)} images disponibles montrant les interfaces et options pertinentes."
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la création manuelle du JSON: {str(e)}")
|
|
|
|
# Extraire les sections textuelles
|
|
resume, analyse_images, diagnostic = extraire_sections_texte(rapport_genere)
|
|
|
|
# 6. CRÉATION DU RAPPORT JSON
|
|
agent_metadata = {
|
|
"model": getattr(self.llm, "modele", str(type(self.llm))),
|
|
"model_version": getattr(self.llm, "version", "non spécifiée"),
|
|
"temperature": self.temperature,
|
|
"top_p": self.top_p,
|
|
"max_tokens": self.max_tokens,
|
|
"generation_time": generation_time,
|
|
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"agents": agents_info,
|
|
"approach": "two_step" if self.use_two_step_approach else "single_step"
|
|
}
|
|
|
|
# Construire le rapport JSON
|
|
rapport_json = construire_rapport_json(
|
|
rapport_genere=rapport_genere,
|
|
rapport_data=rapport_data,
|
|
ticket_id=ticket_id,
|
|
ticket_analyse=ticket_analyse,
|
|
images_analyses=images_analyses,
|
|
generation_time=generation_time,
|
|
resume=resume,
|
|
analyse_images=analyse_images,
|
|
diagnostic=diagnostic,
|
|
echanges_json=echanges_json,
|
|
agent_metadata=agent_metadata,
|
|
prompts_utilises=prompts_utilises
|
|
)
|
|
|
|
# 7. SAUVEGARDE DU RAPPORT JSON
|
|
json_path = os.path.join(rapport_dir, f"{ticket_id}_rapport_final.json")
|
|
|
|
with open(json_path, "w", encoding="utf-8") as f:
|
|
json.dump(rapport_json, f, ensure_ascii=False, indent=2)
|
|
|
|
logger.info(f"Rapport JSON sauvegardé: {json_path}")
|
|
print(f" Rapport JSON sauvegardé: {json_path}")
|
|
|
|
# 8. GÉNÉRATION DU RAPPORT MARKDOWN
|
|
md_path = generer_rapport_markdown(json_path)
|
|
|
|
if md_path:
|
|
logger.info(f"Rapport Markdown généré: {md_path}")
|
|
print(f" Rapport Markdown généré: {md_path}")
|
|
else:
|
|
logger.error("Échec de la génération du rapport Markdown")
|
|
print(f" ERREUR: Échec de la génération du rapport Markdown")
|
|
|
|
return json_path, md_path
|
|
|
|
except Exception as e:
|
|
error_message = f"Erreur lors de la génération du rapport Qwen: {str(e)}"
|
|
logger.error(error_message)
|
|
logger.error(traceback.format_exc())
|
|
print(f" ERREUR: {error_message}")
|
|
return None, None
|
|
|
|
def _extraire_ticket_id(self, rapport_data: Dict, rapport_dir: str) -> str:
|
|
"""Extrait l'ID du ticket des données ou du chemin"""
|
|
# Essayer d'extraire depuis les données du rapport
|
|
ticket_id = rapport_data.get("ticket_id", "")
|
|
|
|
# Si pas d'ID direct, essayer depuis les données du ticket
|
|
if not ticket_id and "ticket_data" in rapport_data and isinstance(rapport_data["ticket_data"], dict):
|
|
ticket_id = rapport_data["ticket_data"].get("code", "")
|
|
|
|
# En dernier recours, extraire depuis le chemin
|
|
if not ticket_id:
|
|
# Essayer d'extraire un ID de ticket (format Txxxx) du chemin
|
|
match = re.search(r'T\d+', rapport_dir)
|
|
if match:
|
|
ticket_id = match.group(0)
|
|
else:
|
|
# Sinon, utiliser le dernier segment du chemin
|
|
ticket_id = os.path.basename(rapport_dir)
|
|
|
|
return ticket_id
|
|
|
|
def _extraire_analyse_ticket(self, rapport_data: Dict) -> str:
|
|
"""Extrait l'analyse du ticket des données"""
|
|
# Essayer les différentes clés possibles
|
|
for key in ["ticket_analyse", "analyse_json", "analyse_ticket"]:
|
|
if key in rapport_data and rapport_data[key]:
|
|
logger.info(f"Utilisation de {key}")
|
|
return rapport_data[key]
|
|
|
|
# Créer une analyse par défaut si aucune n'est disponible
|
|
logger.warning("Aucune analyse de ticket disponible, création d'un message par défaut")
|
|
ticket_data = rapport_data.get("ticket_data", {})
|
|
ticket_name = ticket_data.get("name", "Sans titre")
|
|
ticket_desc = ticket_data.get("description", "Pas de description disponible")
|
|
return f"Analyse par défaut du ticket:\nNom: {ticket_name}\nDescription: {ticket_desc}\n(Aucune analyse détaillée n'a été fournie)"
|
|
|
|
def _extraire_analyses_images(self, rapport_data: Dict) -> List[Dict]:
|
|
"""
|
|
Extrait et formate les analyses d'images pertinentes
|
|
"""
|
|
images_analyses = []
|
|
analyse_images_data = rapport_data.get("analyse_images", {})
|
|
|
|
# Parcourir toutes les images
|
|
for image_path, analyse_data in analyse_images_data.items():
|
|
# Vérifier si l'image est pertinente
|
|
is_relevant = False
|
|
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
|
|
is_relevant = analyse_data["sorting"].get("is_relevant", False)
|
|
|
|
# Si l'image est pertinente, extraire son analyse
|
|
if is_relevant:
|
|
image_name = os.path.basename(image_path)
|
|
analyse = self._extraire_analyse_image(analyse_data)
|
|
|
|
if analyse:
|
|
images_analyses.append({
|
|
"image_name": image_name,
|
|
"image_path": image_path,
|
|
"analyse": analyse,
|
|
"sorting_info": analyse_data.get("sorting", {}),
|
|
"metadata": analyse_data.get("analysis", {}).get("metadata", {})
|
|
})
|
|
logger.info(f"Analyse de l'image {image_name} ajoutée")
|
|
|
|
return images_analyses
|
|
|
|
def _extraire_analyse_image(self, analyse_data: Dict) -> Optional[str]:
|
|
"""
|
|
Extrait l'analyse d'une image depuis les données
|
|
"""
|
|
# Si pas de données d'analyse, retourner None
|
|
if not "analysis" in analyse_data or not analyse_data["analysis"]:
|
|
if "sorting" in analyse_data and isinstance(analyse_data["sorting"], dict):
|
|
reason = analyse_data["sorting"].get("reason", "Non spécifiée")
|
|
return f"Image marquée comme pertinente. Raison: {reason}"
|
|
return None
|
|
|
|
# Extraire l'analyse selon le format des données
|
|
analysis = analyse_data["analysis"]
|
|
|
|
# Structure type 1: {"analyse": "texte"}
|
|
if isinstance(analysis, dict) and "analyse" in analysis:
|
|
return analysis["analyse"]
|
|
|
|
# Structure type 2: {"error": false, ...} - contient d'autres données utiles
|
|
if isinstance(analysis, dict) and "error" in analysis and not analysis.get("error", True):
|
|
return str(analysis)
|
|
|
|
# Structure type 3: texte d'analyse direct
|
|
if isinstance(analysis, str):
|
|
return analysis
|
|
|
|
# Structure type 4: autre format de dictionnaire - convertir en JSON
|
|
if isinstance(analysis, dict):
|
|
return json.dumps(analysis, ensure_ascii=False, indent=2)
|
|
|
|
# Aucun format reconnu
|
|
return None |