llm_ticket3/post_process.py
2025-04-02 09:01:55 +02:00

982 lines
42 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de post-traitement pour améliorer les fichiers JSON avant analyse.
"""
import os
import sys
import json
import re
import unicodedata
import shutil
from typing import Dict, List, Any, Optional
def nettoyer_html(texte: str, debug: bool = False) -> str:
"""
Nettoie le contenu HTML en supprimant les balises et le formatage.
Args:
texte: Texte HTML à nettoyer
debug: Afficher des informations de débogage pendant le nettoyage
Returns:
Texte nettoyé
"""
# Vérifier et convertir l'entrée
if not texte:
return ""
if not isinstance(texte, str):
try:
texte = str(texte)
except Exception as e:
print(f"AVERTISSEMENT: Impossible de convertir en texte: {e}")
return ""
if debug:
print(f"Texte original ({len(texte)} caractères): {texte[:100]}...")
# Détection de HTML
contient_html = bool(re.search(r'<[a-z]+[^>]*>', texte, re.IGNORECASE))
if debug and contient_html:
print(f"Le texte contient du HTML, nettoyage nécessaire")
# Supprimer les balises HTML - regex plus agressive pour capturer tous types de balises
try:
# Première passe - balises standard
texte_nettoye = re.sub(r'</?[a-z]+[^>]*>', ' ', texte, flags=re.IGNORECASE)
# Deuxième passe - balises restantes, y compris les mal formées
texte_nettoye = re.sub(r'<[^>]*>', ' ', texte_nettoye)
if debug and contient_html:
print(f"Après suppression des balises HTML: {texte_nettoye[:100]}...")
except Exception as e:
print(f"AVERTISSEMENT: Erreur lors du nettoyage HTML: {e}")
texte_nettoye = texte
# Remplacer les références aux images
try:
texte_nettoye = re.sub(r'\[Image:[^\]]+\]', '[Image]', texte_nettoye)
texte_nettoye = re.sub(r'<img[^>]+>', '[Image]', texte_nettoye, flags=re.IGNORECASE)
except Exception as e:
if debug:
print(f"AVERTISSEMENT: Erreur lors du traitement des images: {e}")
# Supprimer les éléments courants non pertinents
patterns_a_supprimer = [
r'Cordialement,[\s\S]*?$',
r'Bien cordialement,[\s\S]*?$',
r'Bonne réception[\s\S]*?$',
r'À votre disposition[\s\S]*?$',
r'Support technique[\s\S]*?$',
r'L\'objectif du Support Technique[\s\S]*?$',
r'Notre service est ouvert[\s\S]*?$',
r'Dès réception[\s\S]*?$',
r'Confidentialité[\s\S]*?$',
r'Ce message électronique[\s\S]*?$',
r'Droit à la déconnexion[\s\S]*?$',
r'Afin d\'assurer une meilleure traçabilité[\s\S]*?$',
r'tél\s*:\s*[\d\s\+]+',
r'mobile\s*:\s*[\d\s\+]+',
r'www\.[^\s]+\.[a-z]{2,3}',
]
try:
for pattern in patterns_a_supprimer:
texte_avant = texte_nettoye
texte_nettoye = re.sub(pattern, '', texte_nettoye, flags=re.IGNORECASE)
if debug and texte_avant != texte_nettoye:
print(f"Suppression de pattern '{pattern[:20]}...'")
except Exception as e:
# En cas d'échec des expressions régulières, conserver le texte tel quel
if debug:
print(f"AVERTISSEMENT: Erreur lors de la suppression des patterns: {e}")
try:
# Supprimer les lignes multiples vides
texte_nettoye = re.sub(r'\n\s*\n', '\n', texte_nettoye)
# Supprimer les espaces multiples
texte_nettoye = re.sub(r'\s+', ' ', texte_nettoye)
except Exception as e:
if debug:
print(f"AVERTISSEMENT: Erreur lors du nettoyage des espaces: {e}")
# Normaliser les caractères accentués
try:
texte_nettoye = normaliser_accents(texte_nettoye)
except Exception as e:
print(f"AVERTISSEMENT: Erreur lors de la normalisation des accents: {e}")
if debug:
print(f"Texte final ({len(texte_nettoye)} caractères): {texte_nettoye[:100]}...")
return texte_nettoye.strip()
def normaliser_accents(texte: str) -> str:
"""
Normalise les caractères accentués pour éviter les problèmes d'encodage.
Args:
texte: Texte à normaliser
Returns:
Texte avec caractères accentués normalisés
"""
# Si l'entrée n'est pas une chaîne, convertir en chaîne ou retourner vide
if not isinstance(texte, str):
if texte is None:
return ""
try:
texte = str(texte)
except:
return ""
# Convertir les caractères spéciaux HTML (comme &eacute;)
special_chars = {
'&aacute;': 'á', '&eacute;': 'é', '&iacute;': 'í', '&oacute;': 'ó', '&uacute;': 'ú',
'&Aacute;': 'Á', '&Eacute;': 'É', '&Iacute;': 'Í', '&Oacute;': 'Ó', '&Uacute;': 'Ú',
'&agrave;': 'à', '&egrave;': 'è', '&igrave;': 'ì', '&ograve;': 'ò', '&ugrave;': 'ù',
'&Agrave;': 'À', '&Egrave;': 'È', '&Igrave;': 'Ì', '&Ograve;': 'Ò', '&Ugrave;': 'Ù',
'&acirc;': 'â', '&ecirc;': 'ê', '&icirc;': 'î', '&ocirc;': 'ô', '&ucirc;': 'û',
'&Acirc;': 'Â', '&Ecirc;': 'Ê', '&Icirc;': 'Î', '&Ocirc;': 'Ô', '&Ucirc;': 'Û',
'&atilde;': 'ã', '&etilde;': '', '&itilde;': 'ĩ', '&otilde;': 'õ', '&utilde;': 'ũ',
'&Atilde;': 'Ã', '&Etilde;': '', '&Itilde;': 'Ĩ', '&Otilde;': 'Õ', '&Utilde;': 'Ũ',
'&auml;': 'ä', '&euml;': 'ë', '&iuml;': 'ï', '&ouml;': 'ö', '&uuml;': 'ü',
'&Auml;': 'Ä', '&Euml;': 'Ë', '&Iuml;': 'Ï', '&Ouml;': 'Ö', '&Uuml;': 'Ü',
'&ccedil;': 'ç', '&Ccedil;': 'Ç', '&ntilde;': 'ñ', '&Ntilde;': 'Ñ',
'&nbsp;': ' ', '&lt;': '<', '&gt;': '>', '&amp;': '&', '&quot;': '"', '&apos;': "'",
'&euro;': '', '&copy;': '©', '&reg;': '®', '&trade;': ''
}
for html, char in special_chars.items():
texte = texte.replace(html, char)
# Normaliser les caractères composés (par exemple, e plus accent)
# Exemple: 'é' qui pourrait être stocké comme 'e' + accent combinant
return unicodedata.normalize('NFC', texte)
def detecter_role(message: Dict[str, Any]) -> str:
"""
Détecte si un message provient du client ou du support.
Args:
message: Dictionnaire contenant les informations du message
Returns:
"Client" ou "Support"
"""
# Vérifier le champ 'role' s'il existe déjà
if "role" in message and message["role"] in ["Client", "Support"]:
return message["role"]
# Indices de support dans l'email
domaines_support = ["@cbao.fr", "@odoo.com", "support@", "ticket.support"]
indices_nom_support = ["support", "cbao", "technique", "odoo"]
email = message.get("email_from", "").lower()
# Nettoyer le format "Nom <email@domaine.com>"
if "<" in email and ">" in email:
match = re.search(r'<([^>]+)>', email)
if match:
email = match.group(1).lower()
# Vérifier le domaine email
if any(domaine in email for domaine in domaines_support):
return "Support"
# Vérifier le nom d'auteur
auteur = ""
if "author_id" in message and isinstance(message["author_id"], list) and len(message["author_id"]) > 1:
auteur = str(message["author_id"][1]).lower()
if any(indice in auteur for indice in indices_nom_support):
return "Support"
# Par défaut, considérer comme client
return "Client"
def transformer_messages(input_file: str, output_file: Optional[str] = None, debug: bool = False) -> None:
"""
Transforme le fichier messages.json en un format amélioré pour l'analyse LLM.
Args:
input_file: Chemin du fichier messages.json original
output_file: Chemin du fichier de sortie (par défaut, écrase le fichier d'entrée)
debug: Activer le mode débogage pour afficher plus d'informations
"""
if output_file is None:
output_file = input_file
if debug:
print(f"Transformation du fichier {input_file} vers {output_file} (mode débogage activé)")
try:
# Lire le fichier messages.json original
with open(input_file, 'r', encoding='utf-8') as f:
messages = json.load(f)
# Trouver le répertoire du ticket et charger les informations
ticket_dir = os.path.dirname(input_file)
ticket_info_path = os.path.join(ticket_dir, "ticket_info.json")
ticket_info = {}
# Extraire le code du ticket du nom du répertoire
try:
dir_name = os.path.basename(ticket_dir)
ticket_code = dir_name.replace("ticket_", "") if dir_name.startswith("ticket_") else dir_name
except Exception as e:
print(f"AVERTISSEMENT: Impossible d'extraire le code du ticket du répertoire: {e}")
ticket_code = "UNKNOWN"
# Charger les informations du ticket
if os.path.exists(ticket_info_path):
try:
with open(ticket_info_path, 'r', encoding='utf-8') as f:
ticket_info = json.load(f)
except Exception as e:
print(f"AVERTISSEMENT: Impossible de charger ticket_info.json: {e}")
# Créer une version améliorée des messages
processed_messages = []
# Extraire et formater les informations du ticket de manière sécurisée
ticket_name = ""
ticket_description = ""
ticket_date = ""
try:
if ticket_info:
ticket_name = str(ticket_info.get("name", "")) if ticket_info.get("name") is not None else ""
ticket_description = str(ticket_info.get("description", "")) if ticket_info.get("description") is not None else ""
ticket_date = str(ticket_info.get("create_date", "")) if ticket_info.get("create_date") is not None else ""
ticket_code = str(ticket_info.get("code", ticket_code)) if ticket_info.get("code") is not None else ticket_code
except Exception as e:
print(f"AVERTISSEMENT: Problème lors de l'extraction des données du ticket: {e}")
# Nettoyer la description pour éliminer les balises HTML
ticket_description_nettoyee = nettoyer_html(ticket_description, debug=debug)
if debug:
print(f"Description originale: {ticket_description[:100]}...")
print(f"Description nettoyée: {ticket_description_nettoyee[:100]}...")
# Ajouter les informations du ticket comme premier "message"
formatted_ticket_info = {
"id": "ticket_info",
"name": normaliser_accents(ticket_name) or f"Ticket {ticket_code}",
"code": ticket_code,
"description": ticket_description_nettoyee,
"date_create": ticket_date,
"role": "system",
"type": "contexte",
"body": f"TICKET {ticket_code}: {normaliser_accents(ticket_name)}.\n\nDESCRIPTION: {ticket_description_nettoyee or 'Aucune description disponible.'}"
}
processed_messages.append(formatted_ticket_info)
# Vérifier que messages est bien une liste
if not isinstance(messages, list):
print(f"AVERTISSEMENT: Le fichier messages.json ne contient pas une liste valide. Type: {type(messages)}")
messages = []
# Transformer chaque message
valid_messages = 0
for msg in messages:
# Vérifier que msg est un dictionnaire
if not isinstance(msg, dict):
print(f"AVERTISSEMENT: Message ignoré car ce n'est pas un dictionnaire. Type: {type(msg)}")
continue
# Ignorer les messages vides
body = msg.get("body", "")
if not body or not isinstance(body, str):
continue
if debug:
contient_html = bool(re.search(r'<[a-z]+[^>]*>', body, re.IGNORECASE))
if contient_html:
print(f"Message {msg.get('id', 'unknown')} contient du HTML")
# Déterminer le type (question/réponse) basé sur le rôle
role = detecter_role(msg)
message_type = "Question" if role == "Client" else "Réponse"
# Nettoyer le contenu de manière sécurisée
contenu_nettoye = nettoyer_html(body, debug=debug)
if not contenu_nettoye:
if debug:
print(f"Message {msg.get('id', 'unknown')} ignoré - contenu vide après nettoyage")
continue
# Normaliser les champs textuels
email_from = normaliser_accents(msg.get("email_from", ""))
subject = normaliser_accents(msg.get("subject", ""))
# Créer le message transformé avec des valeurs sécurisées
msg_id = msg.get("id", "")
if not msg_id:
msg_id = f"msg_{valid_messages+1}"
# Convertir msg_id en string si ce n'est pas le cas
if not isinstance(msg_id, str):
try:
msg_id = str(msg_id)
except:
msg_id = f"msg_{valid_messages+1}"
# Récupérer author_id de manière sécurisée
author_id = msg.get("author_id", [0, ""])
if not isinstance(author_id, list):
author_id = [0, ""]
# Récupérer la date de manière sécurisée
date = msg.get("date", "")
if not isinstance(date, str):
try:
date = str(date)
except:
date = ""
processed_message = {
"id": msg_id,
"author_id": author_id,
"role": role,
"type": message_type,
"date": date,
"email_from": email_from,
"subject": subject,
"body": contenu_nettoye
}
processed_messages.append(processed_message)
valid_messages += 1
# Trier par date (sauf le premier message qui est le contexte)
try:
processed_messages[1:] = sorted(processed_messages[1:], key=lambda x: x.get("date", ""))
except Exception as e:
print(f"AVERTISSEMENT: Impossible de trier les messages par date: {e}")
# Vérifier qu'il y a au moins un message valide en plus du contexte
if valid_messages == 0:
print("AVERTISSEMENT: Aucun message valide trouvé après nettoyage.")
# Ajouter un message factice pour éviter les erreurs
processed_messages.append({
"id": "msg_default",
"role": "Client",
"type": "Question",
"date": formatted_ticket_info.get("date_create", ""),
"body": f"Problème concernant {formatted_ticket_info.get('name', 'ce ticket')}."
})
# Écrire le fichier transformé
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(processed_messages, f, indent=2, ensure_ascii=False)
print(f"Transformation réussie: {len(processed_messages)} messages traités ({valid_messages} messages réels)")
except Exception as e:
print(f"Erreur lors de la transformation des messages: {str(e)}")
import traceback
print(f"Détails: {traceback.format_exc()}")
raise
def corriger_json_accents(input_file: str, output_file: Optional[str] = None) -> None:
"""
Corrige les problèmes d'accents dans un fichier JSON.
Args:
input_file: Chemin du fichier JSON à corriger
output_file: Chemin du fichier de sortie (par défaut, écrase le fichier d'entrée)
"""
if output_file is None:
output_file = input_file
try:
# Lire le fichier JSON
with open(input_file, 'r', encoding='utf-8') as f:
content = json.load(f)
# Fonction récursive pour normaliser tous les textes dans le JSON
def normaliser_json(obj):
if isinstance(obj, str):
return normaliser_accents(obj)
elif isinstance(obj, list):
return [normaliser_json(item) for item in obj]
elif isinstance(obj, dict):
return {k: normaliser_json(v) for k, v in obj.items()}
else:
return obj
# Normaliser tout le contenu JSON
content_normalise = normaliser_json(content)
# Écrire le fichier normalisé
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(content_normalise, f, indent=2, ensure_ascii=False)
print(f"Correction des accents réussie pour {os.path.basename(input_file)}")
except Exception as e:
print(f"Erreur lors de la correction des accents: {str(e)}")
raise
def corriger_markdown_accents(input_file: str, output_file: Optional[str] = None) -> None:
"""
Corrige les problèmes d'accents dans un fichier Markdown.
Args:
input_file: Chemin du fichier Markdown à corriger
output_file: Chemin du fichier de sortie (par défaut, écrase le fichier d'entrée)
"""
if output_file is None:
output_file = input_file
try:
# Lire le fichier Markdown
with open(input_file, 'r', encoding='utf-8') as f:
content = f.read()
# Normaliser les accents
content_normalise = normaliser_accents(content)
# Vérifier si des changements ont été effectués
if content != content_normalise:
# Écrire le fichier normalisé
with open(output_file, 'w', encoding='utf-8') as f:
f.write(content_normalise)
print(f"Correction des accents réussie pour {os.path.basename(input_file)}")
else:
print(f"Aucune correction nécessaire pour {os.path.basename(input_file)}")
except Exception as e:
print(f"Erreur lors de la correction des accents dans le markdown: {str(e)}")
raise
def reparer_ticket(ticket_dir: str) -> bool:
"""
Répare et réinitialise le traitement d'un ticket dont les données sont corrompues.
Args:
ticket_dir: Chemin du répertoire du ticket
Returns:
True si la réparation a réussi, False sinon
"""
try:
print(f"Tentative de réparation du ticket dans {ticket_dir}...")
# Vérifier que le répertoire existe
if not os.path.isdir(ticket_dir):
print(f"ERREUR: Répertoire de ticket introuvable: {ticket_dir}")
return False
# Chemins des fichiers critiques
ticket_info_path = os.path.join(ticket_dir, "ticket_info.json")
messages_path = os.path.join(ticket_dir, "messages.json")
attachments_path = os.path.join(ticket_dir, "attachments_info.json")
# Vérifier et réparer ticket_info.json
if os.path.exists(ticket_info_path):
try:
with open(ticket_info_path, 'r', encoding='utf-8') as f:
ticket_info = json.load(f)
# Vérifier la structure minimale
if not isinstance(ticket_info, dict):
raise ValueError("ticket_info.json n'est pas un dictionnaire valide")
# Réparer les champs manquants ou invalides
code = os.path.basename(ticket_dir).replace("ticket_", "")
if "code" not in ticket_info or not isinstance(ticket_info["code"], str):
ticket_info["code"] = code
if "name" not in ticket_info or not isinstance(ticket_info["name"], str):
ticket_info["name"] = f"Ticket {code}"
if "description" not in ticket_info or not isinstance(ticket_info["description"], str):
ticket_info["description"] = ""
# Réécrire le fichier nettoyé
with open(ticket_info_path, 'w', encoding='utf-8') as f:
json.dump(ticket_info, f, indent=2, ensure_ascii=False)
print(f"✓ ticket_info.json réparé")
except Exception as e:
print(f"! Erreur lors de la réparation de ticket_info.json: {str(e)}")
# Créer un ticket_info minimal
ticket_info = {
"code": os.path.basename(ticket_dir).replace("ticket_", ""),
"name": f"Ticket {os.path.basename(ticket_dir).replace('ticket_', '')}",
"description": "",
"create_date": ""
}
# Sauvegarder la version minimale
with open(ticket_info_path, 'w', encoding='utf-8') as f:
json.dump(ticket_info, f, indent=2, ensure_ascii=False)
print(f"✓ ticket_info.json recréé avec structure minimale")
else:
# Créer un ticket_info minimal
ticket_info = {
"code": os.path.basename(ticket_dir).replace("ticket_", ""),
"name": f"Ticket {os.path.basename(ticket_dir).replace('ticket_', '')}",
"description": "",
"create_date": ""
}
# Sauvegarder la version minimale
with open(ticket_info_path, 'w', encoding='utf-8') as f:
json.dump(ticket_info, f, indent=2, ensure_ascii=False)
print(f"✓ ticket_info.json créé avec structure minimale")
# Vérifier et réparer messages.json
messages_valides = False
if os.path.exists(messages_path):
try:
# Sauvegarder l'original s'il n'y a pas encore de backup
backup_file = os.path.join(ticket_dir, "messages.json.original")
if not os.path.exists(backup_file):
shutil.copy2(messages_path, backup_file)
print(f"✓ Sauvegarde originale créée: {backup_file}")
# Essayer de charger le fichier
with open(messages_path, 'r', encoding='utf-8') as f:
messages = json.load(f)
# Vérifier que c'est une liste
if not isinstance(messages, list):
raise ValueError("messages.json n'est pas une liste valide")
messages_valides = True
print(f"✓ messages.json valide ({len(messages)} messages)")
except Exception as e:
print(f"! Erreur dans messages.json: {str(e)}")
print(" Tentative de récupération...")
# Essayer de récupérer depuis la sauvegarde
backup_file = os.path.join(ticket_dir, "messages.json.original")
if os.path.exists(backup_file):
try:
with open(backup_file, 'r', encoding='utf-8') as f:
messages = json.load(f)
if isinstance(messages, list):
# Sauvegarder la version récupérée
with open(messages_path, 'w', encoding='utf-8') as f:
json.dump(messages, f, indent=2, ensure_ascii=False)
messages_valides = True
print(f"✓ messages.json récupéré depuis la sauvegarde")
except Exception:
print(" Échec de la récupération depuis la sauvegarde")
# Si les messages sont toujours invalides, créer un fichier minimal
if not messages_valides:
# Créer un fichier messages minimal
messages = [{
"id": 1,
"body": f"Message par défaut pour le ticket {os.path.basename(ticket_dir)}",
"date": "",
"email_from": "client@example.com"
}]
# Sauvegarder la version minimale
with open(messages_path, 'w', encoding='utf-8') as f:
json.dump(messages, f, indent=2, ensure_ascii=False)
print(f"✓ messages.json recréé avec message par défaut")
# Transformer messages.json pour le format attendu
print("Transformation des messages pour le bon format...")
transformer_messages(messages_path)
print("✓ Transformation des messages terminée")
# Vérifier et réparer attachments_info.json
if os.path.exists(attachments_path):
try:
with open(attachments_path, 'r', encoding='utf-8') as f:
attachments = json.load(f)
# Vérifier que c'est une liste
if not isinstance(attachments, list):
attachments = []
with open(attachments_path, 'w', encoding='utf-8') as f:
json.dump(attachments, f, indent=2, ensure_ascii=False)
print(f"✓ attachments_info.json réparé (liste vide)")
else:
print(f"✓ attachments_info.json valide ({len(attachments)} pièces jointes)")
except Exception as e:
print(f"! Erreur dans attachments_info.json: {str(e)}")
# Créer une liste vide
with open(attachments_path, 'w', encoding='utf-8') as f:
json.dump([], f, indent=2, ensure_ascii=False)
print(f"✓ attachments_info.json recréé (liste vide)")
else:
# Créer une liste vide
with open(attachments_path, 'w', encoding='utf-8') as f:
json.dump([], f, indent=2, ensure_ascii=False)
print(f"✓ attachments_info.json créé (liste vide)")
print(f"Réparation du ticket terminée avec succès!")
return True
except Exception as e:
print(f"ERREUR lors de la réparation du ticket: {str(e)}")
import traceback
print(f"Détails: {traceback.format_exc()}")
return False
def diagnostiquer_ticket(ticket_dir: str) -> Dict[str, Any]:
"""
Diagnostique les problèmes dans un ticket et propose des solutions.
Args:
ticket_dir: Chemin du répertoire du ticket
Returns:
Rapport de diagnostic avec les problèmes identifiés et solutions proposées
"""
diagnostic = {
"problemes": [],
"suggestions": [],
"etat_fichiers": {}
}
print(f"Diagnostic du ticket dans {ticket_dir}...")
# Vérifier que le répertoire existe
if not os.path.isdir(ticket_dir):
diagnostic["problemes"].append(f"Répertoire de ticket introuvable: {ticket_dir}")
diagnostic["suggestions"].append("Créer le répertoire du ticket")
return diagnostic
# Chemins des fichiers critiques
ticket_info_path = os.path.join(ticket_dir, "ticket_info.json")
messages_path = os.path.join(ticket_dir, "messages.json")
messages_backup_path = os.path.join(ticket_dir, "messages.json.backup")
attachments_path = os.path.join(ticket_dir, "attachments_info.json")
attachments_dir = os.path.join(ticket_dir, "attachments")
questions_reponses_path = os.path.join(ticket_dir, "questions_reponses.md")
rapport_dir = os.path.join(ticket_dir, "rapport")
# Vérifier ticket_info.json
if os.path.exists(ticket_info_path):
try:
with open(ticket_info_path, 'r', encoding='utf-8') as f:
ticket_info = json.load(f)
diagnostic["etat_fichiers"]["ticket_info.json"] = "valide"
# Vérifier la structure minimale
if not isinstance(ticket_info, dict):
diagnostic["problemes"].append("ticket_info.json n'est pas un dictionnaire valide")
diagnostic["suggestions"].append("Réparer ticket_info.json avec --repair")
diagnostic["etat_fichiers"]["ticket_info.json"] = "invalide"
# Vérifier les champs HTML
description = ticket_info.get("description", "")
if isinstance(description, str) and re.search(r'<[a-z]+[^>]*>', description, re.IGNORECASE):
diagnostic["problemes"].append("La description contient du HTML non traité")
diagnostic["suggestions"].append("Traiter les balises HTML dans la description")
except Exception as e:
diagnostic["problemes"].append(f"Erreur dans ticket_info.json: {str(e)}")
diagnostic["suggestions"].append("Réparer ticket_info.json avec --repair")
diagnostic["etat_fichiers"]["ticket_info.json"] = "corrompu"
else:
diagnostic["problemes"].append(f"Fichier manquant: ticket_info.json")
diagnostic["suggestions"].append("Créer ticket_info.json avec --repair")
diagnostic["etat_fichiers"]["ticket_info.json"] = "manquant"
# Vérifier messages.json
if os.path.exists(messages_path):
try:
with open(messages_path, 'r', encoding='utf-8') as f:
messages = json.load(f)
diagnostic["etat_fichiers"]["messages.json"] = "valide"
# Vérifier que c'est une liste
if not isinstance(messages, list):
diagnostic["problemes"].append("messages.json n'est pas une liste valide")
diagnostic["suggestions"].append("Réparer messages.json avec --repair")
diagnostic["etat_fichiers"]["messages.json"] = "invalide"
# Vérifier le contenu HTML dans les messages
html_count = 0
for msg in messages:
if not isinstance(msg, dict):
continue
body = msg.get("body", "")
if isinstance(body, str) and re.search(r'<[a-z]+[^>]*>', body, re.IGNORECASE):
html_count += 1
if html_count > 0:
diagnostic["problemes"].append(f"{html_count} message(s) contiennent du HTML non traité")
diagnostic["suggestions"].append("Retraiter messages.json avec --debug pour voir les problèmes")
# Vérifier les accents dans les messages
accents_count = 0
for msg in messages:
if not isinstance(msg, dict):
continue
body = msg.get("body", "")
if isinstance(body, str):
# Vérifier les entités HTML pour les accents
if re.search(r'&[aeiounc][a-z]{3,5};', body, re.IGNORECASE):
accents_count += 1
if accents_count > 0:
diagnostic["problemes"].append(f"{accents_count} message(s) contiennent des entités HTML d'accent non converties")
diagnostic["suggestions"].append("Corriger les accents avec --fix-all")
except Exception as e:
diagnostic["problemes"].append(f"Erreur dans messages.json: {str(e)}")
diagnostic["suggestions"].append("Réparer messages.json avec --repair")
diagnostic["etat_fichiers"]["messages.json"] = "corrompu"
else:
diagnostic["problemes"].append(f"Fichier manquant: messages.json")
diagnostic["suggestions"].append("Créer messages.json avec --repair")
diagnostic["etat_fichiers"]["messages.json"] = "manquant"
# Vérifier si une sauvegarde des messages existe
if os.path.exists(messages_backup_path):
diagnostic["etat_fichiers"]["messages.json.backup"] = "présent"
else:
diagnostic["etat_fichiers"]["messages.json.backup"] = "manquant"
# Vérifier le fichier des questions et réponses
if os.path.exists(questions_reponses_path):
try:
with open(questions_reponses_path, 'r', encoding='utf-8') as f:
content = f.read()
diagnostic["etat_fichiers"]["questions_reponses.md"] = "présent"
# Vérifier si des questions/réponses sont présentes
if "| Question | Réponse |" in content and not re.search(r'\| \*\*[^|]+\*\*: ', content):
diagnostic["problemes"].append("Le fichier questions_reponses.md ne contient pas de questions/réponses")
diagnostic["suggestions"].append("Retraiter le ticket pour extraire les questions/réponses")
except Exception as e:
diagnostic["problemes"].append(f"Erreur dans questions_reponses.md: {str(e)}")
diagnostic["etat_fichiers"]["questions_reponses.md"] = "invalide"
else:
diagnostic["etat_fichiers"]["questions_reponses.md"] = "manquant"
# Vérifier les pièces jointes
if os.path.exists(attachments_path):
try:
with open(attachments_path, 'r', encoding='utf-8') as f:
attachments = json.load(f)
diagnostic["etat_fichiers"]["attachments_info.json"] = "valide"
# Vérifier que c'est une liste
if not isinstance(attachments, list):
diagnostic["problemes"].append("attachments_info.json n'est pas une liste valide")
diagnostic["suggestions"].append("Réparer attachments_info.json avec --repair")
diagnostic["etat_fichiers"]["attachments_info.json"] = "invalide"
# Vérifier que les fichiers attachés existent
if os.path.exists(attachments_dir):
diagnostic["etat_fichiers"]["attachments/"] = "présent"
for attachment in attachments:
if not isinstance(attachment, dict):
continue
file_path = attachment.get("file_path", "")
if not file_path:
continue
# Normaliser le chemin
if not os.path.isabs(file_path):
file_path = os.path.join(attachments_dir, os.path.basename(file_path))
if not os.path.exists(file_path):
file_name = os.path.basename(file_path)
diagnostic["problemes"].append(f"Fichier attaché manquant: {file_name}")
else:
diagnostic["etat_fichiers"]["attachments/"] = "manquant"
diagnostic["problemes"].append("Répertoire attachments/ manquant")
except Exception as e:
diagnostic["problemes"].append(f"Erreur dans attachments_info.json: {str(e)}")
diagnostic["suggestions"].append("Réparer attachments_info.json avec --repair")
diagnostic["etat_fichiers"]["attachments_info.json"] = "corrompu"
else:
diagnostic["etat_fichiers"]["attachments_info.json"] = "manquant"
diagnostic["problemes"].append("Fichier attachments_info.json manquant")
diagnostic["suggestions"].append("Créer attachments_info.json avec --repair")
# Vérifier le répertoire rapport
if os.path.exists(rapport_dir):
diagnostic["etat_fichiers"]["rapport/"] = "présent"
rapport_json = os.path.join(rapport_dir, "ticket_analysis.json")
rapport_md = os.path.join(rapport_dir, "ticket_analysis.md")
if os.path.exists(rapport_json):
diagnostic["etat_fichiers"]["rapport/ticket_analysis.json"] = "présent"
else:
diagnostic["etat_fichiers"]["rapport/ticket_analysis.json"] = "manquant"
diagnostic["problemes"].append("Rapport JSON manquant")
if os.path.exists(rapport_md):
diagnostic["etat_fichiers"]["rapport/ticket_analysis.md"] = "présent"
else:
diagnostic["etat_fichiers"]["rapport/ticket_analysis.md"] = "manquant"
diagnostic["problemes"].append("Rapport Markdown manquant")
else:
diagnostic["etat_fichiers"]["rapport/"] = "manquant"
# Ajouter des suggestions globales si nécessaires
if len(diagnostic["problemes"]) > 3:
diagnostic["suggestions"].insert(0, "Utiliser l'option --repair pour essayer de corriger tous les problèmes automatiquement")
# Afficher le résumé du diagnostic
print(f"\nRésumé du diagnostic pour {os.path.basename(ticket_dir)}:")
print(f"- Problèmes identifiés: {len(diagnostic['problemes'])}")
for i, probleme in enumerate(diagnostic["problemes"]):
print(f" {i+1}. {probleme}")
print("\nSuggestions:")
for suggestion in diagnostic["suggestions"]:
print(f"- {suggestion}")
return diagnostic
def main():
"""
Point d'entrée principal du script.
"""
# Analyser les arguments
if len(sys.argv) < 2:
print("Usage: python post_process.py <ticket_dir> [options]")
print("Options:")
print(" --fix-all Corriger les accents dans tous les fichiers JSON et Markdown")
print(" --fix-md Corriger uniquement les fichiers Markdown")
print(" --repair Réparer un ticket corrompu")
print(" --debug Activer le mode débogage")
print(" --diagnose Diagnostiquer les problèmes du ticket")
print(" --help Afficher cette aide")
sys.exit(1)
# Afficher l'aide
if "--help" in sys.argv:
print("Usage: python post_process.py <ticket_dir> [options]")
print("Options:")
print(" --fix-all Corriger les accents dans tous les fichiers JSON et Markdown")
print(" --fix-md Corriger uniquement les fichiers Markdown")
print(" --repair Réparer un ticket corrompu")
print(" --debug Activer le mode débogage")
print(" --diagnose Diagnostiquer les problèmes du ticket")
print(" --help Afficher cette aide")
sys.exit(0)
ticket_dir = sys.argv[1]
fix_all = "--fix-all" in sys.argv
fix_md = "--fix-md" in sys.argv
repair = "--repair" in sys.argv
debug = "--debug" in sys.argv
diagnose = "--diagnose" in sys.argv
# Vérifier que le répertoire existe
if not os.path.isdir(ticket_dir):
print(f"ERREUR: Répertoire non trouvé: {ticket_dir}")
sys.exit(1)
# Option de diagnostic du ticket
if diagnose:
diagnostiquer_ticket(ticket_dir)
sys.exit(0)
# Option de réparation du ticket
if repair:
success = reparer_ticket(ticket_dir)
if not success:
print("La réparation du ticket a échoué.")
sys.exit(1)
print("Ticket réparé avec succès!")
sys.exit(0)
# Option de correction des accents dans les fichiers Markdown uniquement
if fix_md:
rapport_dir = os.path.join(ticket_dir, "rapport")
corrected = False
# Corriger les fichiers Markdown du répertoire rapport
if os.path.exists(rapport_dir):
for root, _, files in os.walk(rapport_dir):
for file in files:
if file.endswith(".md"):
md_file = os.path.join(root, file)
corriger_markdown_accents(md_file)
corrected = True
# Corriger les fichiers Markdown à la racine du ticket
for file in os.listdir(ticket_dir):
if file.endswith(".md"):
md_file = os.path.join(ticket_dir, file)
corriger_markdown_accents(md_file)
corrected = True
if corrected:
print("Correction des accents terminée dans les fichiers Markdown.")
else:
print("Aucun fichier Markdown trouvé.")
sys.exit(0)
# Transformation standard des messages
messages_file = os.path.join(ticket_dir, "messages.json")
if not os.path.exists(messages_file):
print(f"Fichier non trouvé: {messages_file}")
sys.exit(1)
try:
# Transformer les messages
transformer_messages(messages_file, debug=debug)
print(f"Post-traitement terminé pour {messages_file}")
# Corriger les accents dans tous les fichiers si demandé
if fix_all:
rapport_dir = os.path.join(ticket_dir, "rapport")
if os.path.exists(rapport_dir):
# Corriger les fichiers JSON
for root, _, files in os.walk(rapport_dir):
for file in files:
if file.endswith(".json"):
json_file = os.path.join(root, file)
corriger_json_accents(json_file)
# Corriger les fichiers Markdown
for root, _, files in os.walk(rapport_dir):
for file in files:
if file.endswith(".md"):
md_file = os.path.join(root, file)
corriger_markdown_accents(md_file)
# Corriger les fichiers Markdown à la racine du ticket
for file in os.listdir(ticket_dir):
if file.endswith(".md"):
md_file = os.path.join(ticket_dir, file)
corriger_markdown_accents(md_file)
print("Correction des accents terminée dans tous les fichiers.")
except Exception as e:
print(f"ERREUR lors du post-traitement: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()