from typing import List, Dict, Any, Optional, Tuple from .auth_manager import AuthManager from formatters.clean_html import clean_html from core.utils import save_json, save_text, detect_duplicate_content, normalize_filename import os import re import logging from datetime import datetime class MessageManager: """ Gestionnaire de messages pour traiter les messages associés aux tickets. """ def __init__(self, auth: AuthManager): """ Initialise le gestionnaire de messages. Args: auth: Gestionnaire d'authentification """ self.auth = auth self.model_name = "project.task" self.cleaning_strategies = { "simple": {"preserve_links": False, "preserve_images": False, "strategy": "strip_tags", "preserve_doc_links": True}, "standard": {"preserve_links": True, "preserve_images": True, "strategy": "html2text", "preserve_doc_links": True}, "advanced": {"preserve_links": True, "preserve_images": True, "strategy": "soup", "preserve_doc_links": True}, "raw": {"preserve_links": False, "preserve_images": False, "strategy": "none", "preserve_doc_links": True} } self.default_strategy = "standard" def get_ticket_messages(self, ticket_id: int, fields: Optional[List[str]] = None) -> List[Dict[str, Any]]: """ Récupère tous les messages associés à un ticket. Args: ticket_id: ID du ticket fields: Liste des champs à récupérer (facultatif) Returns: Liste des messages associés au ticket """ if fields is None: fields = ["id", "body", "date", "author_id", "email_from", "message_type", "parent_id", "subtype_id", "subject", "tracking_value_ids", "attachment_ids"] params = { "model": "mail.message", "method": "search_read", "args": [[["res_id", "=", ticket_id], ["model", "=", self.model_name]]], "kwargs": { "fields": fields, "order": "date asc" } } messages = self.auth._rpc_call("/web/dataset/call_kw", params) return messages if isinstance(messages, list) else [] def is_system_message(self, message: Dict[str, Any]) -> bool: """ Vérifie si le message est un message système ou OdooBot. Args: message: Le message à vérifier Returns: True si c'est un message système, False sinon """ is_system = False # Vérifier le nom de l'auteur if 'author_id' in message and isinstance(message['author_id'], list) and len(message['author_id']) > 1: author_name = message['author_id'][1].lower() if 'odoobot' in author_name or 'bot' in author_name or 'système' in author_name or 'system' in author_name: is_system = True # Vérifier le type de message if message.get('message_type') in ['notification', 'auto_comment']: is_system = True # Vérifier le sous-type du message if 'subtype_id' in message and isinstance(message['subtype_id'], list) and len(message['subtype_id']) > 1: subtype = message['subtype_id'][1].lower() if 'notification' in subtype or 'system' in subtype or 'note' in subtype: is_system = True return is_system def is_stage_change_message(self, message: Dict[str, Any]) -> bool: """ Vérifie si le message est un changement d'état. Args: message: Le message à vérifier Returns: True si c'est un message de changement d'état, False sinon """ if not isinstance(message.get('body', ''), str): return False body = message.get('body', '').lower() # Patterns pour les changements d'état stage_patterns = [ 'étape changée', 'stage changed', 'modifié l\'étape', 'changed the stage', 'ticket transféré', 'ticket transferred', 'statut modifié', 'status changed', 'état du ticket' ] # Vérifier aussi les valeurs de tracking si disponibles if message.get('tracking_value_ids'): try: tracking_values = self.auth.read("mail.tracking.value", message.get('tracking_value_ids', []), ["field", "field_desc", "old_value_char", "new_value_char"]) for value in tracking_values: if value.get("field") == "stage_id" or "stage" in value.get("field_desc", "").lower(): return True except Exception as e: logging.warning(f"Erreur lors de la vérification des valeurs de tracking: {e}") return any(pattern in body for pattern in stage_patterns) def is_forwarded_message(self, message: Dict[str, Any]) -> bool: """ Détecte si un message est un message transféré. Args: message: Le message à analyser Returns: True si le message est transféré, False sinon """ if not message.get('body'): return False # Indicateurs de message transféré forwarded_indicators = [ "message transféré", "forwarded message", "transféré de", "forwarded from", "début du message transféré", "begin forwarded message", "message d'origine", "original message", "from:", "de:", "to:", "à:", "subject:", "objet:", "envoyé:", "sent:", "date:", "cc:" ] # Vérifier le contenu du message body_lower = message.get('body', '').lower() if isinstance(message.get('body', ''), str) else "" # Vérifier la présence d'indicateurs de transfert for indicator in forwarded_indicators: if indicator in body_lower: return True # Vérifier si le sujet contient des préfixes courants de transfert subject_value = message.get('subject', '') if not isinstance(subject_value, str): subject_value = str(subject_value) if subject_value is not None else "" subject_lower = subject_value.lower() forwarded_prefixes = ["tr:", "fwd:", "fw:"] for prefix in forwarded_prefixes: if subject_lower.startswith(prefix): return True # Patterns typiques dans les messages transférés patterns = [ r"-{3,}Original Message-{3,}", r"_{3,}Original Message_{3,}", r">{3,}", # Plusieurs signes > consécutifs indiquent souvent un message cité r"Le .* a écrit :" ] for pattern in patterns: if re.search(pattern, body_lower): return True return False def get_message_author_details(self, message: Dict[str, Any]) -> Dict[str, Any]: """ Récupère les détails de l'auteur d'un message. Args: message: Le message dont il faut récupérer l'auteur Returns: Dictionnaire avec les détails de l'auteur """ author_details = { "name": "Inconnu", "email": message.get('email_from', ''), "is_system": False } try: author_id_field = message.get('author_id') if author_id_field and isinstance(author_id_field, list) and len(author_id_field) > 0: author_id = author_id_field[0] params = { "model": "res.partner", "method": "read", "args": [[author_id]], "kwargs": {"fields": ['name', 'email', 'phone', 'function', 'company_id']} } author_data = self.auth._rpc_call("/web/dataset/call_kw", params) if author_data and isinstance(author_data, list) and len(author_data) > 0: author_details.update(author_data[0]) # Vérifier si c'est un auteur système if author_details.get('name'): author_name = author_details['name'].lower() if 'odoobot' in author_name or 'bot' in author_name or 'système' in author_name: author_details['is_system'] = True except Exception as e: logging.warning(f"Erreur lors de la récupération des détails de l'auteur: {e}") return author_details def process_messages(self, ticket_id: int, ticket_code: str, ticket_name: str, output_dir: str, strategy: str = "standard") -> Dict[str, Any]: """ Traite tous les messages d'un ticket, nettoie le contenu et génère des fichiers structurés. Args: ticket_id: ID du ticket ticket_code: Code du ticket ticket_name: Nom du ticket output_dir: Répertoire de sortie strategy: Stratégie de nettoyage (simple, standard, advanced, raw) Returns: Dictionnaire avec les chemins des fichiers créés """ # Validation de la stratégie if strategy not in self.cleaning_strategies: logging.warning(f"Stratégie de nettoyage '{strategy}' inconnue, utilisation de la stratégie par défaut '{self.default_strategy}'") strategy = self.default_strategy cleaning_config = self.cleaning_strategies[strategy] # Récupérer les messages messages = self.get_ticket_messages(ticket_id) # Détecter les messages dupliqués duplicate_indices = detect_duplicate_content(messages) # Nettoyer et structurer les messages processed_messages = [] # Créer un dictionnaire de métadonnées pour chaque message message_metadata = {} for index, message in enumerate(messages): message_id = message.get('id') # Ajouter des métadonnées au message message_metadata[message_id] = { "is_system": self.is_system_message(message), "is_stage_change": self.is_stage_change_message(message), "is_forwarded": self.is_forwarded_message(message), "is_duplicate": index in duplicate_indices } # Créer une copie du message pour éviter de modifier l'original message_copy = message.copy() # Ajouter les métadonnées au message copié for key, value in message_metadata[message_id].items(): message_copy[key] = value # Nettoyer le corps du message selon la stratégie choisie if message_copy.get('body'): # Toujours conserver l'original message_copy['body_original'] = message_copy.get('body', '') # Appliquer la stratégie de nettoyage, sauf si raw if strategy != "raw": cleaned_body = clean_html( message_copy.get('body', ''), is_forwarded=message_copy.get('is_forwarded', False), preserve_doc_links=cleaning_config.get('preserve_doc_links', True) ) # Nettoyer davantage le code HTML qui pourrait rester if cleaned_body: # Supprimer les balises style et script avec leur contenu cleaned_body = re.sub(r']*>.*?', '', cleaned_body, flags=re.DOTALL) cleaned_body = re.sub(r']*>.*?', '', cleaned_body, flags=re.DOTALL) # Supprimer les balises HTML restantes cleaned_body = re.sub(r'<[^>]+>', '', cleaned_body) message_copy['body'] = cleaned_body # Récupérer les détails de l'auteur message_copy['author_details'] = self.get_message_author_details(message_copy) # Vérifier si le message contient des éléments importants has_attachments = bool(message_copy.get('attachment_ids')) has_images = False has_meaningful_content = False # Vérifier la présence d'images dans le HTML if message_copy.get('body_original'): # Rechercher les balises img dans le HTML has_images = ' 30: # Texte non vide et d'une certaine longueur has_meaningful_content = True # Déterminer si le message doit être conservé malgré son statut système is_important = ( has_attachments or has_images or message_copy.get('is_forwarded') or has_meaningful_content or message_copy.get('is_stage_change') ) # Ne pas inclure les messages système UNIQUEMENT s'ils n'ont rien d'important if message_copy.get('is_system') and not is_important: # Enregistrer l'exclusion dans les métadonnées message_metadata[message_id]['excluded'] = "system_message" continue # Si le message est marqué comme exclu dans les métadonnées mais qu'il est transféré, le réintégrer if message_metadata.get(message_id, {}).get('excluded') == "system_message" and message_copy.get('is_forwarded'): # Supprimer l'exclusion des métadonnées del message_metadata[message_id]['excluded'] # Vérifier aussi les messages qui sont déjà exclus dans les métadonnées d'entrée # et les réintégrer s'ils sont transférés if 'excluded' in message_metadata.get(message_id, {}) and message_copy.get('is_forwarded'): # Supprimer l'exclusion des métadonnées del message_metadata[message_id]['excluded'] # Ignorer les messages dupliqués si demandé if message_copy.get('is_duplicate'): # Enregistrer l'exclusion dans les métadonnées message_metadata[message_id]['excluded'] = "duplicate_content" continue processed_messages.append(message_copy) # Trier les messages par date processed_messages.sort(key=lambda x: x.get('date', '')) # Étape supplémentaire: Vérifier si des messages transférés ont été exclus et les réintégrer processed_ids = {msg['id'] for msg in processed_messages if 'id' in msg} for message in messages: message_id = message.get('id') if (message_id not in processed_ids and message_metadata.get(message_id, {}).get('is_forwarded') and 'excluded' in message_metadata.get(message_id, {})): # Créer une copie du message message_copy = message.copy() # Ajouter les métadonnées au message for key, value in message_metadata[message_id].items(): if key != 'excluded': # Ne pas ajouter le tag d'exclusion message_copy[key] = value # Si le message a un corps, on applique le même traitement de nettoyage if message_copy.get('body'): # Toujours conserver l'original message_copy['body_original'] = message_copy.get('body', '') # Appliquer la stratégie de nettoyage, sauf si raw if strategy != "raw": cleaned_body = clean_html( message_copy.get('body', ''), is_forwarded=message_copy.get('is_forwarded', False), preserve_doc_links=cleaning_config.get('preserve_doc_links', True) ) # Nettoyage supplémentaire if cleaned_body: cleaned_body = re.sub(r']*>.*?', '', cleaned_body, flags=re.DOTALL) cleaned_body = re.sub(r']*>.*?', '', cleaned_body, flags=re.DOTALL) cleaned_body = re.sub(r'<[^>]+>', '', cleaned_body) message_copy['body'] = cleaned_body # Récupérer les détails de l'auteur message_copy['author_details'] = self.get_message_author_details(message_copy) # Supprimer l'exclusion des métadonnées if 'excluded' in message_metadata[message_id]: del message_metadata[message_id]['excluded'] # Ajouter le message aux messages traités processed_messages.append(message_copy) # Trier à nouveau les messages par date après la réintégration processed_messages.sort(key=lambda x: x.get('date', '')) # Récupérer les informations supplémentaires du ticket try: ticket_data = self.auth._rpc_call("/web/dataset/call_kw", { "model": "project.task", "method": "read", "args": [[ticket_id]], "kwargs": {"fields": ["project_id", "stage_id"]} }) project_id = None stage_id = None project_name = None stage_name = None if ticket_data and isinstance(ticket_data, list) and len(ticket_data) > 0: if "project_id" in ticket_data[0] and ticket_data[0]["project_id"]: project_id = ticket_data[0]["project_id"][0] if isinstance(ticket_data[0]["project_id"], list) else ticket_data[0]["project_id"] project_name = ticket_data[0]["project_id"][1] if isinstance(ticket_data[0]["project_id"], list) else None if "stage_id" in ticket_data[0] and ticket_data[0]["stage_id"]: stage_id = ticket_data[0]["stage_id"][0] if isinstance(ticket_data[0]["stage_id"], list) else ticket_data[0]["stage_id"] stage_name = ticket_data[0]["stage_id"][1] if isinstance(ticket_data[0]["stage_id"], list) else None except Exception as e: logging.error(f"Erreur lors de la récupération des informations du ticket: {e}") project_id = None stage_id = None project_name = None stage_name = None # Créer la structure pour le JSON messages_with_summary = { "ticket_summary": { "id": ticket_id, "code": ticket_code, "name": ticket_name, "project_id": project_id, "project_name": project_name, "stage_id": stage_id, "stage_name": stage_name, "date_extraction": datetime.now().isoformat() }, "metadata": { "message_count": { "total": len(messages), "processed": len(processed_messages), "excluded": len(messages) - len(processed_messages) }, "cleaning_strategy": strategy, "cleaning_config": cleaning_config }, "messages": processed_messages } # Sauvegarder les messages en JSON all_messages_path = os.path.join(output_dir, "all_messages.json") save_json(messages_with_summary, all_messages_path) # Sauvegarder également les messages bruts raw_messages_path = os.path.join(output_dir, "messages_raw.json") save_json({ "ticket_id": ticket_id, "ticket_code": ticket_code, "message_metadata": message_metadata, "messages": messages }, raw_messages_path) # Créer un fichier texte pour une lecture plus facile messages_text_path = os.path.join(output_dir, "all_messages.txt") try: text_content = self._generate_messages_text(ticket_code, ticket_name, processed_messages) save_text(text_content, messages_text_path) except Exception as e: logging.error(f"Erreur lors de la création du fichier texte: {e}") return { "all_messages_path": all_messages_path, "raw_messages_path": raw_messages_path, "messages_text_path": messages_text_path, "messages_count": len(processed_messages), "total_messages": len(messages) } def _generate_messages_text(self, ticket_code: str, ticket_name: str, processed_messages: List[Dict[str, Any]]) -> str: """ Génère un fichier texte formaté à partir des messages traités. Args: ticket_code: Code du ticket ticket_name: Nom du ticket processed_messages: Liste des messages traités Returns: Contenu du fichier texte """ content = [] # Informations sur le ticket content.append(f"TICKET: {ticket_code} - {ticket_name}") content.append(f"Date d'extraction: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") content.append(f"Nombre de messages: {len(processed_messages)}") content.append("\n" + "="*80 + "\n") # Parcourir les messages filtrés for msg in processed_messages: author = msg.get('author_details', {}).get('name', msg.get('email_from', 'Inconnu')) date = msg.get('date', '') subject = msg.get('subject', 'Sans objet') body = msg.get('body', '') # Formater différemment les messages spéciaux if msg.get('is_stage_change'): content.append("*"*80) content.append("*** CHANGEMENT D'ÉTAT ***") content.append("*"*80 + "\n") elif msg.get('is_forwarded'): content.append("*"*80) content.append("*** MESSAGE TRANSFÉRÉ ***") content.append("*"*80 + "\n") # En-tête du message content.append(f"DATE: {date}") content.append(f"DE: {author}") if subject: content.append(f"OBJET: {subject}") content.append("") content.append(f"{body}") content.append("\n" + "-"*80 + "\n") return "\n".join(content)