#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import json import base64 import re import html import argparse from typing import Dict, List, Any, Optional from bs4 import BeautifulSoup import odoorpc from datetime import datetime # Configuration Odoo ODOO_HOST = os.getenv('ODOO_HOST', 'odoo.cbao.fr') ODOO_DB = os.getenv('ODOO_DB', 'production_cbao') ODOO_USER = os.getenv('ODOO_USER', 'fernand@cbao.fr') ODOO_PASSWORD = os.getenv('ODOO_PASSWORD', 'Lestat66!') # Configuration des répertoires EXPORT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "exported_tickets") FILTERED_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "filtered_tickets") # Créer les répertoires nécessaires os.makedirs(EXPORT_DIR, exist_ok=True) os.makedirs(FILTERED_DIR, exist_ok=True) class OdooConnection: """Gère la connexion à l'instance Odoo""" def __init__(self): self.odoo = None self.connected = False def connect(self): """Établit la connexion à Odoo""" try: self.odoo = odoorpc.ODOO(ODOO_HOST, port=443, protocol='jsonrpc+ssl') print(f"Connexion réussie à {ODOO_HOST}") self.odoo.login(ODOO_DB, ODOO_USER, ODOO_PASSWORD) print(f"Authentifié en tant que {ODOO_USER}") self.connected = True return True except odoorpc.error.RPCError as e: print(f"Erreur RPC Odoo : {e}") return False except Exception as e: print(f"Erreur inattendue : {e}") return False def get_odoo_instance(self): """Retourne l'instance Odoo connectée""" if not self.connected: self.connect() return self.odoo class TicketExtractor: """Classe pour extraire et filtrer les tickets Odoo""" def __init__(self): """Initialise l'extracteur de tickets""" self.conn = OdooConnection() self.odoo = self.conn.get_odoo_instance() self.model_name = "project.task" def _ensure_connection(self): """Vérifie et établit la connexion si nécessaire""" if not self.odoo: self.odoo = self.conn.get_odoo_instance() return self.odoo is not None def _safe_execute(self, model, method, *args): """ Exécute une méthode Odoo en toute sécurité via odoorpc. Vérifie la connexion avant d'exécuter. """ if not self._ensure_connection(): print("Connexion Odoo indisponible.") return None try: return self.odoo.execute(model, method, *args) except odoorpc.error.RPCError as e: print(f"Erreur RPC lors de '{method}' sur '{model}': {e}") return None except Exception as e: print(f"Erreur inattendue lors de '{method}' sur '{model}': {e}") return None def get_ticket_by_code(self, ticket_code: str) -> Optional[Dict[str, Any]]: """ Récupère un ticket directement par son code. Args: ticket_code: Le code du ticket à rechercher Returns: Les données du ticket ou None si non trouvé """ # Rechercher le ticket par code domain = [('code', '=', ticket_code)] ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 1) if not ticket_ids: print(f"Aucun ticket trouvé avec le code '{ticket_code}'") return None # Définir les champs à récupérer fields_to_read = [ 'id', 'name', 'description', 'code', 'stage_id', 'project_id', 'partner_id', 'user_id', 'date_start', 'date_end', 'date_assign', 'date_deadline', 'date_last_stage_update', 'create_date', 'write_date', 'notes', 'planned_hours', 'remaining_hours', 'effective_hours', 'total_hours_spent', 'progress', 'priority', 'color', 'email_from', 'email_cc', 'working_hours_open', 'working_hours_close', 'working_days_open', 'working_days_close', 'website_message_ids', 'message_follower_ids', 'message_ids', 'message_main_attachment_id', 'failed_message_ids', 'rating_ids', 'rating_last_value', 'access_token', 'activity_ids', 'timesheet_ids', 'milestone_id', 'sale_line_id', 'sale_order_id', 'billable_type', 'parent_id', 'child_ids' ] # Récupérer les données du ticket ticket_data = self._safe_execute(self.model_name, 'read', ticket_ids[0], fields_to_read) if not ticket_data: print(f"Impossible de récupérer les données du ticket {ticket_code}") return None return self.resolve_relational_fields(ticket_data[0]) def resolve_relational_fields(self, ticket: Dict[str, Any]) -> Dict[str, Any]: """ Résout les champs relationnels du ticket. Args: ticket: Les données du ticket Returns: Les données du ticket avec les champs relationnels résolus """ fields_info = self._safe_execute(self.model_name, 'fields_get', [], ['type']) for field, info in fields_info.items(): if info.get("type") == "many2one" and isinstance(ticket.get(field), list): ticket[f"{field}_value"] = ticket[field][1] if len(ticket[field]) > 1 else None return ticket def is_odoobot_author(self, message: Dict[str, Any]) -> bool: """ Vérifie si l'auteur du message est OdooBot ou un autre système. Args: message: Le message à vérifier Returns: True si le message provient d'OdooBot, False sinon """ # Vérifier le nom de l'auteur if 'author_id' in message and isinstance(message['author_id'], list) and len(message['author_id']) > 1: author_name = message['author_id'][1].lower() if 'odoobot' in author_name or 'bot' in author_name or 'système' in author_name: return True # Vérifier le type de message if message.get('message_type') == 'notification': return True # Vérifier le sous-type du message if 'subtype_id' in message and isinstance(message['subtype_id'], list) and len(message['subtype_id']) > 1: subtype = message['subtype_id'][1].lower() if 'notification' in subtype or 'system' in subtype: return True # Vérifier le contenu du message if 'body' in message and isinstance(message['body'], str): body = message['body'].lower() system_patterns = [ 'assigné à', 'étape changée', 'créé automatiquement', 'assigned to', 'stage changed', 'automatically created', 'updated', 'mis à jour', 'a modifié', 'changed' ] for pattern in system_patterns: if pattern in body: return True return False def is_important_image(self, tag, message_text: str) -> bool: """ Détermine si une image est importante ou s'il s'agit d'un logo/signature. Args: tag: La balise d'image à analyser message_text: Le texte complet du message pour contexte Returns: True si l'image semble importante, False sinon """ # Vérifier les attributs de l'image src = tag.get('src', '') alt = tag.get('alt', '') title = tag.get('title', '') css_class = tag.get('class', '') # Patterns pour les images inutiles useless_img_patterns = [ 'logo', 'signature', 'outlook', 'footer', 'header', 'icon', 'emoticon', 'emoji', 'cid:', 'pixel', 'spacer', 'vignette', 'banner', 'separator', 'decoration', 'mail_signature' ] # Vérifier si c'est une image inutile for pattern in useless_img_patterns: if (pattern in src.lower() or pattern in alt.lower() or pattern in title.lower() or (css_class and any(pattern in c.lower() for c in css_class if isinstance(c, str)))): return False # Vérifier la taille width = tag.get('width', '') height = tag.get('height', '') try: width = int(width) if width and str(width).isdigit() else None height = int(height) if height and str(height).isdigit() else None if width and height and width <= 50 and height <= 50: return False except (ValueError, TypeError): pass # Vérifier si l'image est mentionnée dans le texte image_indicators = [ 'capture', 'screenshot', 'image', 'photo', 'illustration', 'voir', 'regarder', 'ci-joint', 'écran', 'erreur', 'problème', 'bug', 'pièce jointe', 'attachment', 'veuillez trouver' ] for indicator in image_indicators: if indicator in message_text.lower(): return True return True def clean_html(self, html_content: str) -> str: """ Nettoie le contenu HTML en supprimant toutes les balises mais en préservant le texte. Traite spécifiquement les images pour garder uniquement celles pertinentes. Args: html_content: Contenu HTML à nettoyer Returns: Texte nettoyé sans balises HTML """ if not html_content: return "" # Utiliser BeautifulSoup pour manipuler le HTML soup = BeautifulSoup(html_content, 'html.parser') # Supprimer les éléments de signature signature_elements = [ 'div.signature', '.gmail_signature', '.signature', 'hr + div', 'hr + p', '.footer', '.mail-signature' ] for selector in signature_elements: for element in soup.select(selector): element.decompose() # Supprimer les lignes horizontales for hr in soup.find_all('hr'): hr.decompose() # Récupérer le texte complet pour analyse full_text = soup.get_text(' ', strip=True) # Traiter les images for img in soup.find_all('img'): if self.is_important_image(img, full_text): alt_text = img.get('alt', '') or img.get('title', '') or '[Image importante]' img.replace_with(f" [Image: {alt_text}] ") else: img.decompose() # Traiter les liens vers des pièces jointes for a in soup.find_all('a', href=True): href = a.get('href', '').lower() if 'attachment' in href or 'download' in href or 'file' in href: a.replace_with(f" [Pièce jointe: {a.get_text()}] ") # Récupérer le texte sans balises HTML text = soup.get_text(separator=' ', strip=True) # Décodage des entités HTML text = html.unescape(text) # Nettoyer les espaces multiples text = re.sub(r'\s+', ' ', text) # Nettoyer les lignes vides multiples text = re.sub(r'\n\s*\n', '\n\n', text) # Supprimer les disclaimers et signatures standards footer_patterns = [ r'Sent from my .*', r'Envoyé depuis mon .*', r'Ce message .*confidentiel.*', r'This email .*confidential.*', r'DISCLAIMER.*', r'CONFIDENTIAL.*', r'CONFIDENTIEL.*', r'Le contenu de ce courriel est confidentiel.*', r'This message and any attachments.*', r'Ce message et ses pièces jointes.*', r'AVIS DE CONFIDENTIALITÉ.*', r'PRIVACY NOTICE.*' ] for pattern in footer_patterns: text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL) return text.strip() def save_json(self, data: Any, filepath: str) -> None: """ Sauvegarde des données en JSON. Args: data: Les données à sauvegarder filepath: Le chemin du fichier """ with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4, ensure_ascii=False) def extract_ticket_data(self, ticket_code: str) -> Optional[str]: """ Extrait toutes les données d'un ticket et les sauvegarde dans une structure organisée. Args: ticket_code: Le code du ticket à extraire Returns: Le chemin du dossier créé ou None en cas d'erreur """ # Récupérer les données du ticket ticket = self.get_ticket_by_code(ticket_code) if not ticket: return None # Créer le répertoire pour ce ticket ticket_name = ticket.get('name', 'Sans nom').replace('/', '_').replace('\\', '_') ticket_dir = os.path.join(EXPORT_DIR, f"ticket_{ticket.get('id')}_{ticket_name}") os.makedirs(ticket_dir, exist_ok=True) # Sauvegarder les données du ticket self.save_json(ticket, os.path.join(ticket_dir, "ticket_info.json")) # Récupérer les informations de contact if ticket.get('partner_id'): partner_id = ticket.get('partner_id')[0] if isinstance(ticket.get('partner_id'), list) else ticket.get('partner_id') partner_details = self._safe_execute('res.partner', 'read', [partner_id], ['name', 'email', 'phone', 'mobile', 'street', 'city', 'zip', 'country_id', 'comment']) if partner_details: self.save_json(partner_details[0], os.path.join(ticket_dir, "contact_info.json")) # Récupérer les activités activity_ids = ticket.get('activity_ids', []) if activity_ids: activities = self._safe_execute('mail.activity', 'read', activity_ids, ['id', 'res_id', 'activity_type_id', 'summary', 'note', 'date_deadline', 'user_id', 'create_date']) if activities: self.save_json(activities, os.path.join(ticket_dir, "activities.json")) # Récupérer les messages message_ids = ticket.get('message_ids', []) if message_ids: messages_dir = os.path.join(ticket_dir, "messages") os.makedirs(messages_dir, exist_ok=True) messages = self._safe_execute('mail.message', 'read', message_ids, ['id', 'body', 'date', 'author_id', 'email_from', 'subject', 'parent_id', 'message_type', 'subtype_id', 'attachment_ids']) if messages: # Sauvegarder la collection complète self.save_json(messages, os.path.join(ticket_dir, "messages.json")) # Sauvegarder chaque message individuellement for message in messages: if not self.is_odoobot_author(message): message_id = message.get('id', 0) message_date = message.get('date', '').replace(':', '-').replace(' ', '_') message_path = os.path.join(messages_dir, f"message_{message_id}_{message_date}.json") # Nettoyer le contenu HTML if message.get('body'): message['body'] = self.clean_html(message['body']) # Récupérer les détails de l'auteur if message.get('author_id') and isinstance(message.get('author_id'), list) and len(message.get('author_id')) > 0: author_id = message.get('author_id')[0] author_details = self._safe_execute('res.partner', 'read', [author_id], ['name', 'email', 'phone', 'function', 'company_id']) if author_details: message['author_details'] = author_details[0] # Récupérer les détails du sous-type if message.get('subtype_id') and isinstance(message.get('subtype_id'), list) and len(message.get('subtype_id')) > 0: subtype_id = message.get('subtype_id')[0] subtype_details = self._safe_execute('mail.message.subtype', 'read', [subtype_id], ['name', 'description', 'default']) if subtype_details: message['subtype_details'] = subtype_details self.save_json(message, message_path) # Récupérer les suiveurs follower_ids = ticket.get('message_follower_ids', []) if follower_ids: followers = self._safe_execute('mail.followers', 'read', follower_ids, ['id', 'partner_id', 'name', 'email', 'subtype_ids']) if followers: # Enrichir les informations des suiveurs for follower in followers: if follower.get('partner_id') and isinstance(follower.get('partner_id'), list) and len(follower.get('partner_id')) > 0: partner_id = follower.get('partner_id')[0] partner_details = self._safe_execute('res.partner', 'read', [partner_id], ['name', 'email', 'phone', 'function', 'company_id']) if partner_details: follower['partner_details'] = partner_details[0] if follower.get('subtype_ids'): subtype_details = self._safe_execute('mail.message.subtype', 'read', follower.get('subtype_ids'), ['name', 'description', 'default']) if subtype_details: follower['subtype_details'] = subtype_details self.save_json(followers, os.path.join(ticket_dir, "followers.json")) # Récupérer les pièces jointes attachment_ids = self._safe_execute('ir.attachment', 'search', [('res_model', '=', self.model_name), ('res_id', '=', ticket.get('id'))]) if attachment_ids: attachments = self._safe_execute('ir.attachment', 'read', attachment_ids, ['id', 'name', 'datas', 'mimetype', 'create_date', 'create_uid', 'description', 'res_name', 'public', 'type']) if attachments: # Sauvegarder les métadonnées des pièces jointes attachments_info = [{ 'id': att.get('id'), 'name': att.get('name'), 'mimetype': att.get('mimetype'), 'create_date': att.get('create_date'), 'description': att.get('description'), 'res_name': att.get('res_name'), 'type': att.get('type'), 'file_path': f"attachments/{att.get('id')}_{att.get('name', '').replace('/', '_')}" } for att in attachments] self.save_json(attachments_info, os.path.join(ticket_dir, "attachments_info.json")) # Créer un répertoire pour les pièces jointes attachments_dir = os.path.join(ticket_dir, "attachments") os.makedirs(attachments_dir, exist_ok=True) # Sauvegarder chaque pièce jointe for attachment in attachments: attachment_id = attachment.get('id', 0) attachment_name = attachment.get('name', f"attachment_{attachment_id}").replace('/', '_') attachment_data = attachment.get('datas') if attachment_data: try: binary_data = base64.b64decode(attachment_data) file_path = os.path.join(attachments_dir, f"{attachment_id}_{attachment_name}") with open(file_path, "wb") as f: f.write(binary_data) except Exception as e: print(f"Erreur lors de la sauvegarde de la pièce jointe {attachment_name}: {e}") # Extraire les historiques de timesheet timesheet_ids = ticket.get('timesheet_ids', []) if timesheet_ids: timesheets = self._safe_execute('account.analytic.line', 'read', timesheet_ids, ['id', 'date', 'user_id', 'name', 'unit_amount', 'employee_id', 'department_id']) if timesheets: self.save_json(timesheets, os.path.join(ticket_dir, "timesheets.json")) print(f"Extraction terminée. Les fichiers sont disponibles dans: {ticket_dir}") return ticket_dir def main(): """Point d'entrée principal du script""" # Configuration du parser d'arguments parser = argparse.ArgumentParser( description="Extrait et filtre les données d'un ticket Odoo par son code.", formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( "ticket_code", help="Code du ticket à extraire (ex: T12546)", type=str ) # Parser les arguments args = parser.parse_args() # Vérifier le format du code if not re.match(r'^T\d+$', args.ticket_code): print("Erreur: Le code du ticket doit être au format T suivi de chiffres (ex: T12546)") return print(f"\nExtraction du ticket {args.ticket_code}...") # Créer l'extracteur et extraire les données extractor = TicketExtractor() ticket_dir = extractor.extract_ticket_data(args.ticket_code) if ticket_dir: print(f"\nLes données du ticket ont été extraites avec succès dans: {ticket_dir}") else: print("\nL'extraction a échoué. Vérifiez le code du ticket et réessayez.") if __name__ == "__main__": main()