llm_ticket3/odoo/message_manager.py
2025-04-09 13:36:42 +02:00

447 lines
19 KiB
Python

from typing import List, Dict, Any, Optional, Tuple
from .auth_manager import AuthManager
from formatters.clean_html import clean_html
from core.utils import save_json, save_text, detect_duplicate_content, normalize_filename
import os
import re
import logging
from datetime import datetime
class MessageManager:
"""
Gestionnaire de messages pour traiter les messages associés aux tickets.
"""
def __init__(self, auth: AuthManager):
"""
Initialise le gestionnaire de messages.
Args:
auth: Gestionnaire d'authentification
"""
self.auth = auth
self.model_name = "project.task"
self.cleaning_strategies = {
"simple": {"preserve_links": False, "preserve_images": False, "strategy": "strip_tags"},
"standard": {"preserve_links": True, "preserve_images": True, "strategy": "html2text"},
"advanced": {"preserve_links": True, "preserve_images": True, "strategy": "soup"},
"raw": {"preserve_links": False, "preserve_images": False, "strategy": "none"}
}
self.default_strategy = "standard"
def get_ticket_messages(self, ticket_id: int, fields: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""
Récupère tous les messages associés à un ticket.
Args:
ticket_id: ID du ticket
fields: Liste des champs à récupérer (facultatif)
Returns:
Liste des messages associés au ticket
"""
if fields is None:
fields = ["id", "body", "date", "author_id", "email_from", "message_type",
"parent_id", "subtype_id", "subject", "tracking_value_ids", "attachment_ids"]
params = {
"model": "mail.message",
"method": "search_read",
"args": [[["res_id", "=", ticket_id], ["model", "=", self.model_name]]],
"kwargs": {
"fields": fields,
"order": "date asc"
}
}
messages = self.auth._rpc_call("/web/dataset/call_kw", params)
return messages if isinstance(messages, list) else []
def is_system_message(self, message: Dict[str, Any]) -> bool:
"""
Vérifie si le message est un message système ou OdooBot.
Args:
message: Le message à vérifier
Returns:
True si c'est un message système, False sinon
"""
is_system = False
# Vérifier le nom de l'auteur
if 'author_id' in message and isinstance(message['author_id'], list) and len(message['author_id']) > 1:
author_name = message['author_id'][1].lower()
if 'odoobot' in author_name or 'bot' in author_name or 'système' in author_name or 'system' in author_name:
is_system = True
# Vérifier le type de message
if message.get('message_type') in ['notification', 'auto_comment']:
is_system = True
# Vérifier le sous-type du message
if 'subtype_id' in message and isinstance(message['subtype_id'], list) and len(message['subtype_id']) > 1:
subtype = message['subtype_id'][1].lower()
if 'notification' in subtype or 'system' in subtype or 'note' in subtype:
is_system = True
return is_system
def is_stage_change_message(self, message: Dict[str, Any]) -> bool:
"""
Vérifie si le message est un changement d'état.
Args:
message: Le message à vérifier
Returns:
True si c'est un message de changement d'état, False sinon
"""
if not isinstance(message.get('body', ''), str):
return False
body = message.get('body', '').lower()
# Patterns pour les changements d'état
stage_patterns = [
'étape changée', 'stage changed', 'modifié l\'étape',
'changed the stage', 'ticket transféré', 'ticket transferred',
'statut modifié', 'status changed', 'état du ticket'
]
# Vérifier aussi les valeurs de tracking si disponibles
if message.get('tracking_value_ids'):
try:
tracking_values = self.auth.read("mail.tracking.value", message.get('tracking_value_ids', []),
["field", "field_desc", "old_value_char", "new_value_char"])
for value in tracking_values:
if value.get("field") == "stage_id" or "stage" in value.get("field_desc", "").lower():
return True
except Exception as e:
logging.warning(f"Erreur lors de la vérification des valeurs de tracking: {e}")
return any(pattern in body for pattern in stage_patterns)
def is_forwarded_message(self, message: Dict[str, Any]) -> bool:
"""
Détecte si un message est un message transféré.
Args:
message: Le message à analyser
Returns:
True si le message est transféré, False sinon
"""
if not message.get('body'):
return False
# Indicateurs de message transféré
forwarded_indicators = [
"message transféré", "forwarded message",
"transféré de", "forwarded from",
"début du message transféré", "begin forwarded message",
"message d'origine", "original message",
"from:", "de:", "to:", "à:", "subject:", "objet:",
"envoyé:", "sent:", "date:", "cc:"
]
# Vérifier le contenu du message
body_lower = message.get('body', '').lower() if isinstance(message.get('body', ''), str) else ""
# Vérifier la présence d'indicateurs de transfert
for indicator in forwarded_indicators:
if indicator in body_lower:
return True
# Vérifier si le sujet contient des préfixes courants de transfert
subject_value = message.get('subject', '')
if not isinstance(subject_value, str):
subject_value = str(subject_value) if subject_value is not None else ""
subject_lower = subject_value.lower()
forwarded_prefixes = ["tr:", "fwd:", "fw:"]
for prefix in forwarded_prefixes:
if subject_lower.startswith(prefix):
return True
# Patterns typiques dans les messages transférés
patterns = [
r"-{3,}Original Message-{3,}",
r"_{3,}Original Message_{3,}",
r">{3,}", # Plusieurs signes > consécutifs indiquent souvent un message cité
r"Le .* a écrit :"
]
for pattern in patterns:
if re.search(pattern, body_lower):
return True
return False
def get_message_author_details(self, message: Dict[str, Any]) -> Dict[str, Any]:
"""
Récupère les détails de l'auteur d'un message.
Args:
message: Le message dont il faut récupérer l'auteur
Returns:
Dictionnaire avec les détails de l'auteur
"""
author_details = {
"name": "Inconnu",
"email": message.get('email_from', ''),
"is_system": False
}
try:
author_id_field = message.get('author_id')
if author_id_field and isinstance(author_id_field, list) and len(author_id_field) > 0:
author_id = author_id_field[0]
params = {
"model": "res.partner",
"method": "read",
"args": [[author_id]],
"kwargs": {"fields": ['name', 'email', 'phone', 'function', 'company_id']}
}
author_data = self.auth._rpc_call("/web/dataset/call_kw", params)
if author_data and isinstance(author_data, list) and len(author_data) > 0:
author_details.update(author_data[0])
# Vérifier si c'est un auteur système
if author_details.get('name'):
author_name = author_details['name'].lower()
if 'odoobot' in author_name or 'bot' in author_name or 'système' in author_name:
author_details['is_system'] = True
except Exception as e:
logging.warning(f"Erreur lors de la récupération des détails de l'auteur: {e}")
return author_details
def process_messages(self, ticket_id: int, ticket_code: str, ticket_name: str, output_dir: str,
strategy: str = "standard") -> Dict[str, Any]:
"""
Traite tous les messages d'un ticket, nettoie le contenu et génère des fichiers structurés.
Args:
ticket_id: ID du ticket
ticket_code: Code du ticket
ticket_name: Nom du ticket
output_dir: Répertoire de sortie
strategy: Stratégie de nettoyage (simple, standard, advanced, raw)
Returns:
Dictionnaire avec les chemins des fichiers créés
"""
# Validation de la stratégie
if strategy not in self.cleaning_strategies:
logging.warning(f"Stratégie de nettoyage '{strategy}' inconnue, utilisation de la stratégie par défaut '{self.default_strategy}'")
strategy = self.default_strategy
cleaning_config = self.cleaning_strategies[strategy]
# Récupérer les messages
messages = self.get_ticket_messages(ticket_id)
# Détecter les messages dupliqués
duplicate_indices = detect_duplicate_content(messages)
# Nettoyer et structurer les messages
processed_messages = []
# Créer un dictionnaire de métadonnées pour chaque message
message_metadata = {}
for index, message in enumerate(messages):
message_id = message.get('id')
# Ajouter des métadonnées au message
message_metadata[message_id] = {
"is_system": self.is_system_message(message),
"is_stage_change": self.is_stage_change_message(message),
"is_forwarded": self.is_forwarded_message(message),
"is_duplicate": index in duplicate_indices
}
# Créer une copie du message pour éviter de modifier l'original
message_copy = message.copy()
# Ajouter les métadonnées au message copié
for key, value in message_metadata[message_id].items():
message_copy[key] = value
# Nettoyer le corps du message selon la stratégie choisie
if message_copy.get('body'):
# Toujours conserver l'original
message_copy['body_original'] = message_copy.get('body', '')
# Appliquer la stratégie de nettoyage, sauf si raw
if strategy != "raw":
cleaned_body = clean_html(
message_copy.get('body', ''),
strategy=cleaning_config['strategy'],
preserve_links=cleaning_config['preserve_links'],
preserve_images=cleaning_config['preserve_images']
)
# Nettoyer davantage le code HTML qui pourrait rester
if cleaned_body:
# Supprimer les balises style et script avec leur contenu
cleaned_body = re.sub(r'<style[^>]*>.*?</style>', '', cleaned_body, flags=re.DOTALL)
cleaned_body = re.sub(r'<script[^>]*>.*?</script>', '', cleaned_body, flags=re.DOTALL)
# Supprimer les balises HTML restantes
cleaned_body = re.sub(r'<[^>]+>', '', cleaned_body)
message_copy['body'] = cleaned_body
# Récupérer les détails de l'auteur
message_copy['author_details'] = self.get_message_author_details(message_copy)
# Ne pas inclure les messages système sans intérêt
if message_copy.get('is_system') and not message_copy.get('is_stage_change'):
# Enregistrer l'exclusion dans les métadonnées
message_metadata[message_id]['excluded'] = "system_message"
continue
# Ignorer les messages dupliqués si demandé
if message_copy.get('is_duplicate'):
# Enregistrer l'exclusion dans les métadonnées
message_metadata[message_id]['excluded'] = "duplicate_content"
continue
processed_messages.append(message_copy)
# Trier les messages par date
processed_messages.sort(key=lambda x: x.get('date', ''))
# Récupérer les informations supplémentaires du ticket
try:
ticket_data = self.auth._rpc_call("/web/dataset/call_kw", {
"model": "project.task",
"method": "read",
"args": [[ticket_id]],
"kwargs": {"fields": ["project_id", "stage_id"]}
})
project_id = None
stage_id = None
project_name = None
stage_name = None
if ticket_data and isinstance(ticket_data, list) and len(ticket_data) > 0:
if "project_id" in ticket_data[0] and ticket_data[0]["project_id"]:
project_id = ticket_data[0]["project_id"][0] if isinstance(ticket_data[0]["project_id"], list) else ticket_data[0]["project_id"]
project_name = ticket_data[0]["project_id"][1] if isinstance(ticket_data[0]["project_id"], list) else None
if "stage_id" in ticket_data[0] and ticket_data[0]["stage_id"]:
stage_id = ticket_data[0]["stage_id"][0] if isinstance(ticket_data[0]["stage_id"], list) else ticket_data[0]["stage_id"]
stage_name = ticket_data[0]["stage_id"][1] if isinstance(ticket_data[0]["stage_id"], list) else None
except Exception as e:
logging.error(f"Erreur lors de la récupération des informations du ticket: {e}")
project_id = None
stage_id = None
project_name = None
stage_name = None
# Créer la structure pour le JSON
messages_with_summary = {
"ticket_summary": {
"id": ticket_id,
"code": ticket_code,
"name": ticket_name,
"project_id": project_id,
"project_name": project_name,
"stage_id": stage_id,
"stage_name": stage_name,
"date_extraction": datetime.now().isoformat()
},
"metadata": {
"message_count": {
"total": len(messages),
"processed": len(processed_messages),
"excluded": len(messages) - len(processed_messages)
},
"cleaning_strategy": strategy,
"cleaning_config": cleaning_config
},
"messages": processed_messages
}
# Sauvegarder les messages en JSON
all_messages_path = os.path.join(output_dir, "all_messages.json")
save_json(messages_with_summary, all_messages_path)
# Sauvegarder également les messages bruts
raw_messages_path = os.path.join(output_dir, "messages_raw.json")
save_json({
"ticket_id": ticket_id,
"ticket_code": ticket_code,
"message_metadata": message_metadata,
"messages": messages
}, raw_messages_path)
# Créer un fichier texte pour une lecture plus facile
messages_text_path = os.path.join(output_dir, "all_messages.txt")
try:
text_content = self._generate_messages_text(ticket_code, ticket_name, processed_messages)
save_text(text_content, messages_text_path)
except Exception as e:
logging.error(f"Erreur lors de la création du fichier texte: {e}")
return {
"all_messages_path": all_messages_path,
"raw_messages_path": raw_messages_path,
"messages_text_path": messages_text_path,
"messages_count": len(processed_messages),
"total_messages": len(messages)
}
def _generate_messages_text(self, ticket_code: str, ticket_name: str,
processed_messages: List[Dict[str, Any]]) -> str:
"""
Génère un fichier texte formaté à partir des messages traités.
Args:
ticket_code: Code du ticket
ticket_name: Nom du ticket
processed_messages: Liste des messages traités
Returns:
Contenu du fichier texte
"""
content = []
# Informations sur le ticket
content.append(f"TICKET: {ticket_code} - {ticket_name}")
content.append(f"Date d'extraction: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
content.append(f"Nombre de messages: {len(processed_messages)}")
content.append("\n" + "="*80 + "\n")
# Parcourir les messages filtrés
for msg in processed_messages:
author = msg.get('author_details', {}).get('name', msg.get('email_from', 'Inconnu'))
date = msg.get('date', '')
subject = msg.get('subject', 'Sans objet')
body = msg.get('body', '')
# Formater différemment les messages spéciaux
if msg.get('is_stage_change'):
content.append("*"*80)
content.append("*** CHANGEMENT D'ÉTAT ***")
content.append("*"*80 + "\n")
elif msg.get('is_forwarded'):
content.append("*"*80)
content.append("*** MESSAGE TRANSFÉRÉ ***")
content.append("*"*80 + "\n")
# En-tête du message
content.append(f"DATE: {date}")
content.append(f"DE: {author}")
if subject:
content.append(f"OBJET: {subject}")
content.append("")
content.append(f"{body}")
content.append("\n" + "-"*80 + "\n")
return "\n".join(content)