#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script d'extraction et de prétraitement des tickets. Nettoie les données et sépare les pièces jointes des messages. """ import os import sys import json import re import shutil import argparse import unicodedata from typing import Dict, List, Any, Optional from bs4 import BeautifulSoup import logging # Configuration du logger logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("extract_ticket.log"), logging.StreamHandler() ] ) logger = logging.getLogger("extract_ticket") def nettoyer_html(texte: str) -> str: """ Nettoie le contenu HTML en utilisant BeautifulSoup. Args: texte: Texte HTML à nettoyer Returns: Texte nettoyé """ if not texte: return "" if not isinstance(texte, str): try: texte = str(texte) except Exception as e: logger.warning(f"Impossible de convertir en texte: {e}") return "" # Utiliser BeautifulSoup pour le nettoyage try: soup = BeautifulSoup(texte, 'html.parser') texte_nettoye = soup.get_text(separator=' ') except Exception as e: logger.warning(f"Erreur lors du nettoyage HTML avec BeautifulSoup: {e}") # Fallback à regex si BeautifulSoup échoue try: texte_nettoye = re.sub(r'<[^>]+>', ' ', texte) except Exception as e: logger.warning(f"Erreur lors du nettoyage HTML avec regex: {e}") texte_nettoye = texte # Remplacer les références aux images texte_nettoye = re.sub(r'\[Image:[^\]]+\]', '[Image]', texte_nettoye) # Supprimer les éléments courants non pertinents patterns_a_supprimer = [ r'Cordialement,[\s\S]*?$', r'Bien cordialement,[\s\S]*?$', r'Bonne réception[\s\S]*?$', r'À votre disposition[\s\S]*?$', r'Support technique[\s\S]*?$', r'L\'objectif du Support Technique[\s\S]*?$', r'Notre service est ouvert[\s\S]*?$', r'Dès réception[\s\S]*?$', r'Confidentialité[\s\S]*?$', r'Ce message électronique[\s\S]*?$', r'Droit à la déconnexion[\s\S]*?$', r'Afin d\'assurer une meilleure traçabilité[\s\S]*?$', r'tél\s*:\s*[\d\s\+]+', r'mobile\s*:\s*[\d\s\+]+', r'www\.[^\s]+\.[a-z]{2,3}', ] for pattern in patterns_a_supprimer: texte_nettoye = re.sub(pattern, '', texte_nettoye, flags=re.IGNORECASE) # Supprimer les lignes multiples vides texte_nettoye = re.sub(r'\n\s*\n', '\n', texte_nettoye) # Supprimer les espaces multiples texte_nettoye = re.sub(r'\s+', ' ', texte_nettoye) # Normaliser les caractères accentués texte_nettoye = normaliser_accents(texte_nettoye) return texte_nettoye.strip() def normaliser_accents(texte: str) -> str: """ Normalise les caractères accentués pour éviter les problèmes d'encodage. Args: texte: Texte à normaliser Returns: Texte avec caractères accentués normalisés """ if not isinstance(texte, str): if texte is None: return "" try: texte = str(texte) except: return "" # Convertir les caractères spéciaux HTML special_chars = { 'á': 'á', 'é': 'é', 'í': 'í', 'ó': 'ó', 'ú': 'ú', 'Á': 'Á', 'É': 'É', 'Í': 'Í', 'Ó': 'Ó', 'Ú': 'Ú', 'à': 'à', 'è': 'è', 'ì': 'ì', 'ò': 'ò', 'ù': 'ù', 'À': 'À', 'È': 'È', 'Ì': 'Ì', 'Ò': 'Ò', 'Ù': 'Ù', 'â': 'â', 'ê': 'ê', 'î': 'î', 'ô': 'ô', 'û': 'û', 'Â': 'Â', 'Ê': 'Ê', 'Î': 'Î', 'Ô': 'Ô', 'Û': 'Û', 'ã': 'ã', '&etilde;': 'ẽ', 'ĩ': 'ĩ', 'õ': 'õ', 'ũ': 'ũ', 'Ã': 'Ã', '&Etilde;': 'Ẽ', 'Ĩ': 'Ĩ', 'Õ': 'Õ', 'Ũ': 'Ũ', 'ä': 'ä', 'ë': 'ë', 'ï': 'ï', 'ö': 'ö', 'ü': 'ü', 'Ä': 'Ä', 'Ë': 'Ë', 'Ï': 'Ï', 'Ö': 'Ö', 'Ü': 'Ü', 'ç': 'ç', 'Ç': 'Ç', 'ñ': 'ñ', 'Ñ': 'Ñ', ' ': ' ', '<': '<', '>': '>', '&': '&', '"': '"', ''': "'", '€': '€', '©': '©', '®': '®', '™': '™' } for html, char in special_chars.items(): texte = texte.replace(html, char) # Normaliser les caractères composés return unicodedata.normalize('NFC', texte) def detecter_role(message: Dict[str, Any]) -> str: """ Détecte si un message provient du client ou du support. Args: message: Dictionnaire contenant les informations du message Returns: "Client" ou "Support" """ # Vérifier le champ 'role' s'il existe déjà if "role" in message and message["role"] in ["Client", "Support"]: return message["role"] # Indices de support dans l'email domaines_support = ["@cbao.fr", "@odoo.com", "support@", "ticket.support"] indices_nom_support = ["support", "cbao", "technique", "odoo"] email = message.get("email_from", "").lower() # Nettoyer le format "Nom " if "<" in email and ">" in email: match = re.search(r'<([^>]+)>', email) if match: email = match.group(1).lower() # Vérifier le domaine email if any(domaine in email for domaine in domaines_support): return "Support" # Vérifier le nom d'auteur auteur = "" if "author_id" in message and isinstance(message["author_id"], list) and len(message["author_id"]) > 1: auteur = str(message["author_id"][1]).lower() elif "auteur" in message: auteur = str(message["auteur"]).lower() if any(indice in auteur for indice in indices_nom_support): return "Support" # Par défaut, considérer comme client return "Client" def pretraiter_ticket(input_dir: str, output_dir: str) -> Dict[str, Any]: """ Prétraite les données d'un ticket et les sépare en fichiers distincts. Args: input_dir: Répertoire contenant les données brutes du ticket output_dir: Répertoire où sauvegarder les données prétraitées Returns: Rapport de prétraitement avec les fichiers générés """ logger.info(f"Prétraitement du ticket: {input_dir} -> {output_dir}") # Créer le répertoire de sortie s'il n'existe pas os.makedirs(output_dir, exist_ok=True) # Créer les sous-répertoires attachments_dir = os.path.join(output_dir, "attachments") os.makedirs(attachments_dir, exist_ok=True) # Chemins des fichiers d'entrée ticket_info_path = os.path.join(input_dir, "ticket_info.json") messages_path = os.path.join(input_dir, "messages.json") messages_backup_path = os.path.join(input_dir, "messages.json.backup") # Rapport de prétraitement rapport = { "ticket_id": os.path.basename(input_dir), "fichiers_generes": [], "erreurs": [] } # Prétraiter ticket_info.json if os.path.exists(ticket_info_path): try: with open(ticket_info_path, 'r', encoding='utf-8') as f: ticket_info = json.load(f) # Nettoyer la description if isinstance(ticket_info, dict) and "description" in ticket_info: ticket_info["description"] = nettoyer_html(ticket_info["description"]) # Sauvegarder dans le répertoire de sortie output_ticket_info_path = os.path.join(output_dir, "ticket_info.json") with open(output_ticket_info_path, 'w', encoding='utf-8') as f: json.dump(ticket_info, f, indent=2, ensure_ascii=False) rapport["fichiers_generes"].append("ticket_info.json") logger.info(f"Ticket info prétraité et sauvegardé: {output_ticket_info_path}") except Exception as e: erreur = f"Erreur lors du prétraitement de ticket_info.json: {str(e)}" rapport["erreurs"].append(erreur) logger.error(erreur) else: erreur = f"Fichier ticket_info.json non trouvé dans {input_dir}" rapport["erreurs"].append(erreur) logger.warning(erreur) # Prétraiter messages.json messages_content = None # D'abord essayer messages.json if os.path.exists(messages_path): try: with open(messages_path, 'r', encoding='utf-8') as f: messages_content = f.read() except Exception as e: logger.warning(f"Impossible de lire messages.json: {str(e)}") # Si messages.json est vide ou corrompu, essayer la sauvegarde if not messages_content and os.path.exists(messages_backup_path): try: with open(messages_backup_path, 'r', encoding='utf-8') as f: messages_content = f.read() logger.info("Utilisation de messages.json.backup comme source") except Exception as e: erreur = f"Impossible de lire messages.json.backup: {str(e)}" rapport["erreurs"].append(erreur) logger.error(erreur) # Traiter les messages si nous avons un contenu valide if messages_content: try: messages = json.loads(messages_content) # Créer une version améliorée des messages processed_messages = [] # Déterminer le code du ticket à partir du nom du répertoire ticket_code = os.path.basename(input_dir) if ticket_code.startswith("ticket_"): ticket_code = ticket_code[7:] # Extraire le code sans "ticket_" # Extraire les informations du ticket si disponibles ticket_info_dict = {} if os.path.exists(ticket_info_path): try: with open(ticket_info_path, 'r', encoding='utf-8') as f: ticket_info_dict = json.load(f) except Exception: pass # Créer le message de contexte avec les informations du ticket ticket_name = ticket_info_dict.get("name", f"Ticket {ticket_code}") ticket_description = ticket_info_dict.get("description", "") ticket_date = ticket_info_dict.get("create_date", "") # Nettoyer les informations du ticket ticket_name = normaliser_accents(ticket_name) ticket_description = nettoyer_html(ticket_description) ticket_context = { "id": "ticket_info", "name": ticket_name, "code": ticket_code, "description": ticket_description, "date_create": ticket_date, "role": "system", "type": "contexte", "body": f"TICKET {ticket_code}: {ticket_name}.\n\nDESCRIPTION: {ticket_description or 'Aucune description disponible.'}" } processed_messages.append(ticket_context) # Prétraiter chaque message attachments_info = [] valid_messages = 0 for msg in messages: if not isinstance(msg, dict): continue # Ignorer les messages vides body = msg.get("body", "") if not body or not isinstance(body, str): continue # Détecter le rôle role = detecter_role(msg) message_type = "Question" if role == "Client" else "Réponse" # Nettoyer le contenu contenu_nettoye = nettoyer_html(body) if not contenu_nettoye: continue # Normaliser les champs textuels email_from = normaliser_accents(msg.get("email_from", "")) subject = normaliser_accents(msg.get("subject", "")) # Gérer l'identifiant du message msg_id = msg.get("id", f"msg_{valid_messages+1}") if not isinstance(msg_id, str): try: msg_id = str(msg_id) except: msg_id = f"msg_{valid_messages+1}" # Récupérer les autres champs de manière sécurisée author_id = msg.get("author_id", [0, ""]) if not isinstance(author_id, list): author_id = [0, ""] date = msg.get("date", "") if not isinstance(date, str): try: date = str(date) except: date = "" # Traiter les pièces jointes si présentes if "attachments" in msg and isinstance(msg["attachments"], list): for attachment in msg["attachments"]: if not isinstance(attachment, dict): continue attachment_data = attachment.get("datas") attachment_name = attachment.get("name", "") attachment_type = attachment.get("mimetype", "") if attachment_data and attachment_name: # Générer un nom de fichier unique attachment_id = attachment.get("id", len(attachments_info) + 1) safe_name = f"{attachment_id}_{attachment_name}" file_path = os.path.join(attachments_dir, safe_name) # Traiter différemment selon le type de pièce jointe if attachment_type.startswith("image/"): try: # Sauvegarder l'image import base64 with open(file_path, 'wb') as f: f.write(base64.b64decode(attachment_data)) # Ajouter l'information à la liste des pièces jointes attachments_info.append({ "id": attachment_id, "name": attachment_name, "mimetype": attachment_type, "message_id": msg_id, "date": date, "file_path": file_path }) logger.info(f"Pièce jointe sauvegardée: {file_path}") except Exception as e: logger.warning(f"Erreur lors de la sauvegarde de la pièce jointe {attachment_name}: {str(e)}") # Créer le message transformé processed_message = { "id": msg_id, "author_id": author_id, "role": role, "type": message_type, "date": date, "email_from": email_from, "subject": subject, "body": contenu_nettoye } processed_messages.append(processed_message) valid_messages += 1 # Trier par date (sauf le premier message qui est le contexte) try: processed_messages[1:] = sorted(processed_messages[1:], key=lambda x: x.get("date", "")) except Exception as e: logger.warning(f"Impossible de trier les messages par date: {e}") # Sauvegarder les messages prétraités output_messages_path = os.path.join(output_dir, "messages.json") with open(output_messages_path, 'w', encoding='utf-8') as f: json.dump(processed_messages, f, indent=2, ensure_ascii=False) rapport["fichiers_generes"].append("messages.json") logger.info(f"Messages prétraités et sauvegardés: {output_messages_path} ({valid_messages} messages)") # Sauvegarder les informations sur les pièces jointes if attachments_info: output_attachments_info_path = os.path.join(output_dir, "attachments_info.json") with open(output_attachments_info_path, 'w', encoding='utf-8') as f: json.dump(attachments_info, f, indent=2, ensure_ascii=False) rapport["fichiers_generes"].append("attachments_info.json") rapport["nb_attachments"] = len(attachments_info) logger.info(f"Informations sur les pièces jointes sauvegardées: {output_attachments_info_path} ({len(attachments_info)} pièces jointes)") except Exception as e: erreur = f"Erreur lors du prétraitement des messages: {str(e)}" rapport["erreurs"].append(erreur) logger.error(erreur) else: erreur = "Aucun fichier messages.json ou messages.json.backup trouvé ou lisible" rapport["erreurs"].append(erreur) logger.error(erreur) # Sauvegarder le rapport de prétraitement rapport_path = os.path.join(output_dir, "pretraitement_rapport.json") with open(rapport_path, 'w', encoding='utf-8') as f: json.dump(rapport, f, indent=2, ensure_ascii=False) logger.info(f"Rapport de prétraitement sauvegardé: {rapport_path}") return rapport def main(): """ Point d'entrée du script. """ parser = argparse.ArgumentParser(description="Prétraite les données d'un ticket.") parser.add_argument("input_dir", help="Répertoire contenant les données brutes du ticket") parser.add_argument("--output-dir", help="Répertoire où sauvegarder les données prétraitées (par défaut: _processed)") parser.add_argument("--verbose", "-v", action="store_true", help="Afficher plus d'informations") args = parser.parse_args() # Configurer le niveau de log if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Déterminer le répertoire de sortie input_dir = args.input_dir if not os.path.isdir(input_dir): logger.error(f"Le répertoire d'entrée n'existe pas: {input_dir}") sys.exit(1) output_dir = args.output_dir if not output_dir: # Par défaut, ajouter "_processed" au nom du répertoire d'entrée if input_dir.endswith("/"): input_dir = input_dir[:-1] output_dir = input_dir + "_processed" # Prétraiter le ticket try: rapport = pretraiter_ticket(input_dir, output_dir) # Afficher un résumé print("\nRésumé du prétraitement:") print(f"Ticket: {rapport['ticket_id']}") print(f"Fichiers générés: {len(rapport['fichiers_generes'])}") for fichier in rapport['fichiers_generes']: print(f" - {fichier}") if "nb_attachments" in rapport: print(f"Pièces jointes: {rapport['nb_attachments']}") if rapport['erreurs']: print(f"Erreurs: {len(rapport['erreurs'])}") for erreur in rapport['erreurs']: print(f" - {erreur}") else: print("Aucune erreur") print(f"\nPrétraitement terminé. Données sauvegardées dans: {output_dir}") except Exception as e: logger.error(f"Erreur lors du prétraitement: {str(e)}") sys.exit(1) if __name__ == "__main__": main()