import os import base64 import logging import requests import re from typing import List, Dict, Any, Optional, Set from .auth_manager import AuthManager from core.utils import save_json, normalize_filename from utils.image_extractor.html_image_extractor import extract_images_from_ticket class AttachmentManager: """ Gestionnaire de pièces jointes pour extraire et sauvegarder les fichiers attachés aux tickets. """ def __init__(self, auth: AuthManager): """ Initialise le gestionnaire de pièces jointes. Args: auth: Gestionnaire d'authentification """ self.auth = auth self.model_name = "project.task" self.excluded_mime_types = [] # Types MIME à exclure si nécessaire def get_ticket_attachments(self, ticket_id: int) -> List[Dict[str, Any]]: """ Récupère les pièces jointes associées à un ticket. Args: ticket_id: ID du ticket Returns: Liste des pièces jointes avec leurs métadonnées """ params = { "model": "ir.attachment", "method": "search_read", "args": [[["res_id", "=", ticket_id], ["res_model", "=", self.model_name]]], "kwargs": { "fields": ["id", "name", "mimetype", "file_size", "create_date", "create_uid", "datas", "description", "res_name"] } } attachments = self.auth._rpc_call("/web/dataset/call_kw", params) # Résoudre les informations sur le créateur for attachment in attachments: if "create_uid" in attachment and isinstance(attachment["create_uid"], list) and len(attachment["create_uid"]) >= 2: attachment["creator_name"] = attachment["create_uid"][1] attachment["creator_id"] = attachment["create_uid"][0] elif "create_uid" in attachment and isinstance(attachment["create_uid"], int): # Récupérer le nom du créateur params = { "model": "res.users", "method": "name_get", "args": [[attachment["create_uid"]]], "kwargs": {} } result = self.auth._rpc_call("/web/dataset/call_kw", params) if result and isinstance(result, list) and result[0] and len(result[0]) >= 2: attachment["creator_name"] = result[0][1] attachment["creator_id"] = result[0][0] return attachments if isinstance(attachments, list) else [] def download_attachment(self, attachment: Dict[str, Any], output_dir: str) -> Dict[str, Any]: """ Télécharge et sauvegarde une pièce jointe dans le répertoire spécifié. Args: attachment: Dictionnaire contenant les métadonnées de la pièce jointe output_dir: Répertoire où sauvegarder la pièce jointe Returns: Dictionnaire avec les informations sur le fichier sauvegardé """ result = { "id": attachment.get("id"), "name": attachment.get("name", "Sans nom"), "mimetype": attachment.get("mimetype", "application/octet-stream"), "file_size": attachment.get("file_size", 0), "create_date": attachment.get("create_date"), "creator": attachment.get("creator_name", "Inconnu"), "status": "error", "file_path": "", "error": "" } if not attachment.get("datas"): result["error"] = "Données de pièce jointe manquantes" return result try: # Créer le dossier attachments s'il n'existe pas attachments_dir = os.path.join(output_dir, "attachments") os.makedirs(attachments_dir, exist_ok=True) # Construire un nom de fichier sécurisé safe_filename = normalize_filename(attachment.get("name", f"attachment_{attachment.get('id')}.bin")) file_path = os.path.join(attachments_dir, safe_filename) # Vérifier si un fichier avec le même nom existe déjà if os.path.exists(file_path): base, ext = os.path.splitext(safe_filename) counter = 1 while os.path.exists(file_path): new_filename = f"{base}_{counter}{ext}" file_path = os.path.join(attachments_dir, new_filename) counter += 1 # Décoder et sauvegarder le contenu file_content = base64.b64decode(attachment["datas"]) with open(file_path, "wb") as f: f.write(file_content) result["status"] = "success" result["file_path"] = file_path return result except Exception as e: logging.error(f"Erreur lors du téléchargement de la pièce jointe {attachment.get('name', '')}: {e}") result["error"] = str(e) return result def download_image_from_url(self, url: str, output_dir: str, filename: str = None) -> Dict[str, Any]: """ Télécharge une image à partir d'une URL et la sauvegarde dans le répertoire des pièces jointes. Args: url: URL de l'image à télécharger output_dir: Répertoire de sortie filename: Nom de fichier à utiliser (facultatif) Returns: Dictionnaire avec les informations sur le fichier téléchargé """ result = { "url": url, "status": "error", "file_path": "", "error": "" } try: # Extraire le nom de fichier de l'URL si non fourni if not filename: # Extraire le nom de fichier de l'URL url_path = url.split('?')[0] # Supprimer les paramètres de requête path_parts = url_path.split('/') filename = path_parts[-1] if path_parts else f"image_{hash(url)}.jpg" # Créer le dossier attachments s'il n'existe pas attachments_dir = os.path.join(output_dir, "attachments") os.makedirs(attachments_dir, exist_ok=True) # Construire un nom de fichier sécurisé safe_filename = normalize_filename(filename) file_path = os.path.join(attachments_dir, safe_filename) # Vérifier si un fichier avec le même nom existe déjà if os.path.exists(file_path): base, ext = os.path.splitext(safe_filename) counter = 1 while os.path.exists(file_path): new_filename = f"{base}_{counter}{ext}" file_path = os.path.join(attachments_dir, new_filename) counter += 1 # Télécharger l'image en utilisant la session authentifiée response = requests.get(url, cookies=self.auth.cookies, headers=self.auth.headers, verify=self.auth.verify_ssl) response.raise_for_status() # Sauvegarder l'image with open(file_path, 'wb') as f: f.write(response.content) # Déterminer le type MIME basé sur l'extension _, ext = os.path.splitext(file_path) mimetype = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.svg': 'image/svg+xml' }.get(ext.lower(), 'application/octet-stream') result.update({ "status": "success", "file_path": file_path, "mimetype": mimetype, "file_size": len(response.content), "name": os.path.basename(file_path) }) return result except Exception as e: logging.error(f"Erreur lors du téléchargement de l'image depuis {url}: {e}") result["error"] = str(e) return result def extract_missing_images(self, messages_data: Dict[str, Any], output_dir: str) -> List[Dict[str, Any]]: """ Extrait les images manquantes qui sont intégrées dans les messages HTML mais non attachées au ticket. Args: messages_data: Données des messages du ticket output_dir: Répertoire de sortie Returns: Liste des informations sur les images extraites """ extracted_images = [] # Utiliser l'extracteur d'images HTML pour trouver les images intégrées try: # Vérifier si l'extracteur d'images est disponible image_paths = extract_images_from_ticket(output_dir) logging.info(f"Images trouvées par l'extracteur HTML: {len(image_paths)}") # Chercher aussi les URL d'images dans les messages image_urls = self._extract_image_urls_from_messages(messages_data) logging.info(f"URLs d'images trouvées dans les messages: {len(image_urls)}") # Télécharger les images depuis les URLs for url in image_urls: result = self.download_image_from_url(url, output_dir) if result["status"] == "success": extracted_images.append(result) logging.info(f"Image téléchargée depuis l'URL: {url}") except Exception as e: logging.error(f"Erreur lors de l'extraction des images manquantes: {e}") return extracted_images def _extract_image_urls_from_messages(self, messages_data: Dict[str, Any]) -> Set[str]: """ Extrait les URLs d'images des messages. Args: messages_data: Données des messages du ticket Returns: Ensemble des URLs d'images trouvées """ image_urls = set() # Récupérer la liste des messages messages = messages_data.get("messages", []) if not messages: return image_urls # Parcourir chaque message for message in messages: # Chercher dans body_original s'il existe, sinon dans body body = message.get("body_original", message.get("body", "")) if not body or not isinstance(body, str): continue # Recherche des URLs d'images dans le HTML # 1. Images Odoo internes avec /web/image/ ou /web/content/ odoo_image_urls = re.findall(r'src=["\']((https?://[^"\']+)?/web/(image|content)/[^"\']+)["\']', body) for match in odoo_image_urls: url = match[0] # Ajouter le domaine Odoo si l'URL est relative if not url.startswith(('http://', 'https://')): # Utiliser le domaine de base de l'API Odoo if self.auth.url: base_url = self.auth.url.split('/xmlrpc')[0] # Extraire le domaine de base url = f"{base_url}{url if url.startswith('/') else '/' + url}" # Ne pas inclure les URLs avec data: if url and not url.startswith('data:'): image_urls.add(url) # 2. Images externes external_image_urls = re.findall(r'src=["\']((https?://[^"\']+)\.(jpe?g|png|gif|svg)([^"\']*)?)["\']', body) for match in external_image_urls: url = match[0] if url and not url.startswith('data:'): image_urls.add(url) return image_urls def save_attachments(self, ticket_id: int, output_dir: str, download: bool = True, messages_data: Dict[str, Any] = None) -> List[Dict[str, Any]]: """ Récupère et sauvegarde toutes les pièces jointes d'un ticket. Args: ticket_id: ID du ticket output_dir: Répertoire de sortie download: Si True, télécharge les pièces jointes, sinon récupère seulement les métadonnées messages_data: Données des messages pour extraire les images intégrées (optionnel) Returns: Liste des informations sur les pièces jointes """ # Récupérer les pièces jointes attachments = self.get_ticket_attachments(ticket_id) if not attachments: logging.info(f"Aucune pièce jointe trouvée pour le ticket {ticket_id}") # Si aucune pièce jointe trouvée mais que nous avons les messages, # on peut quand même chercher des images intégrées attachments_info = [] else: logging.info(f"Traitement de {len(attachments)} pièces jointes pour le ticket {ticket_id}") # Préparer les résultats attachments_info = [] # Télécharger chaque pièce jointe for i, attachment in enumerate(attachments): # Ne pas inclure le contenu binaire dans les métadonnées attachment_meta = {key: value for key, value in attachment.items() if key != "datas"} if download: # Télécharger et sauvegarder la pièce jointe download_result = self.download_attachment(attachment, output_dir) attachment_meta.update({ "download_status": download_result.get("status"), "local_path": download_result.get("file_path", ""), "error": download_result.get("error", "") }) if download_result.get("status") == "success": logging.info(f"Pièce jointe téléchargée: {attachment_meta.get('name')} ({i+1}/{len(attachments)})") else: logging.warning(f"Échec du téléchargement de la pièce jointe: {attachment_meta.get('name')} - {download_result.get('error')}") else: # Seulement récupérer les métadonnées attachment_meta.update({ "download_status": "not_attempted", "local_path": "", "error": "" }) attachments_info.append(attachment_meta) # Extraction des images intégrées si les données des messages sont fournies if messages_data: try: logging.info("Extraction des images intégrées aux messages...") missing_images = self.extract_missing_images(messages_data, output_dir) # Ajouter les images extraites à la liste des pièces jointes for image in missing_images: image_info = { "id": f"embedded_{len(attachments_info) + 1}", # Identifiant unique pour l'image intégrée "name": image.get("name", "Sans nom"), "mimetype": image.get("mimetype", "image/jpeg"), "file_size": image.get("file_size", 0), "create_date": None, "creator_name": "Extraction automatique", "download_status": image.get("status"), "local_path": image.get("file_path", ""), "error": image.get("error", ""), "is_embedded_image": True, "source_url": image.get("url", "") } attachments_info.append(image_info) logging.info(f"{len(missing_images)} images intégrées extraites et ajoutées aux pièces jointes") except Exception as e: logging.error(f"Erreur lors de l'extraction des images intégrées: {e}") # Sauvegarder les informations sur les pièces jointes attachments_info_path = os.path.join(output_dir, "attachments_info.json") save_json(attachments_info, attachments_info_path) return attachments_info