import os import base64 import logging import requests import re import json import datetime from typing import List, Dict, Any, Optional, Set from .auth_manager import AuthManager from core.utils import save_json, normalize_filename class AttachmentManager: """ Gestionnaire de pièces jointes pour extraire et sauvegarder les fichiers attachés aux tickets. """ def __init__(self, auth: AuthManager): """ Initialise le gestionnaire de pièces jointes. Args: auth: Gestionnaire d'authentification """ self.auth = auth self.model_name = "project.task" self.excluded_mime_types = [] # Types MIME à exclure si nécessaire def get_ticket_attachments(self, ticket_id: int) -> List[Dict[str, Any]]: """ Récupère les pièces jointes associées à un ticket. Args: ticket_id: ID du ticket Returns: Liste des pièces jointes avec leurs métadonnées """ params = { "model": "ir.attachment", "method": "search_read", "args": [[["res_id", "=", ticket_id], ["res_model", "=", self.model_name]]], "kwargs": { "fields": ["id", "name", "mimetype", "file_size", "create_date", "create_uid", "datas", "description", "res_name"] } } attachments = self.auth._rpc_call("/web/dataset/call_kw", params) # Résoudre les informations sur le créateur for attachment in attachments: if "create_uid" in attachment and isinstance(attachment["create_uid"], list) and len(attachment["create_uid"]) >= 2: attachment["creator_name"] = attachment["create_uid"][1] attachment["creator_id"] = attachment["create_uid"][0] elif "create_uid" in attachment and isinstance(attachment["create_uid"], int): # Récupérer le nom du créateur params = { "model": "res.users", "method": "name_get", "args": [[attachment["create_uid"]]], "kwargs": {} } result = self.auth._rpc_call("/web/dataset/call_kw", params) if result and isinstance(result, list) and result[0] and len(result[0]) >= 2: attachment["creator_name"] = result[0][1] attachment["creator_id"] = result[0][0] return attachments if isinstance(attachments, list) else [] def download_attachment(self, attachment: Dict[str, Any], output_dir: str) -> Dict[str, Any]: """ Télécharge et sauvegarde une pièce jointe dans le répertoire spécifié. Args: attachment: Dictionnaire contenant les métadonnées de la pièce jointe output_dir: Répertoire où sauvegarder la pièce jointe Returns: Dictionnaire avec les informations sur le fichier sauvegardé """ result = { "id": attachment.get("id"), "name": attachment.get("name", "Sans nom"), "mimetype": attachment.get("mimetype", "application/octet-stream"), "file_size": attachment.get("file_size", 0), "create_date": attachment.get("create_date"), "creator": attachment.get("creator_name", "Inconnu"), "status": "error", "file_path": "", "error": "" } if not attachment.get("datas"): result["error"] = "Données de pièce jointe manquantes" return result try: # Créer le dossier attachments s'il n'existe pas attachments_dir = os.path.join(output_dir, "attachments") os.makedirs(attachments_dir, exist_ok=True) # Construire un nom de fichier sécurisé safe_filename = normalize_filename(attachment.get("name", f"attachment_{attachment.get('id')}.bin")) file_path = os.path.join(attachments_dir, safe_filename) # Vérifier si un fichier avec le même nom existe déjà if os.path.exists(file_path): base, ext = os.path.splitext(safe_filename) counter = 1 while os.path.exists(file_path): new_filename = f"{base}_{counter}{ext}" file_path = os.path.join(attachments_dir, new_filename) counter += 1 # Décoder et sauvegarder le contenu file_content = base64.b64decode(attachment["datas"]) with open(file_path, "wb") as f: f.write(file_content) result["status"] = "success" result["file_path"] = file_path return result except Exception as e: logging.error(f"Erreur lors du téléchargement de la pièce jointe {attachment.get('name', '')}: {e}") result["error"] = str(e) return result def download_image_from_url(self, url: str, output_dir: str, filename: Optional[str] = None) -> Dict[str, Any]: """ Télécharge une image à partir d'une URL et la sauvegarde dans le répertoire des pièces jointes. Args: url: URL de l'image à télécharger output_dir: Répertoire de sortie filename: Nom de fichier à utiliser (facultatif) Returns: Dictionnaire avec les informations sur le fichier téléchargé """ result = { "url": url, "status": "error", "file_path": "", "error": "" } try: # Extraire l'ID de l'image et le token d'accès si présents img_id = None access_token = None # Pattern pour /web/image/ID?access_token=... id_match = re.search(r"/web/image/(\d+)", url) if id_match: img_id = int(id_match.group(1)) # Extraire le token d'accès token_match = re.search(r"access_token=([^&]+)", url) if token_match: access_token = token_match.group(1) # Extraire le nom de fichier de l'URL si non fourni if not filename: if img_id: # Utiliser l'ID de l'image filename = f"image_{img_id}.png" else: # Extraire le nom de fichier de l'URL url_path = url.split('?')[0] # Supprimer les paramètres de requête path_parts = url_path.split('/') filename = path_parts[-1] if path_parts else f"image_{hash(url)}.jpg" # Créer le dossier attachments s'il n'existe pas attachments_dir = os.path.join(output_dir, "attachments") os.makedirs(attachments_dir, exist_ok=True) # Construire un nom de fichier sécurisé if filename is None: filename = f"image_{hash(url)}.jpg" safe_filename = normalize_filename(filename) file_path = os.path.join(attachments_dir, safe_filename) # Vérifier si un fichier avec le même nom existe déjà if os.path.exists(file_path): base, ext = os.path.splitext(safe_filename) counter = 1 while os.path.exists(file_path): new_filename = f"{base}_{counter}{ext}" file_path = os.path.join(attachments_dir, new_filename) counter += 1 # Télécharger l'image en utilisant la session authentifiée # Vérifier les attributs disponibles dans l'objet auth cookies = getattr(self.auth, 'cookies', None) headers = getattr(self.auth, 'headers', None) verify_ssl = getattr(self.auth, 'verify_ssl', True) response = requests.get(url, cookies=cookies, headers=headers, verify=verify_ssl) response.raise_for_status() # Sauvegarder l'image with open(file_path, 'wb') as f: f.write(response.content) # Déterminer le type MIME basé sur l'extension _, ext = os.path.splitext(file_path) mimetype = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.svg': 'image/svg+xml' }.get(ext.lower(), 'application/octet-stream') # Mettre à jour le résultat avec les informations du fichier result["status"] = "success" result["file_path"] = file_path result["mimetype"] = mimetype result["file_size"] = str(len(response.content)) result["name"] = os.path.basename(file_path) if img_id is not None: result["id"] = str(img_id) if access_token: result["access_token"] = access_token return result except Exception as e: logging.error(f"Erreur lors du téléchargement de l'image depuis {url}: {e}") result["error"] = str(e) return result def extract_missing_images(self, messages_data: Dict[str, Any], output_dir: str) -> List[Dict[str, Any]]: """ Extrait les images manquantes qui sont intégrées dans les messages HTML mais non attachées au ticket. Args: messages_data: Données des messages du ticket output_dir: Répertoire de sortie Returns: Liste des informations sur les images extraites """ extracted_images = [] try: # Chercher les URL d'images dans les messages image_urls = self._extract_image_urls_from_messages(messages_data) logging.info(f"URLs d'images trouvées dans les messages: {len(image_urls)}") # Récupérer les IDs des pièces jointes existantes pour éviter les doublons existing_attachments = {} attachments_info_path = os.path.join(output_dir, "attachments_info.json") if os.path.exists(attachments_info_path): try: with open(attachments_info_path, 'r', encoding='utf-8') as f: existing_attachments_info = json.load(f) for att in existing_attachments_info: if "id" in att and att["id"]: existing_attachments[str(att["id"])] = att except Exception as e: logging.error(f"Erreur lors du chargement des pièces jointes existantes: {e}") # Télécharger les images depuis les URLs for url in image_urls: # Vérifier si l'image a déjà un ID extrait de l'URL img_id = None id_match = re.search(r"/web/image/(\d+)", url) if id_match: img_id = id_match.group(1) # Vérifier si cette image existe déjà if img_id in existing_attachments: logging.info(f"Image avec ID {img_id} déjà présente dans les pièces jointes, ignorée") continue # Télécharger l'image result = self.download_image_from_url(url, output_dir) if result["status"] == "success": # Ajouter des informations sur la source result["source"] = "message_embedded" result["extraction_date"] = datetime.datetime.now().isoformat() # Traiter le message source si disponible message_id = None message_author = None messages = messages_data.get("messages", []) for msg in messages: body = msg.get("body_original", msg.get("body", "")) if body and url in body: message_id = msg.get("id") if "author_id" in msg and isinstance(msg["author_id"], list) and len(msg["author_id"]) >= 2: message_author = msg["author_id"][1] elif "author_id" in msg and isinstance(msg["author_id"], int): message_author = f"User ID: {msg['author_id']}" break if message_id: result["source_message_id"] = message_id if message_author: result["message_author"] = message_author extracted_images.append(result) logging.info(f"Image téléchargée depuis l'URL: {url}") logging.info(f"Total des images extraites: {len(extracted_images)}") except Exception as e: logging.error(f"Erreur lors de l'extraction des images manquantes: {e}") return extracted_images def _extract_image_urls_from_messages(self, messages_data: Dict[str, Any]) -> Set[str]: """ Extrait les URLs d'images des messages. Args: messages_data: Données des messages du ticket Returns: Ensemble des URLs d'images trouvées """ image_urls = set() # Récupérer la liste des messages messages = messages_data.get("messages", []) if messages_data else [] if not messages: return image_urls # Parcourir chaque message for message in messages: # Chercher dans body_original s'il existe, sinon dans body body = message.get("body_original", message.get("body", "")) if not body or not isinstance(body, str): continue # Recherche des URLs d'images dans le HTML # 1. Trouver toutes les balises avec leur src complet img_tags = re.finditer(r']+src=["\']([^"\']+)["\'][^>]*>', body) for match in img_tags: img_url = match.group(1) # Ignorer les URLs data: if img_url.startswith('data:'): continue # Ajouter le domaine Odoo si l'URL est relative if img_url.startswith('/web/'): base_url = "" if hasattr(self.auth, 'url') and self.auth.url: base_url = self.auth.url.split('/xmlrpc')[0] # Extraire le domaine de base img_url = f"{base_url}{img_url if img_url.startswith('/') else '/' + img_url}" image_urls.add(img_url) # 2. Images Odoo internes avec /web/image/ ou /web/content/ odoo_image_urls = re.findall(r'src=["\']((https?://[^"\']+)?/web/(image|content)/[^"\']+)["\']', body) for match in odoo_image_urls: url = match[0] # Ajouter le domaine Odoo si l'URL est relative if not url.startswith(('http://', 'https://')): base_url = "" if hasattr(self.auth, 'url') and self.auth.url: base_url = self.auth.url.split('/xmlrpc')[0] # Extraire le domaine de base url = f"{base_url}{url if url.startswith('/') else '/' + url}" # Ne pas inclure les URLs avec data: if url and not url.startswith('data:'): image_urls.add(url) # 3. Images externes external_image_urls = re.findall(r'src=["\']((https?://[^"\']+)\.(jpe?g|png|gif|svg)([^"\']*)?)["\']', body) for match in external_image_urls: url = match[0] if url and not url.startswith('data:'): image_urls.add(url) return image_urls def save_attachments(self, ticket_id: int, output_dir: str, download: bool = True, messages_data: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]: """ Récupère et sauvegarde toutes les pièces jointes d'un ticket. Args: ticket_id: ID du ticket output_dir: Répertoire de sortie download: Si True, télécharge les pièces jointes, sinon récupère seulement les métadonnées messages_data: Données des messages pour extraire les images intégrées (optionnel) Returns: Liste des informations sur les pièces jointes """ # Récupérer les pièces jointes attachments = self.get_ticket_attachments(ticket_id) if not attachments: logging.info(f"Aucune pièce jointe trouvée pour le ticket {ticket_id}") # Si aucune pièce jointe trouvée mais que nous avons les messages, # on peut quand même chercher des images intégrées attachments_info = [] else: logging.info(f"Traitement de {len(attachments)} pièces jointes pour le ticket {ticket_id}") # Préparer les résultats attachments_info = [] # Télécharger chaque pièce jointe for i, attachment in enumerate(attachments): # Ne pas inclure le contenu binaire dans les métadonnées attachment_meta = {key: value for key, value in attachment.items() if key != "datas"} if download: # Télécharger et sauvegarder la pièce jointe download_result = self.download_attachment(attachment, output_dir) attachment_meta.update({ "download_status": download_result.get("status"), "local_path": download_result.get("file_path", ""), "error": download_result.get("error", "") }) if download_result.get("status") == "success": logging.info(f"Pièce jointe téléchargée: {attachment_meta.get('name')} ({i+1}/{len(attachments)})") else: logging.warning(f"Échec du téléchargement de la pièce jointe: {attachment_meta.get('name')} - {download_result.get('error')}") else: # Seulement récupérer les métadonnées attachment_meta.update({ "download_status": "not_attempted", "local_path": "", "error": "" }) attachments_info.append(attachment_meta) # Extraction des images intégrées si les données des messages sont fournies if messages_data: try: logging.info("Extraction des images intégrées aux messages...") missing_images = self.extract_missing_images(messages_data, output_dir) # Ajouter les images extraites à la liste des pièces jointes for image in missing_images: # Convertir l'ID en chaîne si c'est un entier image_id = image.get("id") if image_id is not None and isinstance(image_id, int): image_id = str(image_id) else: image_id = f"embedded_{len(attachments_info) + 1}" image_info = { "id": image_id, "name": image.get("name", "Sans nom"), "mimetype": image.get("mimetype", "image/jpeg"), "file_size": str(image.get("file_size", 0)), "create_date": None, "creator_name": "Extraction automatique", "download_status": image.get("status"), "local_path": image.get("file_path", ""), "error": image.get("error", ""), "is_embedded_image": True, "source_url": image.get("url", ""), "source": image.get("source", ""), "extraction_date": image.get("extraction_date", ""), "source_message_id": image.get("source_message_id", ""), "message_author": image.get("message_author", "") } attachments_info.append(image_info) logging.info(f"{len(missing_images)} images intégrées extraites et ajoutées aux pièces jointes") except Exception as e: logging.error(f"Erreur lors de l'extraction des images intégrées: {e}") # Sauvegarder les informations sur les pièces jointes attachments_info_path = os.path.join(output_dir, "attachments_info.json") save_json(attachments_info, attachments_info_path) return attachments_info