mirror of
https://github.com/Ladebeze66/odoo_toolkit.git
synced 2025-12-13 09:06:52 +01:00
535 lines
23 KiB
Python
535 lines
23 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import json
|
|
import base64
|
|
import re
|
|
import html
|
|
import argparse
|
|
from typing import Dict, List, Any, Optional
|
|
from bs4 import BeautifulSoup
|
|
import odoorpc
|
|
from datetime import datetime
|
|
|
|
# Configuration Odoo
|
|
ODOO_HOST = os.getenv('ODOO_HOST', 'odoo.cbao.fr')
|
|
ODOO_DB = os.getenv('ODOO_DB', 'production_cbao')
|
|
ODOO_USER = os.getenv('ODOO_USER', 'fernand@cbao.fr')
|
|
ODOO_PASSWORD = os.getenv('ODOO_PASSWORD', 'Lestat66!')
|
|
|
|
# Configuration des répertoires
|
|
EXPORT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "exported_tickets")
|
|
FILTERED_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "filtered_tickets")
|
|
|
|
# Créer les répertoires nécessaires
|
|
os.makedirs(EXPORT_DIR, exist_ok=True)
|
|
os.makedirs(FILTERED_DIR, exist_ok=True)
|
|
|
|
class OdooConnection:
|
|
"""Gère la connexion à l'instance Odoo"""
|
|
|
|
def __init__(self):
|
|
self.odoo = None
|
|
self.connected = False
|
|
|
|
def connect(self):
|
|
"""Établit la connexion à Odoo"""
|
|
try:
|
|
self.odoo = odoorpc.ODOO(ODOO_HOST, port=443, protocol='jsonrpc+ssl')
|
|
print(f"Connexion réussie à {ODOO_HOST}")
|
|
|
|
self.odoo.login(ODOO_DB, ODOO_USER, ODOO_PASSWORD)
|
|
print(f"Authentifié en tant que {ODOO_USER}")
|
|
|
|
self.connected = True
|
|
return True
|
|
except odoorpc.error.RPCError as e:
|
|
print(f"Erreur RPC Odoo : {e}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Erreur inattendue : {e}")
|
|
return False
|
|
|
|
def get_odoo_instance(self):
|
|
"""Retourne l'instance Odoo connectée"""
|
|
if not self.connected:
|
|
self.connect()
|
|
return self.odoo
|
|
|
|
class TicketExtractor:
|
|
"""Classe pour extraire et filtrer les tickets Odoo"""
|
|
|
|
def __init__(self):
|
|
"""Initialise l'extracteur de tickets"""
|
|
self.conn = OdooConnection()
|
|
self.odoo = self.conn.get_odoo_instance()
|
|
self.model_name = "project.task"
|
|
|
|
def _ensure_connection(self):
|
|
"""Vérifie et établit la connexion si nécessaire"""
|
|
if not self.odoo:
|
|
self.odoo = self.conn.get_odoo_instance()
|
|
return self.odoo is not None
|
|
|
|
def _safe_execute(self, model, method, *args):
|
|
"""
|
|
Exécute une méthode Odoo en toute sécurité via odoorpc.
|
|
Vérifie la connexion avant d'exécuter.
|
|
"""
|
|
if not self._ensure_connection():
|
|
print("Connexion Odoo indisponible.")
|
|
return None
|
|
|
|
try:
|
|
return self.odoo.execute(model, method, *args)
|
|
except odoorpc.error.RPCError as e:
|
|
print(f"Erreur RPC lors de '{method}' sur '{model}': {e}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"Erreur inattendue lors de '{method}' sur '{model}': {e}")
|
|
return None
|
|
|
|
def get_ticket_by_code(self, ticket_code: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Récupère un ticket directement par son code.
|
|
|
|
Args:
|
|
ticket_code: Le code du ticket à rechercher
|
|
|
|
Returns:
|
|
Les données du ticket ou None si non trouvé
|
|
"""
|
|
# Rechercher le ticket par code
|
|
domain = [('code', '=', ticket_code)]
|
|
ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 1)
|
|
|
|
if not ticket_ids:
|
|
print(f"Aucun ticket trouvé avec le code '{ticket_code}'")
|
|
return None
|
|
|
|
# Définir les champs à récupérer
|
|
fields_to_read = [
|
|
'id', 'name', 'description', 'code', 'stage_id', 'project_id',
|
|
'partner_id', 'user_id', 'date_start', 'date_end', 'date_assign',
|
|
'date_deadline', 'date_last_stage_update', 'create_date', 'write_date',
|
|
'notes', 'planned_hours', 'remaining_hours', 'effective_hours',
|
|
'total_hours_spent', 'progress', 'priority', 'color', 'email_from',
|
|
'email_cc', 'working_hours_open', 'working_hours_close',
|
|
'working_days_open', 'working_days_close', 'website_message_ids',
|
|
'message_follower_ids', 'message_ids', 'message_main_attachment_id',
|
|
'failed_message_ids', 'rating_ids', 'rating_last_value', 'access_token',
|
|
'activity_ids', 'timesheet_ids', 'milestone_id', 'sale_line_id',
|
|
'sale_order_id', 'billable_type', 'parent_id', 'child_ids'
|
|
]
|
|
|
|
# Récupérer les données du ticket
|
|
ticket_data = self._safe_execute(self.model_name, 'read', ticket_ids[0], fields_to_read)
|
|
|
|
if not ticket_data:
|
|
print(f"Impossible de récupérer les données du ticket {ticket_code}")
|
|
return None
|
|
|
|
return self.resolve_relational_fields(ticket_data[0])
|
|
|
|
def resolve_relational_fields(self, ticket: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Résout les champs relationnels du ticket.
|
|
|
|
Args:
|
|
ticket: Les données du ticket
|
|
|
|
Returns:
|
|
Les données du ticket avec les champs relationnels résolus
|
|
"""
|
|
fields_info = self._safe_execute(self.model_name, 'fields_get', [], ['type'])
|
|
|
|
for field, info in fields_info.items():
|
|
if info.get("type") == "many2one" and isinstance(ticket.get(field), list):
|
|
ticket[f"{field}_value"] = ticket[field][1] if len(ticket[field]) > 1 else None
|
|
|
|
return ticket
|
|
|
|
def is_odoobot_author(self, message: Dict[str, Any]) -> bool:
|
|
"""
|
|
Vérifie si l'auteur du message est OdooBot ou un autre système.
|
|
|
|
Args:
|
|
message: Le message à vérifier
|
|
|
|
Returns:
|
|
True si le message provient d'OdooBot, False sinon
|
|
"""
|
|
# Vérifier le nom de l'auteur
|
|
if 'author_id' in message and isinstance(message['author_id'], list) and len(message['author_id']) > 1:
|
|
author_name = message['author_id'][1].lower()
|
|
if 'odoobot' in author_name or 'bot' in author_name or 'système' in author_name:
|
|
return True
|
|
|
|
# Vérifier le type de message
|
|
if message.get('message_type') == 'notification':
|
|
return True
|
|
|
|
# Vérifier le sous-type du message
|
|
if 'subtype_id' in message and isinstance(message['subtype_id'], list) and len(message['subtype_id']) > 1:
|
|
subtype = message['subtype_id'][1].lower()
|
|
if 'notification' in subtype or 'system' in subtype:
|
|
return True
|
|
|
|
# Vérifier le contenu du message
|
|
if 'body' in message and isinstance(message['body'], str):
|
|
body = message['body'].lower()
|
|
system_patterns = [
|
|
'assigné à', 'étape changée', 'créé automatiquement',
|
|
'assigned to', 'stage changed', 'automatically created',
|
|
'updated', 'mis à jour', 'a modifié', 'changed'
|
|
]
|
|
|
|
for pattern in system_patterns:
|
|
if pattern in body:
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_important_image(self, tag, message_text: str) -> bool:
|
|
"""
|
|
Détermine si une image est importante ou s'il s'agit d'un logo/signature.
|
|
|
|
Args:
|
|
tag: La balise d'image à analyser
|
|
message_text: Le texte complet du message pour contexte
|
|
|
|
Returns:
|
|
True si l'image semble importante, False sinon
|
|
"""
|
|
# Vérifier les attributs de l'image
|
|
src = tag.get('src', '')
|
|
alt = tag.get('alt', '')
|
|
title = tag.get('title', '')
|
|
css_class = tag.get('class', '')
|
|
|
|
# Patterns pour les images inutiles
|
|
useless_img_patterns = [
|
|
'logo', 'signature', 'outlook', 'footer', 'header', 'icon',
|
|
'emoticon', 'emoji', 'cid:', 'pixel', 'spacer', 'vignette',
|
|
'banner', 'separator', 'decoration', 'mail_signature'
|
|
]
|
|
|
|
# Vérifier si c'est une image inutile
|
|
for pattern in useless_img_patterns:
|
|
if (pattern in src.lower() or
|
|
pattern in alt.lower() or
|
|
pattern in title.lower() or
|
|
(css_class and any(pattern in c.lower() for c in css_class if isinstance(c, str)))):
|
|
return False
|
|
|
|
# Vérifier la taille
|
|
width = tag.get('width', '')
|
|
height = tag.get('height', '')
|
|
try:
|
|
width = int(width) if width and str(width).isdigit() else None
|
|
height = int(height) if height and str(height).isdigit() else None
|
|
if width and height and width <= 50 and height <= 50:
|
|
return False
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Vérifier si l'image est mentionnée dans le texte
|
|
image_indicators = [
|
|
'capture', 'screenshot', 'image', 'photo', 'illustration',
|
|
'voir', 'regarder', 'ci-joint', 'écran', 'erreur', 'problème',
|
|
'bug', 'pièce jointe', 'attachment', 'veuillez trouver'
|
|
]
|
|
|
|
for indicator in image_indicators:
|
|
if indicator in message_text.lower():
|
|
return True
|
|
|
|
return True
|
|
|
|
def clean_html(self, html_content: str) -> str:
|
|
"""
|
|
Nettoie le contenu HTML en supprimant toutes les balises mais en préservant le texte.
|
|
Traite spécifiquement les images pour garder uniquement celles pertinentes.
|
|
|
|
Args:
|
|
html_content: Contenu HTML à nettoyer
|
|
|
|
Returns:
|
|
Texte nettoyé sans balises HTML
|
|
"""
|
|
if not html_content:
|
|
return ""
|
|
|
|
# Utiliser BeautifulSoup pour manipuler le HTML
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
# Supprimer les éléments de signature
|
|
signature_elements = [
|
|
'div.signature', '.gmail_signature', '.signature',
|
|
'hr + div', 'hr + p', '.footer', '.mail-signature'
|
|
]
|
|
|
|
for selector in signature_elements:
|
|
for element in soup.select(selector):
|
|
element.decompose()
|
|
|
|
# Supprimer les lignes horizontales
|
|
for hr in soup.find_all('hr'):
|
|
hr.decompose()
|
|
|
|
# Récupérer le texte complet pour analyse
|
|
full_text = soup.get_text(' ', strip=True)
|
|
|
|
# Traiter les images
|
|
for img in soup.find_all('img'):
|
|
if self.is_important_image(img, full_text):
|
|
alt_text = img.get('alt', '') or img.get('title', '') or '[Image importante]'
|
|
img.replace_with(f" [Image: {alt_text}] ")
|
|
else:
|
|
img.decompose()
|
|
|
|
# Traiter les liens vers des pièces jointes
|
|
for a in soup.find_all('a', href=True):
|
|
href = a.get('href', '').lower()
|
|
if 'attachment' in href or 'download' in href or 'file' in href:
|
|
a.replace_with(f" [Pièce jointe: {a.get_text()}] ")
|
|
|
|
# Récupérer le texte sans balises HTML
|
|
text = soup.get_text(separator=' ', strip=True)
|
|
|
|
# Décodage des entités HTML
|
|
text = html.unescape(text)
|
|
|
|
# Nettoyer les espaces multiples
|
|
text = re.sub(r'\s+', ' ', text)
|
|
|
|
# Nettoyer les lignes vides multiples
|
|
text = re.sub(r'\n\s*\n', '\n\n', text)
|
|
|
|
# Supprimer les disclaimers et signatures standards
|
|
footer_patterns = [
|
|
r'Sent from my .*',
|
|
r'Envoyé depuis mon .*',
|
|
r'Ce message .*confidentiel.*',
|
|
r'This email .*confidential.*',
|
|
r'DISCLAIMER.*',
|
|
r'CONFIDENTIAL.*',
|
|
r'CONFIDENTIEL.*',
|
|
r'Le contenu de ce courriel est confidentiel.*',
|
|
r'This message and any attachments.*',
|
|
r'Ce message et ses pièces jointes.*',
|
|
r'AVIS DE CONFIDENTIALITÉ.*',
|
|
r'PRIVACY NOTICE.*'
|
|
]
|
|
|
|
for pattern in footer_patterns:
|
|
text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL)
|
|
|
|
return text.strip()
|
|
|
|
def save_json(self, data: Any, filepath: str) -> None:
|
|
"""
|
|
Sauvegarde des données en JSON.
|
|
|
|
Args:
|
|
data: Les données à sauvegarder
|
|
filepath: Le chemin du fichier
|
|
"""
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=4, ensure_ascii=False)
|
|
|
|
def extract_ticket_data(self, ticket_code: str) -> Optional[str]:
|
|
"""
|
|
Extrait toutes les données d'un ticket et les sauvegarde dans une structure organisée.
|
|
|
|
Args:
|
|
ticket_code: Le code du ticket à extraire
|
|
|
|
Returns:
|
|
Le chemin du dossier créé ou None en cas d'erreur
|
|
"""
|
|
# Récupérer les données du ticket
|
|
ticket = self.get_ticket_by_code(ticket_code)
|
|
if not ticket:
|
|
return None
|
|
|
|
# Créer le répertoire pour ce ticket
|
|
ticket_name = ticket.get('name', 'Sans nom').replace('/', '_').replace('\\', '_')
|
|
ticket_dir = os.path.join(EXPORT_DIR, f"ticket_{ticket.get('id')}_{ticket_name}")
|
|
os.makedirs(ticket_dir, exist_ok=True)
|
|
|
|
# Sauvegarder les données du ticket
|
|
self.save_json(ticket, os.path.join(ticket_dir, "ticket_info.json"))
|
|
|
|
# Récupérer les informations de contact
|
|
if ticket.get('partner_id'):
|
|
partner_id = ticket.get('partner_id')[0] if isinstance(ticket.get('partner_id'), list) else ticket.get('partner_id')
|
|
partner_details = self._safe_execute('res.partner', 'read', [partner_id],
|
|
['name', 'email', 'phone', 'mobile', 'street', 'city', 'zip', 'country_id', 'comment'])
|
|
if partner_details:
|
|
self.save_json(partner_details[0], os.path.join(ticket_dir, "contact_info.json"))
|
|
|
|
# Récupérer les activités
|
|
activity_ids = ticket.get('activity_ids', [])
|
|
if activity_ids:
|
|
activities = self._safe_execute('mail.activity', 'read', activity_ids,
|
|
['id', 'res_id', 'activity_type_id', 'summary', 'note', 'date_deadline', 'user_id', 'create_date'])
|
|
if activities:
|
|
self.save_json(activities, os.path.join(ticket_dir, "activities.json"))
|
|
|
|
# Récupérer les messages
|
|
message_ids = ticket.get('message_ids', [])
|
|
if message_ids:
|
|
messages_dir = os.path.join(ticket_dir, "messages")
|
|
os.makedirs(messages_dir, exist_ok=True)
|
|
|
|
messages = self._safe_execute('mail.message', 'read', message_ids,
|
|
['id', 'body', 'date', 'author_id', 'email_from', 'subject', 'parent_id', 'message_type', 'subtype_id', 'attachment_ids'])
|
|
|
|
if messages:
|
|
# Sauvegarder la collection complète
|
|
self.save_json(messages, os.path.join(ticket_dir, "messages.json"))
|
|
|
|
# Sauvegarder chaque message individuellement
|
|
for message in messages:
|
|
if not self.is_odoobot_author(message):
|
|
message_id = message.get('id', 0)
|
|
message_date = message.get('date', '').replace(':', '-').replace(' ', '_')
|
|
message_path = os.path.join(messages_dir, f"message_{message_id}_{message_date}.json")
|
|
|
|
# Nettoyer le contenu HTML
|
|
if message.get('body'):
|
|
message['body'] = self.clean_html(message['body'])
|
|
|
|
# Récupérer les détails de l'auteur
|
|
if message.get('author_id') and isinstance(message.get('author_id'), list) and len(message.get('author_id')) > 0:
|
|
author_id = message.get('author_id')[0]
|
|
author_details = self._safe_execute('res.partner', 'read', [author_id],
|
|
['name', 'email', 'phone', 'function', 'company_id'])
|
|
if author_details:
|
|
message['author_details'] = author_details[0]
|
|
|
|
# Récupérer les détails du sous-type
|
|
if message.get('subtype_id') and isinstance(message.get('subtype_id'), list) and len(message.get('subtype_id')) > 0:
|
|
subtype_id = message.get('subtype_id')[0]
|
|
subtype_details = self._safe_execute('mail.message.subtype', 'read', [subtype_id],
|
|
['name', 'description', 'default'])
|
|
if subtype_details:
|
|
message['subtype_details'] = subtype_details
|
|
|
|
self.save_json(message, message_path)
|
|
|
|
# Récupérer les suiveurs
|
|
follower_ids = ticket.get('message_follower_ids', [])
|
|
if follower_ids:
|
|
followers = self._safe_execute('mail.followers', 'read', follower_ids,
|
|
['id', 'partner_id', 'name', 'email', 'subtype_ids'])
|
|
if followers:
|
|
# Enrichir les informations des suiveurs
|
|
for follower in followers:
|
|
if follower.get('partner_id') and isinstance(follower.get('partner_id'), list) and len(follower.get('partner_id')) > 0:
|
|
partner_id = follower.get('partner_id')[0]
|
|
partner_details = self._safe_execute('res.partner', 'read', [partner_id],
|
|
['name', 'email', 'phone', 'function', 'company_id'])
|
|
if partner_details:
|
|
follower['partner_details'] = partner_details[0]
|
|
|
|
if follower.get('subtype_ids'):
|
|
subtype_details = self._safe_execute('mail.message.subtype', 'read',
|
|
follower.get('subtype_ids'),
|
|
['name', 'description', 'default'])
|
|
if subtype_details:
|
|
follower['subtype_details'] = subtype_details
|
|
|
|
self.save_json(followers, os.path.join(ticket_dir, "followers.json"))
|
|
|
|
# Récupérer les pièces jointes
|
|
attachment_ids = self._safe_execute('ir.attachment', 'search',
|
|
[('res_model', '=', self.model_name), ('res_id', '=', ticket.get('id'))])
|
|
|
|
if attachment_ids:
|
|
attachments = self._safe_execute('ir.attachment', 'read', attachment_ids,
|
|
['id', 'name', 'datas', 'mimetype', 'create_date', 'create_uid', 'description', 'res_name', 'public', 'type'])
|
|
|
|
if attachments:
|
|
# Sauvegarder les métadonnées des pièces jointes
|
|
attachments_info = [{
|
|
'id': att.get('id'),
|
|
'name': att.get('name'),
|
|
'mimetype': att.get('mimetype'),
|
|
'create_date': att.get('create_date'),
|
|
'description': att.get('description'),
|
|
'res_name': att.get('res_name'),
|
|
'type': att.get('type'),
|
|
'file_path': f"attachments/{att.get('id')}_{att.get('name', '').replace('/', '_')}"
|
|
} for att in attachments]
|
|
|
|
self.save_json(attachments_info, os.path.join(ticket_dir, "attachments_info.json"))
|
|
|
|
# Créer un répertoire pour les pièces jointes
|
|
attachments_dir = os.path.join(ticket_dir, "attachments")
|
|
os.makedirs(attachments_dir, exist_ok=True)
|
|
|
|
# Sauvegarder chaque pièce jointe
|
|
for attachment in attachments:
|
|
attachment_id = attachment.get('id', 0)
|
|
attachment_name = attachment.get('name', f"attachment_{attachment_id}").replace('/', '_')
|
|
attachment_data = attachment.get('datas')
|
|
|
|
if attachment_data:
|
|
try:
|
|
binary_data = base64.b64decode(attachment_data)
|
|
file_path = os.path.join(attachments_dir, f"{attachment_id}_{attachment_name}")
|
|
|
|
with open(file_path, "wb") as f:
|
|
f.write(binary_data)
|
|
except Exception as e:
|
|
print(f"Erreur lors de la sauvegarde de la pièce jointe {attachment_name}: {e}")
|
|
|
|
# Extraire les historiques de timesheet
|
|
timesheet_ids = ticket.get('timesheet_ids', [])
|
|
if timesheet_ids:
|
|
timesheets = self._safe_execute('account.analytic.line', 'read', timesheet_ids,
|
|
['id', 'date', 'user_id', 'name', 'unit_amount', 'employee_id', 'department_id'])
|
|
if timesheets:
|
|
self.save_json(timesheets, os.path.join(ticket_dir, "timesheets.json"))
|
|
|
|
print(f"Extraction terminée. Les fichiers sont disponibles dans: {ticket_dir}")
|
|
return ticket_dir
|
|
|
|
def main():
|
|
"""Point d'entrée principal du script"""
|
|
# Configuration du parser d'arguments
|
|
parser = argparse.ArgumentParser(
|
|
description="Extrait et filtre les données d'un ticket Odoo par son code.",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter
|
|
)
|
|
|
|
parser.add_argument(
|
|
"ticket_code",
|
|
help="Code du ticket à extraire (ex: T12546)",
|
|
type=str
|
|
)
|
|
|
|
# Parser les arguments
|
|
args = parser.parse_args()
|
|
|
|
# Vérifier le format du code
|
|
if not re.match(r'^T\d+$', args.ticket_code):
|
|
print("Erreur: Le code du ticket doit être au format T suivi de chiffres (ex: T12546)")
|
|
return
|
|
|
|
print(f"\nExtraction du ticket {args.ticket_code}...")
|
|
|
|
# Créer l'extracteur et extraire les données
|
|
extractor = TicketExtractor()
|
|
ticket_dir = extractor.extract_ticket_data(args.ticket_code)
|
|
|
|
if ticket_dir:
|
|
print(f"\nLes données du ticket ont été extraites avec succès dans: {ticket_dir}")
|
|
else:
|
|
print("\nL'extraction a échoué. Vérifiez le code du ticket et réessayez.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |