import os import json import re from bs4 import BeautifulSoup import html import shutil from typing import Dict, List, Any, Optional, Tuple, Union, Set def is_odoobot_author(message: Dict[str, Any]) -> bool: """ Vérifie si l'auteur du message est OdooBot ou un autre système. Args: message: Le message à vérifier Returns: True si le message provient d'OdooBot, False sinon """ # Vérifier le nom de l'auteur if 'author_id' in message and isinstance(message['author_id'], list) and len(message['author_id']) > 1: author_name = message['author_id'][1].lower() if 'odoobot' in author_name or 'bot' in author_name or 'système' in author_name: return True # Vérifier le type de message (souvent les notifications système) if message.get('message_type') == 'notification': return True # Vérifier le sous-type du message if 'subtype_id' in message and isinstance(message['subtype_id'], list) and len(message['subtype_id']) > 1: subtype = message['subtype_id'][1].lower() if 'notification' in subtype or 'system' in subtype: return True # Vérifier le contenu du message if 'body' in message and isinstance(message['body'], str): body = message['body'].lower() system_patterns = [ 'assigné à', 'étape changée', 'créé automatiquement', 'assigned to', 'stage changed', 'automatically created', 'updated', 'mis à jour', 'a modifié', 'changed' ] for pattern in system_patterns: if pattern in body: return True return False def is_important_image(tag, message_text: str) -> bool: """ Détermine si une image est importante ou s'il s'agit d'un logo/signature. Args: tag: La balise d'image à analyser message_text: Le texte complet du message pour contexte Returns: True si l'image semble importante, False sinon """ # Vérifier les attributs de l'image src = tag.get('src', '') alt = tag.get('alt', '') title = tag.get('title', '') css_class = tag.get('class', '') # Patterns pour les images inutiles useless_img_patterns = [ 'logo', 'signature', 'outlook', 'footer', 'header', 'icon', 'emoticon', 'emoji', 'cid:', 'pixel', 'spacer', 'vignette', 'banner', 'separator', 'decoration', 'mail_signature' ] # Vérifier si c'est une image inutile for pattern in useless_img_patterns: if (pattern in src.lower() or pattern in alt.lower() or pattern in title.lower() or (css_class and any(pattern in c.lower() for c in css_class if isinstance(c, str)))): return False # Vérifier la taille (les petites images sont souvent des icônes/logos) width = tag.get('width', '') height = tag.get('height', '') try: width = int(width) if width and str(width).isdigit() else None height = int(height) if height and str(height).isdigit() else None if width and height and width <= 50 and height <= 50: return False except (ValueError, TypeError): pass # Vérifier si l'image est mentionnée dans le texte image_indicators = [ 'capture', 'screenshot', 'image', 'photo', 'illustration', 'voir', 'regarder', 'ci-joint', 'écran', 'erreur', 'problème', 'bug', 'pièce jointe', 'attachment', 'veuillez trouver' ] for indicator in image_indicators: if indicator in message_text.lower(): return True # Par défaut, considérer les images qui ne sont pas clairement inutiles comme potentiellement importantes return True def find_attachment_references(message_text: str, attachments_info: List[Dict[str, Any]]) -> List[int]: """ Identifie les pièces jointes mentionnées dans le message. Args: message_text: Texte du message attachments_info: Informations sur les pièces jointes disponibles Returns: Liste des IDs des pièces jointes pertinentes """ if not message_text or not attachments_info: return [] # Patterns indiquant des pièces jointes attachment_indicators = [ r'pi[èe]ce[s]? jointe[s]?', r'attachment[s]?', r'fichier[s]?', r'file[s]?', r'veuillez trouver', r'please find', r'voir ci-joint', r'voir ci-dessous', r'ci-joint', r'joint[e]?[s]?', r'attached', r'screenshot[s]?', r'capture[s]? d[\'e] ?[ée]cran', r'image[s]?', r'photo[s]?' ] relevant_ids = [] # Vérifier si le message mentionne des pièces jointes mention_found = False for pattern in attachment_indicators: if re.search(pattern, message_text, re.IGNORECASE): mention_found = True break if mention_found: # Identifier les pièces jointes pertinentes (non logos/images d'interface) for attachment in attachments_info: name = attachment.get('name', '').lower() if attachment.get('name') else '' # Exclure les pièces jointes qui semblent être des logos ou images d'interface useless_patterns = ['logo', 'signature', 'outlook', 'icon', 'emoticon', 'emoji'] is_useless = any(pattern in name for pattern in useless_patterns) if not is_useless and 'id' in attachment: relevant_ids.append(attachment['id']) return relevant_ids def clean_html(html_content: str) -> str: """ Nettoie le contenu HTML en supprimant toutes les balises mais en préservant le texte. Traite spécifiquement les images pour garder uniquement celles pertinentes. Args: html_content: Contenu HTML à nettoyer Returns: Texte nettoyé sans balises HTML """ if not html_content: return "" # Utiliser BeautifulSoup pour manipuler le HTML soup = BeautifulSoup(html_content, 'html.parser') # Supprimer les éléments de signature signature_elements = [ 'div.signature', '.gmail_signature', '.signature', 'hr + div', 'hr + p', '.footer', '.mail-signature' ] for selector in signature_elements: for element in soup.select(selector): element.decompose() # Supprimer les lignes horizontales (souvent utilisées pour séparer les signatures) for hr in soup.find_all('hr'): hr.decompose() # Récupérer le texte complet pour analyse full_text = soup.get_text(' ', strip=True) # Traiter les images for img in soup.find_all('img'): if is_important_image(img, full_text): # Remplacer les images importantes par une description alt_text = img.get('alt', '') or img.get('title', '') or '[Image importante]' img.replace_with(f" [Image: {alt_text}] ") else: # Supprimer les images non pertinentes img.decompose() # Traiter les liens vers des pièces jointes for a in soup.find_all('a', href=True): href = a.get('href', '').lower() if 'attachment' in href or 'download' in href or 'file' in href: a.replace_with(f" [Pièce jointe: {a.get_text()}] ") # Récupérer le texte sans balises HTML text = soup.get_text(separator=' ', strip=True) # Décodage des entités HTML spéciales text = html.unescape(text) # Nettoyer les espaces multiples text = re.sub(r'\s+', ' ', text) # Nettoyer les lignes vides multiples text = re.sub(r'\n\s*\n', '\n\n', text) # Supprimer les disclaimers et signatures standards footer_patterns = [ r'Sent from my .*', r'Envoyé depuis mon .*', r'Ce message .*confidentiel.*', r'This email .*confidential.*', r'DISCLAIMER.*', r'CONFIDENTIAL.*', r'CONFIDENTIEL.*', r'Le contenu de ce courriel est confidentiel.*', r'This message and any attachments.*', r'Ce message et ses pièces jointes.*', r'AVIS DE CONFIDENTIALITÉ.*', r'PRIVACY NOTICE.*' ] for pattern in footer_patterns: text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL) return text.strip() def process_message_file(message_file_path: str, output_dir: str, attachments_info: List[Dict[str, Any]] = None) -> None: """ Traite un fichier de message en nettoyant le contenu HTML des messages. Args: message_file_path: Chemin du fichier de message à traiter output_dir: Répertoire de sortie pour le fichier traité attachments_info: Informations sur les pièces jointes disponibles """ try: with open(message_file_path, 'r', encoding='utf-8') as f: message_data = json.load(f) # Ignorer les messages d'OdooBot if is_odoobot_author(message_data): print(f"Message ignoré (OdooBot): {os.path.basename(message_file_path)}") return # Vérifier si le message contient un corps HTML if 'body' in message_data and message_data['body']: # Remplacer le contenu HTML par le texte filtré message_data['body'] = clean_html(message_data['body']) # Identifier les pièces jointes pertinentes mentionnées dans le message if attachments_info and message_data['body']: relevant_attachments = find_attachment_references(message_data['body'], attachments_info) if relevant_attachments: message_data['relevant_attachment_ids'] = relevant_attachments # Écrire le message filtré output_file_path = os.path.join(output_dir, os.path.basename(message_file_path)) with open(output_file_path, 'w', encoding='utf-8') as f: json.dump(message_data, f, indent=4, ensure_ascii=False) print(f"Message traité: {os.path.basename(message_file_path)}") except Exception as e: print(f"Erreur lors du traitement du fichier {message_file_path}: {e}") def process_messages_threads(threads_file_path: str, output_dir: str, attachments_info: List[Dict[str, Any]] = None) -> None: """ Traite un fichier de threads de messages en nettoyant le contenu HTML. Args: threads_file_path: Chemin du fichier de threads de messages output_dir: Répertoire de sortie pour le fichier traité attachments_info: Informations sur les pièces jointes disponibles """ try: with open(threads_file_path, 'r', encoding='utf-8') as f: threads_data = json.load(f) # Liste des threads à supprimer (ceux qui ne contiennent que des messages d'OdooBot) threads_to_remove = [] # Parcourir tous les threads for thread_id, thread in threads_data.items(): # Traiter le message principal main_message_is_bot = False if thread.get('main_message'): if is_odoobot_author(thread['main_message']): main_message_is_bot = True thread['main_message'] = None elif 'body' in thread['main_message']: thread['main_message']['body'] = clean_html(thread['main_message']['body']) # Identifier les pièces jointes pertinentes if attachments_info and thread['main_message']['body']: relevant_attachments = find_attachment_references( thread['main_message']['body'], attachments_info ) if relevant_attachments: thread['main_message']['relevant_attachment_ids'] = relevant_attachments # Traiter les réponses (filtrer les messages d'OdooBot) filtered_replies = [] for reply in thread.get('replies', []): if not is_odoobot_author(reply): if 'body' in reply: reply['body'] = clean_html(reply['body']) # Identifier les pièces jointes pertinentes if attachments_info and reply['body']: relevant_attachments = find_attachment_references(reply['body'], attachments_info) if relevant_attachments: reply['relevant_attachment_ids'] = relevant_attachments filtered_replies.append(reply) # Mettre à jour les réponses thread['replies'] = filtered_replies # Si le thread ne contient que des messages de bot, le marquer pour suppression if main_message_is_bot and not filtered_replies: threads_to_remove.append(thread_id) # Supprimer les threads qui ne contiennent que des messages d'OdooBot for thread_id in threads_to_remove: del threads_data[thread_id] # Écrire le fichier de threads filtré output_file_path = os.path.join(output_dir, os.path.basename(threads_file_path)) with open(output_file_path, 'w', encoding='utf-8') as f: json.dump(threads_data, f, indent=4, ensure_ascii=False) print(f"Fichier de threads traité: {os.path.basename(threads_file_path)}") if threads_to_remove: print(f" {len(threads_to_remove)} threads supprimés (OdooBot uniquement)") except Exception as e: print(f"Erreur lors du traitement du fichier {threads_file_path}: {e}") def process_messages_collection(messages_file_path: str, output_dir: str, attachments_info: List[Dict[str, Any]] = None) -> None: """ Traite un fichier de collection de messages en nettoyant le contenu HTML. Args: messages_file_path: Chemin du fichier de collection de messages output_dir: Répertoire de sortie pour le fichier traité attachments_info: Informations sur les pièces jointes disponibles """ try: with open(messages_file_path, 'r', encoding='utf-8') as f: messages_data = json.load(f) # Filtrer les messages pour supprimer ceux d'OdooBot filtered_messages = [] for message in messages_data: if not is_odoobot_author(message): # Nettoyer le contenu HTML if 'body' in message: message['body'] = clean_html(message['body']) # Identifier les pièces jointes pertinentes if attachments_info and message['body']: relevant_attachments = find_attachment_references(message['body'], attachments_info) if relevant_attachments: message['relevant_attachment_ids'] = relevant_attachments filtered_messages.append(message) # Écrire le fichier de messages filtré output_file_path = os.path.join(output_dir, os.path.basename(messages_file_path)) with open(output_file_path, 'w', encoding='utf-8') as f: json.dump(filtered_messages, f, indent=4, ensure_ascii=False) print(f"Collection de messages traitée: {os.path.basename(messages_file_path)}") print(f" {len(messages_data) - len(filtered_messages)} messages supprimés (OdooBot)") except Exception as e: print(f"Erreur lors du traitement du fichier {messages_file_path}: {e}") def process_ticket_folder(ticket_folder: str, output_base_dir: str) -> None: """ Traite un dossier de ticket en filtrant les messages HTML. Args: ticket_folder: Chemin du dossier du ticket à traiter output_base_dir: Répertoire de base pour la sortie des fichiers filtrés """ ticket_name = os.path.basename(ticket_folder) output_ticket_dir = os.path.join(output_base_dir, ticket_name) # Créer le répertoire de sortie s'il n'existe pas if not os.path.exists(output_ticket_dir): os.makedirs(output_ticket_dir, exist_ok=True) # Copier les fichiers d'information du ticket for file_name in ['ticket_info.json', 'contact_info.json', 'activities.json', 'followers.json', 'timesheets.json']: src_file = os.path.join(ticket_folder, file_name) if os.path.exists(src_file): dst_file = os.path.join(output_ticket_dir, file_name) shutil.copy2(src_file, dst_file) print(f"Fichier copié: {file_name}") # Charger les informations sur les pièces jointes attachments_info = [] attachments_info_file = os.path.join(ticket_folder, 'attachments_info.json') if os.path.exists(attachments_info_file): try: with open(attachments_info_file, 'r', encoding='utf-8') as f: attachments_info = json.load(f) except Exception as e: print(f"Erreur lors du chargement des informations sur les pièces jointes: {e}") # Copier le fichier d'informations sur les pièces jointes dst_file = os.path.join(output_ticket_dir, 'attachments_info.json') shutil.copy2(attachments_info_file, dst_file) # Traitement des fichiers de messages src_messages_dir = os.path.join(ticket_folder, 'messages') if os.path.exists(src_messages_dir): # Créer le répertoire de messages filtré filtered_messages_dir = os.path.join(output_ticket_dir, 'messages') if not os.path.exists(filtered_messages_dir): os.makedirs(filtered_messages_dir, exist_ok=True) # Traiter chaque fichier de message individuel for file_name in os.listdir(src_messages_dir): if file_name.endswith('.json'): message_file_path = os.path.join(src_messages_dir, file_name) process_message_file(message_file_path, filtered_messages_dir, attachments_info) # Traitement des fichiers de messages regroupés messages_file = os.path.join(ticket_folder, 'messages.json') message_threads_file = os.path.join(ticket_folder, 'message_threads.json') if os.path.exists(messages_file): process_messages_collection(messages_file, output_ticket_dir, attachments_info) if os.path.exists(message_threads_file): process_messages_threads(message_threads_file, output_ticket_dir, attachments_info) # Copier le répertoire des pièces jointes (on garde toutes les pièces jointes) src_attachments_dir = os.path.join(ticket_folder, 'attachments') if os.path.exists(src_attachments_dir): dst_attachments_dir = os.path.join(output_ticket_dir, 'attachments') if os.path.exists(dst_attachments_dir): shutil.rmtree(dst_attachments_dir) shutil.copytree(src_attachments_dir, dst_attachments_dir) print(f"Répertoire des pièces jointes copié") print(f"Traitement du ticket {ticket_name} terminé") def filter_exported_tickets(source_dir: str = 'exported_tickets', output_dir: str = 'filtered_tickets') -> None: """ Filtre les tickets exportés en nettoyant les messages HTML. Args: source_dir: Répertoire source contenant les tickets exportés output_dir: Répertoire de sortie pour les tickets filtrés """ if not os.path.exists(source_dir): print(f"Le répertoire source {source_dir} n'existe pas.") return # Créer le répertoire de sortie s'il n'existe pas if not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True) # Parcourir tous les éléments du répertoire source for item in os.listdir(source_dir): item_path = os.path.join(source_dir, item) # Vérifier si c'est un dossier qui contient un ticket if os.path.isdir(item_path) and ( item.startswith('ticket_') or os.path.exists(os.path.join(item_path, 'ticket_info.json')) ): process_ticket_folder(item_path, output_dir) # Si c'est un fichier JSON brut de ticket, le copier simplement elif os.path.isfile(item_path) and item.endswith('_raw.json'): shutil.copy2(item_path, os.path.join(output_dir, item)) print(f"Fichier copié: {item}") print(f"Filtrage des tickets terminé. Les tickets filtrés sont disponibles dans {output_dir}") def run_filter_wizard() -> None: """ Interface utilisateur en ligne de commande pour filtrer les tickets exportés. """ print("\n==== FILTRAGE DES MESSAGES DES TICKETS ====") print("Cette fonction va:") print("1. Supprimer les messages provenant d'OdooBot") print("2. Filtrer les images inutiles (logos, signatures, images Outlook)") print("3. Conserver les images pertinentes pour la demande") print("4. Identifier les pièces jointes importantes mentionnées dans les messages\n") # Demander le répertoire source default_source = 'exported_tickets' source_dir = input(f"Répertoire source (par défaut: {default_source}): ").strip() if not source_dir: source_dir = default_source # Vérifier si le répertoire source existe if not os.path.exists(source_dir): print(f"Le répertoire source {source_dir} n'existe pas.") return # Demander le répertoire de sortie default_output = 'filtered_tickets' output_dir = input(f"Répertoire de sortie (par défaut: {default_output}): ").strip() if not output_dir: output_dir = default_output # Confirmation print(f"\nLes tickets du répertoire {source_dir} seront filtrés et placés dans {output_dir}.") confirm = input("Continuer ? (o/n): ").strip().lower() if confirm == 'o': filter_exported_tickets(source_dir, output_dir) print("\nOpération terminée avec succès!") else: print("Opération annulée.") if __name__ == "__main__": run_filter_wizard()