mirror of
https://github.com/Ladebeze66/projetcbaollm.git
synced 2025-12-16 22:47:49 +01:00
520 lines
23 KiB
Python
520 lines
23 KiB
Python
from odoo_connection import OdooConnection
|
|
import os
|
|
from utils import save_json
|
|
from config import EXPORT_DIR
|
|
from data_filter import filter_ticket_data
|
|
import json
|
|
from ticket_search import TicketSearch
|
|
from ticket_display import TicketDisplay
|
|
from ticket_classification import TicketClassification
|
|
|
|
class TicketManager:
|
|
"""Gère la récupération et le traitement des tickets"""
|
|
|
|
def __init__(self):
|
|
"""Initialise le gestionnaire de tickets"""
|
|
self.conn = OdooConnection()
|
|
self.odoo = self.conn.get_odoo_instance()
|
|
self.model_name = "project.task"
|
|
self.search = TicketSearch()
|
|
self.display = TicketDisplay()
|
|
self.classification = TicketClassification()
|
|
|
|
def _check_connection(self):
|
|
"""Vérifie que la connexion Odoo est active"""
|
|
if self.odoo is None:
|
|
print("Erreur: La connexion à Odoo n'est pas établie.")
|
|
print("Tentative de reconnexion...")
|
|
try:
|
|
self.conn = OdooConnection()
|
|
self.odoo = self.conn.get_odoo_instance()
|
|
except Exception as e:
|
|
print(f"Erreur lors de la tentative de reconnexion: {e}")
|
|
self.odoo = None
|
|
|
|
return self.odoo is not None
|
|
|
|
def _safe_execute(self, model, method, *args):
|
|
"""Exécute une méthode Odoo de manière sécurisée"""
|
|
if not self._check_connection():
|
|
return None
|
|
|
|
try:
|
|
if self.odoo is None:
|
|
print("Erreur: Impossible d'exécuter la commande, la connexion à Odoo est inactive.")
|
|
return None
|
|
|
|
return self.odoo.execute(model, method, *args)
|
|
except Exception as e:
|
|
print(f"Erreur lors de l'exécution de {method} sur {model}: {e}")
|
|
return None
|
|
|
|
def get_ticket_fields(self):
|
|
"""Récupère tous les champs d'un modèle en excluant les Many2Many"""
|
|
fields_info = self._safe_execute(self.model_name, 'fields_get')
|
|
if fields_info is None:
|
|
return [], []
|
|
|
|
simple_fields = []
|
|
related_fields = []
|
|
|
|
for field_name, field_data in fields_info.items():
|
|
field_type = field_data["type"]
|
|
if field_type == "many2many":
|
|
continue # Ignorer les Many2Many pour éviter l'erreur
|
|
if field_type in ["many2one", "one2many"]:
|
|
related_fields.append((field_name, field_type, field_data.get("relation", "Inconnu")))
|
|
else:
|
|
simple_fields.append((field_name, field_type))
|
|
|
|
return simple_fields, related_fields
|
|
|
|
def list_available_tickets(self, limit=10):
|
|
"""Liste les tickets disponibles avec leur code"""
|
|
ticket_ids = self._safe_execute(self.model_name, 'search', [], 0, limit)
|
|
if ticket_ids is None:
|
|
return []
|
|
|
|
tickets_info = self._safe_execute(self.model_name, 'read', ticket_ids, ['id', 'name', 'code'])
|
|
if tickets_info is None:
|
|
return []
|
|
|
|
print("\nListe des tickets disponibles:")
|
|
for ticket in tickets_info:
|
|
code = ticket.get('code', 'N/A')
|
|
print(f"Code: {code} - ID: {ticket['id']} - Nom: {ticket['name']}")
|
|
|
|
return tickets_info
|
|
|
|
def get_field_values(self, field_name, limit=50):
|
|
"""Récupère les valeurs possibles pour un champ relationnel"""
|
|
simple_fields, related_fields = self.get_ticket_fields()
|
|
relation_model = None
|
|
|
|
# Trouver le modèle de relation
|
|
for field, field_type, relation in related_fields:
|
|
if field == field_name:
|
|
relation_model = relation
|
|
break
|
|
|
|
if not relation_model:
|
|
print(f"Le champ {field_name} n'est pas un champ relationnel connu.")
|
|
return []
|
|
|
|
# Récupérer toutes les valeurs possibles
|
|
value_ids = self._safe_execute(relation_model, 'search', [], 0, limit)
|
|
if value_ids is None:
|
|
return []
|
|
|
|
values = self._safe_execute(relation_model, 'read', value_ids, ['id', 'name'])
|
|
if values is None:
|
|
return []
|
|
|
|
print(f"\nValeurs disponibles pour le champ {field_name}:")
|
|
for value in values:
|
|
print(f" - ID: {value['id']} - Nom: {value['name']}")
|
|
|
|
return values
|
|
|
|
def get_ticket_by_id(self, ticket_id):
|
|
"""Récupère un ticket par son ID"""
|
|
simple_fields, related_fields = self.get_ticket_fields()
|
|
all_fields = [f[0] for f in simple_fields + related_fields]
|
|
|
|
ticket_data = self._safe_execute(self.model_name, 'read', [ticket_id], all_fields)
|
|
if not ticket_data:
|
|
return None
|
|
|
|
ticket = ticket_data[0]
|
|
formatted_ticket = {
|
|
"ID du Ticket": ticket["id"],
|
|
"Nom": ticket["name"],
|
|
"Code": ticket.get("code", "N/A"),
|
|
"Date Limite": ticket.get("date_deadline", "Non défini"),
|
|
"Champs Simples": {},
|
|
"Champs Relationnels": {},
|
|
"Discussions": []
|
|
}
|
|
|
|
for field, field_type in simple_fields:
|
|
formatted_ticket["Champs Simples"][field] = ticket.get(field, "Non défini")
|
|
|
|
for field, field_type, relation in related_fields:
|
|
formatted_ticket["Champs Relationnels"][field] = ticket.get(field, "Aucun")
|
|
|
|
if "message_ids" in ticket and ticket["message_ids"]:
|
|
formatted_ticket["Discussions"] = self.get_ticket_discussions(ticket["message_ids"])
|
|
|
|
print(f"\nTicket sélectionné : {formatted_ticket['Code']} - {formatted_ticket['Nom']}")
|
|
return formatted_ticket
|
|
|
|
def get_ticket_by_code(self, code):
|
|
"""Récupère un ticket par son code"""
|
|
# Recherche du ticket par code
|
|
domain = [('code', '=', code)]
|
|
ticket_ids = self._safe_execute(self.model_name, 'search', domain)
|
|
|
|
if not ticket_ids:
|
|
print(f"Aucun ticket trouvé avec le code : {code}")
|
|
return None
|
|
|
|
# Utiliser la fonction existante pour récupérer les détails
|
|
return self.get_ticket_by_id(ticket_ids[0])
|
|
|
|
def get_tickets_by_codes(self, codes):
|
|
"""Récupère plusieurs tickets par leurs codes"""
|
|
tickets = []
|
|
for code in codes:
|
|
ticket = self.get_ticket_by_code(code)
|
|
if ticket:
|
|
tickets.append(ticket)
|
|
return tickets
|
|
|
|
def search_tickets_by_fields(self, field_criteria, limit=50):
|
|
"""Recherche des tickets selon des critères de champs"""
|
|
# Construire le domaine de recherche
|
|
domain = []
|
|
for field, value in field_criteria.items():
|
|
domain.append((field, '=', value))
|
|
|
|
print(f"\nRecherche de tickets avec les critères: {field_criteria}")
|
|
|
|
# Rechercher les tickets correspondants
|
|
ticket_ids = self._safe_execute(self.model_name, 'search', domain, 0, limit)
|
|
if not ticket_ids:
|
|
print("Aucun ticket ne correspond aux critères de recherche.")
|
|
return []
|
|
|
|
print(f"Nombre de tickets trouvés: {len(ticket_ids)}")
|
|
|
|
# Récupérer tous les tickets trouvés
|
|
tickets = []
|
|
for ticket_id in ticket_ids:
|
|
ticket = self.get_ticket_by_id(ticket_id)
|
|
if ticket:
|
|
tickets.append(ticket)
|
|
|
|
return tickets
|
|
|
|
def get_ticket_discussions(self, message_ids):
|
|
"""Récupère et nettoie les discussions associées à un ticket"""
|
|
from data_formatter import clean_ticket_data
|
|
model_name = "mail.message"
|
|
|
|
if not message_ids:
|
|
return []
|
|
|
|
messages = self._safe_execute(model_name, 'read', message_ids, ["id", "subject", "body", "author_id", "date"])
|
|
if messages is None:
|
|
return []
|
|
|
|
formatted_messages = []
|
|
for msg in messages:
|
|
cleaned_content = clean_ticket_data({"description": msg["body"]})["description"]
|
|
if cleaned_content:
|
|
formatted_messages.append({
|
|
"ID Message": msg["id"],
|
|
"Sujet": msg["subject"],
|
|
"Contenu": cleaned_content,
|
|
"Auteur": msg["author_id"][1] if msg["author_id"] else "Inconnu",
|
|
"Date": msg["date"]
|
|
})
|
|
|
|
return formatted_messages
|
|
|
|
def display_field(self, ticket, field_path):
|
|
"""Affiche un champ spécifique du ticket"""
|
|
if not ticket:
|
|
print("Aucun ticket à afficher.")
|
|
return
|
|
|
|
parts = field_path.split('.')
|
|
|
|
if len(parts) == 1:
|
|
# Champ principal
|
|
if parts[0] in ticket:
|
|
print(f"{parts[0]}: {ticket[parts[0]]}")
|
|
else:
|
|
print(f"Champ {parts[0]} introuvable.")
|
|
elif len(parts) == 2:
|
|
# Champ dans une section
|
|
section, field = parts
|
|
if section in ticket and field in ticket[section]:
|
|
print(f"{section}.{field}: {ticket[section][field]}")
|
|
else:
|
|
print(f"Champ {field} introuvable dans {section}.")
|
|
|
|
def classify_tickets_by_field(self, codes, field_name):
|
|
"""Classe les tickets par un champ donné et crée des répertoires."""
|
|
tickets = self.get_tickets_by_codes(codes)
|
|
|
|
if not tickets:
|
|
print("Aucun ticket à classer.")
|
|
return
|
|
|
|
# Dictionnaire pour stocker les tickets par valeur de champ
|
|
classified_tickets = {}
|
|
|
|
# Vérifier si le champ existe dans les tickets
|
|
field_found = False
|
|
for ticket in tickets:
|
|
# Appliquer le filtrage pour nettoyer les discussions
|
|
ticket = filter_ticket_data(ticket) # Nettoyer le ticket
|
|
|
|
# Vérifier si le champ est dans Champs Simples
|
|
if field_name in ticket["Champs Simples"]:
|
|
field_found = True
|
|
field_value = ticket["Champs Simples"][field_name]
|
|
if field_value is not None:
|
|
# Convertir la valeur en string pour l'utiliser comme nom de dossier
|
|
str_value = str(field_value).replace("/", "_").replace("\\", "_")
|
|
if str_value not in classified_tickets:
|
|
classified_tickets[str_value] = []
|
|
classified_tickets[str_value].append(ticket)
|
|
# Vérifier si le champ est dans Champs Relationnels
|
|
elif field_name in ticket["Champs Relationnels"]:
|
|
field_found = True
|
|
field_value = ticket["Champs Relationnels"][field_name]
|
|
if field_value is not None and field_value != "Aucun":
|
|
# Pour les champs relationnels, utiliser le nom si disponible
|
|
if isinstance(field_value, list) and len(field_value) > 1:
|
|
str_value = str(field_value[1]).replace("/", "_").replace("\\", "_")
|
|
else:
|
|
str_value = str(field_value).replace("/", "_").replace("\\", "_")
|
|
if str_value not in classified_tickets:
|
|
classified_tickets[str_value] = []
|
|
classified_tickets[str_value].append(ticket)
|
|
# Vérifier si le champ est dans les champs principaux
|
|
elif field_name in ticket and ticket[field_name] is not None:
|
|
field_found = True
|
|
field_value = ticket[field_name]
|
|
str_value = str(field_value).replace("/", "_").replace("\\", "_")
|
|
if str_value not in classified_tickets:
|
|
classified_tickets[str_value] = []
|
|
classified_tickets[str_value].append(ticket)
|
|
|
|
if not field_found:
|
|
print(f"Le champ '{field_name}' n'a pas été trouvé dans les tickets.")
|
|
return
|
|
|
|
if not classified_tickets:
|
|
print(f"Aucun ticket avec des valeurs pour le champ '{field_name}'.")
|
|
return
|
|
|
|
# Créer un répertoire principal pour le champ
|
|
field_dir = os.path.join(EXPORT_DIR, f"classified_by_{field_name}")
|
|
os.makedirs(field_dir, exist_ok=True)
|
|
|
|
print(f"\nClassification des tickets par le champ '{field_name}':")
|
|
|
|
# Créer des répertoires et sauvegarder les tickets
|
|
for value, value_tickets in classified_tickets.items():
|
|
# Créer un répertoire pour la valeur du champ
|
|
value_dir = os.path.join(field_dir, value)
|
|
os.makedirs(value_dir, exist_ok=True)
|
|
|
|
print(f"- Valeur '{value}': {len(value_tickets)} ticket(s)")
|
|
|
|
for ticket in value_tickets:
|
|
# Sauvegarder chaque ticket dans un fichier JSON
|
|
filename = f"ticket_{ticket['Code']}_{ticket['ID du Ticket']}.json"
|
|
save_json(os.path.join(value_dir, filename), ticket)
|
|
print(f" • Ticket {ticket['Code']} sauvegardé")
|
|
|
|
print(f"\nTickets classés et sauvegardés dans {field_dir}/")
|
|
return classified_tickets
|
|
|
|
def classify_tickets_by_project_and_stage(self, project_id):
|
|
"""Classifie les tickets par project_id et stage_id"""
|
|
# Rechercher les tickets par project_id
|
|
try:
|
|
project_id = int(project_id) # Convertir en entier
|
|
# Utiliser un dictionnaire pour les critères de recherche
|
|
tickets = self.search_tickets_by_fields({'project_id': project_id})
|
|
|
|
if not tickets:
|
|
print(f"Aucun ticket trouvé pour le project_id : {project_id}")
|
|
return
|
|
|
|
# Créer un répertoire principal pour les classifications
|
|
base_dir = os.path.join(EXPORT_DIR, "classified_by_project_id")
|
|
os.makedirs(base_dir, exist_ok=True)
|
|
|
|
# Créer un répertoire pour le project_id
|
|
project_dir = os.path.join(base_dir, str(project_id))
|
|
os.makedirs(project_dir, exist_ok=True)
|
|
|
|
# Récupérer le nom du projet
|
|
project_name = "Inconnu"
|
|
if tickets and 'Champs Relationnels' in tickets[0] and 'project_id' in tickets[0]['Champs Relationnels']:
|
|
if isinstance(tickets[0]['Champs Relationnels']['project_id'], list) and len(tickets[0]['Champs Relationnels']['project_id']) > 1:
|
|
project_name = tickets[0]['Champs Relationnels']['project_id'][1]
|
|
|
|
print(f"\nClassification des tickets pour le projet {project_id} ({project_name}):")
|
|
print(f"Nombre de tickets trouvés: {len(tickets)}")
|
|
|
|
# Classer les tickets par stage_id
|
|
tickets_by_stage = {}
|
|
for ticket in tickets:
|
|
# Nettoyer le ticket en utilisant le filtre pour nettoyer les discussions
|
|
ticket = filter_ticket_data(ticket)
|
|
|
|
# Vérifier que stage_id existe dans les champs relationnels
|
|
if 'Champs Relationnels' in ticket and 'stage_id' in ticket['Champs Relationnels']:
|
|
if isinstance(ticket['Champs Relationnels']['stage_id'], list) and len(ticket['Champs Relationnels']['stage_id']) > 0:
|
|
stage_id = ticket['Champs Relationnels']['stage_id'][0] # Récupérer l'ID du stage
|
|
stage_name = ticket['Champs Relationnels']['stage_id'][1] if len(ticket['Champs Relationnels']['stage_id']) > 1 else "Inconnu"
|
|
|
|
key = f"{stage_id}_{stage_name}"
|
|
if key not in tickets_by_stage:
|
|
tickets_by_stage[key] = []
|
|
tickets_by_stage[key].append(ticket)
|
|
|
|
# Vérifier si des tickets ont été classés
|
|
if not tickets_by_stage:
|
|
print("Aucun ticket n'a pu être classé par stage_id.")
|
|
return
|
|
|
|
# Créer des répertoires pour chaque stage_id et sauvegarder les tickets
|
|
for stage_key, stage_tickets in tickets_by_stage.items():
|
|
# Extraire l'ID et le nom du stage
|
|
stage_parts = stage_key.split('_', 1)
|
|
stage_id = stage_parts[0]
|
|
stage_name = stage_parts[1] if len(stage_parts) > 1 else "Inconnu"
|
|
|
|
# Créer un nom de dossier valide en remplaçant les caractères problématiques
|
|
safe_stage_name = stage_name.replace('/', '_').replace('\\', '_')
|
|
stage_dir = os.path.join(project_dir, f"{stage_id}_{safe_stage_name}")
|
|
os.makedirs(stage_dir, exist_ok=True)
|
|
|
|
print(f"- Stage '{stage_name}' (ID: {stage_id}): {len(stage_tickets)} tickets")
|
|
|
|
for i, ticket in enumerate(stage_tickets):
|
|
ticket_file = os.path.join(stage_dir, f"ticket_{ticket['Code']}.json")
|
|
with open(ticket_file, 'w', encoding='utf-8') as f:
|
|
json.dump(ticket, f, ensure_ascii=False, indent=4)
|
|
|
|
# Afficher un message toutes les 5 tickets ou pour le dernier
|
|
if i % 5 == 0 or i == len(stage_tickets) - 1:
|
|
print(f" • {i+1}/{len(stage_tickets)} tickets sauvegardés")
|
|
|
|
print(f"\nTickets classés et sauvegardés dans {project_dir}/")
|
|
|
|
except ValueError:
|
|
print(f"Erreur: Le project_id '{project_id}' n'est pas un nombre valide.")
|
|
except Exception as e:
|
|
print(f"Erreur lors de la classification des tickets: {e}")
|
|
|
|
def analyze_common_values(self, tickets):
|
|
"""Analyse les valeurs communes entre plusieurs tickets"""
|
|
if not tickets or len(tickets) < 2:
|
|
print("L'analyse des valeurs communes nécessite au moins 2 tickets.")
|
|
return {}
|
|
|
|
common_fields = {}
|
|
first_ticket = tickets[0]
|
|
|
|
# Analyser les champs simples
|
|
if 'Champs Simples' in first_ticket:
|
|
# Si le format est structuré
|
|
for field, value in first_ticket['Champs Simples'].items():
|
|
is_common = True
|
|
for ticket in tickets[1:]:
|
|
if ('Champs Simples' not in ticket or
|
|
field not in ticket['Champs Simples'] or
|
|
ticket['Champs Simples'][field] != value):
|
|
is_common = False
|
|
break
|
|
|
|
if is_common and value: # Ne pas inclure les valeurs vides
|
|
common_fields[f"Champs Simples.{field}"] = value
|
|
else:
|
|
# Format standard non structuré
|
|
simple_fields = ['description', 'priority', 'sequence', 'date_deadline',
|
|
'create_date', 'write_date']
|
|
for field in simple_fields:
|
|
if field in first_ticket and first_ticket[field]:
|
|
value = first_ticket[field]
|
|
is_common = True
|
|
for ticket in tickets[1:]:
|
|
if field not in ticket or ticket[field] != value:
|
|
is_common = False
|
|
break
|
|
|
|
if is_common:
|
|
common_fields[field] = value
|
|
|
|
# Analyser les champs relationnels
|
|
if 'Champs Relationnels' in first_ticket:
|
|
# Si le format est structuré
|
|
for field, value in first_ticket['Champs Relationnels'].items():
|
|
is_common = True
|
|
for ticket in tickets[1:]:
|
|
if ('Champs Relationnels' not in ticket or
|
|
field not in ticket['Champs Relationnels'] or
|
|
ticket['Champs Relationnels'][field] != value):
|
|
is_common = False
|
|
break
|
|
|
|
if is_common and value: # Ne pas inclure les valeurs vides
|
|
common_fields[f"Champs Relationnels.{field}"] = value
|
|
else:
|
|
# Format standard des champs relationnels
|
|
relational_fields = ['stage_id', 'project_id', 'user_id']
|
|
for field in relational_fields:
|
|
if field in first_ticket and first_ticket[field]:
|
|
value = first_ticket[field]
|
|
is_common = True
|
|
for ticket in tickets[1:]:
|
|
if field not in ticket or ticket[field] != value:
|
|
is_common = False
|
|
break
|
|
|
|
if is_common:
|
|
# Récupérer le nom plutôt que l'ID si disponible
|
|
if isinstance(value, list) and len(value) > 1:
|
|
common_fields[field] = f"{value[1]} (ID: {value[0]})"
|
|
else:
|
|
common_fields[field] = value
|
|
|
|
return common_fields
|
|
|
|
def list_models(self):
|
|
"""Affiche la liste des modèles disponibles dans Odoo"""
|
|
models = self._safe_execute('ir.model', 'search_read', [], ['model', 'name'])
|
|
if not models:
|
|
print("Aucun modèle disponible.")
|
|
return []
|
|
|
|
print("\nListe des modèles disponibles:")
|
|
for model in models:
|
|
print(f"Modèle: {model['name']} (ID: {model['model']})")
|
|
return models
|
|
|
|
def list_model_fields(self, model_name):
|
|
"""Affiche les champs d'un modèle donné"""
|
|
fields_info = self._safe_execute(model_name, 'fields_get')
|
|
if not fields_info:
|
|
print(f"Aucun champ trouvé pour le modèle {model_name}.")
|
|
return []
|
|
|
|
print(f"\nChamps du modèle {model_name}:")
|
|
for field_name, field_data in fields_info.items():
|
|
print(f"Champ: {field_name} - Type: {field_data['type']}")
|
|
return fields_info
|
|
|
|
def export_model_fields_to_json(self, model_name, filename):
|
|
"""Exporte les informations des champs d'un modèle en JSON"""
|
|
fields_info = self.list_model_fields(model_name)
|
|
if not fields_info:
|
|
return False
|
|
|
|
file_path = os.path.join(EXPORT_DIR, f"{filename}.json")
|
|
try:
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(fields_info, f, indent=4, ensure_ascii=False)
|
|
print(f"Informations des champs exportées dans {file_path}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Erreur lors de l'exportation des champs: {str(e)}")
|
|
return False |