mirror of
https://github.com/Ladebeze66/odoo_toolkit.git
synced 2025-12-15 19:26:55 +01:00
183 lines
7.9 KiB
Python
183 lines
7.9 KiB
Python
from odoo_connection import OdooConnection
|
|
import os
|
|
import json
|
|
from utils import print_error
|
|
from config import EXPORT_DIR
|
|
|
|
class TicketManager:
|
|
"""Gestionnaire de tickets simplifié avec seulement les fonctionnalités essentielles"""
|
|
|
|
def __init__(self):
|
|
"""Initialise le gestionnaire de tickets"""
|
|
self.conn = OdooConnection()
|
|
self.odoo = self.conn.get_odoo_instance()
|
|
self.model_name = "project.task"
|
|
|
|
def _ensure_connection(self):
|
|
"""Vérifie et établit la onnexion si nécessaire"""
|
|
if not self.odoo:
|
|
self.odoo = self.conn.get_odoo_instance()
|
|
return self.odoo is not None
|
|
|
|
def _safe_execute(self, model, method, *args):
|
|
"""
|
|
Exécute une méthode Odoo en toute sécurité via odoorpc.
|
|
Vérifie la connexion avant d'exécuter.
|
|
"""
|
|
# Vérifier que la connexion est bien établie
|
|
if not self._ensure_connection():
|
|
print_error("Connexion Odoo indisponible.")
|
|
return None
|
|
|
|
try:
|
|
# Exécuter la méthode sur le modèle via OdooRPC
|
|
return self.odoo.execute(model, method, *args) # type: ignore
|
|
|
|
except odoorpc.error.RPCError as e: # type: ignore
|
|
print_error(f" Erreur RPC lors de '{method}' sur '{model}': {e}")
|
|
return None
|
|
except Exception as e:
|
|
print_error(f" Erreur inattendue lors de '{method}' sur '{model}': {e}")
|
|
return None
|
|
|
|
def get_model_fields(self, model_name):
|
|
"""Récupére tous les champs disponibles pour un modèle donné, en filtrant ceux qui ne sont pas exploitables"""
|
|
fields_info = self._safe_execute(model_name, 'fields_get', [], ['name', 'type'])
|
|
if not fields_info:
|
|
print_error(f"Impossible de récupérer les champs pour {model_name}")
|
|
return []
|
|
|
|
#On filtre les champs qui ne sont pas exploitables
|
|
invalid_types = ['many2many']
|
|
valid_fields = [field for field, info in fields_info.items() if info.get("type") not in invalid_types]
|
|
return valid_fields #Retourne la liste des champs exploitables
|
|
|
|
def save_raw_ticket_data(self, ticket_data, filename="raw_ticket_data.json"):
|
|
"""Sauvegarde les données brutes du ticket dans un fichier JSON"""
|
|
file_path = os.path.join(EXPORT_DIR, filename)
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
|
json.dump(ticket_data, f, indent=4, ensure_ascii=False)
|
|
print(f"Données brutes du ticket sauvegardées dans : {file_path}")
|
|
|
|
def get_all_models(self):
|
|
"""Récupérer et sauvegarder tous les modèles disponibles"""
|
|
models = self._safe_execute('ir.model', 'search_read', [], ['model', 'name'] )
|
|
if not models:
|
|
print_error("Impossible de récupérer la liste des modèles.")
|
|
return None
|
|
|
|
#Convertir en dictionnaire {nom_du_modèle: description}
|
|
models_dict = {model['model']: model['name'] for model in models}
|
|
|
|
#Sauvegarder dans un fichier JSON
|
|
self.save_raw_ticket_data(models_dict, "all_models.json")
|
|
print(f"Liste des modèles sauvegardée dans 'available_models.json")
|
|
return models_dict
|
|
|
|
def get_model_fields_with_types(self, model_name):
|
|
"""Récupère et sauvegarde les champs d'un modèle avec leurs types"""
|
|
fields_info = self._safe_execute(model_name, 'fields_get', [], ['name', 'type', 'relation'])
|
|
if not fields_info:
|
|
print_error(f"Impossible de récupérer les champs pour {model_name}")
|
|
return {}
|
|
|
|
# Construire un dictionnaire {champ: {type, relation (si relationnel)}}
|
|
fields_dict = {
|
|
field: {
|
|
"type": info["type"],
|
|
"relation": info.get("relation", None)
|
|
}
|
|
for field, info in fields_info.items()
|
|
}
|
|
|
|
# Sauvegarde en JSON
|
|
self.save_raw_ticket_data(fields_dict, f"fields_{model_name}.json")
|
|
print(f"Liste des champs du modèle '{model_name}' sauvegardée dans 'fields_{model_name}.json'")
|
|
return fields_dict
|
|
|
|
def get_ticket_by_id(self, ticket_id):
|
|
"""Récupère les détails d'un ticket par son ID et exclut dynamiquement les champs invalides"""
|
|
fields_to_read = self.get_model_fields(self.model_name) # Récupère tous les champs
|
|
ticket_data = self._safe_execute(self.model_name, 'read', [ticket_id], fields_to_read)
|
|
|
|
if ticket_data:
|
|
ticket = self.resolve_relational_fields(ticket_data[0])
|
|
self.save_raw_ticket_data(ticket, f"ticket_{ticket_id}_raw.json")
|
|
return ticket
|
|
|
|
print_error(f"Aucun ticket trouvé avec l'ID {ticket_id}")
|
|
return None
|
|
|
|
def resolve_relational_fields(self, ticket):
|
|
"""Ajoute les valeurs des champs relationnels"""
|
|
fields_info = self._safe_execute(self.model_name, 'fields_get', [], ['type'])
|
|
|
|
for field, info in fields_info.items(): # type: ignore
|
|
if info.get("type") == "many2one" and isinstance(ticket.get(field), list):
|
|
ticket[f"{field}_value"] = ticket[field][1] if len(ticket[field]) > 1 else None #Ajoute la valeur lisible en plus du ID
|
|
return ticket
|
|
|
|
|
|
def get_ticket_by_code(self, ticket_code):
|
|
"""Récupérer un ticket via son code"""
|
|
domain = [('code', '=', ticket_code)]
|
|
ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 1)
|
|
if not ticket_ids:
|
|
return None
|
|
|
|
# Sauvegarde des données brutes pour analyse
|
|
self.save_raw_ticket_data(ticket_ids, f"ticket_{ticket_code}_raw.json")
|
|
|
|
return self.get_ticket_by_id(ticket_ids[0])
|
|
|
|
def get_available_projects(self):
|
|
"""Retourne la liste des projets disponibles"""
|
|
projects = self._safe_execute('project.project', 'search_read', [], ['id', 'name'])
|
|
if not projects:
|
|
print_error("Aucun projet trouvé.")
|
|
return {}
|
|
return {proj['id']: proj['name'] for proj in projects}
|
|
|
|
def export_tickets_by_project_and_stage(self, project_id):
|
|
""" Exporte les tickets d'un projet classés par étape """
|
|
|
|
# Vérifier la connexion Odoo
|
|
if not self._ensure_connection():
|
|
print_error("Connexion Odoo indisponible.")
|
|
return
|
|
|
|
# Récupérer les tickets du projet
|
|
domain = [('project_id', '=', project_id)]
|
|
ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, 10)
|
|
if not ticket_ids:
|
|
print_error(f"Aucun ticket trouvé pour le projet {project_id}.")
|
|
return
|
|
|
|
# Lire les détails des tickets
|
|
tickets = []
|
|
for ticket_id in ticket_ids:
|
|
ticket = self.get_ticket_by_id(ticket_id)
|
|
if ticket:
|
|
tickets.append(ticket)
|
|
|
|
#trier les tickets par étape
|
|
tickets_by_stage = {}
|
|
for ticket in tickets:
|
|
champs_relationnels = ticket.get("Champs Relationnels", {})
|
|
stage_data = champs_relationnels.get("stage_id", [0, "Non classé"])
|
|
stage_id = stage_data[0] if isinstance(stage_data, list) and len(stage_data) > 0 else 0
|
|
stage_name = stage_data[1] if isinstance(stage_data, list) and len(stage_data) > 1 else "Non classé"
|
|
|
|
key = f"{stage_id}_{stage_name}"
|
|
tickets_by_stage.setdefault(key, []).append(ticket)
|
|
|
|
# Sauvegarde des fichiers
|
|
project_dir = ensure_export_directory(f"project_{project_id}")
|
|
for stage_key, stage_tickets in tickets_by_stage.items():
|
|
stage_dir = os.path.join(project_dir, stage_key)
|
|
os.makedirs(stage_dir, exist_ok=True)
|
|
save_json(os.path.join(stage_dir, "all_tickets.json"), stage_tickets)
|
|
|
|
print(f"Exportation terminée. Les fichiers sont enregistrés dans : {project_dir}/")
|
|
|