mirror of
https://github.com/Ladebeze66/odoo_toolkit.git
synced 2025-12-13 10:46:52 +01:00
456 lines
23 KiB
Python
456 lines
23 KiB
Python
from odoo_connection import OdooConnection
|
|
import os
|
|
import json
|
|
import base64
|
|
from utils import print_error
|
|
from config import EXPORT_DIR
|
|
|
|
class TicketManager:
|
|
"""Gestionnaire de tickets simplifié avec seulement les fonctionnalités essentielles"""
|
|
|
|
def __init__(self):
|
|
"""Initialise le gestionnaire de tickets"""
|
|
self.conn = OdooConnection()
|
|
self.odoo = self.conn.get_odoo_instance()
|
|
self.model_name = "project.task"
|
|
|
|
def _ensure_connection(self):
|
|
"""Vérifie et établit la onnexion si nécessaire"""
|
|
if not self.odoo:
|
|
self.odoo = self.conn.get_odoo_instance()
|
|
return self.odoo is not None
|
|
|
|
def _safe_execute(self, model, method, *args):
|
|
"""
|
|
Exécute une méthode Odoo en toute sécurité via odoorpc.
|
|
Vérifie la connexion avant d'exécuter.
|
|
"""
|
|
# Vérifier que la connexion est bien établie
|
|
if not self._ensure_connection():
|
|
print_error("Connexion Odoo indisponible.")
|
|
return None
|
|
|
|
try:
|
|
# Exécuter la méthode sur le modèle via OdooRPC
|
|
return self.odoo.execute(model, method, *args) # type: ignore
|
|
|
|
except odoorpc.error.RPCError as e: # type: ignore
|
|
print_error(f" Erreur RPC lors de '{method}' sur '{model}': {e}")
|
|
return None
|
|
except Exception as e:
|
|
print_error(f" Erreur inattendue lors de '{method}' sur '{model}': {e}")
|
|
return None
|
|
|
|
def get_model_fields(self, model_name):
|
|
"""Récupére tous les champs disponibles pour un modèle donné, en filtrant ceux qui ne sont pas exploitables"""
|
|
fields_info = self._safe_execute(model_name, 'fields_get', [], ['name', 'type'])
|
|
if not fields_info:
|
|
print_error(f"Impossible de récupérer les champs pour {model_name}")
|
|
return []
|
|
|
|
#On filtre les champs qui ne sont pas exploitables
|
|
invalid_types = ['many2many']
|
|
valid_fields = [field for field, info in fields_info.items() if info.get("type") not in invalid_types]
|
|
return valid_fields #Retourne la liste des champs exploitables
|
|
|
|
def save_raw_ticket_data(self, ticket_data, filename="raw_ticket_data.json"):
|
|
"""Sauvegarde les données brutes du ticket dans un fichier JSON"""
|
|
file_path = os.path.join(EXPORT_DIR, filename)
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
|
json.dump(ticket_data, f, indent=4, ensure_ascii=False)
|
|
print(f"Données brutes du ticket sauvegardées dans : {file_path}")
|
|
|
|
def get_all_models(self):
|
|
"""Récupérer et sauvegarder tous les modèles disponibles"""
|
|
models = self._safe_execute('ir.model', 'search_read', [], ['model', 'name'] )
|
|
if not models:
|
|
print_error("Impossible de récupérer la liste des modèles.")
|
|
return None
|
|
|
|
#Convertir en dictionnaire {nom_du_modèle: description}
|
|
models_dict = {model['model']: model['name'] for model in models}
|
|
|
|
#Sauvegarder dans un fichier JSON
|
|
self.save_raw_ticket_data(models_dict, "all_models.json")
|
|
print(f"Liste des modèles sauvegardée dans 'available_models.json")
|
|
return models_dict
|
|
|
|
def get_model_fields_with_types(self, model_name):
|
|
"""Récupère et sauvegarde les champs d'un modèle avec tous leurs attributs"""
|
|
# Récupérer tous les attributs disponibles pour chaque champ
|
|
fields_info = self._safe_execute(model_name, 'fields_get', [])
|
|
if not fields_info:
|
|
print_error(f"Impossible de récupérer les champs pour {model_name}")
|
|
return {}
|
|
|
|
# Sauvegarde en JSON (tous les attributs sont conservés)
|
|
self.save_raw_ticket_data(fields_info, f"fields_{model_name}_complete.json")
|
|
print(f"Liste complète des champs du modèle '{model_name}' sauvegardée dans 'fields_{model_name}_complete.json'")
|
|
|
|
# Pour la compatibilité, construire aussi le dictionnaire simplifié
|
|
fields_dict = {
|
|
field: {
|
|
"type": info.get("type", "unknown"),
|
|
"relation": info.get("relation", None),
|
|
"string": info.get("string", field),
|
|
"help": info.get("help", ""),
|
|
"required": info.get("required", False),
|
|
"readonly": info.get("readonly", False),
|
|
"selection": info.get("selection", []) if info.get("type") == "selection" else None
|
|
}
|
|
for field, info in fields_info.items()
|
|
}
|
|
|
|
# Sauvegarde en JSON
|
|
self.save_raw_ticket_data(fields_dict, f"fields_{model_name}.json")
|
|
print(f"Liste des champs du modèle '{model_name}' sauvegardée dans 'fields_{model_name}.json'")
|
|
return fields_dict
|
|
|
|
def get_ticket_by_id(self, ticket_id):
|
|
"""Récupère les détails d'un ticket par son ID et exclut dynamiquement les champs invalides"""
|
|
fields_to_read = self.get_model_fields(self.model_name) # Récupère tous les champs
|
|
ticket_data = self._safe_execute(self.model_name, 'read', [ticket_id], fields_to_read)
|
|
|
|
if ticket_data:
|
|
ticket = self.resolve_relational_fields(ticket_data[0])
|
|
self.save_raw_ticket_data(ticket, f"ticket_{ticket_id}_raw.json")
|
|
return ticket
|
|
|
|
print_error(f"Aucun ticket trouvé avec l'ID {ticket_id}")
|
|
return None
|
|
|
|
def resolve_relational_fields(self, ticket):
|
|
"""Ajoute les valeurs des champs relationnels"""
|
|
fields_info = self._safe_execute(self.model_name, 'fields_get', [], ['type'])
|
|
|
|
for field, info in fields_info.items(): # type: ignore
|
|
if info.get("type") == "many2one" and isinstance(ticket.get(field), list):
|
|
ticket[f"{field}_value"] = ticket[field][1] if len(ticket[field]) > 1 else None #Ajoute la valeur lisible en plus du ID
|
|
return ticket
|
|
|
|
|
|
def get_ticket_by_code(self, ticket_code):
|
|
"""Récupérer un ticket via son code"""
|
|
domain = [('code', '=', ticket_code)]
|
|
ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 1)
|
|
if not ticket_ids:
|
|
return None
|
|
|
|
# Sauvegarde des données brutes pour analyse
|
|
self.save_raw_ticket_data(ticket_ids, f"ticket_{ticket_code}_raw.json")
|
|
|
|
return self.get_ticket_by_id(ticket_ids[0])
|
|
|
|
def get_available_projects(self):
|
|
"""Retourne la liste des projets disponibles"""
|
|
projects = self._safe_execute('project.project', 'search_read', [], ['id', 'name'])
|
|
if not projects:
|
|
print_error("Aucun projet trouvé.")
|
|
return {}
|
|
return {proj['id']: proj['name'] for proj in projects}
|
|
|
|
def export_tickets_by_project_and_stage(self, project_id=None, stage_id=None):
|
|
"""
|
|
Exporte les tickets selon des critères de projet et d'étape
|
|
|
|
Args:
|
|
project_id: ID du projet à filtrer (optionnel)
|
|
stage_id: ID de l'étape à filtrer (optionnel)
|
|
"""
|
|
# Vérifier la connexion Odoo
|
|
if not self._ensure_connection():
|
|
print_error("Connexion Odoo indisponible.")
|
|
return
|
|
|
|
# Construire le domaine de recherche en fonction des paramètres
|
|
domain = []
|
|
if project_id:
|
|
domain.append(('project_id', '=', project_id))
|
|
if stage_id:
|
|
domain.append(('stage_id', '=', stage_id))
|
|
|
|
# Récupérer les IDs des tickets correspondant aux critères
|
|
ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 1000)
|
|
if not ticket_ids:
|
|
filters = []
|
|
if project_id:
|
|
project_name = self._safe_execute('project.project', 'read', [project_id], ['name'])
|
|
project_name = project_name[0]['name'] if project_name else str(project_id)
|
|
filters.append(f"projet '{project_name}'")
|
|
if stage_id:
|
|
stage_name = self._safe_execute('project.task.type', 'read', [stage_id], ['name'])
|
|
stage_name = stage_name[0]['name'] if stage_name else str(stage_id)
|
|
filters.append(f"étape '{stage_name}'")
|
|
|
|
filter_text = " et ".join(filters) if filters else "critères spécifiés"
|
|
print_error(f"Aucun ticket trouvé pour les {filter_text}.")
|
|
return
|
|
|
|
# Récupérer les détails des tickets sans appeler get_ticket_by_id
|
|
# pour éviter la création de fichiers individuels
|
|
fields_to_read = self.get_model_fields(self.model_name)
|
|
all_tickets_data = self._safe_execute(self.model_name, 'read', ticket_ids, fields_to_read)
|
|
|
|
if not all_tickets_data:
|
|
print_error("Impossible de récupérer les détails des tickets.")
|
|
return
|
|
|
|
# Organisation des tickets par projet et par étape
|
|
tickets_by_project_stage = {}
|
|
|
|
for ticket_data in all_tickets_data:
|
|
# Résoudre les champs relationnels
|
|
ticket = self.resolve_relational_fields(ticket_data)
|
|
|
|
# Extraire les informations du projet
|
|
project_data = ticket.get('project_id', [0, "Sans projet"])
|
|
project_id_value = project_data[0] if isinstance(project_data, list) and len(project_data) > 0 else 0
|
|
project_name = project_data[1] if isinstance(project_data, list) and len(project_data) > 1 else "Sans projet"
|
|
|
|
# Extraire les informations de l'étape
|
|
stage_data = ticket.get('stage_id', [0, "Non classé"])
|
|
stage_id_value = stage_data[0] if isinstance(stage_data, list) and len(stage_data) > 0 else 0
|
|
stage_name = stage_data[1] if isinstance(stage_data, list) and len(stage_data) > 1 else "Non classé"
|
|
|
|
# Clés pour l'organisation des dossiers
|
|
project_key = f"{project_id_value}_{project_name}"
|
|
stage_key = f"{stage_id_value}_{stage_name}"
|
|
|
|
# Organiser la structure de données
|
|
if project_key not in tickets_by_project_stage:
|
|
tickets_by_project_stage[project_key] = {}
|
|
|
|
if stage_key not in tickets_by_project_stage[project_key]:
|
|
tickets_by_project_stage[project_key][stage_key] = {}
|
|
|
|
# Utiliser l'ID du ticket comme clé pour éviter les doublons
|
|
ticket_id = ticket.get('id', 0)
|
|
tickets_by_project_stage[project_key][stage_key][str(ticket_id)] = ticket
|
|
|
|
# Création des répertoires d'exportation
|
|
export_base_dir = os.path.join(EXPORT_DIR, "tickets_export")
|
|
os.makedirs(export_base_dir, exist_ok=True)
|
|
|
|
# Sauvegarder les tickets un par un dans des fichiers individuels
|
|
# organisés par répertoires de projet et d'étape
|
|
for project_key, stages in tickets_by_project_stage.items():
|
|
project_dir = os.path.join(export_base_dir, project_key)
|
|
os.makedirs(project_dir, exist_ok=True)
|
|
|
|
for stage_key, tickets in stages.items():
|
|
stage_dir = os.path.join(project_dir, stage_key)
|
|
os.makedirs(stage_dir, exist_ok=True)
|
|
|
|
# Créer un index de tous les tickets dans l'étape
|
|
tickets_index = {}
|
|
for ticket_id, ticket in tickets.items():
|
|
ticket_name = ticket.get('name', 'Sans nom')
|
|
tickets_index[ticket_id] = {
|
|
"name": ticket_name,
|
|
"code": ticket.get('code', ''),
|
|
"filename": f"ticket_{ticket_id}.json"
|
|
}
|
|
|
|
# Sauvegarder l'index des tickets
|
|
index_path = os.path.join(stage_dir, "index.json")
|
|
with open(index_path, "w", encoding="utf-8") as f:
|
|
json.dump(tickets_index, f, indent=4, ensure_ascii=False)
|
|
|
|
# Sauvegarder chaque ticket dans un fichier séparé
|
|
for ticket_id, ticket in tickets.items():
|
|
ticket_path = os.path.join(stage_dir, f"ticket_{ticket_id}.json")
|
|
with open(ticket_path, "w", encoding="utf-8") as f:
|
|
json.dump(ticket, f, indent=4, ensure_ascii=False)
|
|
|
|
print(f"Sauvegarde de {len(tickets)} tickets pour le projet '{project_key}', étape '{stage_key}'")
|
|
|
|
print(f"Exportation terminée. Les fichiers sont organisés dans : {export_base_dir}/")
|
|
|
|
def extract_ticket_attachments_and_messages(self, ticket_id):
|
|
"""
|
|
Extrait toutes les pièces jointes et messages d'un ticket
|
|
et les sauvegarde dans un répertoire dédié.
|
|
|
|
Args:
|
|
ticket_id: ID du ticket à extraire
|
|
"""
|
|
# Récupérer les informations du ticket
|
|
ticket = self.get_ticket_by_id(ticket_id)
|
|
if not ticket:
|
|
print_error(f"Impossible de trouver le ticket avec l'ID {ticket_id}")
|
|
return
|
|
|
|
# Créer un répertoire pour ce ticket
|
|
ticket_name = ticket.get('name', 'Sans nom').replace('/', '_').replace('\\', '_')
|
|
ticket_dir = os.path.join(EXPORT_DIR, f"ticket_{ticket_id}_{ticket_name}")
|
|
os.makedirs(ticket_dir, exist_ok=True)
|
|
|
|
# Sauvegarder les données du ticket
|
|
self.save_raw_ticket_data(ticket, os.path.join(ticket_dir, "ticket_info.json"))
|
|
|
|
# Récupérer les informations de contact supplémentaires si disponibles
|
|
contact_info = {}
|
|
if ticket.get('partner_id'):
|
|
partner_id = ticket.get('partner_id')[0] if isinstance(ticket.get('partner_id'), list) else ticket.get('partner_id')
|
|
partner_details = self._safe_execute('res.partner', 'read', [partner_id],
|
|
['name', 'email', 'phone', 'mobile', 'street', 'city', 'zip', 'country_id', 'comment'])
|
|
if partner_details:
|
|
contact_info = partner_details[0]
|
|
self.save_raw_ticket_data(contact_info, os.path.join(ticket_dir, "contact_info.json"))
|
|
print(f"Informations de contact extraites pour le partenaire {partner_id}")
|
|
|
|
# Récupérer les activités liées au ticket
|
|
activity_ids = ticket.get('activity_ids', [])
|
|
if activity_ids:
|
|
activities = self._safe_execute('mail.activity', 'read', activity_ids,
|
|
['id', 'res_id', 'activity_type_id', 'summary', 'note', 'date_deadline', 'user_id', 'create_date'])
|
|
if activities:
|
|
self.save_raw_ticket_data(activities, os.path.join(ticket_dir, "activities.json"))
|
|
print(f"{len(activities)} activités extraites pour le ticket {ticket_id}")
|
|
|
|
# Récupérer les messages (discussions)
|
|
message_ids = ticket.get('message_ids', [])
|
|
if message_ids:
|
|
# Récupérer tous les messages avec leurs détails
|
|
messages = self._safe_execute('mail.message', 'read', message_ids,
|
|
['id', 'body', 'date', 'author_id', 'email_from', 'subject', 'parent_id', 'message_type', 'subtype_id', 'attachment_ids'])
|
|
|
|
if messages:
|
|
# Sauvegarder tous les messages dans un fichier
|
|
self.save_raw_ticket_data(messages, os.path.join(ticket_dir, "messages.json"))
|
|
|
|
# Organiser les messages par fil de discussion
|
|
messages_by_thread = {}
|
|
for message in messages:
|
|
parent_id = message.get('parent_id', False)
|
|
parent_id = parent_id[0] if isinstance(parent_id, list) and len(parent_id) > 0 else False
|
|
|
|
if not parent_id: # Message principal
|
|
thread_id = message['id']
|
|
if thread_id not in messages_by_thread:
|
|
messages_by_thread[thread_id] = {
|
|
'main_message': message,
|
|
'replies': []
|
|
}
|
|
else: # Réponse à un message
|
|
if parent_id not in messages_by_thread:
|
|
messages_by_thread[parent_id] = {
|
|
'main_message': None,
|
|
'replies': []
|
|
}
|
|
messages_by_thread[parent_id]['replies'].append(message)
|
|
|
|
# Sauvegarder les fils de discussion
|
|
self.save_raw_ticket_data(messages_by_thread, os.path.join(ticket_dir, "message_threads.json"))
|
|
|
|
# Sauvegarder chaque message dans un fichier séparé pour une meilleure lisibilité
|
|
messages_dir = os.path.join(ticket_dir, "messages")
|
|
os.makedirs(messages_dir, exist_ok=True)
|
|
|
|
for message in messages:
|
|
message_id = message.get('id', 0)
|
|
message_date = message.get('date', '').replace(':', '-').replace(' ', '_')
|
|
message_path = os.path.join(messages_dir, f"message_{message_id}_{message_date}.json")
|
|
|
|
# Récupérer les détails de l'auteur si disponible
|
|
if message.get('author_id') and isinstance(message.get('author_id'), list) and len(message.get('author_id')) > 0:
|
|
author_id = message.get('author_id')[0]
|
|
author_details = self._safe_execute('res.partner', 'read', [author_id], ['name', 'email', 'phone', 'function', 'company_id'])
|
|
if author_details:
|
|
message['author_details'] = author_details[0]
|
|
|
|
# Récupérer les détails du sous-type si disponible
|
|
if message.get('subtype_id') and isinstance(message.get('subtype_id'), list) and len(message.get('subtype_id')) > 0:
|
|
subtype_id = message.get('subtype_id')[0]
|
|
subtype_details = self._safe_execute('mail.message.subtype', 'read', [subtype_id], ['name', 'description', 'default'])
|
|
if subtype_details:
|
|
message['subtype_details'] = subtype_details
|
|
|
|
with open(message_path, "w", encoding="utf-8") as f:
|
|
json.dump(message, f, indent=4, ensure_ascii=False)
|
|
|
|
print(f"{len(messages)} messages extraits pour le ticket {ticket_id}")
|
|
|
|
# Récupérer les suiveurs du ticket
|
|
follower_ids = ticket.get('message_follower_ids', [])
|
|
if follower_ids:
|
|
followers = self._safe_execute('mail.followers', 'read', follower_ids, ['id', 'partner_id', 'name', 'email', 'subtype_ids'])
|
|
if followers:
|
|
# Enrichir les informations des suiveurs
|
|
for follower in followers:
|
|
if follower.get('partner_id') and isinstance(follower.get('partner_id'), list) and len(follower.get('partner_id')) > 0:
|
|
partner_id = follower.get('partner_id')[0]
|
|
partner_details = self._safe_execute('res.partner', 'read', [partner_id], ['name', 'email', 'phone', 'function', 'company_id'])
|
|
if partner_details:
|
|
follower['partner_details'] = partner_details[0]
|
|
|
|
# Récupérer les détails des sous-types
|
|
if follower.get('subtype_ids'):
|
|
subtype_details = self._safe_execute('mail.message.subtype', 'read', follower.get('subtype_ids'), ['name', 'description', 'default'])
|
|
if subtype_details:
|
|
follower['subtype_details'] = subtype_details
|
|
|
|
self.save_raw_ticket_data(followers, os.path.join(ticket_dir, "followers.json"))
|
|
print(f"{len(followers)} suiveurs extraits pour le ticket {ticket_id}")
|
|
|
|
# Récupérer les pièces jointes
|
|
attachment_ids = self._safe_execute('ir.attachment', 'search',
|
|
[('res_model', '=', self.model_name), ('res_id', '=', ticket_id)])
|
|
|
|
if attachment_ids:
|
|
attachments = self._safe_execute('ir.attachment', 'read', attachment_ids,
|
|
['id', 'name', 'datas', 'mimetype', 'create_date', 'create_uid', 'description', 'res_name', 'public', 'type'])
|
|
|
|
if attachments:
|
|
# Sauvegarder les métadonnées des pièces jointes
|
|
attachments_info = [{
|
|
'id': att.get('id'),
|
|
'name': att.get('name'),
|
|
'mimetype': att.get('mimetype'),
|
|
'create_date': att.get('create_date'),
|
|
'description': att.get('description'),
|
|
'res_name': att.get('res_name'),
|
|
'type': att.get('type'),
|
|
'file_path': f"attachments/{att.get('id')}_{att.get('name', '').replace('/', '_')}"
|
|
} for att in attachments]
|
|
|
|
self.save_raw_ticket_data(attachments_info, os.path.join(ticket_dir, "attachments_info.json"))
|
|
|
|
# Créer un répertoire pour les pièces jointes
|
|
attachments_dir = os.path.join(ticket_dir, "attachments")
|
|
os.makedirs(attachments_dir, exist_ok=True)
|
|
|
|
# Sauvegarder chaque pièce jointe
|
|
for attachment in attachments:
|
|
attachment_id = attachment.get('id', 0)
|
|
attachment_name = attachment.get('name', f"attachment_{attachment_id}").replace('/', '_')
|
|
attachment_data = attachment.get('datas')
|
|
|
|
if attachment_data:
|
|
try:
|
|
# Décoder les données base64
|
|
binary_data = base64.b64decode(attachment_data)
|
|
file_path = os.path.join(attachments_dir, f"{attachment_id}_{attachment_name}")
|
|
|
|
with open(file_path, "wb") as f:
|
|
f.write(binary_data)
|
|
|
|
print(f"Pièce jointe {attachment_name} sauvegardée dans {file_path}")
|
|
except Exception as e:
|
|
print_error(f"Erreur lors de la sauvegarde de la pièce jointe {attachment_name}: {e}")
|
|
|
|
print(f"{len(attachments)} pièces jointes extraites pour le ticket {ticket_id}")
|
|
|
|
# Extraire les historiques de timesheet si disponibles
|
|
timesheet_ids = ticket.get('timesheet_ids', [])
|
|
if timesheet_ids:
|
|
timesheets = self._safe_execute('account.analytic.line', 'read', timesheet_ids,
|
|
['id', 'date', 'user_id', 'name', 'unit_amount', 'employee_id', 'department_id'])
|
|
if timesheets:
|
|
self.save_raw_ticket_data(timesheets, os.path.join(ticket_dir, "timesheets.json"))
|
|
print(f"{len(timesheets)} feuilles de temps extraites pour le ticket {ticket_id}")
|
|
|
|
print(f"Extraction terminée. Les fichiers sont disponibles dans: {ticket_dir}")
|
|
return ticket_dir
|
|
|