mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-16 00:36:52 +01:00
982 lines
42 KiB
Python
982 lines
42 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
|
Script de post-traitement pour améliorer les fichiers JSON avant analyse.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import re
|
|
import unicodedata
|
|
import shutil
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
def nettoyer_html(texte: str, debug: bool = False) -> str:
|
|
"""
|
|
Nettoie le contenu HTML en supprimant les balises et le formatage.
|
|
|
|
Args:
|
|
texte: Texte HTML à nettoyer
|
|
debug: Afficher des informations de débogage pendant le nettoyage
|
|
|
|
Returns:
|
|
Texte nettoyé
|
|
"""
|
|
# Vérifier et convertir l'entrée
|
|
if not texte:
|
|
return ""
|
|
|
|
if not isinstance(texte, str):
|
|
try:
|
|
texte = str(texte)
|
|
except Exception as e:
|
|
print(f"AVERTISSEMENT: Impossible de convertir en texte: {e}")
|
|
return ""
|
|
|
|
if debug:
|
|
print(f"Texte original ({len(texte)} caractères): {texte[:100]}...")
|
|
|
|
# Détection de HTML
|
|
contient_html = bool(re.search(r'<[a-z]+[^>]*>', texte, re.IGNORECASE))
|
|
if debug and contient_html:
|
|
print(f"Le texte contient du HTML, nettoyage nécessaire")
|
|
|
|
# Supprimer les balises HTML - regex plus agressive pour capturer tous types de balises
|
|
try:
|
|
# Première passe - balises standard
|
|
texte_nettoye = re.sub(r'</?[a-z]+[^>]*>', ' ', texte, flags=re.IGNORECASE)
|
|
|
|
# Deuxième passe - balises restantes, y compris les mal formées
|
|
texte_nettoye = re.sub(r'<[^>]*>', ' ', texte_nettoye)
|
|
|
|
if debug and contient_html:
|
|
print(f"Après suppression des balises HTML: {texte_nettoye[:100]}...")
|
|
except Exception as e:
|
|
print(f"AVERTISSEMENT: Erreur lors du nettoyage HTML: {e}")
|
|
texte_nettoye = texte
|
|
|
|
# Remplacer les références aux images
|
|
try:
|
|
texte_nettoye = re.sub(r'\[Image:[^\]]+\]', '[Image]', texte_nettoye)
|
|
texte_nettoye = re.sub(r'<img[^>]+>', '[Image]', texte_nettoye, flags=re.IGNORECASE)
|
|
except Exception as e:
|
|
if debug:
|
|
print(f"AVERTISSEMENT: Erreur lors du traitement des images: {e}")
|
|
|
|
# Supprimer les éléments courants non pertinents
|
|
patterns_a_supprimer = [
|
|
r'Cordialement,[\s\S]*?$',
|
|
r'Bien cordialement,[\s\S]*?$',
|
|
r'Bonne réception[\s\S]*?$',
|
|
r'À votre disposition[\s\S]*?$',
|
|
r'Support technique[\s\S]*?$',
|
|
r'L\'objectif du Support Technique[\s\S]*?$',
|
|
r'Notre service est ouvert[\s\S]*?$',
|
|
r'Dès réception[\s\S]*?$',
|
|
r'Confidentialité[\s\S]*?$',
|
|
r'Ce message électronique[\s\S]*?$',
|
|
r'Droit à la déconnexion[\s\S]*?$',
|
|
r'Afin d\'assurer une meilleure traçabilité[\s\S]*?$',
|
|
r'tél\s*:\s*[\d\s\+]+',
|
|
r'mobile\s*:\s*[\d\s\+]+',
|
|
r'www\.[^\s]+\.[a-z]{2,3}',
|
|
]
|
|
|
|
try:
|
|
for pattern in patterns_a_supprimer:
|
|
texte_avant = texte_nettoye
|
|
texte_nettoye = re.sub(pattern, '', texte_nettoye, flags=re.IGNORECASE)
|
|
if debug and texte_avant != texte_nettoye:
|
|
print(f"Suppression de pattern '{pattern[:20]}...'")
|
|
except Exception as e:
|
|
# En cas d'échec des expressions régulières, conserver le texte tel quel
|
|
if debug:
|
|
print(f"AVERTISSEMENT: Erreur lors de la suppression des patterns: {e}")
|
|
|
|
try:
|
|
# Supprimer les lignes multiples vides
|
|
texte_nettoye = re.sub(r'\n\s*\n', '\n', texte_nettoye)
|
|
|
|
# Supprimer les espaces multiples
|
|
texte_nettoye = re.sub(r'\s+', ' ', texte_nettoye)
|
|
except Exception as e:
|
|
if debug:
|
|
print(f"AVERTISSEMENT: Erreur lors du nettoyage des espaces: {e}")
|
|
|
|
# Normaliser les caractères accentués
|
|
try:
|
|
texte_nettoye = normaliser_accents(texte_nettoye)
|
|
except Exception as e:
|
|
print(f"AVERTISSEMENT: Erreur lors de la normalisation des accents: {e}")
|
|
|
|
if debug:
|
|
print(f"Texte final ({len(texte_nettoye)} caractères): {texte_nettoye[:100]}...")
|
|
|
|
return texte_nettoye.strip()
|
|
|
|
def normaliser_accents(texte: str) -> str:
|
|
"""
|
|
Normalise les caractères accentués pour éviter les problèmes d'encodage.
|
|
|
|
Args:
|
|
texte: Texte à normaliser
|
|
|
|
Returns:
|
|
Texte avec caractères accentués normalisés
|
|
"""
|
|
# Si l'entrée n'est pas une chaîne, convertir en chaîne ou retourner vide
|
|
if not isinstance(texte, str):
|
|
if texte is None:
|
|
return ""
|
|
try:
|
|
texte = str(texte)
|
|
except:
|
|
return ""
|
|
|
|
# Convertir les caractères spéciaux HTML (comme é)
|
|
special_chars = {
|
|
'á': 'á', 'é': 'é', 'í': 'í', 'ó': 'ó', 'ú': 'ú',
|
|
'Á': 'Á', 'É': 'É', 'Í': 'Í', 'Ó': 'Ó', 'Ú': 'Ú',
|
|
'à': 'à', 'è': 'è', 'ì': 'ì', 'ò': 'ò', 'ù': 'ù',
|
|
'À': 'À', 'È': 'È', 'Ì': 'Ì', 'Ò': 'Ò', 'Ù': 'Ù',
|
|
'â': 'â', 'ê': 'ê', 'î': 'î', 'ô': 'ô', 'û': 'û',
|
|
'Â': 'Â', 'Ê': 'Ê', 'Î': 'Î', 'Ô': 'Ô', 'Û': 'Û',
|
|
'ã': 'ã', '&etilde;': 'ẽ', 'ĩ': 'ĩ', 'õ': 'õ', 'ũ': 'ũ',
|
|
'Ã': 'Ã', '&Etilde;': 'Ẽ', 'Ĩ': 'Ĩ', 'Õ': 'Õ', 'Ũ': 'Ũ',
|
|
'ä': 'ä', 'ë': 'ë', 'ï': 'ï', 'ö': 'ö', 'ü': 'ü',
|
|
'Ä': 'Ä', 'Ë': 'Ë', 'Ï': 'Ï', 'Ö': 'Ö', 'Ü': 'Ü',
|
|
'ç': 'ç', 'Ç': 'Ç', 'ñ': 'ñ', 'Ñ': 'Ñ',
|
|
' ': ' ', '<': '<', '>': '>', '&': '&', '"': '"', ''': "'",
|
|
'€': '€', '©': '©', '®': '®', '™': '™'
|
|
}
|
|
|
|
for html, char in special_chars.items():
|
|
texte = texte.replace(html, char)
|
|
|
|
# Normaliser les caractères composés (par exemple, e plus accent)
|
|
# Exemple: 'é' qui pourrait être stocké comme 'e' + accent combinant
|
|
return unicodedata.normalize('NFC', texte)
|
|
|
|
def detecter_role(message: Dict[str, Any]) -> str:
|
|
"""
|
|
Détecte si un message provient du client ou du support.
|
|
|
|
Args:
|
|
message: Dictionnaire contenant les informations du message
|
|
|
|
Returns:
|
|
"Client" ou "Support"
|
|
"""
|
|
# Vérifier le champ 'role' s'il existe déjà
|
|
if "role" in message and message["role"] in ["Client", "Support"]:
|
|
return message["role"]
|
|
|
|
# Indices de support dans l'email
|
|
domaines_support = ["@cbao.fr", "@odoo.com", "support@", "ticket.support"]
|
|
indices_nom_support = ["support", "cbao", "technique", "odoo"]
|
|
|
|
email = message.get("email_from", "").lower()
|
|
# Nettoyer le format "Nom <email@domaine.com>"
|
|
if "<" in email and ">" in email:
|
|
match = re.search(r'<([^>]+)>', email)
|
|
if match:
|
|
email = match.group(1).lower()
|
|
|
|
# Vérifier le domaine email
|
|
if any(domaine in email for domaine in domaines_support):
|
|
return "Support"
|
|
|
|
# Vérifier le nom d'auteur
|
|
auteur = ""
|
|
if "author_id" in message and isinstance(message["author_id"], list) and len(message["author_id"]) > 1:
|
|
auteur = str(message["author_id"][1]).lower()
|
|
|
|
if any(indice in auteur for indice in indices_nom_support):
|
|
return "Support"
|
|
|
|
# Par défaut, considérer comme client
|
|
return "Client"
|
|
|
|
def transformer_messages(input_file: str, output_file: Optional[str] = None, debug: bool = False) -> None:
|
|
"""
|
|
Transforme le fichier messages.json en un format amélioré pour l'analyse LLM.
|
|
|
|
Args:
|
|
input_file: Chemin du fichier messages.json original
|
|
output_file: Chemin du fichier de sortie (par défaut, écrase le fichier d'entrée)
|
|
debug: Activer le mode débogage pour afficher plus d'informations
|
|
"""
|
|
if output_file is None:
|
|
output_file = input_file
|
|
|
|
if debug:
|
|
print(f"Transformation du fichier {input_file} vers {output_file} (mode débogage activé)")
|
|
|
|
try:
|
|
# Lire le fichier messages.json original
|
|
with open(input_file, 'r', encoding='utf-8') as f:
|
|
messages = json.load(f)
|
|
|
|
# Trouver le répertoire du ticket et charger les informations
|
|
ticket_dir = os.path.dirname(input_file)
|
|
ticket_info_path = os.path.join(ticket_dir, "ticket_info.json")
|
|
ticket_info = {}
|
|
|
|
# Extraire le code du ticket du nom du répertoire
|
|
try:
|
|
dir_name = os.path.basename(ticket_dir)
|
|
ticket_code = dir_name.replace("ticket_", "") if dir_name.startswith("ticket_") else dir_name
|
|
except Exception as e:
|
|
print(f"AVERTISSEMENT: Impossible d'extraire le code du ticket du répertoire: {e}")
|
|
ticket_code = "UNKNOWN"
|
|
|
|
# Charger les informations du ticket
|
|
if os.path.exists(ticket_info_path):
|
|
try:
|
|
with open(ticket_info_path, 'r', encoding='utf-8') as f:
|
|
ticket_info = json.load(f)
|
|
except Exception as e:
|
|
print(f"AVERTISSEMENT: Impossible de charger ticket_info.json: {e}")
|
|
|
|
# Créer une version améliorée des messages
|
|
processed_messages = []
|
|
|
|
# Extraire et formater les informations du ticket de manière sécurisée
|
|
ticket_name = ""
|
|
ticket_description = ""
|
|
ticket_date = ""
|
|
|
|
try:
|
|
if ticket_info:
|
|
ticket_name = str(ticket_info.get("name", "")) if ticket_info.get("name") is not None else ""
|
|
ticket_description = str(ticket_info.get("description", "")) if ticket_info.get("description") is not None else ""
|
|
ticket_date = str(ticket_info.get("create_date", "")) if ticket_info.get("create_date") is not None else ""
|
|
ticket_code = str(ticket_info.get("code", ticket_code)) if ticket_info.get("code") is not None else ticket_code
|
|
except Exception as e:
|
|
print(f"AVERTISSEMENT: Problème lors de l'extraction des données du ticket: {e}")
|
|
|
|
# Nettoyer la description pour éliminer les balises HTML
|
|
ticket_description_nettoyee = nettoyer_html(ticket_description, debug=debug)
|
|
if debug:
|
|
print(f"Description originale: {ticket_description[:100]}...")
|
|
print(f"Description nettoyée: {ticket_description_nettoyee[:100]}...")
|
|
|
|
# Ajouter les informations du ticket comme premier "message"
|
|
formatted_ticket_info = {
|
|
"id": "ticket_info",
|
|
"name": normaliser_accents(ticket_name) or f"Ticket {ticket_code}",
|
|
"code": ticket_code,
|
|
"description": ticket_description_nettoyee,
|
|
"date_create": ticket_date,
|
|
"role": "system",
|
|
"type": "contexte",
|
|
"body": f"TICKET {ticket_code}: {normaliser_accents(ticket_name)}.\n\nDESCRIPTION: {ticket_description_nettoyee or 'Aucune description disponible.'}"
|
|
}
|
|
processed_messages.append(formatted_ticket_info)
|
|
|
|
# Vérifier que messages est bien une liste
|
|
if not isinstance(messages, list):
|
|
print(f"AVERTISSEMENT: Le fichier messages.json ne contient pas une liste valide. Type: {type(messages)}")
|
|
messages = []
|
|
|
|
# Transformer chaque message
|
|
valid_messages = 0
|
|
for msg in messages:
|
|
# Vérifier que msg est un dictionnaire
|
|
if not isinstance(msg, dict):
|
|
print(f"AVERTISSEMENT: Message ignoré car ce n'est pas un dictionnaire. Type: {type(msg)}")
|
|
continue
|
|
|
|
# Ignorer les messages vides
|
|
body = msg.get("body", "")
|
|
if not body or not isinstance(body, str):
|
|
continue
|
|
|
|
if debug:
|
|
contient_html = bool(re.search(r'<[a-z]+[^>]*>', body, re.IGNORECASE))
|
|
if contient_html:
|
|
print(f"Message {msg.get('id', 'unknown')} contient du HTML")
|
|
|
|
# Déterminer le type (question/réponse) basé sur le rôle
|
|
role = detecter_role(msg)
|
|
message_type = "Question" if role == "Client" else "Réponse"
|
|
|
|
# Nettoyer le contenu de manière sécurisée
|
|
contenu_nettoye = nettoyer_html(body, debug=debug)
|
|
if not contenu_nettoye:
|
|
if debug:
|
|
print(f"Message {msg.get('id', 'unknown')} ignoré - contenu vide après nettoyage")
|
|
continue
|
|
|
|
# Normaliser les champs textuels
|
|
email_from = normaliser_accents(msg.get("email_from", ""))
|
|
subject = normaliser_accents(msg.get("subject", ""))
|
|
|
|
# Créer le message transformé avec des valeurs sécurisées
|
|
msg_id = msg.get("id", "")
|
|
if not msg_id:
|
|
msg_id = f"msg_{valid_messages+1}"
|
|
|
|
# Convertir msg_id en string si ce n'est pas le cas
|
|
if not isinstance(msg_id, str):
|
|
try:
|
|
msg_id = str(msg_id)
|
|
except:
|
|
msg_id = f"msg_{valid_messages+1}"
|
|
|
|
# Récupérer author_id de manière sécurisée
|
|
author_id = msg.get("author_id", [0, ""])
|
|
if not isinstance(author_id, list):
|
|
author_id = [0, ""]
|
|
|
|
# Récupérer la date de manière sécurisée
|
|
date = msg.get("date", "")
|
|
if not isinstance(date, str):
|
|
try:
|
|
date = str(date)
|
|
except:
|
|
date = ""
|
|
|
|
processed_message = {
|
|
"id": msg_id,
|
|
"author_id": author_id,
|
|
"role": role,
|
|
"type": message_type,
|
|
"date": date,
|
|
"email_from": email_from,
|
|
"subject": subject,
|
|
"body": contenu_nettoye
|
|
}
|
|
|
|
processed_messages.append(processed_message)
|
|
valid_messages += 1
|
|
|
|
# Trier par date (sauf le premier message qui est le contexte)
|
|
try:
|
|
processed_messages[1:] = sorted(processed_messages[1:], key=lambda x: x.get("date", ""))
|
|
except Exception as e:
|
|
print(f"AVERTISSEMENT: Impossible de trier les messages par date: {e}")
|
|
|
|
# Vérifier qu'il y a au moins un message valide en plus du contexte
|
|
if valid_messages == 0:
|
|
print("AVERTISSEMENT: Aucun message valide trouvé après nettoyage.")
|
|
# Ajouter un message factice pour éviter les erreurs
|
|
processed_messages.append({
|
|
"id": "msg_default",
|
|
"role": "Client",
|
|
"type": "Question",
|
|
"date": formatted_ticket_info.get("date_create", ""),
|
|
"body": f"Problème concernant {formatted_ticket_info.get('name', 'ce ticket')}."
|
|
})
|
|
|
|
# Écrire le fichier transformé
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(processed_messages, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Transformation réussie: {len(processed_messages)} messages traités ({valid_messages} messages réels)")
|
|
|
|
except Exception as e:
|
|
print(f"Erreur lors de la transformation des messages: {str(e)}")
|
|
import traceback
|
|
print(f"Détails: {traceback.format_exc()}")
|
|
raise
|
|
|
|
def corriger_json_accents(input_file: str, output_file: Optional[str] = None) -> None:
|
|
"""
|
|
Corrige les problèmes d'accents dans un fichier JSON.
|
|
|
|
Args:
|
|
input_file: Chemin du fichier JSON à corriger
|
|
output_file: Chemin du fichier de sortie (par défaut, écrase le fichier d'entrée)
|
|
"""
|
|
if output_file is None:
|
|
output_file = input_file
|
|
|
|
try:
|
|
# Lire le fichier JSON
|
|
with open(input_file, 'r', encoding='utf-8') as f:
|
|
content = json.load(f)
|
|
|
|
# Fonction récursive pour normaliser tous les textes dans le JSON
|
|
def normaliser_json(obj):
|
|
if isinstance(obj, str):
|
|
return normaliser_accents(obj)
|
|
elif isinstance(obj, list):
|
|
return [normaliser_json(item) for item in obj]
|
|
elif isinstance(obj, dict):
|
|
return {k: normaliser_json(v) for k, v in obj.items()}
|
|
else:
|
|
return obj
|
|
|
|
# Normaliser tout le contenu JSON
|
|
content_normalise = normaliser_json(content)
|
|
|
|
# Écrire le fichier normalisé
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(content_normalise, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Correction des accents réussie pour {os.path.basename(input_file)}")
|
|
|
|
except Exception as e:
|
|
print(f"Erreur lors de la correction des accents: {str(e)}")
|
|
raise
|
|
|
|
def corriger_markdown_accents(input_file: str, output_file: Optional[str] = None) -> None:
|
|
"""
|
|
Corrige les problèmes d'accents dans un fichier Markdown.
|
|
|
|
Args:
|
|
input_file: Chemin du fichier Markdown à corriger
|
|
output_file: Chemin du fichier de sortie (par défaut, écrase le fichier d'entrée)
|
|
"""
|
|
if output_file is None:
|
|
output_file = input_file
|
|
|
|
try:
|
|
# Lire le fichier Markdown
|
|
with open(input_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Normaliser les accents
|
|
content_normalise = normaliser_accents(content)
|
|
|
|
# Vérifier si des changements ont été effectués
|
|
if content != content_normalise:
|
|
# Écrire le fichier normalisé
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
f.write(content_normalise)
|
|
|
|
print(f"Correction des accents réussie pour {os.path.basename(input_file)}")
|
|
else:
|
|
print(f"Aucune correction nécessaire pour {os.path.basename(input_file)}")
|
|
|
|
except Exception as e:
|
|
print(f"Erreur lors de la correction des accents dans le markdown: {str(e)}")
|
|
raise
|
|
|
|
def reparer_ticket(ticket_dir: str) -> bool:
|
|
"""
|
|
Répare et réinitialise le traitement d'un ticket dont les données sont corrompues.
|
|
|
|
Args:
|
|
ticket_dir: Chemin du répertoire du ticket
|
|
|
|
Returns:
|
|
True si la réparation a réussi, False sinon
|
|
"""
|
|
try:
|
|
print(f"Tentative de réparation du ticket dans {ticket_dir}...")
|
|
|
|
# Vérifier que le répertoire existe
|
|
if not os.path.isdir(ticket_dir):
|
|
print(f"ERREUR: Répertoire de ticket introuvable: {ticket_dir}")
|
|
return False
|
|
|
|
# Chemins des fichiers critiques
|
|
ticket_info_path = os.path.join(ticket_dir, "ticket_info.json")
|
|
messages_path = os.path.join(ticket_dir, "messages.json")
|
|
attachments_path = os.path.join(ticket_dir, "attachments_info.json")
|
|
|
|
# Vérifier et réparer ticket_info.json
|
|
if os.path.exists(ticket_info_path):
|
|
try:
|
|
with open(ticket_info_path, 'r', encoding='utf-8') as f:
|
|
ticket_info = json.load(f)
|
|
|
|
# Vérifier la structure minimale
|
|
if not isinstance(ticket_info, dict):
|
|
raise ValueError("ticket_info.json n'est pas un dictionnaire valide")
|
|
|
|
# Réparer les champs manquants ou invalides
|
|
code = os.path.basename(ticket_dir).replace("ticket_", "")
|
|
if "code" not in ticket_info or not isinstance(ticket_info["code"], str):
|
|
ticket_info["code"] = code
|
|
|
|
if "name" not in ticket_info or not isinstance(ticket_info["name"], str):
|
|
ticket_info["name"] = f"Ticket {code}"
|
|
|
|
if "description" not in ticket_info or not isinstance(ticket_info["description"], str):
|
|
ticket_info["description"] = ""
|
|
|
|
# Réécrire le fichier nettoyé
|
|
with open(ticket_info_path, 'w', encoding='utf-8') as f:
|
|
json.dump(ticket_info, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"✓ ticket_info.json réparé")
|
|
|
|
except Exception as e:
|
|
print(f"! Erreur lors de la réparation de ticket_info.json: {str(e)}")
|
|
# Créer un ticket_info minimal
|
|
ticket_info = {
|
|
"code": os.path.basename(ticket_dir).replace("ticket_", ""),
|
|
"name": f"Ticket {os.path.basename(ticket_dir).replace('ticket_', '')}",
|
|
"description": "",
|
|
"create_date": ""
|
|
}
|
|
|
|
# Sauvegarder la version minimale
|
|
with open(ticket_info_path, 'w', encoding='utf-8') as f:
|
|
json.dump(ticket_info, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"✓ ticket_info.json recréé avec structure minimale")
|
|
else:
|
|
# Créer un ticket_info minimal
|
|
ticket_info = {
|
|
"code": os.path.basename(ticket_dir).replace("ticket_", ""),
|
|
"name": f"Ticket {os.path.basename(ticket_dir).replace('ticket_', '')}",
|
|
"description": "",
|
|
"create_date": ""
|
|
}
|
|
|
|
# Sauvegarder la version minimale
|
|
with open(ticket_info_path, 'w', encoding='utf-8') as f:
|
|
json.dump(ticket_info, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"✓ ticket_info.json créé avec structure minimale")
|
|
|
|
# Vérifier et réparer messages.json
|
|
messages_valides = False
|
|
if os.path.exists(messages_path):
|
|
try:
|
|
# Sauvegarder l'original s'il n'y a pas encore de backup
|
|
backup_file = os.path.join(ticket_dir, "messages.json.original")
|
|
if not os.path.exists(backup_file):
|
|
shutil.copy2(messages_path, backup_file)
|
|
print(f"✓ Sauvegarde originale créée: {backup_file}")
|
|
|
|
# Essayer de charger le fichier
|
|
with open(messages_path, 'r', encoding='utf-8') as f:
|
|
messages = json.load(f)
|
|
|
|
# Vérifier que c'est une liste
|
|
if not isinstance(messages, list):
|
|
raise ValueError("messages.json n'est pas une liste valide")
|
|
|
|
messages_valides = True
|
|
print(f"✓ messages.json valide ({len(messages)} messages)")
|
|
except Exception as e:
|
|
print(f"! Erreur dans messages.json: {str(e)}")
|
|
print(" Tentative de récupération...")
|
|
|
|
# Essayer de récupérer depuis la sauvegarde
|
|
backup_file = os.path.join(ticket_dir, "messages.json.original")
|
|
if os.path.exists(backup_file):
|
|
try:
|
|
with open(backup_file, 'r', encoding='utf-8') as f:
|
|
messages = json.load(f)
|
|
|
|
if isinstance(messages, list):
|
|
# Sauvegarder la version récupérée
|
|
with open(messages_path, 'w', encoding='utf-8') as f:
|
|
json.dump(messages, f, indent=2, ensure_ascii=False)
|
|
|
|
messages_valides = True
|
|
print(f"✓ messages.json récupéré depuis la sauvegarde")
|
|
except Exception:
|
|
print(" Échec de la récupération depuis la sauvegarde")
|
|
|
|
# Si les messages sont toujours invalides, créer un fichier minimal
|
|
if not messages_valides:
|
|
# Créer un fichier messages minimal
|
|
messages = [{
|
|
"id": 1,
|
|
"body": f"Message par défaut pour le ticket {os.path.basename(ticket_dir)}",
|
|
"date": "",
|
|
"email_from": "client@example.com"
|
|
}]
|
|
|
|
# Sauvegarder la version minimale
|
|
with open(messages_path, 'w', encoding='utf-8') as f:
|
|
json.dump(messages, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"✓ messages.json recréé avec message par défaut")
|
|
|
|
# Transformer messages.json pour le format attendu
|
|
print("Transformation des messages pour le bon format...")
|
|
transformer_messages(messages_path)
|
|
print("✓ Transformation des messages terminée")
|
|
|
|
# Vérifier et réparer attachments_info.json
|
|
if os.path.exists(attachments_path):
|
|
try:
|
|
with open(attachments_path, 'r', encoding='utf-8') as f:
|
|
attachments = json.load(f)
|
|
|
|
# Vérifier que c'est une liste
|
|
if not isinstance(attachments, list):
|
|
attachments = []
|
|
with open(attachments_path, 'w', encoding='utf-8') as f:
|
|
json.dump(attachments, f, indent=2, ensure_ascii=False)
|
|
print(f"✓ attachments_info.json réparé (liste vide)")
|
|
else:
|
|
print(f"✓ attachments_info.json valide ({len(attachments)} pièces jointes)")
|
|
except Exception as e:
|
|
print(f"! Erreur dans attachments_info.json: {str(e)}")
|
|
# Créer une liste vide
|
|
with open(attachments_path, 'w', encoding='utf-8') as f:
|
|
json.dump([], f, indent=2, ensure_ascii=False)
|
|
print(f"✓ attachments_info.json recréé (liste vide)")
|
|
else:
|
|
# Créer une liste vide
|
|
with open(attachments_path, 'w', encoding='utf-8') as f:
|
|
json.dump([], f, indent=2, ensure_ascii=False)
|
|
print(f"✓ attachments_info.json créé (liste vide)")
|
|
|
|
print(f"Réparation du ticket terminée avec succès!")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"ERREUR lors de la réparation du ticket: {str(e)}")
|
|
import traceback
|
|
print(f"Détails: {traceback.format_exc()}")
|
|
return False
|
|
|
|
def diagnostiquer_ticket(ticket_dir: str) -> Dict[str, Any]:
|
|
"""
|
|
Diagnostique les problèmes dans un ticket et propose des solutions.
|
|
|
|
Args:
|
|
ticket_dir: Chemin du répertoire du ticket
|
|
|
|
Returns:
|
|
Rapport de diagnostic avec les problèmes identifiés et solutions proposées
|
|
"""
|
|
diagnostic = {
|
|
"problemes": [],
|
|
"suggestions": [],
|
|
"etat_fichiers": {}
|
|
}
|
|
|
|
print(f"Diagnostic du ticket dans {ticket_dir}...")
|
|
|
|
# Vérifier que le répertoire existe
|
|
if not os.path.isdir(ticket_dir):
|
|
diagnostic["problemes"].append(f"Répertoire de ticket introuvable: {ticket_dir}")
|
|
diagnostic["suggestions"].append("Créer le répertoire du ticket")
|
|
return diagnostic
|
|
|
|
# Chemins des fichiers critiques
|
|
ticket_info_path = os.path.join(ticket_dir, "ticket_info.json")
|
|
messages_path = os.path.join(ticket_dir, "messages.json")
|
|
messages_backup_path = os.path.join(ticket_dir, "messages.json.backup")
|
|
attachments_path = os.path.join(ticket_dir, "attachments_info.json")
|
|
attachments_dir = os.path.join(ticket_dir, "attachments")
|
|
questions_reponses_path = os.path.join(ticket_dir, "questions_reponses.md")
|
|
rapport_dir = os.path.join(ticket_dir, "rapport")
|
|
|
|
# Vérifier ticket_info.json
|
|
if os.path.exists(ticket_info_path):
|
|
try:
|
|
with open(ticket_info_path, 'r', encoding='utf-8') as f:
|
|
ticket_info = json.load(f)
|
|
|
|
diagnostic["etat_fichiers"]["ticket_info.json"] = "valide"
|
|
|
|
# Vérifier la structure minimale
|
|
if not isinstance(ticket_info, dict):
|
|
diagnostic["problemes"].append("ticket_info.json n'est pas un dictionnaire valide")
|
|
diagnostic["suggestions"].append("Réparer ticket_info.json avec --repair")
|
|
diagnostic["etat_fichiers"]["ticket_info.json"] = "invalide"
|
|
|
|
# Vérifier les champs HTML
|
|
description = ticket_info.get("description", "")
|
|
if isinstance(description, str) and re.search(r'<[a-z]+[^>]*>', description, re.IGNORECASE):
|
|
diagnostic["problemes"].append("La description contient du HTML non traité")
|
|
diagnostic["suggestions"].append("Traiter les balises HTML dans la description")
|
|
|
|
except Exception as e:
|
|
diagnostic["problemes"].append(f"Erreur dans ticket_info.json: {str(e)}")
|
|
diagnostic["suggestions"].append("Réparer ticket_info.json avec --repair")
|
|
diagnostic["etat_fichiers"]["ticket_info.json"] = "corrompu"
|
|
else:
|
|
diagnostic["problemes"].append(f"Fichier manquant: ticket_info.json")
|
|
diagnostic["suggestions"].append("Créer ticket_info.json avec --repair")
|
|
diagnostic["etat_fichiers"]["ticket_info.json"] = "manquant"
|
|
|
|
# Vérifier messages.json
|
|
if os.path.exists(messages_path):
|
|
try:
|
|
with open(messages_path, 'r', encoding='utf-8') as f:
|
|
messages = json.load(f)
|
|
|
|
diagnostic["etat_fichiers"]["messages.json"] = "valide"
|
|
|
|
# Vérifier que c'est une liste
|
|
if not isinstance(messages, list):
|
|
diagnostic["problemes"].append("messages.json n'est pas une liste valide")
|
|
diagnostic["suggestions"].append("Réparer messages.json avec --repair")
|
|
diagnostic["etat_fichiers"]["messages.json"] = "invalide"
|
|
|
|
# Vérifier le contenu HTML dans les messages
|
|
html_count = 0
|
|
for msg in messages:
|
|
if not isinstance(msg, dict):
|
|
continue
|
|
|
|
body = msg.get("body", "")
|
|
if isinstance(body, str) and re.search(r'<[a-z]+[^>]*>', body, re.IGNORECASE):
|
|
html_count += 1
|
|
|
|
if html_count > 0:
|
|
diagnostic["problemes"].append(f"{html_count} message(s) contiennent du HTML non traité")
|
|
diagnostic["suggestions"].append("Retraiter messages.json avec --debug pour voir les problèmes")
|
|
|
|
# Vérifier les accents dans les messages
|
|
accents_count = 0
|
|
for msg in messages:
|
|
if not isinstance(msg, dict):
|
|
continue
|
|
|
|
body = msg.get("body", "")
|
|
if isinstance(body, str):
|
|
# Vérifier les entités HTML pour les accents
|
|
if re.search(r'&[aeiounc][a-z]{3,5};', body, re.IGNORECASE):
|
|
accents_count += 1
|
|
|
|
if accents_count > 0:
|
|
diagnostic["problemes"].append(f"{accents_count} message(s) contiennent des entités HTML d'accent non converties")
|
|
diagnostic["suggestions"].append("Corriger les accents avec --fix-all")
|
|
|
|
except Exception as e:
|
|
diagnostic["problemes"].append(f"Erreur dans messages.json: {str(e)}")
|
|
diagnostic["suggestions"].append("Réparer messages.json avec --repair")
|
|
diagnostic["etat_fichiers"]["messages.json"] = "corrompu"
|
|
else:
|
|
diagnostic["problemes"].append(f"Fichier manquant: messages.json")
|
|
diagnostic["suggestions"].append("Créer messages.json avec --repair")
|
|
diagnostic["etat_fichiers"]["messages.json"] = "manquant"
|
|
|
|
# Vérifier si une sauvegarde des messages existe
|
|
if os.path.exists(messages_backup_path):
|
|
diagnostic["etat_fichiers"]["messages.json.backup"] = "présent"
|
|
else:
|
|
diagnostic["etat_fichiers"]["messages.json.backup"] = "manquant"
|
|
|
|
# Vérifier le fichier des questions et réponses
|
|
if os.path.exists(questions_reponses_path):
|
|
try:
|
|
with open(questions_reponses_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
diagnostic["etat_fichiers"]["questions_reponses.md"] = "présent"
|
|
|
|
# Vérifier si des questions/réponses sont présentes
|
|
if "| Question | Réponse |" in content and not re.search(r'\| \*\*[^|]+\*\*: ', content):
|
|
diagnostic["problemes"].append("Le fichier questions_reponses.md ne contient pas de questions/réponses")
|
|
diagnostic["suggestions"].append("Retraiter le ticket pour extraire les questions/réponses")
|
|
|
|
except Exception as e:
|
|
diagnostic["problemes"].append(f"Erreur dans questions_reponses.md: {str(e)}")
|
|
diagnostic["etat_fichiers"]["questions_reponses.md"] = "invalide"
|
|
else:
|
|
diagnostic["etat_fichiers"]["questions_reponses.md"] = "manquant"
|
|
|
|
# Vérifier les pièces jointes
|
|
if os.path.exists(attachments_path):
|
|
try:
|
|
with open(attachments_path, 'r', encoding='utf-8') as f:
|
|
attachments = json.load(f)
|
|
|
|
diagnostic["etat_fichiers"]["attachments_info.json"] = "valide"
|
|
|
|
# Vérifier que c'est une liste
|
|
if not isinstance(attachments, list):
|
|
diagnostic["problemes"].append("attachments_info.json n'est pas une liste valide")
|
|
diagnostic["suggestions"].append("Réparer attachments_info.json avec --repair")
|
|
diagnostic["etat_fichiers"]["attachments_info.json"] = "invalide"
|
|
|
|
# Vérifier que les fichiers attachés existent
|
|
if os.path.exists(attachments_dir):
|
|
diagnostic["etat_fichiers"]["attachments/"] = "présent"
|
|
|
|
for attachment in attachments:
|
|
if not isinstance(attachment, dict):
|
|
continue
|
|
|
|
file_path = attachment.get("file_path", "")
|
|
if not file_path:
|
|
continue
|
|
|
|
# Normaliser le chemin
|
|
if not os.path.isabs(file_path):
|
|
file_path = os.path.join(attachments_dir, os.path.basename(file_path))
|
|
|
|
if not os.path.exists(file_path):
|
|
file_name = os.path.basename(file_path)
|
|
diagnostic["problemes"].append(f"Fichier attaché manquant: {file_name}")
|
|
else:
|
|
diagnostic["etat_fichiers"]["attachments/"] = "manquant"
|
|
diagnostic["problemes"].append("Répertoire attachments/ manquant")
|
|
|
|
except Exception as e:
|
|
diagnostic["problemes"].append(f"Erreur dans attachments_info.json: {str(e)}")
|
|
diagnostic["suggestions"].append("Réparer attachments_info.json avec --repair")
|
|
diagnostic["etat_fichiers"]["attachments_info.json"] = "corrompu"
|
|
else:
|
|
diagnostic["etat_fichiers"]["attachments_info.json"] = "manquant"
|
|
diagnostic["problemes"].append("Fichier attachments_info.json manquant")
|
|
diagnostic["suggestions"].append("Créer attachments_info.json avec --repair")
|
|
|
|
# Vérifier le répertoire rapport
|
|
if os.path.exists(rapport_dir):
|
|
diagnostic["etat_fichiers"]["rapport/"] = "présent"
|
|
|
|
rapport_json = os.path.join(rapport_dir, "ticket_analysis.json")
|
|
rapport_md = os.path.join(rapport_dir, "ticket_analysis.md")
|
|
|
|
if os.path.exists(rapport_json):
|
|
diagnostic["etat_fichiers"]["rapport/ticket_analysis.json"] = "présent"
|
|
else:
|
|
diagnostic["etat_fichiers"]["rapport/ticket_analysis.json"] = "manquant"
|
|
diagnostic["problemes"].append("Rapport JSON manquant")
|
|
|
|
if os.path.exists(rapport_md):
|
|
diagnostic["etat_fichiers"]["rapport/ticket_analysis.md"] = "présent"
|
|
else:
|
|
diagnostic["etat_fichiers"]["rapport/ticket_analysis.md"] = "manquant"
|
|
diagnostic["problemes"].append("Rapport Markdown manquant")
|
|
else:
|
|
diagnostic["etat_fichiers"]["rapport/"] = "manquant"
|
|
|
|
# Ajouter des suggestions globales si nécessaires
|
|
if len(diagnostic["problemes"]) > 3:
|
|
diagnostic["suggestions"].insert(0, "Utiliser l'option --repair pour essayer de corriger tous les problèmes automatiquement")
|
|
|
|
# Afficher le résumé du diagnostic
|
|
print(f"\nRésumé du diagnostic pour {os.path.basename(ticket_dir)}:")
|
|
print(f"- Problèmes identifiés: {len(diagnostic['problemes'])}")
|
|
|
|
for i, probleme in enumerate(diagnostic["problemes"]):
|
|
print(f" {i+1}. {probleme}")
|
|
|
|
print("\nSuggestions:")
|
|
for suggestion in diagnostic["suggestions"]:
|
|
print(f"- {suggestion}")
|
|
|
|
return diagnostic
|
|
|
|
def main():
|
|
"""
|
|
Point d'entrée principal du script.
|
|
"""
|
|
# Analyser les arguments
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python post_process.py <ticket_dir> [options]")
|
|
print("Options:")
|
|
print(" --fix-all Corriger les accents dans tous les fichiers JSON et Markdown")
|
|
print(" --fix-md Corriger uniquement les fichiers Markdown")
|
|
print(" --repair Réparer un ticket corrompu")
|
|
print(" --debug Activer le mode débogage")
|
|
print(" --diagnose Diagnostiquer les problèmes du ticket")
|
|
print(" --help Afficher cette aide")
|
|
sys.exit(1)
|
|
|
|
# Afficher l'aide
|
|
if "--help" in sys.argv:
|
|
print("Usage: python post_process.py <ticket_dir> [options]")
|
|
print("Options:")
|
|
print(" --fix-all Corriger les accents dans tous les fichiers JSON et Markdown")
|
|
print(" --fix-md Corriger uniquement les fichiers Markdown")
|
|
print(" --repair Réparer un ticket corrompu")
|
|
print(" --debug Activer le mode débogage")
|
|
print(" --diagnose Diagnostiquer les problèmes du ticket")
|
|
print(" --help Afficher cette aide")
|
|
sys.exit(0)
|
|
|
|
ticket_dir = sys.argv[1]
|
|
fix_all = "--fix-all" in sys.argv
|
|
fix_md = "--fix-md" in sys.argv
|
|
repair = "--repair" in sys.argv
|
|
debug = "--debug" in sys.argv
|
|
diagnose = "--diagnose" in sys.argv
|
|
|
|
# Vérifier que le répertoire existe
|
|
if not os.path.isdir(ticket_dir):
|
|
print(f"ERREUR: Répertoire non trouvé: {ticket_dir}")
|
|
sys.exit(1)
|
|
|
|
# Option de diagnostic du ticket
|
|
if diagnose:
|
|
diagnostiquer_ticket(ticket_dir)
|
|
sys.exit(0)
|
|
|
|
# Option de réparation du ticket
|
|
if repair:
|
|
success = reparer_ticket(ticket_dir)
|
|
if not success:
|
|
print("La réparation du ticket a échoué.")
|
|
sys.exit(1)
|
|
print("Ticket réparé avec succès!")
|
|
sys.exit(0)
|
|
|
|
# Option de correction des accents dans les fichiers Markdown uniquement
|
|
if fix_md:
|
|
rapport_dir = os.path.join(ticket_dir, "rapport")
|
|
corrected = False
|
|
|
|
# Corriger les fichiers Markdown du répertoire rapport
|
|
if os.path.exists(rapport_dir):
|
|
for root, _, files in os.walk(rapport_dir):
|
|
for file in files:
|
|
if file.endswith(".md"):
|
|
md_file = os.path.join(root, file)
|
|
corriger_markdown_accents(md_file)
|
|
corrected = True
|
|
|
|
# Corriger les fichiers Markdown à la racine du ticket
|
|
for file in os.listdir(ticket_dir):
|
|
if file.endswith(".md"):
|
|
md_file = os.path.join(ticket_dir, file)
|
|
corriger_markdown_accents(md_file)
|
|
corrected = True
|
|
|
|
if corrected:
|
|
print("Correction des accents terminée dans les fichiers Markdown.")
|
|
else:
|
|
print("Aucun fichier Markdown trouvé.")
|
|
sys.exit(0)
|
|
|
|
# Transformation standard des messages
|
|
messages_file = os.path.join(ticket_dir, "messages.json")
|
|
if not os.path.exists(messages_file):
|
|
print(f"Fichier non trouvé: {messages_file}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
# Transformer les messages
|
|
transformer_messages(messages_file, debug=debug)
|
|
print(f"Post-traitement terminé pour {messages_file}")
|
|
|
|
# Corriger les accents dans tous les fichiers si demandé
|
|
if fix_all:
|
|
rapport_dir = os.path.join(ticket_dir, "rapport")
|
|
if os.path.exists(rapport_dir):
|
|
# Corriger les fichiers JSON
|
|
for root, _, files in os.walk(rapport_dir):
|
|
for file in files:
|
|
if file.endswith(".json"):
|
|
json_file = os.path.join(root, file)
|
|
corriger_json_accents(json_file)
|
|
|
|
# Corriger les fichiers Markdown
|
|
for root, _, files in os.walk(rapport_dir):
|
|
for file in files:
|
|
if file.endswith(".md"):
|
|
md_file = os.path.join(root, file)
|
|
corriger_markdown_accents(md_file)
|
|
|
|
# Corriger les fichiers Markdown à la racine du ticket
|
|
for file in os.listdir(ticket_dir):
|
|
if file.endswith(".md"):
|
|
md_file = os.path.join(ticket_dir, file)
|
|
corriger_markdown_accents(md_file)
|
|
|
|
print("Correction des accents terminée dans tous les fichiers.")
|
|
except Exception as e:
|
|
print(f"ERREUR lors du post-traitement: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |