llm_ticket3/core/utils.py
2025-04-18 15:18:46 +02:00

246 lines
7.9 KiB
Python

"""
Utilitaires généraux pour l'ensemble de l'application.
"""
import os
import json
import logging
import re
from typing import Dict, Any, List, Optional, Union
from datetime import datetime
import unicodedata
from bs4 import Tag
def setup_logging(level: Union[str, int] = logging.INFO, log_file: Optional[str] = None) -> None:
"""
Configure la journalisation avec un format spécifique et éventuellement un fichier de logs.
Args:
level: Niveau de journalisation en tant que chaîne (ex: "INFO", "DEBUG") ou valeur entière (default: logging.INFO)
log_file: Chemin du fichier de log (default: None)
"""
# Convertir le niveau de log si c'est une chaîne
if isinstance(level, str):
numeric_level = getattr(logging, level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f"Niveau de journalisation invalide: {level}")
else:
numeric_level = level
logging.basicConfig(
level=numeric_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Ajout d'un gestionnaire de fichier si log_file est spécifié
if log_file:
# S'assurer que le répertoire existe
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(numeric_level)
file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%Y-%m-%d %H:%M:%S')
file_handler.setFormatter(file_formatter)
logging.getLogger().addHandler(file_handler)
def log_separator(length: int = 60) -> None:
"""
Ajoute une ligne de séparation dans les logs.
Args:
length: Longueur de la ligne (default: 60)
"""
logging.info("-" * length)
def save_json(data: Any, file_path: str) -> bool:
"""
Sauvegarde des données au format JSON dans un fichier.
Args:
data: Données à sauvegarder
file_path: Chemin du fichier
Returns:
True si la sauvegarde a réussi, False sinon
"""
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
logging.error(f"Erreur lors de la sauvegarde du fichier JSON {file_path}: {e}")
return False
def save_text(text: str, file_path: str) -> bool:
"""
Sauvegarde du texte dans un fichier.
Args:
text: Texte à sauvegarder
file_path: Chemin du fichier
Returns:
True si la sauvegarde a réussi, False sinon
"""
try:
# S'assurer que le répertoire existe
directory = os.path.dirname(file_path)
if directory and not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
f.write(text)
return True
except Exception as e:
logging.error(f"Erreur lors de la sauvegarde du fichier texte {file_path}: {e}")
return False
def normalize_filename(name: str) -> str:
"""
Normalise un nom de fichier en remplaçant les caractères non autorisés.
Args:
name: Nom à normaliser
Returns:
Nom normalisé
"""
# Enlever les accents
name = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore').decode('ASCII')
# Remplacer les caractères non alphanumériques par des underscores
name = re.sub(r'[^\w\.-]', '_', name)
# Remplacer les caractères non autorisés par des underscores
sanitized = re.sub(r'[\\/*?:"<>|]', '_', name)
# Limiter la longueur du nom à 100 caractères
if len(sanitized) > 100:
sanitized = sanitized[:97] + "..."
return sanitized.strip()
def detect_duplicate_content(messages: List[Dict[str, Any]]) -> List[int]:
"""
Détecte les messages avec un contenu dupliqué et retourne leurs indices.
Args:
messages: Liste de messages à analyser
Returns:
Liste des indices des messages dupliqués
"""
try:
# Import here to avoid circular imports
from formatters.clean_html import clean_html
except ImportError:
# Fallback to a simplified version if the import fails
def clean_html(html_content, is_forwarded=False, is_description=False, strategy="standard",
preserve_links=False, preserve_images=False, preserve_doc_links=True):
return html_content.strip() if html_content else ""
content_map = {}
duplicate_indices = []
for idx, message in enumerate(messages):
body = message.get("body", "")
if not body:
continue
# Nettoyer le contenu HTML pour la comparaison
cleaned_content = clean_html(body, is_description=False)
# Considérer uniquement les messages avec du contenu significatif
if len(cleaned_content.strip()) < 10:
continue
# Vérifier si le contenu existe déjà
if cleaned_content in content_map:
duplicate_indices.append(idx)
else:
content_map[cleaned_content] = idx
return duplicate_indices
def get_timestamp() -> str:
"""
Retourne un timestamp au format YYYYMMDD_HHMMSS
Returns:
Chaîne formatée avec le timestamp
"""
return datetime.now().strftime("%Y%m%d_%H%M%S")
def ensure_dir(path: str) -> bool:
"""
S'assure qu'un répertoire existe, le crée si nécessaire.
Args:
path: Chemin du répertoire à créer
Returns:
True si le répertoire existe ou a été créé avec succès, False sinon
"""
try:
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
return True
except Exception as e:
logging.error(f"Erreur lors de la création du répertoire {path}: {e}")
return False
def is_important_image(tag: Tag, message_text: str) -> bool:
"""
Détermine si une image est importante ou s'il s'agit d'un logo/signature.
Args:
tag: La balise d'image à analyser
message_text: Le texte complet du message pour contexte
Returns:
True si l'image semble importante, False sinon
"""
# Vérifier les attributs de l'image
src = str(tag.get('src', ''))
alt = str(tag.get('alt', ''))
title = str(tag.get('title', ''))
# Patterns pour les images inutiles
useless_img_patterns = [
'logo', 'signature', 'outlook', 'footer', 'header', 'icon',
'emoticon', 'emoji', 'cid:', 'pixel', 'spacer', 'vignette',
'banner', 'separator', 'decoration', 'mail_signature'
]
# Vérifier si c'est une image inutile
for pattern in useless_img_patterns:
if (pattern in src.lower() or
pattern in alt.lower() or
pattern in title.lower()):
return False
# Vérifier la taille
width_str = str(tag.get('width', ''))
height_str = str(tag.get('height', ''))
try:
if width_str.isdigit() and height_str.isdigit():
width = int(width_str)
height = int(height_str)
if width <= 50 and height <= 50:
return False
except (ValueError, TypeError):
pass
# Vérifier si l'image est mentionnée dans le texte
image_indicators = [
'capture', 'screenshot', 'image', 'photo', 'illustration',
'voir', 'regarder', 'ci-joint', 'écran', 'erreur', 'problème',
'bug', 'pièce jointe', 'attachment', 'veuillez trouver'
]
for indicator in image_indicators:
if indicator in message_text.lower():
return True
return True