This commit is contained in:
Ladebeze66 2025-04-03 10:45:52 +02:00
parent aa58f872ce
commit db180b340f
6 changed files with 273 additions and 289 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,9 @@
{
"date_extraction": "2025-04-03T10:44:16.768543",
"ticket_dir": "output/ticket_T11067",
"fichiers_json": [
"ticket_info.json",
"messages_raw.json",
"all_messages.json"
]
}

View File

@ -0,0 +1 @@
{}

View File

@ -1,18 +1,11 @@
import os import os
import json import json
import base64 import base64
from typing import Dict, List, Any, Optional
import requests import requests
import re from typing import Dict, List, Any, Optional
from html import unescape
from datetime import datetime from datetime import datetime
class TicketManager: class TicketManager:
"""
Gestionnaire de tickets pour extraire des données depuis Odoo.
"""
def __init__(self, url: str, db: str, username: str, api_key: str): def __init__(self, url: str, db: str, username: str, api_key: str):
self.url = url self.url = url
self.db = db self.db = db
@ -23,7 +16,6 @@ class TicketManager:
self.model_name = "project.task" self.model_name = "project.task"
def login(self) -> bool: def login(self) -> bool:
try:
login_url = f"{self.url}/web/session/authenticate" login_url = f"{self.url}/web/session/authenticate"
login_data = { login_data = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
@ -34,296 +26,139 @@ class TicketManager:
} }
} }
response = requests.post(login_url, json=login_data) response = requests.post(login_url, json=login_data)
response.raise_for_status()
result = response.json() result = response.json()
if result.get("error"): if result.get("error"):
print(f"Erreur de connexion: {result['error']['message']}") print(f"Erreur de connexion: {result['error']['message']}")
return False return False
self.uid = result.get("result", {}).get("uid") self.uid = result.get("result", {}).get("uid")
self.session_id = response.cookies.get("session_id") self.session_id = response.cookies.get("session_id")
if not self.uid: return True if self.uid else False
print("Erreur: Impossible de récupérer l'ID utilisateur")
return False
print(f"Connecté avec succès à {self.url} (User ID: {self.uid})")
return True
except Exception as e:
print(f"Erreur de connexion: {str(e)}")
return False
def _rpc_call(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]: def _rpc_call(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
if not self.uid and not self.login():
return {"error": "Non connecté"}
try:
full_url = f"{self.url}{endpoint}" full_url = f"{self.url}{endpoint}"
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
data = {"jsonrpc": "2.0", "method": "call", "params": params} data = {"jsonrpc": "2.0", "method": "call", "params": params}
response = requests.post( response = requests.post(full_url, json=data, headers=headers, cookies={"session_id": self.session_id})
full_url, return response.json().get("result", {})
json=data,
headers=headers,
cookies={"session_id": self.session_id} if self.session_id else None
)
response.raise_for_status()
result = response.json()
if result.get("error"):
return {"error": result["error"]["message"]}
return result.get("result", {})
except Exception as e:
return {"error": str(e)}
def search_read(self, model: str, domain: List, fields: List[str], order: Optional[str] = None, limit: Optional[int] = None) -> List[Dict[str, Any]]: def get_ticket(self, ticket_id: int) -> Dict[str, Any]:
params = { params = {
"model": model, "model": self.model_name,
"method": "search_read", "method": "read",
"args": [domain, fields], "args": [[ticket_id]],
"kwargs": {} "kwargs": {}
} }
return self._rpc_call("/web/dataset/call_kw", params)
if order is not None:
params["kwargs"]["order"] = order
if limit is not None:
params["kwargs"]["limit"] = limit
result = self._rpc_call("/web/dataset/call_kw", params)
return result if isinstance(result, list) else []
def read(self, model: str, ids: List[int], fields: List[str]) -> List[Dict[str, Any]]:
params = {"model": model, "method": "read", "args": [ids, fields], "kwargs": {}}
result = self._rpc_call("/web/dataset/call_kw", params)
return result if isinstance(result, list) else []
def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]:
tickets = self.search_read(self.model_name, [("code", "=", ticket_code)], ["id"])
if not tickets:
return {}
return self.get_ticket_by_id(tickets[0]["id"])
def get_ticket_by_id(self, ticket_id: int) -> Dict[str, Any]:
ticket_fields = [
"id", "name", "description", "stage_id", "user_id", "partner_id",
"create_date", "write_date", "date_deadline", "priority",
"tag_ids", "code", "project_id", "kanban_state", "color",
"active", "company_id", "display_name"
]
tickets = self.read(self.model_name, [ticket_id], ticket_fields)
return tickets[0] if tickets else {}
def get_ticket_messages(self, ticket_id: int) -> List[Dict[str, Any]]: def get_ticket_messages(self, ticket_id: int) -> List[Dict[str, Any]]:
messages = self.search_read( messages = self._rpc_call("/web/dataset/call_kw", {
"mail.message", "model": "mail.message",
[ "method": "search_read",
("res_id", "=", ticket_id), "args": [[["res_id", "=", ticket_id], ["model", "=", "project.task"]]],
("model", "=", self.model_name), "kwargs": {"fields": ["id", "body", "author_id", "date"]}
("message_type", "in", ["comment", "notification", "email"]) })
], return messages
["id", "body", "date", "author_id", "email_from", "message_type", "parent_id", "subtype_id", "tracking_value_ids"],
order="date asc"
)
return self._clean_messages(messages)
def _clean_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]:
cleaned_messages = []
for message in messages:
if message.get("body"):
body = message["body"]
body = unescape(body)
# Stocker la version HTML
message["body_html"] = body
# Créer une version texte nettoyée
body_text = re.sub(r'<style.*?>.*?</style>', '', body, flags=re.DOTALL)
body_text = re.sub(r'<script.*?>.*?</script>', '', body_text, flags=re.DOTALL)
body_text = re.sub(r'<[^>]+>', ' ', body_text)
body_text = re.sub(r'\s+', ' ', body_text).strip()
message["body_text"] = body_text
# Organiser les messages en fils de discussion
if message.get("parent_id"):
parent_id = message["parent_id"][0] if isinstance(message["parent_id"], (list, tuple)) else message["parent_id"]
message["parent_id"] = parent_id
cleaned_messages.append(message)
return cleaned_messages
def get_ticket_attachments(self, ticket_id: int) -> List[Dict[str, Any]]:
""" """
Récupère les pièces jointes associées à un ticket. Récupère un ticket par son code unique.
Args: Args:
ticket_id: ID du ticket ticket_code: Code du ticket à récupérer (par exemple, T11067)
Returns: Returns:
Liste des pièces jointes avec leurs métadonnées. Dictionnaire contenant les informations du ticket ou une erreur si non trouvé
""" """
attachments = self.search_read( # Recherche du ticket par code
"ir.attachment", params = {
[ "model": self.model_name,
("res_id", "=", ticket_id), "method": "search_read",
("res_model", "=", self.model_name) "args": [[["code", "=", ticket_code]], ["id", "name", "description", "stage_id"]],
], "kwargs": {"limit": 1}
["id", "name", "mimetype", "file_size", "create_date", "create_uid", "datas", "description"] }
)
return attachments
def download_attachment(self, attachment: Dict[str, Any], output_dir: str) -> str: result = self._rpc_call("/web/dataset/call_kw", params)
"""
Télécharge et sauvegarde une pièce jointe dans le répertoire spécifié.
Args: if not result:
attachment: Dictionnaire contenant les métadonnées de la pièce jointe print(f"Aucun ticket trouvé avec le code {ticket_code}")
output_dir: Répertoire sauvegarder la pièce jointe return {}
Returns: # Retourne le premier ticket trouvé
Chemin du fichier sauvegardé return result[0] if isinstance(result, list) and len(result) > 0 else {}
"""
if not attachment.get("datas"):
return ""
# Créer le dossier attachments s'il n'existe pas def save_json(self, data: Any, path: str):
attachments_dir = os.path.join(output_dir, "attachments") with open(path, "w", encoding="utf-8") as f:
os.makedirs(attachments_dir, exist_ok=True) json.dump(data, f, indent=2, ensure_ascii=False)
# Construire un nom de fichier sécurisé def extract_ticket_data(self, ticket_id: int, output_dir: str):
filename = re.sub(r'[^\w\.-]', '_', attachment["name"])
file_path = os.path.join(attachments_dir, filename)
# Décoder et sauvegarder le contenu
try:
file_content = base64.b64decode(attachment["datas"])
with open(file_path, "wb") as f:
f.write(file_content)
return file_path
except Exception as e:
print(f"Erreur lors du téléchargement de la pièce jointe {attachment['name']}: {str(e)}")
return ""
def organize_messages_by_thread(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Organise les messages en fils de discussion.
Args:
messages: Liste des messages à organiser
Returns:
Liste des messages racines avec leurs réponses imbriquées.
"""
# Créer un dictionnaire pour stocker tous les messages par ID
messages_by_id = {msg["id"]: {**msg, "replies": []} for msg in messages}
# Identifier les messages racines et ajouter les réponses aux parents
root_messages = []
for msg_id, msg in messages_by_id.items():
if not msg.get("parent_id") or msg["parent_id"] == 0:
root_messages.append(msg)
else:
parent_id = msg["parent_id"]
if parent_id in messages_by_id:
messages_by_id[parent_id]["replies"].append(msg)
# Trier les messages racines par date
root_messages.sort(key=lambda m: m.get("date", ""))
return root_messages
def extract_ticket_data(self, ticket_id: int, output_dir: str) -> Dict[str, Any]:
"""
Extrait toutes les données d'un ticket, y compris messages et pièces jointes.
Args:
ticket_id: ID du ticket
output_dir: Répertoire de sortie
Returns:
Dictionnaire contenant les chemins des fichiers créés.
"""
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
ticket = self.get_ticket_by_id(ticket_id)
if not ticket: ticket_data = self.get_ticket(ticket_id)
return {"error": f"Ticket {ticket_id} non trouvé"} self.save_json(ticket_data, os.path.join(output_dir, "ticket_info.json"))
# Récupération des messages associés au ticket
messages = self.get_ticket_messages(ticket_id) messages = self.get_ticket_messages(ticket_id)
# Organisation des messages en fils de discussion # Save messages in raw and cleaned formats
thread_messages = self.organize_messages_by_thread(messages) self.save_json(messages, os.path.join(output_dir, "messages_raw.json"))
# Récupération des pièces jointes cleaned_messages = []
attachments = self.get_ticket_attachments(ticket_id) for msg in messages:
attachment_files = [] cleaned_messages.append({
"message_id": msg["id"],
"sender": msg["author_id"][1] if msg["author_id"] else "Unknown",
"timestamp": msg["date"],
"content": self.clean_html(msg["body"])
})
# Téléchargement des pièces jointes self.save_json(cleaned_messages, os.path.join(output_dir, "all_messages.json"))
for attachment in attachments:
file_path = self.download_attachment(attachment, output_dir)
if file_path:
# Supprimer les données binaires avant de sauvegarder dans le JSON
attachment_info = {k: v for k, v in attachment.items() if k != "datas"}
attachment_info["local_path"] = file_path
attachment_files.append(attachment_info)
# Constitution des données complètes du ticket # Generate structure.json
ticket_data = { structure = {
**ticket, "date_extraction": datetime.now().isoformat(),
"messages": messages, "ticket_dir": output_dir,
"threads": thread_messages, "fichiers_json": [
"attachments": attachment_files "ticket_info.json",
"messages_raw.json",
"all_messages.json"
]
} }
self.save_json(structure, os.path.join(output_dir, "structure.json"))
# Sauvegarde des données du ticket dans un fichier JSON def clean_html(self, html_content: str) -> str:
ticket_path = os.path.join(output_dir, "ticket_data.json") import re
with open(ticket_path, "w", encoding="utf-8") as f: from html import unescape
json.dump(ticket_data, f, indent=2, ensure_ascii=False)
# Sauvegarder séparément les messages pour compatibilité text = re.sub(r'<.*?>', '', html_content)
messages_path = os.path.join(output_dir, "messages.json") text = unescape(text)
with open(messages_path, "w", encoding="utf-8") as f: text = re.sub(r'\s+', ' ', text).strip()
json.dump({"ticket": ticket, "messages": messages}, f, indent=2, ensure_ascii=False)
# Journal d'extraction pour référence return text
log_path = os.path.join(output_dir, "extraction_log.txt")
with open(log_path, "w", encoding="utf-8") as f:
f.write(f"Extraction du ticket {ticket_id} le {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Nom du ticket: {ticket.get('name', 'N/A')}\n")
f.write(f"Nombre de messages: {len(messages)}\n")
f.write(f"Nombre de pièces jointes: {len(attachments)}\n")
print(f"Données complètes sauvegardées dans {ticket_path}")
print(f"Pièces jointes ({len(attachment_files)}) sauvegardées dans {os.path.join(output_dir, 'attachments')}")
# Retourner un dictionnaire contenant les informations du ticket
return {
"ticket_info": ticket,
"messages_file": messages_path,
"ticket_data_file": ticket_path,
"attachments": attachment_files,
"log_file": log_path
}
if __name__ == "__main__": if __name__ == "__main__":
import sys import sys
if len(sys.argv) < 2: if len(sys.argv) < 2:
print("Usage: python retrieve_ticket.py <ticket_code>") print("Usage: python ticket_manager2.py <ticket_id>")
sys.exit(1) sys.exit(1)
ticket_code = sys.argv[1] ticket_id = int(sys.argv[1])
output_dir = f"output/ticket_{ticket_code}"
config = { # Configuration Odoo
"url": "https://odoo.example.com", url = "https://odoo.cbao.fr"
"db": "your_db_name", db = "database_name"
"username": "your_username", username = "username"
"api_key": "your_api_key" api_key = "api_key"
}
manager = TicketManager(config["url"], config["db"], config["username"], config["api_key"]) manager = TicketManager(url, db, username, api_key)
if manager.login():
ticket = manager.get_ticket_by_code(ticket_code) if not manager.login():
if ticket: print("Échec de connexion à Odoo")
result = manager.extract_ticket_data(ticket["id"], output_dir) sys.exit(1)
print(f"Extraction terminée. Données disponibles dans {output_dir}")
else: output_dir = f"T11067_analysis/ticket_structure"
print(f"Ticket avec code {ticket_code} non trouvé.") manager.extract_ticket_data(ticket_id, output_dir)
print("Extraction terminée avec succès.")