#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script pour extraire le tableau de questions/réponses du rapport final et le convertir en CSV. """ import json import csv import os import sys import re import logging from typing import List, Dict, Any, Optional, Tuple from datetime import datetime # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('ReportCSVExporter') class ReportCSVExporter: """ Classe responsable de l'exportation des rapports au format CSV. Permet de générer des fichiers CSV à partir des données JSON des rapports. """ @staticmethod def generate_csv_from_json(json_file, model_name=None, output_dir=None): """ Génère un fichier CSV à partir des données du tableau questions/réponses contenues dans le fichier JSON du rapport. Args: json_file (str): Chemin du fichier JSON contenant les données model_name (str, optional): Nom du modèle à inclure dans le nom du fichier output_dir (str, optional): Répertoire de sortie personnalisé Returns: str: Chemin du fichier CSV généré """ try: # Vérifier que le fichier JSON existe if not os.path.exists(json_file): logger.error(f"Le fichier JSON n'existe pas: {json_file}") return None # Extraire l'ID du ticket du nom du fichier filename = os.path.basename(json_file) ticket_id = filename.split('_')[0] if '_' in filename else 'unknown' # Définir le répertoire de sortie if output_dir: csv_dir = output_dir else: # Créer le répertoire CSV à la racine du projet project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) csv_root_dir = os.path.join(project_root, 'CSV') csv_dir = os.path.join(csv_root_dir, ticket_id) # Créer les répertoires si nécessaire os.makedirs(csv_dir, exist_ok=True) # Charger les données JSON with open(json_file, 'r', encoding='utf-8') as f: data = json.load(f) # Utiliser le modèle depuis les métadonnées si non spécifié if not model_name: # Chercher le nom du modèle dans les métadonnées (comme dans pipeline_logger.py) model_name = "unknown" if isinstance(data, dict): # Priorité 1: Vérifier model_info.model (structure principale) if "metadata" in data and "model_info" in data["metadata"] and "model" in data["metadata"]["model_info"]: model_name = data["metadata"]["model_info"]["model"] # Priorité 2: Vérifier metadata.model (structure alternative) elif "metadata" in data and "model" in data["metadata"]: model_name = data["metadata"]["model"] # S'assurer que model_name n'est pas None avant d'appeler lower() if model_name: # Nettoyer le nom du modèle pour éviter les caractères problématiques dans le nom de fichier model_name = model_name.lower().replace("/", "_").replace(":", "_").replace("\\", "_").replace(" ", "_") else: model_name = "unknown" # Ajouter un timestamp pour éviter les écrasements timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") csv_filename = f"{ticket_id}_{model_name}_{timestamp}.csv" csv_file = os.path.join(csv_dir, csv_filename) # Extraire les échanges exchanges = data.get('chronologie_echanges', []) if not exchanges: logger.warning(f"Aucun échange trouvé dans {json_file}") return None # Créer et écrire dans le fichier CSV with open(csv_file, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # Écrire l'en-tête avec métadonnées du rapport writer.writerow(['Question', 'Réponse']) current_question = None current_answers = [] # Parcourir les échanges pour les combiner en paires questions/réponses for exchange in exchanges: emetteur = exchange.get('emetteur', '').upper() type_msg = exchange.get('type', '').lower() contenu = exchange.get('contenu', '') # Si c'est une question client if emetteur == 'CLIENT' and (type_msg == 'question' or '?' in contenu): # Si une question précédente existe, l'écrire avec ses réponses if current_question: combined_answer = "\n".join(current_answers) if current_answers else "Pas de réponse" writer.writerow([current_question, combined_answer]) # Réinitialiser pour la nouvelle question current_question = contenu current_answers = [] # Si c'est une réponse ou un complément du support elif emetteur == 'SUPPORT' and (type_msg == 'réponse' or type_msg == 'complément visuel' or type_msg == 'information technique'): if current_question: # S'assurer qu'il y a une question en cours current_answers.append(contenu) # Écrire la dernière question et ses réponses if current_question: combined_answer = "\n".join(current_answers) if current_answers else "Pas de réponse" writer.writerow([current_question, combined_answer]) logger.info(f"Fichier CSV créé: {csv_file}") return csv_file except Exception as e: logger.error(f"Erreur lors de la génération du CSV: {str(e)}") return None @staticmethod def process_batch(json_directory, output_directory=None): """ Traite tous les fichiers JSON de rapport dans un répertoire. Args: json_directory (str): Répertoire contenant les fichiers JSON à traiter output_directory (str, optional): Répertoire de sortie personnalisé Returns: list: Liste des chemins des fichiers CSV générés """ csv_files = [] if not os.path.exists(json_directory): logger.error(f"Le répertoire n'existe pas: {json_directory}") return csv_files # Parcourir tous les fichiers JSON dans le répertoire for filename in os.listdir(json_directory): if filename.endswith('.json') and 'rapport_final' in filename: json_file = os.path.join(json_directory, filename) csv_file = ReportCSVExporter.generate_csv_from_json( json_file=json_file, output_dir=output_directory ) if csv_file: csv_files.append(csv_file) if not csv_files: logger.warning(f"Aucun fichier CSV généré à partir de {json_directory}") else: logger.info(f"{len(csv_files)} fichiers CSV générés avec succès") return csv_files def extraire_tableau_markdown(texte: str) -> List[List[str]]: """ Extrait un tableau au format Markdown du texte. Args: texte: Texte contenant un tableau Markdown Returns: Liste de lignes du tableau, chaque ligne étant une liste de cellules """ lignes = [] # Chercher le début du tableau pattern_table = r"\|\s*ÉMETTEUR\s*\|\s*TYPE\s*\|\s*DATE\s*\|\s*CONTENU\s*\|\s*ÉLÉMENTS VISUELS\s*\|" if not re.search(pattern_table, texte, re.IGNORECASE): # Si l'en-tête exacte n'est pas trouvée, chercher un format plus générique pattern_table = r"\|\s*\w+\s*\|\s*\w+\s*\|\s*\w+\s*\|\s*\w+\s*\|\s*[\w\s]+\s*\|" if not re.search(pattern_table, texte, re.IGNORECASE): logger.warning("Aucun tableau trouvé dans le texte") return [] # Extraire toutes les lignes qui commencent et se terminent par | tableau_lines = re.findall(r"^\|.*\|$", texte, re.MULTILINE) if len(tableau_lines) < 3: # En-tête + séparateur + au moins une ligne de données logger.warning(f"Tableau incomplet: seulement {len(tableau_lines)} lignes trouvées") return [] # Ignorer la ligne de séparation (2ème ligne) qui contient juste des | et des - en_tete = tableau_lines[0] donnees = tableau_lines[2:] # Commencer à la 3ème ligne # Extraire les en-têtes en_tetes = [h.strip() for h in en_tete.split('|')[1:-1]] lignes.append(en_tetes) # Extraire les données for ligne in donnees: # Split sur | mais ignorer les | dans le contenu entre guillemets ou parenthèses cellules = [] cellule_actuelle = "" entre_parentheses = 0 entre_guillemets = False for char in ligne[1:-1]: # Ignorer les | aux extrémités if char == '|' and entre_parentheses == 0 and not entre_guillemets: cellules.append(cellule_actuelle.strip()) cellule_actuelle = "" else: cellule_actuelle += char if char == '(': entre_parentheses += 1 elif char == ')': entre_parentheses = max(0, entre_parentheses - 1) elif char == '"': entre_guillemets = not entre_guillemets if cellule_actuelle: cellules.append(cellule_actuelle.strip()) # S'assurer que nous avons le bon nombre de cellules if len(cellules) != len(en_tetes): # Ajuster au besoin if len(cellules) < len(en_tetes): cellules.extend([""] * (len(en_tetes) - len(cellules))) else: cellules = cellules[:len(en_tetes)] lignes.append(cellules) return lignes def extraire_tableau_du_rapport(json_path: str) -> List[List[str]]: """ Extrait le tableau du rapport JSON. Args: json_path: Chemin du fichier JSON contenant le rapport Returns: Tableau extrait sous forme de liste de lignes """ try: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) # Vérifier si c'est une liste (comme dans certains formats de rapport) if isinstance(data, list): # Chercher l'élément qui contient la réponse for item in data: if isinstance(item, dict) and "response" in item: texte = item["response"] return extraire_tableau_markdown(texte) # Si c'est un dictionnaire simple elif isinstance(data, dict) and "response" in data: texte = data["response"] return extraire_tableau_markdown(texte) logger.warning(f"Format de données non reconnu dans {json_path}") return [] except Exception as e: logger.error(f"Erreur lors de l'extraction du tableau: {e}") return [] def generer_csv_depuis_rapport(json_path: str, output_dir: Optional[str] = None) -> Optional[str]: """ Génère un fichier CSV à partir du tableau extrait du rapport. Args: json_path: Chemin du fichier JSON contenant le rapport output_dir: Répertoire de sortie pour le fichier CSV (optionnel) Returns: Chemin du fichier CSV généré """ # Extraire l'ID du ticket du chemin du fichier basename = os.path.basename(json_path) ticket_id = None # Essayer d'extraire l'ID du ticket du nom de fichier match = re.search(r'(T\d+)', basename) if match: ticket_id = match.group(1) else: # Essayer d'extraire à partir du chemin match = re.search(r'ticket_(T\d+)', json_path) if match: ticket_id = match.group(1) if not ticket_id: logger.warning(f"Impossible d'extraire l'ID du ticket de {json_path}") ticket_id = "unknown" # Déterminer le répertoire de sortie if not output_dir: # Utiliser le répertoire CSV à la racine du projet project_root = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) output_dir = os.path.join(project_root, "CSV", ticket_id) os.makedirs(output_dir, exist_ok=True) # Extraire le nom du modèle directement à partir du contenu JSON model_name = "unknown" try: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) # Chercher le nom du modèle dans les métadonnées (comme dans pipeline_logger.py) if isinstance(data, dict): # Priorité 1: Vérifier model_info.model (structure principale) if "metadata" in data and "model_info" in data["metadata"] and "model" in data["metadata"]["model_info"]: model_name = data["metadata"]["model_info"]["model"] # Priorité 2: Vérifier metadata.model (structure alternative) elif "metadata" in data and "model" in data["metadata"]: model_name = data["metadata"]["model"] # Nettoyer le nom du modèle pour éviter les problèmes de caractères spéciaux model_name = model_name.lower().replace("/", "_").replace(":", "_").replace("\\", "_").replace(" ", "_") except Exception as e: logger.warning(f"Erreur lors de l'extraction du nom du modèle: {e}") # Créer le chemin du fichier CSV csv_path = os.path.join(output_dir, f"{ticket_id}_{model_name}_exchanges.csv") # Extraire le tableau du rapport tableau = extraire_tableau_du_rapport(json_path) if not tableau: logger.warning(f"Aucun tableau à exporter depuis {json_path}") return None # Écrire le CSV try: with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerows(tableau) logger.info(f"Fichier CSV généré: {csv_path}") return csv_path except Exception as e: logger.error(f"Erreur lors de la génération du CSV: {e}") return None def extraire_questions_reponses(tableau: List[List[str]]) -> List[Dict[str, str]]: """ Extrait les questions et réponses du tableau en paires Q/R. Args: tableau: Tableau extrait du rapport Returns: Liste de dictionnaires {question, réponse, contexte} """ if not tableau or len(tableau) < 2: return [] # Récupérer les indices des colonnes header = tableau[0] try: idx_emetteur = header.index("ÉMETTEUR") idx_type = header.index("TYPE") idx_contenu = header.index("CONTENU") idx_elements = header.index("ÉLÉMENTS VISUELS") except ValueError: # Si les en-têtes exacts ne sont pas trouvés, essayer des correspondances partielles idx_emetteur = next((i for i, h in enumerate(header) if "METTEUR" in h.upper()), 0) idx_type = next((i for i, h in enumerate(header) if "TYPE" in h.upper()), 1) idx_contenu = next((i for i, h in enumerate(header) if "CONTENU" in h.upper()), 3) idx_elements = next((i for i, h in enumerate(header) if "VISUEL" in h.upper()), 4) resultat = [] question_courante = {} # Parcourir les lignes de données for ligne in tableau[1:]: if len(ligne) <= max(idx_emetteur, idx_type, idx_contenu, idx_elements): continue emetteur = ligne[idx_emetteur].upper() type_msg = ligne[idx_type].lower() if ligne[idx_type] else "" contenu = ligne[idx_contenu] elements = ligne[idx_elements] # Si c'est une question du client if emetteur == "CLIENT" and (type_msg == "question" or "?" in contenu): # Si on a déjà une question en cours avec une réponse, l'ajouter aux résultats if question_courante and "question" in question_courante and "réponse" in question_courante: resultat.append(question_courante) # Créer une nouvelle question question_courante = { "question": contenu, "contexte_question": elements } # Si c'est une réponse du support et qu'on a une question en cours elif emetteur == "SUPPORT" and "question" in question_courante: question_courante["réponse"] = contenu question_courante["contexte_réponse"] = elements # Ajouter la dernière paire Q/R si elle est complète if question_courante and "question" in question_courante and "réponse" in question_courante: resultat.append(question_courante) return resultat def generer_csv_qr(json_path: str, output_dir: Optional[str] = None) -> Optional[str]: """ Génère un fichier CSV de questions/réponses à partir du tableau extrait du rapport. Args: json_path: Chemin du fichier JSON contenant le rapport output_dir: Répertoire de sortie pour le fichier CSV (optionnel) Returns: Chemin du fichier CSV généré """ # Extraire l'ID du ticket comme précédemment basename = os.path.basename(json_path) ticket_id = None match = re.search(r'(T\d+)', basename) if match: ticket_id = match.group(1) else: match = re.search(r'ticket_(T\d+)', json_path) if match: ticket_id = match.group(1) if not ticket_id: logger.warning(f"Impossible d'extraire l'ID du ticket de {json_path}") ticket_id = "unknown" # Déterminer le répertoire de sortie if not output_dir: project_root = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) output_dir = os.path.join(project_root, "CSV", ticket_id) os.makedirs(output_dir, exist_ok=True) # Extraire le nom du modèle directement à partir du contenu JSON model_name = "unknown" try: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) # Chercher le nom du modèle dans les métadonnées (comme dans pipeline_logger.py) if isinstance(data, dict): # Priorité 1: Vérifier model_info.model (structure principale) if "metadata" in data and "model_info" in data["metadata"] and "model" in data["metadata"]["model_info"]: model_name = data["metadata"]["model_info"]["model"] # Priorité 2: Vérifier metadata.model (structure alternative) elif "metadata" in data and "model" in data["metadata"]: model_name = data["metadata"]["model"] # Nettoyer le nom du modèle pour éviter les problèmes de caractères spéciaux model_name = model_name.lower().replace("/", "_").replace(":", "_").replace("\\", "_").replace(" ", "_") except Exception as e: logger.warning(f"Erreur lors de l'extraction du nom du modèle: {e}") # Créer le chemin du fichier CSV csv_path = os.path.join(output_dir, f"{ticket_id}_{model_name}_qa.csv") # Extraire le tableau du rapport tableau = extraire_tableau_du_rapport(json_path) if not tableau: logger.warning(f"Aucun tableau à exporter depuis {json_path}") return None # Extraire les paires Q/R paires_qr = extraire_questions_reponses(tableau) if not paires_qr: logger.warning(f"Aucune paire question/réponse extraite de {json_path}") return None # Écrire le CSV try: with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # Écrire l'en-tête writer.writerow(["Question", "Réponse", "Contexte Question", "Contexte Réponse"]) # Écrire les données for paire in paires_qr: writer.writerow([ paire.get("question", ""), paire.get("réponse", ""), paire.get("contexte_question", ""), paire.get("contexte_réponse", "") ]) logger.info(f"Fichier CSV Q/R généré: {csv_path}") return csv_path except Exception as e: logger.error(f"Erreur lors de la génération du CSV Q/R: {e}") return None def traiter_rapports_ticket(ticket_id: str) -> None: """ Traite tous les rapports finaux d'un ticket pour générer des CSV. Ne génère qu'un seul fichier CSV par modèle contenant le tableau d'échanges. Args: ticket_id: ID du ticket à traiter """ # Déterminer le chemin du dossier pipeline base_dir = "output" ticket_dir = f"ticket_{ticket_id}" ticket_path = os.path.join(base_dir, ticket_dir) if not os.path.exists(ticket_path): logger.warning(f"Répertoire du ticket non trouvé: {ticket_path}") return # Trouver la dernière extraction extractions = [] for extract in os.listdir(ticket_path): extraction_path = os.path.join(ticket_path, extract) if os.path.isdir(extraction_path) and extract.startswith(ticket_id): extractions.append(extraction_path) if not extractions: logger.warning(f"Aucune extraction trouvée pour le ticket {ticket_id}") return # Trier par date (plus récente en premier) extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True) latest_extraction = extractions[0] # Trouver le dossier pipeline pipeline_dir = os.path.join(latest_extraction, f"{ticket_id}_rapports", "pipeline") if not os.path.exists(pipeline_dir): logger.warning(f"Dossier pipeline non trouvé: {pipeline_dir}") return # Créer le répertoire pour les fichiers CSV csv_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "CSV", ticket_id) os.makedirs(csv_dir, exist_ok=True) # Dictionnaire pour suivre quels modèles ont déjà été traités modeles_traites = set() # Liste les fichiers par ordre alphabétique pour un traitement stable fichiers = sorted([f for f in os.listdir(pipeline_dir) if f.startswith("rapport_final") and f.endswith(".json")]) # Chercher tous les rapports finaux for filename in fichiers: json_path = os.path.join(pipeline_dir, filename) logger.info(f"Traitement du rapport: {filename}") # Extraire le nom du modèle depuis le contenu du fichier JSON try: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) # Utiliser le même système d'extraction que pipeline_logger.py model_name = "unknown" # Si les données sont dans une liste, prendre le premier élément if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict): data = data[0] # Extraire des métadonnées if isinstance(data, dict): # Priorité 1: Vérifier model_info.model (structure principale) if "metadata" in data and "model_info" in data["metadata"] and "model" in data["metadata"]["model_info"]: model_name = data["metadata"]["model_info"]["model"] # Priorité 2: Vérifier metadata.model (structure alternative) elif "metadata" in data and "model" in data["metadata"]: model_name = data["metadata"]["model"] # Si extraction échouée, essayer d'extraire du nom de fichier comme fallback if model_name == "unknown": model_match = re.search(r'rapport_final_([^_]+)', filename) if model_match: model_name_full = model_match.group(1) # Simplifier le nom du modèle if "mistral" in model_name_full.lower(): model_name = "mistral" elif "llama" in model_name_full.lower(): model_name = "llama" elif "pixtral" in model_name_full.lower(): model_name = "pixtral" else: model_name = model_name_full.split("-")[0].split(":")[0] # Nettoyer le nom du modèle if model_name and model_name != "unknown": model_name = model_name.lower().replace("/", "_").replace(":", "_").replace("\\", "_").replace(" ", "_") except Exception as e: logger.warning(f"Erreur lors de l'extraction du modèle depuis {json_path}: {e}") # Fallback à l'extraction par nom de fichier model_match = re.search(r'rapport_final_([^_]+)', filename) if model_match: model_name_full = model_match.group(1) if "mistral" in model_name_full.lower(): model_name = "mistral" elif "llama" in model_name_full.lower(): model_name = "llama" elif "pixtral" in model_name_full.lower(): model_name = "pixtral" else: model_name = model_name_full.split("-")[0].split(":")[0] else: model_name = "unknown" # Vérifier si ce modèle a déjà été traité pour éviter les doublons if model_name in modeles_traites: logger.info(f"Modèle {model_name} déjà traité, ignoré pour éviter les doublons") continue # Marquer ce modèle comme traité modeles_traites.add(model_name) # Créer le nom du fichier CSV (un seul fichier par modèle) csv_exchanges = os.path.join(csv_dir, f"{ticket_id}_{model_name}_exchanges.csv") # Supprimer le fichier existant pour éviter les doublons if os.path.exists(csv_exchanges): os.remove(csv_exchanges) # Extraire le tableau du rapport tableau = extraire_tableau_du_rapport(json_path) if not tableau: logger.warning(f"Aucun tableau à exporter depuis {json_path}") continue # Écrire le CSV des échanges try: with open(csv_exchanges, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerows(tableau) logger.info(f"CSV échanges généré: {csv_exchanges}") except Exception as e: logger.error(f"Erreur lors de la génération du CSV échanges: {e}") logger.info(f"Traitement terminé. {len(modeles_traites)} modèles traités: {', '.join(modeles_traites)}") if __name__ == "__main__": # Configurer l'analyse des arguments if len(sys.argv) < 2: print("Erreur: arguments insuffisants") print("Usage:") print(" 1. Traiter un seul fichier: python report_csv_exporter.py chemin/vers/rapport_final.json [nom_modele] [rep_sortie]") print(" 2. Traiter un répertoire: python report_csv_exporter.py --batch chemin/vers/repertoire [rep_sortie]") sys.exit(1) # Traitement par lot if sys.argv[1] == '--batch': if len(sys.argv) < 3: print("Erreur: veuillez spécifier le répertoire contenant les fichiers JSON") sys.exit(1) json_directory = sys.argv[2] output_directory = sys.argv[3] if len(sys.argv) > 3 else None ReportCSVExporter.process_batch(json_directory, output_directory) # Traitement d'un seul fichier else: json_file = sys.argv[1] model_name = sys.argv[2] if len(sys.argv) > 2 else None output_directory = sys.argv[3] if len(sys.argv) > 3 else None ReportCSVExporter.generate_csv_from_json(json_file, model_name, output_directory) print("Terminé!")