#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script pour extraire le tableau de questions/réponses du rapport final et le convertir en CSV. """ import json import csv import os import sys import re import logging from typing import List, Dict, Any, Optional, Tuple from datetime import datetime # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('ReportCSVExporter') class ReportCSVExporter: """ Classe responsable de l'exportation des rapports au format CSV. Permet de générer des fichiers CSV à partir des données JSON des rapports. """ @staticmethod def generate_csv_from_json(json_file, model_name=None, output_dir=None): """ Génère un fichier CSV à partir des données du tableau questions/réponses contenues dans le fichier JSON du rapport. Args: json_file (str): Chemin du fichier JSON contenant les données model_name (str, optional): Nom du modèle à inclure dans le nom du fichier output_dir (str, optional): Répertoire de sortie personnalisé Returns: str: Chemin du fichier CSV généré """ try: # Vérifier que le fichier JSON existe if not os.path.exists(json_file): logger.error(f"Le fichier JSON n'existe pas: {json_file}") return None # Extraire l'ID du ticket du nom du fichier filename = os.path.basename(json_file) ticket_id = filename.split('_')[0] if '_' in filename else 'unknown' # Définir le répertoire de sortie if output_dir: csv_dir = output_dir else: # Créer le répertoire CSV à la racine du projet project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')) csv_root_dir = os.path.join(project_root, 'CSV') csv_dir = os.path.join(csv_root_dir, ticket_id) # Créer les répertoires si nécessaire os.makedirs(csv_dir, exist_ok=True) # Charger les données JSON with open(json_file, 'r', encoding='utf-8') as f: data = json.load(f) # Utiliser le modèle depuis les métadonnées si non spécifié if not model_name: model_name = data.get('metadata', {}).get('model', 'unknown') # Ajouter un timestamp pour éviter les écrasements timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") csv_filename = f"{ticket_id}_{model_name}_{timestamp}.csv" csv_file = os.path.join(csv_dir, csv_filename) # Extraire les échanges exchanges = data.get('chronologie_echanges', []) if not exchanges: logger.warning(f"Aucun échange trouvé dans {json_file}") return None # Créer et écrire dans le fichier CSV with open(csv_file, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # Écrire l'en-tête avec métadonnées du rapport writer.writerow(['Question', 'Réponse']) current_question = None current_answers = [] # Parcourir les échanges pour les combiner en paires questions/réponses for exchange in exchanges: emetteur = exchange.get('emetteur', '').upper() type_msg = exchange.get('type', '').lower() contenu = exchange.get('contenu', '') # Si c'est une question client if emetteur == 'CLIENT' and (type_msg == 'question' or '?' in contenu): # Si une question précédente existe, l'écrire avec ses réponses if current_question: combined_answer = "\n".join(current_answers) if current_answers else "Pas de réponse" writer.writerow([current_question, combined_answer]) # Réinitialiser pour la nouvelle question current_question = contenu current_answers = [] # Si c'est une réponse ou un complément du support elif emetteur == 'SUPPORT' and (type_msg == 'réponse' or type_msg == 'complément visuel' or type_msg == 'information technique'): if current_question: # S'assurer qu'il y a une question en cours current_answers.append(contenu) # Écrire la dernière question et ses réponses if current_question: combined_answer = "\n".join(current_answers) if current_answers else "Pas de réponse" writer.writerow([current_question, combined_answer]) logger.info(f"Fichier CSV créé: {csv_file}") return csv_file except Exception as e: logger.error(f"Erreur lors de la génération du CSV: {str(e)}") return None @staticmethod def process_batch(json_directory, output_directory=None): """ Traite tous les fichiers JSON de rapport dans un répertoire. Args: json_directory (str): Répertoire contenant les fichiers JSON à traiter output_directory (str, optional): Répertoire de sortie personnalisé Returns: list: Liste des chemins des fichiers CSV générés """ csv_files = [] if not os.path.exists(json_directory): logger.error(f"Le répertoire n'existe pas: {json_directory}") return csv_files # Parcourir tous les fichiers JSON dans le répertoire for filename in os.listdir(json_directory): if filename.endswith('.json') and 'rapport_final' in filename: json_file = os.path.join(json_directory, filename) csv_file = ReportCSVExporter.generate_csv_from_json( json_file=json_file, output_dir=output_directory ) if csv_file: csv_files.append(csv_file) if not csv_files: logger.warning(f"Aucun fichier CSV généré à partir de {json_directory}") else: logger.info(f"{len(csv_files)} fichiers CSV générés avec succès") return csv_files def extraire_tableau_markdown(texte: str) -> List[List[str]]: """ Extrait un tableau au format Markdown du texte. Args: texte: Texte contenant un tableau Markdown Returns: Liste de lignes du tableau, chaque ligne étant une liste de cellules """ lignes = [] # Chercher le début du tableau pattern_table = r"\|\s*ÉMETTEUR\s*\|\s*TYPE\s*\|\s*DATE\s*\|\s*CONTENU\s*\|\s*ÉLÉMENTS VISUELS\s*\|" if not re.search(pattern_table, texte, re.IGNORECASE): # Si l'en-tête exacte n'est pas trouvée, chercher un format plus générique pattern_table = r"\|\s*\w+\s*\|\s*\w+\s*\|\s*\w+\s*\|\s*\w+\s*\|\s*[\w\s]+\s*\|" if not re.search(pattern_table, texte, re.IGNORECASE): logger.warning("Aucun tableau trouvé dans le texte") return [] # Extraire toutes les lignes qui commencent et se terminent par | tableau_lines = re.findall(r"^\|.*\|$", texte, re.MULTILINE) if len(tableau_lines) < 3: # En-tête + séparateur + au moins une ligne de données logger.warning(f"Tableau incomplet: seulement {len(tableau_lines)} lignes trouvées") return [] # Ignorer la ligne de séparation (2ème ligne) qui contient juste des | et des - en_tete = tableau_lines[0] donnees = tableau_lines[2:] # Commencer à la 3ème ligne # Extraire les en-têtes en_tetes = [h.strip() for h in en_tete.split('|')[1:-1]] lignes.append(en_tetes) # Extraire les données for ligne in donnees: # Split sur | mais ignorer les | dans le contenu entre guillemets ou parenthèses cellules = [] cellule_actuelle = "" entre_parentheses = 0 entre_guillemets = False for char in ligne[1:-1]: # Ignorer les | aux extrémités if char == '|' and entre_parentheses == 0 and not entre_guillemets: cellules.append(cellule_actuelle.strip()) cellule_actuelle = "" else: cellule_actuelle += char if char == '(': entre_parentheses += 1 elif char == ')': entre_parentheses = max(0, entre_parentheses - 1) elif char == '"': entre_guillemets = not entre_guillemets if cellule_actuelle: cellules.append(cellule_actuelle.strip()) # S'assurer que nous avons le bon nombre de cellules if len(cellules) != len(en_tetes): # Ajuster au besoin if len(cellules) < len(en_tetes): cellules.extend([""] * (len(en_tetes) - len(cellules))) else: cellules = cellules[:len(en_tetes)] lignes.append(cellules) return lignes def extraire_tableau_du_rapport(json_path: str) -> List[List[str]]: """ Extrait le tableau du rapport JSON. Args: json_path: Chemin du fichier JSON contenant le rapport Returns: Tableau extrait sous forme de liste de lignes """ try: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) # Vérifier si c'est une liste (comme dans certains formats de rapport) if isinstance(data, list): # Chercher l'élément qui contient la réponse for item in data: if isinstance(item, dict) and "response" in item: texte = item["response"] return extraire_tableau_markdown(texte) # Si c'est un dictionnaire simple elif isinstance(data, dict) and "response" in data: texte = data["response"] return extraire_tableau_markdown(texte) logger.warning(f"Format de données non reconnu dans {json_path}") return [] except Exception as e: logger.error(f"Erreur lors de l'extraction du tableau: {e}") return [] def generer_csv_depuis_rapport(json_path: str, output_dir: Optional[str] = None) -> Optional[str]: """ Génère un fichier CSV à partir du tableau extrait du rapport. Args: json_path: Chemin du fichier JSON contenant le rapport output_dir: Répertoire de sortie pour le fichier CSV (optionnel) Returns: Chemin du fichier CSV généré """ # Extraire l'ID du ticket du chemin du fichier basename = os.path.basename(json_path) ticket_id = None # Essayer d'extraire l'ID du ticket du nom de fichier match = re.search(r'(T\d+)', basename) if match: ticket_id = match.group(1) else: # Essayer d'extraire à partir du chemin match = re.search(r'ticket_(T\d+)', json_path) if match: ticket_id = match.group(1) if not ticket_id: logger.warning(f"Impossible d'extraire l'ID du ticket de {json_path}") ticket_id = "unknown" # Déterminer le répertoire de sortie if not output_dir: # Utiliser le répertoire CSV à la racine du projet project_root = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) output_dir = os.path.join(project_root, "CSV", ticket_id) os.makedirs(output_dir, exist_ok=True) # Créer le chemin du fichier CSV model_info = "" if "_results" in basename: model_match = re.search(r'_([a-zA-Z0-9-]+)_results', basename) if model_match: model_info = f"_{model_match.group(1)}" csv_path = os.path.join(output_dir, f"{ticket_id}{model_info}_exchanges.csv") # Extraire le tableau du rapport tableau = extraire_tableau_du_rapport(json_path) if not tableau: logger.warning(f"Aucun tableau à exporter depuis {json_path}") return None # Écrire le CSV try: with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerows(tableau) logger.info(f"Fichier CSV généré: {csv_path}") return csv_path except Exception as e: logger.error(f"Erreur lors de la génération du CSV: {e}") return None def extraire_questions_reponses(tableau: List[List[str]]) -> List[Dict[str, str]]: """ Extrait les questions et réponses du tableau en paires Q/R. Args: tableau: Tableau extrait du rapport Returns: Liste de dictionnaires {question, réponse, contexte} """ if not tableau or len(tableau) < 2: return [] # Récupérer les indices des colonnes header = tableau[0] try: idx_emetteur = header.index("ÉMETTEUR") idx_type = header.index("TYPE") idx_contenu = header.index("CONTENU") idx_elements = header.index("ÉLÉMENTS VISUELS") except ValueError: # Si les en-têtes exacts ne sont pas trouvés, essayer des correspondances partielles idx_emetteur = next((i for i, h in enumerate(header) if "METTEUR" in h.upper()), 0) idx_type = next((i for i, h in enumerate(header) if "TYPE" in h.upper()), 1) idx_contenu = next((i for i, h in enumerate(header) if "CONTENU" in h.upper()), 3) idx_elements = next((i for i, h in enumerate(header) if "VISUEL" in h.upper()), 4) resultat = [] question_courante = {} # Parcourir les lignes de données for ligne in tableau[1:]: if len(ligne) <= max(idx_emetteur, idx_type, idx_contenu, idx_elements): continue emetteur = ligne[idx_emetteur].upper() type_msg = ligne[idx_type].lower() if ligne[idx_type] else "" contenu = ligne[idx_contenu] elements = ligne[idx_elements] # Si c'est une question du client if emetteur == "CLIENT" and (type_msg == "question" or "?" in contenu): # Si on a déjà une question en cours avec une réponse, l'ajouter aux résultats if question_courante and "question" in question_courante and "réponse" in question_courante: resultat.append(question_courante) # Créer une nouvelle question question_courante = { "question": contenu, "contexte_question": elements } # Si c'est une réponse du support et qu'on a une question en cours elif emetteur == "SUPPORT" and "question" in question_courante: question_courante["réponse"] = contenu question_courante["contexte_réponse"] = elements # Ajouter la dernière paire Q/R si elle est complète if question_courante and "question" in question_courante and "réponse" in question_courante: resultat.append(question_courante) return resultat def generer_csv_qr(json_path: str, output_dir: Optional[str] = None) -> Optional[str]: """ Génère un fichier CSV de questions/réponses à partir du tableau extrait du rapport. Args: json_path: Chemin du fichier JSON contenant le rapport output_dir: Répertoire de sortie pour le fichier CSV (optionnel) Returns: Chemin du fichier CSV généré """ # Extraire l'ID du ticket comme précédemment basename = os.path.basename(json_path) ticket_id = None match = re.search(r'(T\d+)', basename) if match: ticket_id = match.group(1) else: match = re.search(r'ticket_(T\d+)', json_path) if match: ticket_id = match.group(1) if not ticket_id: logger.warning(f"Impossible d'extraire l'ID du ticket de {json_path}") ticket_id = "unknown" # Déterminer le répertoire de sortie if not output_dir: project_root = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) output_dir = os.path.join(project_root, "CSV", ticket_id) os.makedirs(output_dir, exist_ok=True) # Créer le chemin du fichier CSV model_info = "" if "_results" in basename: model_match = re.search(r'_([a-zA-Z0-9-]+)_results', basename) if model_match: model_info = f"_{model_match.group(1)}" csv_path = os.path.join(output_dir, f"{ticket_id}{model_info}_qa.csv") # Extraire le tableau du rapport tableau = extraire_tableau_du_rapport(json_path) if not tableau: logger.warning(f"Aucun tableau à exporter depuis {json_path}") return None # Extraire les paires Q/R paires_qr = extraire_questions_reponses(tableau) if not paires_qr: logger.warning(f"Aucune paire question/réponse extraite de {json_path}") return None # Écrire le CSV try: with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # Écrire l'en-tête writer.writerow(["Question", "Réponse", "Contexte Question", "Contexte Réponse"]) # Écrire les données for paire in paires_qr: writer.writerow([ paire.get("question", ""), paire.get("réponse", ""), paire.get("contexte_question", ""), paire.get("contexte_réponse", "") ]) logger.info(f"Fichier CSV Q/R généré: {csv_path}") return csv_path except Exception as e: logger.error(f"Erreur lors de la génération du CSV Q/R: {e}") return None def traiter_rapports_ticket(ticket_id: str) -> None: """ Traite tous les rapports finaux d'un ticket pour générer des CSV. Args: ticket_id: ID du ticket à traiter """ # Déterminer le chemin du dossier pipeline base_dir = "output" ticket_dir = f"ticket_{ticket_id}" ticket_path = os.path.join(base_dir, ticket_dir) if not os.path.exists(ticket_path): logger.warning(f"Répertoire du ticket non trouvé: {ticket_path}") return # Trouver la dernière extraction extractions = [] for extract in os.listdir(ticket_path): extraction_path = os.path.join(ticket_path, extract) if os.path.isdir(extraction_path) and extract.startswith(ticket_id): extractions.append(extraction_path) if not extractions: logger.warning(f"Aucune extraction trouvée pour le ticket {ticket_id}") return # Trier par date (plus récente en premier) extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True) latest_extraction = extractions[0] # Trouver le dossier pipeline pipeline_dir = os.path.join(latest_extraction, f"{ticket_id}_rapports", "pipeline") if not os.path.exists(pipeline_dir): logger.warning(f"Dossier pipeline non trouvé: {pipeline_dir}") return # Chercher tous les rapports finaux for filename in os.listdir(pipeline_dir): if filename.startswith("rapport_final") and filename.endswith(".json"): json_path = os.path.join(pipeline_dir, filename) logger.info(f"Traitement du rapport: {filename}") # Générer les deux types de CSV csv_path = generer_csv_depuis_rapport(json_path) csv_qr_path = generer_csv_qr(json_path) if csv_path: logger.info(f"CSV généré: {csv_path}") if csv_qr_path: logger.info(f"CSV Q/R généré: {csv_qr_path}") if __name__ == "__main__": # Configurer l'analyse des arguments if len(sys.argv) < 2: print("Erreur: arguments insuffisants") print("Usage:") print(" 1. Traiter un seul fichier: python report_csv_exporter.py chemin/vers/rapport_final.json [nom_modele] [rep_sortie]") print(" 2. Traiter un répertoire: python report_csv_exporter.py --batch chemin/vers/repertoire [rep_sortie]") sys.exit(1) # Traitement par lot if sys.argv[1] == '--batch': if len(sys.argv) < 3: print("Erreur: veuillez spécifier le répertoire contenant les fichiers JSON") sys.exit(1) json_directory = sys.argv[2] output_directory = sys.argv[3] if len(sys.argv) > 3 else None ReportCSVExporter.process_batch(json_directory, output_directory) # Traitement d'un seul fichier else: json_file = sys.argv[1] model_name = sys.argv[2] if len(sys.argv) > 2 else None output_directory = sys.argv[3] if len(sys.argv) > 3 else None ReportCSVExporter.generate_csv_from_json(json_file, model_name, output_directory) print("Terminé!")