mirror of
https://github.com/Ladebeze66/llm_ticket3.git
synced 2025-12-15 21:16:52 +01:00
524 lines
23 KiB
Python
524 lines
23 KiB
Python
from typing import List, Dict, Any, Optional, Tuple
|
|
from .auth_manager import AuthManager
|
|
from formatters.clean_html import clean_html
|
|
from core.utils import save_json, save_text, detect_duplicate_content, normalize_filename
|
|
import os
|
|
import re
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
class MessageManager:
|
|
"""
|
|
Gestionnaire de messages pour traiter les messages associés aux tickets.
|
|
"""
|
|
|
|
def __init__(self, auth: AuthManager):
|
|
"""
|
|
Initialise le gestionnaire de messages.
|
|
|
|
Args:
|
|
auth: Gestionnaire d'authentification
|
|
"""
|
|
self.auth = auth
|
|
self.model_name = "project.task"
|
|
self.cleaning_strategies = {
|
|
"simple": {"preserve_links": False, "preserve_images": False, "strategy": "strip_tags"},
|
|
"standard": {"preserve_links": True, "preserve_images": True, "strategy": "html2text"},
|
|
"advanced": {"preserve_links": True, "preserve_images": True, "strategy": "soup"},
|
|
"raw": {"preserve_links": False, "preserve_images": False, "strategy": "none"}
|
|
}
|
|
self.default_strategy = "standard"
|
|
|
|
def get_ticket_messages(self, ticket_id: int, fields: Optional[List[str]] = None) -> List[Dict[str, Any]]:
|
|
"""
|
|
Récupère tous les messages associés à un ticket.
|
|
|
|
Args:
|
|
ticket_id: ID du ticket
|
|
fields: Liste des champs à récupérer (facultatif)
|
|
|
|
Returns:
|
|
Liste des messages associés au ticket
|
|
"""
|
|
if fields is None:
|
|
fields = ["id", "body", "date", "author_id", "email_from", "message_type",
|
|
"parent_id", "subtype_id", "subject", "tracking_value_ids", "attachment_ids"]
|
|
|
|
params = {
|
|
"model": "mail.message",
|
|
"method": "search_read",
|
|
"args": [[["res_id", "=", ticket_id], ["model", "=", self.model_name]]],
|
|
"kwargs": {
|
|
"fields": fields,
|
|
"order": "date asc"
|
|
}
|
|
}
|
|
|
|
messages = self.auth._rpc_call("/web/dataset/call_kw", params)
|
|
return messages if isinstance(messages, list) else []
|
|
|
|
def is_system_message(self, message: Dict[str, Any]) -> bool:
|
|
"""
|
|
Vérifie si le message est un message système ou OdooBot.
|
|
|
|
Args:
|
|
message: Le message à vérifier
|
|
|
|
Returns:
|
|
True si c'est un message système, False sinon
|
|
"""
|
|
is_system = False
|
|
|
|
# Vérifier le nom de l'auteur
|
|
if 'author_id' in message and isinstance(message['author_id'], list) and len(message['author_id']) > 1:
|
|
author_name = message['author_id'][1].lower()
|
|
if 'odoobot' in author_name or 'bot' in author_name or 'système' in author_name or 'system' in author_name:
|
|
is_system = True
|
|
|
|
# Vérifier le type de message
|
|
if message.get('message_type') in ['notification', 'auto_comment']:
|
|
is_system = True
|
|
|
|
# Vérifier le sous-type du message
|
|
if 'subtype_id' in message and isinstance(message['subtype_id'], list) and len(message['subtype_id']) > 1:
|
|
subtype = message['subtype_id'][1].lower()
|
|
if 'notification' in subtype or 'system' in subtype or 'note' in subtype:
|
|
is_system = True
|
|
|
|
return is_system
|
|
|
|
def is_stage_change_message(self, message: Dict[str, Any]) -> bool:
|
|
"""
|
|
Vérifie si le message est un changement d'état.
|
|
|
|
Args:
|
|
message: Le message à vérifier
|
|
|
|
Returns:
|
|
True si c'est un message de changement d'état, False sinon
|
|
"""
|
|
if not isinstance(message.get('body', ''), str):
|
|
return False
|
|
|
|
body = message.get('body', '').lower()
|
|
|
|
# Patterns pour les changements d'état
|
|
stage_patterns = [
|
|
'étape changée', 'stage changed', 'modifié l\'étape',
|
|
'changed the stage', 'ticket transféré', 'ticket transferred',
|
|
'statut modifié', 'status changed', 'état du ticket'
|
|
]
|
|
|
|
# Vérifier aussi les valeurs de tracking si disponibles
|
|
if message.get('tracking_value_ids'):
|
|
try:
|
|
tracking_values = self.auth.read("mail.tracking.value", message.get('tracking_value_ids', []),
|
|
["field", "field_desc", "old_value_char", "new_value_char"])
|
|
for value in tracking_values:
|
|
if value.get("field") == "stage_id" or "stage" in value.get("field_desc", "").lower():
|
|
return True
|
|
except Exception as e:
|
|
logging.warning(f"Erreur lors de la vérification des valeurs de tracking: {e}")
|
|
|
|
return any(pattern in body for pattern in stage_patterns)
|
|
|
|
def is_forwarded_message(self, message: Dict[str, Any]) -> bool:
|
|
"""
|
|
Détecte si un message est un message transféré.
|
|
|
|
Args:
|
|
message: Le message à analyser
|
|
|
|
Returns:
|
|
True si le message est transféré, False sinon
|
|
"""
|
|
if not message.get('body'):
|
|
return False
|
|
|
|
# Indicateurs de message transféré
|
|
forwarded_indicators = [
|
|
"message transféré", "forwarded message",
|
|
"transféré de", "forwarded from",
|
|
"début du message transféré", "begin forwarded message",
|
|
"message d'origine", "original message",
|
|
"from:", "de:", "to:", "à:", "subject:", "objet:",
|
|
"envoyé:", "sent:", "date:", "cc:"
|
|
]
|
|
|
|
# Vérifier le contenu du message
|
|
body_lower = message.get('body', '').lower() if isinstance(message.get('body', ''), str) else ""
|
|
|
|
# Vérifier la présence d'indicateurs de transfert
|
|
for indicator in forwarded_indicators:
|
|
if indicator in body_lower:
|
|
return True
|
|
|
|
# Vérifier si le sujet contient des préfixes courants de transfert
|
|
subject_value = message.get('subject', '')
|
|
if not isinstance(subject_value, str):
|
|
subject_value = str(subject_value) if subject_value is not None else ""
|
|
|
|
subject_lower = subject_value.lower()
|
|
forwarded_prefixes = ["tr:", "fwd:", "fw:"]
|
|
for prefix in forwarded_prefixes:
|
|
if subject_lower.startswith(prefix):
|
|
return True
|
|
|
|
# Patterns typiques dans les messages transférés
|
|
patterns = [
|
|
r"-{3,}Original Message-{3,}",
|
|
r"_{3,}Original Message_{3,}",
|
|
r">{3,}", # Plusieurs signes > consécutifs indiquent souvent un message cité
|
|
r"Le .* a écrit :"
|
|
]
|
|
for pattern in patterns:
|
|
if re.search(pattern, body_lower):
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_message_author_details(self, message: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Récupère les détails de l'auteur d'un message.
|
|
|
|
Args:
|
|
message: Le message dont il faut récupérer l'auteur
|
|
|
|
Returns:
|
|
Dictionnaire avec les détails de l'auteur
|
|
"""
|
|
author_details = {
|
|
"name": "Inconnu",
|
|
"email": message.get('email_from', ''),
|
|
"is_system": False
|
|
}
|
|
|
|
try:
|
|
author_id_field = message.get('author_id')
|
|
if author_id_field and isinstance(author_id_field, list) and len(author_id_field) > 0:
|
|
author_id = author_id_field[0]
|
|
params = {
|
|
"model": "res.partner",
|
|
"method": "read",
|
|
"args": [[author_id]],
|
|
"kwargs": {"fields": ['name', 'email', 'phone', 'function', 'company_id']}
|
|
}
|
|
author_data = self.auth._rpc_call("/web/dataset/call_kw", params)
|
|
if author_data and isinstance(author_data, list) and len(author_data) > 0:
|
|
author_details.update(author_data[0])
|
|
|
|
# Vérifier si c'est un auteur système
|
|
if author_details.get('name'):
|
|
author_name = author_details['name'].lower()
|
|
if 'odoobot' in author_name or 'bot' in author_name or 'système' in author_name:
|
|
author_details['is_system'] = True
|
|
except Exception as e:
|
|
logging.warning(f"Erreur lors de la récupération des détails de l'auteur: {e}")
|
|
|
|
return author_details
|
|
|
|
def process_messages(self, ticket_id: int, ticket_code: str, ticket_name: str, output_dir: str,
|
|
strategy: str = "standard") -> Dict[str, Any]:
|
|
"""
|
|
Traite tous les messages d'un ticket, nettoie le contenu et génère des fichiers structurés.
|
|
|
|
Args:
|
|
ticket_id: ID du ticket
|
|
ticket_code: Code du ticket
|
|
ticket_name: Nom du ticket
|
|
output_dir: Répertoire de sortie
|
|
strategy: Stratégie de nettoyage (simple, standard, advanced, raw)
|
|
|
|
Returns:
|
|
Dictionnaire avec les chemins des fichiers créés
|
|
"""
|
|
# Validation de la stratégie
|
|
if strategy not in self.cleaning_strategies:
|
|
logging.warning(f"Stratégie de nettoyage '{strategy}' inconnue, utilisation de la stratégie par défaut '{self.default_strategy}'")
|
|
strategy = self.default_strategy
|
|
|
|
cleaning_config = self.cleaning_strategies[strategy]
|
|
|
|
# Récupérer les messages
|
|
messages = self.get_ticket_messages(ticket_id)
|
|
|
|
# Détecter les messages dupliqués
|
|
duplicate_indices = detect_duplicate_content(messages)
|
|
|
|
# Nettoyer et structurer les messages
|
|
processed_messages = []
|
|
|
|
# Créer un dictionnaire de métadonnées pour chaque message
|
|
message_metadata = {}
|
|
|
|
for index, message in enumerate(messages):
|
|
message_id = message.get('id')
|
|
|
|
# Ajouter des métadonnées au message
|
|
message_metadata[message_id] = {
|
|
"is_system": self.is_system_message(message),
|
|
"is_stage_change": self.is_stage_change_message(message),
|
|
"is_forwarded": self.is_forwarded_message(message),
|
|
"is_duplicate": index in duplicate_indices
|
|
}
|
|
|
|
# Créer une copie du message pour éviter de modifier l'original
|
|
message_copy = message.copy()
|
|
|
|
# Ajouter les métadonnées au message copié
|
|
for key, value in message_metadata[message_id].items():
|
|
message_copy[key] = value
|
|
|
|
# Nettoyer le corps du message selon la stratégie choisie
|
|
if message_copy.get('body'):
|
|
# Toujours conserver l'original
|
|
message_copy['body_original'] = message_copy.get('body', '')
|
|
|
|
# Appliquer la stratégie de nettoyage, sauf si raw
|
|
if strategy != "raw":
|
|
cleaned_body = clean_html(
|
|
message_copy.get('body', ''),
|
|
is_forwarded=message_copy.get('is_forwarded', False)
|
|
)
|
|
|
|
# Nettoyer davantage le code HTML qui pourrait rester
|
|
if cleaned_body:
|
|
# Supprimer les balises style et script avec leur contenu
|
|
cleaned_body = re.sub(r'<style[^>]*>.*?</style>', '', cleaned_body, flags=re.DOTALL)
|
|
cleaned_body = re.sub(r'<script[^>]*>.*?</script>', '', cleaned_body, flags=re.DOTALL)
|
|
# Supprimer les balises HTML restantes
|
|
cleaned_body = re.sub(r'<[^>]+>', '', cleaned_body)
|
|
|
|
message_copy['body'] = cleaned_body
|
|
|
|
# Récupérer les détails de l'auteur
|
|
message_copy['author_details'] = self.get_message_author_details(message_copy)
|
|
|
|
# Vérifier si le message contient des éléments importants
|
|
has_attachments = bool(message_copy.get('attachment_ids'))
|
|
has_images = False
|
|
has_meaningful_content = False
|
|
|
|
# Vérifier la présence d'images dans le HTML
|
|
if message_copy.get('body_original'):
|
|
# Rechercher les balises img dans le HTML
|
|
has_images = '<img' in message_copy.get('body_original', '').lower() or '/web/image/' in message_copy.get('body_original', '').lower()
|
|
|
|
# Rechercher des éléments d'images Odoo spécifiques
|
|
if '/web/image/' in message_copy.get('body_original', ''):
|
|
has_images = True
|
|
|
|
# Vérifier si le corps du message contient du texte significatif
|
|
body_text = message_copy.get('body', '')
|
|
if body_text and len(body_text.strip()) > 30: # Texte non vide et d'une certaine longueur
|
|
has_meaningful_content = True
|
|
|
|
# Déterminer si le message doit être conservé malgré son statut système
|
|
is_important = (
|
|
has_attachments or
|
|
has_images or
|
|
message_copy.get('is_forwarded') or
|
|
has_meaningful_content or
|
|
message_copy.get('is_stage_change')
|
|
)
|
|
|
|
# Ne pas inclure les messages système UNIQUEMENT s'ils n'ont rien d'important
|
|
if message_copy.get('is_system') and not is_important:
|
|
# Enregistrer l'exclusion dans les métadonnées
|
|
message_metadata[message_id]['excluded'] = "system_message"
|
|
continue
|
|
|
|
# Si le message est marqué comme exclu dans les métadonnées mais qu'il est transféré, le réintégrer
|
|
if message_metadata.get(message_id, {}).get('excluded') == "system_message" and message_copy.get('is_forwarded'):
|
|
# Supprimer l'exclusion des métadonnées
|
|
del message_metadata[message_id]['excluded']
|
|
|
|
# Vérifier aussi les messages qui sont déjà exclus dans les métadonnées d'entrée
|
|
# et les réintégrer s'ils sont transférés
|
|
if 'excluded' in message_metadata.get(message_id, {}) and message_copy.get('is_forwarded'):
|
|
# Supprimer l'exclusion des métadonnées
|
|
del message_metadata[message_id]['excluded']
|
|
|
|
# Ignorer les messages dupliqués si demandé
|
|
if message_copy.get('is_duplicate'):
|
|
# Enregistrer l'exclusion dans les métadonnées
|
|
message_metadata[message_id]['excluded'] = "duplicate_content"
|
|
continue
|
|
|
|
processed_messages.append(message_copy)
|
|
|
|
# Trier les messages par date
|
|
processed_messages.sort(key=lambda x: x.get('date', ''))
|
|
|
|
# Étape supplémentaire: Vérifier si des messages transférés ont été exclus et les réintégrer
|
|
processed_ids = {msg['id'] for msg in processed_messages if 'id' in msg}
|
|
for message in messages:
|
|
message_id = message.get('id')
|
|
if (message_id not in processed_ids and
|
|
message_metadata.get(message_id, {}).get('is_forwarded') and
|
|
'excluded' in message_metadata.get(message_id, {})):
|
|
# Créer une copie du message
|
|
message_copy = message.copy()
|
|
# Ajouter les métadonnées au message
|
|
for key, value in message_metadata[message_id].items():
|
|
if key != 'excluded': # Ne pas ajouter le tag d'exclusion
|
|
message_copy[key] = value
|
|
# Si le message a un corps, on applique le même traitement de nettoyage
|
|
if message_copy.get('body'):
|
|
# Toujours conserver l'original
|
|
message_copy['body_original'] = message_copy.get('body', '')
|
|
# Appliquer la stratégie de nettoyage, sauf si raw
|
|
if strategy != "raw":
|
|
cleaned_body = clean_html(
|
|
message_copy.get('body', ''),
|
|
is_forwarded=message_copy.get('is_forwarded', False)
|
|
)
|
|
# Nettoyage supplémentaire
|
|
if cleaned_body:
|
|
cleaned_body = re.sub(r'<style[^>]*>.*?</style>', '', cleaned_body, flags=re.DOTALL)
|
|
cleaned_body = re.sub(r'<script[^>]*>.*?</script>', '', cleaned_body, flags=re.DOTALL)
|
|
cleaned_body = re.sub(r'<[^>]+>', '', cleaned_body)
|
|
message_copy['body'] = cleaned_body
|
|
# Récupérer les détails de l'auteur
|
|
message_copy['author_details'] = self.get_message_author_details(message_copy)
|
|
# Supprimer l'exclusion des métadonnées
|
|
if 'excluded' in message_metadata[message_id]:
|
|
del message_metadata[message_id]['excluded']
|
|
# Ajouter le message aux messages traités
|
|
processed_messages.append(message_copy)
|
|
|
|
# Trier à nouveau les messages par date après la réintégration
|
|
processed_messages.sort(key=lambda x: x.get('date', ''))
|
|
|
|
# Récupérer les informations supplémentaires du ticket
|
|
try:
|
|
ticket_data = self.auth._rpc_call("/web/dataset/call_kw", {
|
|
"model": "project.task",
|
|
"method": "read",
|
|
"args": [[ticket_id]],
|
|
"kwargs": {"fields": ["project_id", "stage_id"]}
|
|
})
|
|
|
|
project_id = None
|
|
stage_id = None
|
|
project_name = None
|
|
stage_name = None
|
|
|
|
if ticket_data and isinstance(ticket_data, list) and len(ticket_data) > 0:
|
|
if "project_id" in ticket_data[0] and ticket_data[0]["project_id"]:
|
|
project_id = ticket_data[0]["project_id"][0] if isinstance(ticket_data[0]["project_id"], list) else ticket_data[0]["project_id"]
|
|
project_name = ticket_data[0]["project_id"][1] if isinstance(ticket_data[0]["project_id"], list) else None
|
|
|
|
if "stage_id" in ticket_data[0] and ticket_data[0]["stage_id"]:
|
|
stage_id = ticket_data[0]["stage_id"][0] if isinstance(ticket_data[0]["stage_id"], list) else ticket_data[0]["stage_id"]
|
|
stage_name = ticket_data[0]["stage_id"][1] if isinstance(ticket_data[0]["stage_id"], list) else None
|
|
except Exception as e:
|
|
logging.error(f"Erreur lors de la récupération des informations du ticket: {e}")
|
|
project_id = None
|
|
stage_id = None
|
|
project_name = None
|
|
stage_name = None
|
|
|
|
# Créer la structure pour le JSON
|
|
messages_with_summary = {
|
|
"ticket_summary": {
|
|
"id": ticket_id,
|
|
"code": ticket_code,
|
|
"name": ticket_name,
|
|
"project_id": project_id,
|
|
"project_name": project_name,
|
|
"stage_id": stage_id,
|
|
"stage_name": stage_name,
|
|
"date_extraction": datetime.now().isoformat()
|
|
},
|
|
"metadata": {
|
|
"message_count": {
|
|
"total": len(messages),
|
|
"processed": len(processed_messages),
|
|
"excluded": len(messages) - len(processed_messages)
|
|
},
|
|
"cleaning_strategy": strategy,
|
|
"cleaning_config": cleaning_config
|
|
},
|
|
"messages": processed_messages
|
|
}
|
|
|
|
# Sauvegarder les messages en JSON
|
|
all_messages_path = os.path.join(output_dir, "all_messages.json")
|
|
save_json(messages_with_summary, all_messages_path)
|
|
|
|
# Sauvegarder également les messages bruts
|
|
raw_messages_path = os.path.join(output_dir, "messages_raw.json")
|
|
save_json({
|
|
"ticket_id": ticket_id,
|
|
"ticket_code": ticket_code,
|
|
"message_metadata": message_metadata,
|
|
"messages": messages
|
|
}, raw_messages_path)
|
|
|
|
# Créer un fichier texte pour une lecture plus facile
|
|
messages_text_path = os.path.join(output_dir, "all_messages.txt")
|
|
|
|
try:
|
|
text_content = self._generate_messages_text(ticket_code, ticket_name, processed_messages)
|
|
save_text(text_content, messages_text_path)
|
|
except Exception as e:
|
|
logging.error(f"Erreur lors de la création du fichier texte: {e}")
|
|
|
|
return {
|
|
"all_messages_path": all_messages_path,
|
|
"raw_messages_path": raw_messages_path,
|
|
"messages_text_path": messages_text_path,
|
|
"messages_count": len(processed_messages),
|
|
"total_messages": len(messages)
|
|
}
|
|
|
|
def _generate_messages_text(self, ticket_code: str, ticket_name: str,
|
|
processed_messages: List[Dict[str, Any]]) -> str:
|
|
"""
|
|
Génère un fichier texte formaté à partir des messages traités.
|
|
|
|
Args:
|
|
ticket_code: Code du ticket
|
|
ticket_name: Nom du ticket
|
|
processed_messages: Liste des messages traités
|
|
|
|
Returns:
|
|
Contenu du fichier texte
|
|
"""
|
|
content = []
|
|
|
|
# Informations sur le ticket
|
|
content.append(f"TICKET: {ticket_code} - {ticket_name}")
|
|
content.append(f"Date d'extraction: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
content.append(f"Nombre de messages: {len(processed_messages)}")
|
|
content.append("\n" + "="*80 + "\n")
|
|
|
|
# Parcourir les messages filtrés
|
|
for msg in processed_messages:
|
|
author = msg.get('author_details', {}).get('name', msg.get('email_from', 'Inconnu'))
|
|
date = msg.get('date', '')
|
|
subject = msg.get('subject', 'Sans objet')
|
|
body = msg.get('body', '')
|
|
|
|
# Formater différemment les messages spéciaux
|
|
if msg.get('is_stage_change'):
|
|
content.append("*"*80)
|
|
content.append("*** CHANGEMENT D'ÉTAT ***")
|
|
content.append("*"*80 + "\n")
|
|
elif msg.get('is_forwarded'):
|
|
content.append("*"*80)
|
|
content.append("*** MESSAGE TRANSFÉRÉ ***")
|
|
content.append("*"*80 + "\n")
|
|
|
|
# En-tête du message
|
|
content.append(f"DATE: {date}")
|
|
content.append(f"DE: {author}")
|
|
if subject:
|
|
content.append(f"OBJET: {subject}")
|
|
content.append("")
|
|
content.append(f"{body}")
|
|
content.append("\n" + "-"*80 + "\n")
|
|
|
|
return "\n".join(content)
|