#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script pour extraire le tableau de questions/réponses du rapport final et le convertir en CSV. """ import json import csv import os import sys import re import logging from typing import List, Dict, Any, Optional, Tuple # Configuration du logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("report_csv_exporter") def extraire_tableau_markdown(texte: str) -> List[List[str]]: """ Extrait un tableau au format Markdown du texte. Args: texte: Texte contenant un tableau Markdown Returns: Liste de lignes du tableau, chaque ligne étant une liste de cellules """ lignes = [] # Chercher le début du tableau pattern_table = r"\|\s*ÉMETTEUR\s*\|\s*TYPE\s*\|\s*DATE\s*\|\s*CONTENU\s*\|\s*ÉLÉMENTS VISUELS\s*\|" if not re.search(pattern_table, texte, re.IGNORECASE): # Si l'en-tête exacte n'est pas trouvée, chercher un format plus générique pattern_table = r"\|\s*\w+\s*\|\s*\w+\s*\|\s*\w+\s*\|\s*\w+\s*\|\s*[\w\s]+\s*\|" if not re.search(pattern_table, texte, re.IGNORECASE): logger.warning("Aucun tableau trouvé dans le texte") return [] # Extraire toutes les lignes qui commencent et se terminent par | tableau_lines = re.findall(r"^\|.*\|$", texte, re.MULTILINE) if len(tableau_lines) < 3: # En-tête + séparateur + au moins une ligne de données logger.warning(f"Tableau incomplet: seulement {len(tableau_lines)} lignes trouvées") return [] # Ignorer la ligne de séparation (2ème ligne) qui contient juste des | et des - en_tete = tableau_lines[0] donnees = tableau_lines[2:] # Commencer à la 3ème ligne # Extraire les en-têtes en_tetes = [h.strip() for h in en_tete.split('|')[1:-1]] lignes.append(en_tetes) # Extraire les données for ligne in donnees: # Split sur | mais ignorer les | dans le contenu entre guillemets ou parenthèses cellules = [] cellule_actuelle = "" entre_parentheses = 0 entre_guillemets = False for char in ligne[1:-1]: # Ignorer les | aux extrémités if char == '|' and entre_parentheses == 0 and not entre_guillemets: cellules.append(cellule_actuelle.strip()) cellule_actuelle = "" else: cellule_actuelle += char if char == '(': entre_parentheses += 1 elif char == ')': entre_parentheses = max(0, entre_parentheses - 1) elif char == '"': entre_guillemets = not entre_guillemets if cellule_actuelle: cellules.append(cellule_actuelle.strip()) # S'assurer que nous avons le bon nombre de cellules if len(cellules) != len(en_tetes): # Ajuster au besoin if len(cellules) < len(en_tetes): cellules.extend([""] * (len(en_tetes) - len(cellules))) else: cellules = cellules[:len(en_tetes)] lignes.append(cellules) return lignes def extraire_tableau_du_rapport(json_path: str) -> List[List[str]]: """ Extrait le tableau du rapport JSON. Args: json_path: Chemin du fichier JSON contenant le rapport Returns: Tableau extrait sous forme de liste de lignes """ try: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) # Vérifier si c'est une liste (comme dans certains formats de rapport) if isinstance(data, list): # Chercher l'élément qui contient la réponse for item in data: if isinstance(item, dict) and "response" in item: texte = item["response"] return extraire_tableau_markdown(texte) # Si c'est un dictionnaire simple elif isinstance(data, dict) and "response" in data: texte = data["response"] return extraire_tableau_markdown(texte) logger.warning(f"Format de données non reconnu dans {json_path}") return [] except Exception as e: logger.error(f"Erreur lors de l'extraction du tableau: {e}") return [] def generer_csv_depuis_rapport(json_path: str, output_dir: Optional[str] = None) -> Optional[str]: """ Génère un fichier CSV à partir du tableau extrait du rapport. Args: json_path: Chemin du fichier JSON contenant le rapport output_dir: Répertoire de sortie pour le fichier CSV (optionnel) Returns: Chemin du fichier CSV généré """ # Extraire l'ID du ticket du chemin du fichier basename = os.path.basename(json_path) ticket_id = None # Essayer d'extraire l'ID du ticket du nom de fichier match = re.search(r'(T\d+)', basename) if match: ticket_id = match.group(1) else: # Essayer d'extraire à partir du chemin match = re.search(r'ticket_(T\d+)', json_path) if match: ticket_id = match.group(1) if not ticket_id: logger.warning(f"Impossible d'extraire l'ID du ticket de {json_path}") ticket_id = "unknown" # Déterminer le répertoire de sortie if not output_dir: # Utiliser le répertoire CSV à la racine du projet project_root = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) output_dir = os.path.join(project_root, "CSV", ticket_id) os.makedirs(output_dir, exist_ok=True) # Créer le chemin du fichier CSV model_info = "" if "_results" in basename: model_match = re.search(r'_([a-zA-Z0-9-]+)_results', basename) if model_match: model_info = f"_{model_match.group(1)}" csv_path = os.path.join(output_dir, f"{ticket_id}{model_info}_exchanges.csv") # Extraire le tableau du rapport tableau = extraire_tableau_du_rapport(json_path) if not tableau: logger.warning(f"Aucun tableau à exporter depuis {json_path}") return None # Écrire le CSV try: with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerows(tableau) logger.info(f"Fichier CSV généré: {csv_path}") return csv_path except Exception as e: logger.error(f"Erreur lors de la génération du CSV: {e}") return None def extraire_questions_reponses(tableau: List[List[str]]) -> List[Dict[str, str]]: """ Extrait les questions et réponses du tableau en paires Q/R. Args: tableau: Tableau extrait du rapport Returns: Liste de dictionnaires {question, réponse, contexte} """ if not tableau or len(tableau) < 2: return [] # Récupérer les indices des colonnes header = tableau[0] try: idx_emetteur = header.index("ÉMETTEUR") idx_type = header.index("TYPE") idx_contenu = header.index("CONTENU") idx_elements = header.index("ÉLÉMENTS VISUELS") except ValueError: # Si les en-têtes exacts ne sont pas trouvés, essayer des correspondances partielles idx_emetteur = next((i for i, h in enumerate(header) if "METTEUR" in h.upper()), 0) idx_type = next((i for i, h in enumerate(header) if "TYPE" in h.upper()), 1) idx_contenu = next((i for i, h in enumerate(header) if "CONTENU" in h.upper()), 3) idx_elements = next((i for i, h in enumerate(header) if "VISUEL" in h.upper()), 4) resultat = [] question_courante = {} # Parcourir les lignes de données for ligne in tableau[1:]: if len(ligne) <= max(idx_emetteur, idx_type, idx_contenu, idx_elements): continue emetteur = ligne[idx_emetteur].upper() type_msg = ligne[idx_type].lower() if ligne[idx_type] else "" contenu = ligne[idx_contenu] elements = ligne[idx_elements] # Si c'est une question du client if emetteur == "CLIENT" and (type_msg == "question" or "?" in contenu): # Si on a déjà une question en cours avec une réponse, l'ajouter aux résultats if question_courante and "question" in question_courante and "réponse" in question_courante: resultat.append(question_courante) # Créer une nouvelle question question_courante = { "question": contenu, "contexte_question": elements } # Si c'est une réponse du support et qu'on a une question en cours elif emetteur == "SUPPORT" and "question" in question_courante: question_courante["réponse"] = contenu question_courante["contexte_réponse"] = elements # Ajouter la dernière paire Q/R si elle est complète if question_courante and "question" in question_courante and "réponse" in question_courante: resultat.append(question_courante) return resultat def generer_csv_qr(json_path: str, output_dir: Optional[str] = None) -> Optional[str]: """ Génère un fichier CSV de questions/réponses à partir du tableau extrait du rapport. Args: json_path: Chemin du fichier JSON contenant le rapport output_dir: Répertoire de sortie pour le fichier CSV (optionnel) Returns: Chemin du fichier CSV généré """ # Extraire l'ID du ticket comme précédemment basename = os.path.basename(json_path) ticket_id = None match = re.search(r'(T\d+)', basename) if match: ticket_id = match.group(1) else: match = re.search(r'ticket_(T\d+)', json_path) if match: ticket_id = match.group(1) if not ticket_id: logger.warning(f"Impossible d'extraire l'ID du ticket de {json_path}") ticket_id = "unknown" # Déterminer le répertoire de sortie if not output_dir: project_root = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) output_dir = os.path.join(project_root, "CSV", ticket_id) os.makedirs(output_dir, exist_ok=True) # Créer le chemin du fichier CSV model_info = "" if "_results" in basename: model_match = re.search(r'_([a-zA-Z0-9-]+)_results', basename) if model_match: model_info = f"_{model_match.group(1)}" csv_path = os.path.join(output_dir, f"{ticket_id}{model_info}_qa.csv") # Extraire le tableau du rapport tableau = extraire_tableau_du_rapport(json_path) if not tableau: logger.warning(f"Aucun tableau à exporter depuis {json_path}") return None # Extraire les paires Q/R paires_qr = extraire_questions_reponses(tableau) if not paires_qr: logger.warning(f"Aucune paire question/réponse extraite de {json_path}") return None # Écrire le CSV try: with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # Écrire l'en-tête writer.writerow(["Question", "Réponse", "Contexte Question", "Contexte Réponse"]) # Écrire les données for paire in paires_qr: writer.writerow([ paire.get("question", ""), paire.get("réponse", ""), paire.get("contexte_question", ""), paire.get("contexte_réponse", "") ]) logger.info(f"Fichier CSV Q/R généré: {csv_path}") return csv_path except Exception as e: logger.error(f"Erreur lors de la génération du CSV Q/R: {e}") return None def traiter_rapports_ticket(ticket_id: str) -> None: """ Traite tous les rapports finaux d'un ticket pour générer des CSV. Args: ticket_id: ID du ticket à traiter """ # Déterminer le chemin du dossier pipeline base_dir = "output" ticket_dir = f"ticket_{ticket_id}" ticket_path = os.path.join(base_dir, ticket_dir) if not os.path.exists(ticket_path): logger.warning(f"Répertoire du ticket non trouvé: {ticket_path}") return # Trouver la dernière extraction extractions = [] for extract in os.listdir(ticket_path): extraction_path = os.path.join(ticket_path, extract) if os.path.isdir(extraction_path) and extract.startswith(ticket_id): extractions.append(extraction_path) if not extractions: logger.warning(f"Aucune extraction trouvée pour le ticket {ticket_id}") return # Trier par date (plus récente en premier) extractions.sort(key=lambda x: os.path.getmtime(x), reverse=True) latest_extraction = extractions[0] # Trouver le dossier pipeline pipeline_dir = os.path.join(latest_extraction, f"{ticket_id}_rapports", "pipeline") if not os.path.exists(pipeline_dir): logger.warning(f"Dossier pipeline non trouvé: {pipeline_dir}") return # Chercher tous les rapports finaux for filename in os.listdir(pipeline_dir): if filename.startswith("rapport_final") and filename.endswith(".json"): json_path = os.path.join(pipeline_dir, filename) logger.info(f"Traitement du rapport: {filename}") # Générer les deux types de CSV csv_path = generer_csv_depuis_rapport(json_path) csv_qr_path = generer_csv_qr(json_path) if csv_path: logger.info(f"CSV généré: {csv_path}") if csv_qr_path: logger.info(f"CSV Q/R généré: {csv_qr_path}") if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python report_csv_exporter.py ") print("Exemples:") print(" python report_csv_exporter.py T11143") print(" python report_csv_exporter.py chemin/vers/rapport_final.json") sys.exit(1) arg = sys.argv[1] # Vérifier si l'argument est un fichier JSON existant if os.path.isfile(arg) and arg.endswith(".json"): # Générer les CSV directement à partir du fichier JSON spécifié csv_path = generer_csv_depuis_rapport(arg) csv_qr_path = generer_csv_qr(arg) if csv_path: print(f"CSV généré: {csv_path}") if csv_qr_path: print(f"CSV Q/R généré: {csv_qr_path}") else: # Sinon, considérer l'argument comme un ID de ticket ticket_id = arg if not ticket_id.startswith("T"): ticket_id = f"T{ticket_id}" traiter_rapports_ticket(ticket_id) print("Terminé!")