prefiltrage

This commit is contained in:
Ladebeze66 2025-04-03 09:04:52 +02:00
parent da29526412
commit 7520372f7a
9 changed files with 225 additions and 901 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 62 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 45 KiB

View File

@ -1,23 +0,0 @@
[
{
"id": 144796,
"name": "image.png",
"mimetype": "image/png",
"create_date": "2025-03-18 14:18:51",
"file_path": "output/ticket_T11067/attachments/144796_image.png"
},
{
"id": 144794,
"name": "image004.jpg",
"mimetype": "image/jpeg",
"create_date": "2025-03-18 13:22:27",
"file_path": "output/ticket_T11067/attachments/144794_image004.jpg"
},
{
"id": 144792,
"name": "image003.png",
"mimetype": "image/png",
"create_date": "2025-03-18 13:22:27",
"file_path": "output/ticket_T11067/attachments/144792_image003.png"
}
]

File diff suppressed because one or more lines are too long

View File

@ -1,146 +0,0 @@
{
"id": 11046,
"active": true,
"name": "changement nom centrale d'enrobage",
"description": "<p><br></p>",
"sequence": 10,
"stage_id": [
8,
"Clôturé"
],
"kanban_state": "normal",
"create_date": "2025-03-18 13:22:27",
"write_date": "2025-04-02 07:16:48",
"date_start": "2025-03-18 13:22:28",
"date_end": false,
"date_assign": "2025-03-18 13:42:04",
"date_deadline": "2025-04-02",
"date_last_stage_update": "2025-04-02 07:16:48",
"project_id": [
3,
"Demandes"
],
"notes": false,
"planned_hours": 0.0,
"user_id": [
32,
"Romuald GRUSON"
],
"partner_id": [
5144,
"CONSEIL DEPARTEMENTAL DU MORBIHAN (56), Dominique CARVAL"
],
"company_id": [
1,
"CBAO S.A.R.L."
],
"color": 0,
"displayed_image_id": false,
"parent_id": false,
"child_ids": [],
"email_from": "CARVAL Dominique <dominique.carval@morbihan.fr>",
"email_cc": "",
"working_hours_open": 0.0,
"working_hours_close": 0.0,
"working_days_open": 0.0,
"working_days_close": 0.0,
"website_message_ids": [
227731,
227725
],
"remaining_hours": 0.0,
"effective_hours": 0.0,
"total_hours_spent": 0.0,
"progress": 0.0,
"subtask_effective_hours": 0.0,
"timesheet_ids": [],
"priority": "0",
"code": "T11067",
"milestone_id": false,
"sale_line_id": false,
"sale_order_id": false,
"billable_type": "no",
"activity_ids": [],
"message_follower_ids": [
89590,
89592,
89593
],
"message_ids": [
228803,
227733,
227732,
227731,
227730,
227728,
227726,
227725,
227724
],
"message_main_attachment_id": [
144792,
"image003.png"
],
"failed_message_ids": [],
"rating_ids": [],
"rating_last_value": 0.0,
"access_token": "3295983b-a3aa-4d8c-817d-2332829ca264",
"create_uid": [
1,
"OdooBot"
],
"write_uid": [
32,
"Romuald GRUSON"
],
"x_CBAO_windows_maj_ID": false,
"x_CBAO_version_signalement": false,
"x_CBAO_version_correction": false,
"x_CBAO_DateCorrection": false,
"x_CBAO_Scoring_Facilite": 0,
"x_CBAO_Scoring_Importance": 0,
"x_CBAO_Scoring_Urgence": 0,
"x_CBAO_Scoring_Incidence": 0,
"x_CBAO_Scoring_Resultat": 0,
"x_CBAO_InformationsSup": false,
"kanban_state_label": "En cours",
"subtask_planned_hours": 0.0,
"manager_id": [
22,
"Fabien LAFAY"
],
"user_email": "romuald@mail.cbao.fr",
"attachment_ids": [],
"legend_blocked": "Bloqué",
"legend_done": "Prêt pour la prochaine étape",
"legend_normal": "En cours",
"subtask_project_id": [
3,
"Demandes"
],
"subtask_count": 0,
"analytic_account_active": true,
"allow_timesheets": true,
"use_milestones": false,
"show_time_control": "start",
"is_project_map_empty": true,
"activity_state": false,
"activity_user_id": false,
"activity_type_id": false,
"activity_date_deadline": false,
"activity_summary": false,
"message_is_follower": false,
"message_unread": false,
"message_unread_counter": 0,
"message_needaction": false,
"message_needaction_counter": 0,
"message_has_error": false,
"message_has_error_counter": 0,
"message_attachment_count": 3,
"rating_last_feedback": false,
"rating_count": 0,
"access_url": "/my/task/11046",
"access_warning": "",
"display_name": "[T11067] changement nom centrale d'enrobage",
"__last_update": "2025-04-02 07:16:48"
}

View File

@ -124,16 +124,16 @@ def main():
sys.exit(1) sys.exit(1)
print(f"Extraction terminée avec succès") print(f"Extraction terminée avec succès")
print(f"- Informations du ticket: {result['files']['ticket_info']}") print(f"- Informations du ticket: {result['ticket_info']}")
print(f"- Messages: {result['files']['messages']}") print(f"- Messages: {result['messages_file']}")
print(f"- Pièces jointes: {result['files']['attachments_info']}") print(f"- Données complètes: {result['ticket_data_file']}")
print(f"- Dossier des pièces jointes: {result['files']['attachments_dir']}") print(f"- Pièces jointes: {len(result['attachments'])} fichiers")
# Afficher un résumé # Afficher un résumé
print(f"\nRésumé du ticket {args.ticket_code}:") print(f"\nRésumé du ticket {args.ticket_code}:")
print(f"- Nom: {ticket.get('name', 'N/A')}") print(f"- Nom: {ticket.get('name', 'N/A')}")
print(f"- Messages: {len(result['messages'])}") print(f"- Description: {ticket.get('description', 'N/A')[:100]}...")
print(f"- Pièces jointes: {len(result['attachments'])}") print(f"- Étape: {ticket.get('stage_id', ['N/A'])[1] if isinstance(ticket.get('stage_id'), (list, tuple)) else 'N/A'}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,13 +1,12 @@
"""
Module pour gérer l'extraction de tickets depuis Odoo.
Cette version est simplifiée et indépendante de odoo_toolkit.
"""
import os import os
import json import json
import base64 import base64
from typing import Dict, List, Any, Optional from typing import Dict, List, Any, Optional
import requests import requests
import re
from html import unescape
from datetime import datetime
class TicketManager: class TicketManager:
""" """
@ -15,35 +14,17 @@ class TicketManager:
""" """
def __init__(self, url: str, db: str, username: str, api_key: str): def __init__(self, url: str, db: str, username: str, api_key: str):
"""
Initialise le gestionnaire de tickets avec les paramètres de connexion.
Args:
url: URL du serveur Odoo
db: Nom de la base de données
username: Nom d'utilisateur
api_key: Clé API ou mot de passe
"""
self.url = url self.url = url
self.db = db self.db = db
self.username = username self.username = username
self.api_key = api_key self.api_key = api_key
self.uid = None self.uid = None
self.session_id = None self.session_id = None
self.model_name = "project.task" # Modèle par défaut pour les tickets self.model_name = "project.task"
def login(self) -> bool: def login(self) -> bool:
"""
Établit la connexion au serveur Odoo.
Returns:
True si la connexion réussit, False sinon
"""
try: try:
# Point d'entrée pour le login
login_url = f"{self.url}/web/session/authenticate" login_url = f"{self.url}/web/session/authenticate"
# Données pour la requête de login
login_data = { login_data = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"params": { "params": {
@ -52,70 +33,35 @@ class TicketManager:
"password": self.api_key "password": self.api_key
} }
} }
# Effectuer la requête
response = requests.post(login_url, json=login_data) response = requests.post(login_url, json=login_data)
response.raise_for_status() response.raise_for_status()
# Extraire les résultats
result = response.json() result = response.json()
if result.get("error"): if result.get("error"):
print(f"Erreur de connexion: {result['error']['message']}") print(f"Erreur de connexion: {result['error']['message']}")
return False return False
# Récupérer l'ID utilisateur et la session
self.uid = result.get("result", {}).get("uid") self.uid = result.get("result", {}).get("uid")
self.session_id = response.cookies.get("session_id") self.session_id = response.cookies.get("session_id")
if not self.uid: if not self.uid:
print("Erreur: Impossible de récupérer l'ID utilisateur") print("Erreur: Impossible de récupérer l'ID utilisateur")
return False return False
print(f"Connecté avec succès à {self.url} (User ID: {self.uid})") print(f"Connecté avec succès à {self.url} (User ID: {self.uid})")
return True return True
except Exception as e: except Exception as e:
print(f"Erreur de connexion: {str(e)}") print(f"Erreur de connexion: {str(e)}")
return False return False
def _ensure_connection(self) -> bool:
"""
Vérifie que la connexion est établie, tente de se reconnecter si nécessaire.
Returns:
True si la connexion est disponible, False sinon
"""
if not self.uid or not self.session_id:
return self.login()
return True
def _rpc_call(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]: def _rpc_call(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
""" if not self.uid and not self.login():
Effectue un appel RPC vers le serveur Odoo.
Args:
endpoint: Point d'entrée de l'API (/web/dataset/call_kw, etc.)
params: Paramètres de la requête
Returns:
Résultat de la requête
"""
if not self._ensure_connection():
return {"error": "Non connecté"} return {"error": "Non connecté"}
try: try:
# Préparer la requête
full_url = f"{self.url}{endpoint}" full_url = f"{self.url}{endpoint}"
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
data = {"jsonrpc": "2.0", "method": "call", "params": params}
# Données de la requête
data = {
"jsonrpc": "2.0",
"method": "call",
"params": params
}
# Effectuer la requête
response = requests.post( response = requests.post(
full_url, full_url,
json=data, json=data,
@ -123,231 +69,165 @@ class TicketManager:
cookies={"session_id": self.session_id} if self.session_id else None cookies={"session_id": self.session_id} if self.session_id else None
) )
response.raise_for_status() response.raise_for_status()
# Traiter la réponse
result = response.json() result = response.json()
if result.get("error"): if result.get("error"):
return {"error": result["error"]["message"]} return {"error": result["error"]["message"]}
return result.get("result", {}) return result.get("result", {})
except Exception as e: except Exception as e:
return {"error": str(e)} return {"error": str(e)}
def search_read(self, model: str, domain: List, fields: List[str], limit: int = 0) -> List[Dict[str, Any]]: def search_read(self, model: str, domain: List, fields: List[str], order: Optional[str] = None, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Recherche et lit des enregistrements selon un domaine.
Args:
model: Nom du modèle
domain: Domaine de recherche
fields: Champs à récupérer
limit: Nombre max de résultats (0 pour illimité)
Returns:
Liste des enregistrements trouvés
"""
params = { params = {
"model": model, "model": model,
"method": "search_read", "method": "search_read",
"args": [domain, fields], "args": [domain, fields],
"kwargs": {"limit": limit}
}
result = self._rpc_call("/web/dataset/call_kw", params)
if isinstance(result, dict) and "error" in result:
print(f"Erreur lors de la recherche: {result['error']}")
return []
return result if isinstance(result, list) else []
def read(self, model: str, ids: List[int], fields: List[str]) -> List[Dict[str, Any]]:
"""
Lit des enregistrements par leurs IDs.
Args:
model: Nom du modèle
ids: Liste des IDs à lire
fields: Champs à récupérer
Returns:
Liste des enregistrements lus
"""
params = {
"model": model,
"method": "read",
"args": [ids, fields],
"kwargs": {} "kwargs": {}
} }
result = self._rpc_call("/web/dataset/call_kw", params) if order is not None:
if isinstance(result, dict) and "error" in result: params["kwargs"]["order"] = order
print(f"Erreur lors de la lecture: {result['error']}") if limit is not None:
return [] params["kwargs"]["limit"] = limit
result = self._rpc_call("/web/dataset/call_kw", params)
return result if isinstance(result, list) else []
def read(self, model: str, ids: List[int], fields: List[str]) -> List[Dict[str, Any]]:
params = {"model": model, "method": "read", "args": [ids, fields], "kwargs": {}}
result = self._rpc_call("/web/dataset/call_kw", params)
return result if isinstance(result, list) else [] return result if isinstance(result, list) else []
def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]: def get_ticket_by_code(self, ticket_code: str) -> Dict[str, Any]:
""" tickets = self.search_read(self.model_name, [("code", "=", ticket_code)], ["id"])
Récupère un ticket par son code.
Args:
ticket_code: Code du ticket à récupérer
Returns:
Données du ticket ou dictionnaire vide si non trouvé
"""
# Rechercher l'ID du ticket par son code
tickets = self.search_read(
model=self.model_name,
domain=[("code", "=", ticket_code)],
fields=["id"],
limit=1
)
if not tickets: if not tickets:
print(f"Aucun ticket trouvé avec le code {ticket_code}")
return {} return {}
return self.get_ticket_by_id(tickets[0]["id"])
# Récupérer toutes les données du ticket
ticket_id = tickets[0]["id"]
return self.get_ticket_by_id(ticket_id)
def get_ticket_by_id(self, ticket_id: int) -> Dict[str, Any]: def get_ticket_by_id(self, ticket_id: int) -> Dict[str, Any]:
""" ticket_fields = [
Récupère un ticket par son ID. "id", "name", "description", "stage_id", "user_id", "partner_id",
"create_date", "write_date", "date_deadline", "priority",
Args: "tag_ids", "code", "project_id", "kanban_state", "color",
ticket_id: ID du ticket à récupérer "active", "company_id", "display_name"
Returns:
Données du ticket ou dictionnaire vide si non trouvé
"""
# Récupérer les champs disponibles pour le modèle
fields_info = self._get_model_fields(self.model_name)
# Lire les données du ticket
tickets = self.read(
model=self.model_name,
ids=[ticket_id],
fields=fields_info
)
if not tickets:
print(f"Aucun ticket trouvé avec l'ID {ticket_id}")
return {}
return tickets[0]
def _get_model_fields(self, model_name: str) -> List[str]:
"""
Récupère la liste des champs disponibles pour un modèle.
Args:
model_name: Nom du modèle
Returns:
Liste des noms de champs
"""
params = {
"model": model_name,
"method": "fields_get",
"args": [],
"kwargs": {"attributes": ["name", "type"]}
}
result = self._rpc_call("/web/dataset/call_kw", params)
if "error" in result:
print(f"Erreur lors de la récupération des champs: {result['error']}")
return []
# Filtrer les types de champs problématiques
invalid_types = ["many2many", "binary"]
valid_fields = [
field for field, info in result.items()
if info.get("type") not in invalid_types
] ]
tickets = self.read(self.model_name, [ticket_id], ticket_fields)
return valid_fields return tickets[0] if tickets else {}
def get_ticket_messages(self, ticket_id: int) -> List[Dict[str, Any]]: def get_ticket_messages(self, ticket_id: int) -> List[Dict[str, Any]]:
messages = self.search_read(
"mail.message",
[
("res_id", "=", ticket_id),
("model", "=", self.model_name),
("message_type", "in", ["comment", "notification", "email"])
],
["id", "body", "date", "author_id", "email_from", "message_type", "parent_id", "subtype_id", "tracking_value_ids"],
order="date asc"
)
return self._clean_messages(messages)
def _clean_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
cleaned_messages = []
for message in messages:
if message.get("body"):
body = message["body"]
body = unescape(body)
# Stocker la version HTML
message["body_html"] = body
# Créer une version texte nettoyée
body_text = re.sub(r'<style.*?>.*?</style>', '', body, flags=re.DOTALL)
body_text = re.sub(r'<script.*?>.*?</script>', '', body_text, flags=re.DOTALL)
body_text = re.sub(r'<[^>]+>', ' ', body_text)
body_text = re.sub(r'\s+', ' ', body_text).strip()
message["body_text"] = body_text
# Organiser les messages en fils de discussion
if message.get("parent_id"):
parent_id = message["parent_id"][0] if isinstance(message["parent_id"], (list, tuple)) else message["parent_id"]
message["parent_id"] = parent_id
cleaned_messages.append(message)
return cleaned_messages
def get_ticket_attachments(self, ticket_id: int) -> List[Dict[str, Any]]:
""" """
Récupère les messages d'un ticket. Récupère les pièces jointes associées à un ticket.
Args: Args:
ticket_id: ID du ticket ticket_id: ID du ticket
Returns: Returns:
Liste des messages du ticket Liste des pièces jointes avec leurs métadonnées.
""" """
# D'abord récupérer les IDs des messages
ticket = self.read(
model=self.model_name,
ids=[ticket_id],
fields=["message_ids"]
)
if not ticket or "message_ids" not in ticket[0]:
print(f"Impossible de récupérer les messages pour le ticket {ticket_id}")
return []
message_ids = ticket[0]["message_ids"]
# Récupérer les détails des messages
messages = self.read(
model="mail.message",
ids=message_ids,
fields=["id", "body", "date", "author_id", "email_from", "subject", "parent_id"]
)
return messages
def get_ticket_attachments(self, ticket_id: int, download_path: Optional[str] = None) -> List[Dict[str, Any]]:
"""
Récupère les pièces jointes d'un ticket, avec option de téléchargement.
Args:
ticket_id: ID du ticket
download_path: Chemin télécharger les pièces jointes (optionnel)
Returns:
Liste des informations sur les pièces jointes
"""
# Rechercher les pièces jointes liées au ticket
attachments = self.search_read( attachments = self.search_read(
model="ir.attachment", "ir.attachment",
domain=[("res_model", "=", self.model_name), ("res_id", "=", ticket_id)], [
fields=["id", "name", "mimetype", "create_date", "datas"] ("res_id", "=", ticket_id),
("res_model", "=", self.model_name)
],
["id", "name", "mimetype", "file_size", "create_date", "create_uid", "datas", "description"]
) )
if not attachments:
print(f"Aucune pièce jointe trouvée pour le ticket {ticket_id}")
return []
if download_path:
# Créer le répertoire si nécessaire
os.makedirs(download_path, exist_ok=True)
# Télécharger chaque pièce jointe
for attachment in attachments:
if "datas" in attachment and attachment["datas"]:
# Déchiffrer les données base64
binary_data = base64.b64decode(attachment["datas"])
# Nettoyer le nom de fichier
safe_name = attachment["name"].replace("/", "_").replace("\\", "_")
file_path = os.path.join(download_path, f"{attachment['id']}_{safe_name}")
# Sauvegarder le fichier
with open(file_path, "wb") as f:
f.write(binary_data)
# Remplacer les données binaires par le chemin du fichier
attachment["file_path"] = file_path
del attachment["datas"]
return attachments return attachments
def download_attachment(self, attachment: Dict[str, Any], output_dir: str) -> str:
"""
Télécharge et sauvegarde une pièce jointe dans le répertoire spécifié.
Args:
attachment: Dictionnaire contenant les métadonnées de la pièce jointe
output_dir: Répertoire sauvegarder la pièce jointe
Returns:
Chemin du fichier sauvegardé
"""
if not attachment.get("datas"):
return ""
# Créer le dossier attachments s'il n'existe pas
attachments_dir = os.path.join(output_dir, "attachments")
os.makedirs(attachments_dir, exist_ok=True)
# Construire un nom de fichier sécurisé
filename = re.sub(r'[^\w\.-]', '_', attachment["name"])
file_path = os.path.join(attachments_dir, filename)
# Décoder et sauvegarder le contenu
try:
file_content = base64.b64decode(attachment["datas"])
with open(file_path, "wb") as f:
f.write(file_content)
return file_path
except Exception as e:
print(f"Erreur lors du téléchargement de la pièce jointe {attachment['name']}: {str(e)}")
return ""
def organize_messages_by_thread(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Organise les messages en fils de discussion.
Args:
messages: Liste des messages à organiser
Returns:
Liste des messages racines avec leurs réponses imbriquées.
"""
# Créer un dictionnaire pour stocker tous les messages par ID
messages_by_id = {msg["id"]: {**msg, "replies": []} for msg in messages}
# Identifier les messages racines et ajouter les réponses aux parents
root_messages = []
for msg_id, msg in messages_by_id.items():
if not msg.get("parent_id") or msg["parent_id"] == 0:
root_messages.append(msg)
else:
parent_id = msg["parent_id"]
if parent_id in messages_by_id:
messages_by_id[parent_id]["replies"].append(msg)
# Trier les messages racines par date
root_messages.sort(key=lambda m: m.get("date", ""))
return root_messages
def extract_ticket_data(self, ticket_id: int, output_dir: str) -> Dict[str, Any]: def extract_ticket_data(self, ticket_id: int, output_dir: str) -> Dict[str, Any]:
""" """
Extrait toutes les données d'un ticket, y compris messages et pièces jointes. Extrait toutes les données d'un ticket, y compris messages et pièces jointes.
@ -357,377 +237,93 @@ class TicketManager:
output_dir: Répertoire de sortie output_dir: Répertoire de sortie
Returns: Returns:
Dictionnaire avec toutes les données du ticket Dictionnaire contenant les chemins des fichiers créés.
""" """
# Créer le répertoire de sortie
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
# Récupérer les données du ticket
ticket = self.get_ticket_by_id(ticket_id) ticket = self.get_ticket_by_id(ticket_id)
if not ticket: if not ticket:
return {"error": f"Ticket {ticket_id} non trouvé"} return {"error": f"Ticket {ticket_id} non trouvé"}
# Sauvegarder les données du ticket # Récupération des messages associés au ticket
ticket_path = os.path.join(output_dir, "ticket_info.json")
with open(ticket_path, "w", encoding="utf-8") as f:
json.dump(ticket, f, indent=2, ensure_ascii=False)
# Récupérer et sauvegarder les messages
messages = self.get_ticket_messages(ticket_id) messages = self.get_ticket_messages(ticket_id)
# Nettoyer le contenu HTML des messages # Organisation des messages en fils de discussion
cleaned_messages = self._clean_messages(messages) thread_messages = self.organize_messages_by_thread(messages)
# Récupération des pièces jointes
attachments = self.get_ticket_attachments(ticket_id)
attachment_files = []
# Téléchargement des pièces jointes
for attachment in attachments:
file_path = self.download_attachment(attachment, output_dir)
if file_path:
# Supprimer les données binaires avant de sauvegarder dans le JSON
attachment_info = {k: v for k, v in attachment.items() if k != "datas"}
attachment_info["local_path"] = file_path
attachment_files.append(attachment_info)
# Constitution des données complètes du ticket
ticket_data = {
**ticket,
"messages": messages,
"threads": thread_messages,
"attachments": attachment_files
}
# Sauvegarde des données du ticket dans un fichier JSON
ticket_path = os.path.join(output_dir, "ticket_data.json")
with open(ticket_path, "w", encoding="utf-8") as f:
json.dump(ticket_data, f, indent=2, ensure_ascii=False)
# Sauvegarder séparément les messages pour compatibilité
messages_path = os.path.join(output_dir, "messages.json") messages_path = os.path.join(output_dir, "messages.json")
with open(messages_path, "w", encoding="utf-8") as f: with open(messages_path, "w", encoding="utf-8") as f:
json.dump(cleaned_messages, f, indent=2, ensure_ascii=False) json.dump({"ticket": ticket, "messages": messages}, f, indent=2, ensure_ascii=False)
# Récupérer et sauvegarder les pièces jointes # Journal d'extraction pour référence
attachments_dir = os.path.join(output_dir, "attachments") log_path = os.path.join(output_dir, "extraction_log.txt")
attachments = self.get_ticket_attachments(ticket_id, attachments_dir) with open(log_path, "w", encoding="utf-8") as f:
attachments_path = os.path.join(output_dir, "attachments_info.json") f.write(f"Extraction du ticket {ticket_id} le {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
with open(attachments_path, "w", encoding="utf-8") as f: f.write(f"Nom du ticket: {ticket.get('name', 'N/A')}\n")
json.dump(attachments, f, indent=2, ensure_ascii=False) f.write(f"Nombre de messages: {len(messages)}\n")
f.write(f"Nombre de pièces jointes: {len(attachments)}\n")
# Compiler toutes les informations print(f"Données complètes sauvegardées dans {ticket_path}")
result = { print(f"Pièces jointes ({len(attachment_files)}) sauvegardées dans {os.path.join(output_dir, 'attachments')}")
"ticket": ticket,
"messages": cleaned_messages, # Retourner un dictionnaire contenant les informations du ticket
"attachments": [ return {
{k: v for k, v in a.items() if k != "datas"} "ticket_info": ticket,
for a in attachments "messages_file": messages_path,
], "ticket_data_file": ticket_path,
"files": { "attachments": attachment_files,
"ticket_info": ticket_path, "log_file": log_path
"messages": messages_path,
"attachments_info": attachments_path,
"attachments_dir": attachments_dir
} }
}
return result
def _clean_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Nettoie le contenu HTML des messages et filtre les messages indésirables.
Args:
messages: Liste des messages à nettoyer
Returns:
Liste des messages nettoyés
"""
import re
from html import unescape
# Restauration du contenu original du premier message
original_content = None
for message in messages:
if message.get("body") and isinstance(message.get("body"), str) and not message.get("author_id", [0])[0] == 2:
if original_content is None and "body_original" in message:
# Récupérer le corps original du message initial (généralement la demande client)
body_original = message["body_original"]
# Extraire le contenu de la question initiale
if body_original:
# Suppression des balises d'image avec leurs attributs
content = re.sub(r'<img[^>]*?>', '', body_original)
# Supprimer les balises de style et script
content = re.sub(r'<style[^>]*?>.*?</style>', '', content, flags=re.DOTALL)
content = re.sub(r'<script[^>]*?>.*?</script>', '', content, flags=re.DOTALL)
# Supprimer les attributs de style des balises
content = re.sub(r' style="[^"]*"', '', content)
content = re.sub(r' id="[^"]*"', '', content)
content = re.sub(r' class="[^"]*"', '', content)
content = re.sub(r' width="[^"]*"', '', content)
content = re.sub(r' height="[^"]*"', '', content)
content = re.sub(r' border="[^"]*"', '', content)
# Remplacer les balises <p>, <div>, <br> par des sauts de ligne
content = re.sub(r'<(?:p|div)[^>]*?>', '\n', content)
content = re.sub(r'</(?:p|div)>', '\n', content)
content = re.sub(r'<br[^>]*?>', '\n', content)
# Remplacer les listes
content = re.sub(r'<li[^>]*?>', '\n- ', content)
content = re.sub(r'</li>', '', content)
# Remplacer les liens par leur texte et URL
def replace_link(match):
link_text = re.sub(r'<[^>]*?>', '', match.group(2))
href = re.search(r'href="([^"]*)"', match.group(1))
if href and href.group(1) != link_text:
return f"{link_text} ({href.group(1)})"
return link_text
content = re.sub(r'<a([^>]*?)>(.*?)</a>', replace_link, content)
# Supprimer toutes les autres balises HTML
content = re.sub(r'<[^>]*?>', '', content)
# Convertir les entités HTML en caractères correspondants
content = unescape(content)
# Supprimer les signatures et autres textes communs des emails
signatures = [
r'Droit à la déconnexion.*',
r'Ce message électronique et tous les fichiers attachés.*',
r'Direction des Infrastructures.*',
r'Service d\'Appui aux Politiques d\'Aménagement.*',
r'tél :.*',
r'mobile :.*',
]
for sig_pattern in signatures:
content = re.sub(sig_pattern, '', content, flags=re.DOTALL | re.IGNORECASE)
# Supprimer les espaces et sauts de ligne multiples
content = re.sub(r'\n\s*\n', '\n\n', content)
content = re.sub(r' +', ' ', content)
# Supprimer les espaces en début et fin de chaîne
content = content.strip()
original_content = content
message["body"] = content
break # On arrête après avoir traité le premier message client
cleaned_messages = []
for message in messages:
# Ignorer les messages d'OdooBot
if message.get("author_id") and message["author_id"][0] == 2 and message["author_id"][1] == "OdooBot":
continue
# Ignorer les messages vides sans contenu
if not message.get("body"):
# Vérifier si c'est un message qui ne contient aucune information utile
if not message.get("subject") and not message.get("email_from"):
continue
cleaned_message = message.copy()
# Nettoyer le contenu du message si existe
if "body" in cleaned_message and cleaned_message["body"]:
# Vérifier que body est une chaîne de caractères
if isinstance(cleaned_message["body"], str):
# Conserver le corps original pour référence
cleaned_message["body_original"] = message["body"]
# Supprimer les balises HTML
body = cleaned_message["body"]
# Si ce n'est pas le premier message et qu'on n'a pas déjà nettoyé
if body != original_content:
# Supprimer les balises d'image avec leurs attributs
body = re.sub(r'<img[^>]*?>', '', body)
# Supprimer les balises de style et script
body = re.sub(r'<style[^>]*?>.*?</style>', '', body, flags=re.DOTALL)
body = re.sub(r'<script[^>]*?>.*?</script>', '', body, flags=re.DOTALL)
# Supprimer les attributs de style des balises
body = re.sub(r' style="[^"]*"', '', body)
body = re.sub(r' id="[^"]*"', '', body)
body = re.sub(r' class="[^"]*"', '', body)
body = re.sub(r' width="[^"]*"', '', body)
body = re.sub(r' height="[^"]*"', '', body)
body = re.sub(r' border="[^"]*"', '', body)
# Remplacer les balises <p>, <div>, <br> par des sauts de ligne
body = re.sub(r'<(?:p|div)[^>]*?>', '\n', body)
body = re.sub(r'</(?:p|div)>', '\n', body)
body = re.sub(r'<br[^>]*?>', '\n', body)
# Remplacer les listes
body = re.sub(r'<li[^>]*?>', '\n- ', body)
body = re.sub(r'</li>', '', body)
# Remplacer les liens par leur texte et URL
def replace_link(match):
link_text = re.sub(r'<[^>]*?>', '', match.group(2))
href = re.search(r'href="([^"]*)"', match.group(1))
if href and href.group(1) != link_text:
return f"{link_text} ({href.group(1)})"
return link_text
body = re.sub(r'<a([^>]*?)>(.*?)</a>', replace_link, body)
# Supprimer toutes les autres balises HTML
body = re.sub(r'<[^>]*?>', '', body)
# Convertir les entités HTML en caractères correspondants
body = unescape(body)
# Supprimer les parties de signature standard et de footer
signatures = [
r'---\s*\n.*Support technique.*',
r'Afin d\'assurer une meilleure traçabilité.*',
r'Confidentialité :.*',
r'Ce message électronique et tous les fichiers attachés.*',
r'Droit à la déconnexion :.*',
r'L\'objectif du Support Technique est de vous aider.*',
r'.*www\.cbao\.fr.*',
r'.*tél.*\+33.*',
r'.*\@.*\.fr.*',
r'<span style=.*',
r'Cordialement,\s*$',
r'Bonne réception\s*$',
r'Bonne journée\s*$',
r'À bientôt\s*$',
r'Bien à vous\s*$',
r'.*@.*\.com.*',
r'.*cid:image.*',
r'.*data:image.*',
r'Mobile : .*',
r'Tél : .*',
r'Téléphone : .*',
r'Mobile : .*',
r'Phone : .*',
r'E-mail : .*',
r'Email : .*',
r'Courriel : .*',
r'Responsable .*',
r'Directeur .*',
r'Directrice .*',
r'Gérant .*',
r'Chef .*',
r'Service .*',
r'Département .*'
]
for sig_pattern in signatures:
body = re.sub(sig_pattern, '', body, flags=re.DOTALL | re.IGNORECASE)
# Supprimer les sections qui contiennent des logos ou des coordonnées
lines = body.split('\n')
filtered_lines = []
signature_section = False
for line in lines:
# Détecter les débuts potentiels de signature
if re.match(r'^\s*-{2,}\s*$', line) or re.match(r'^\s*_{2,}\s*$', line):
signature_section = True
continue
# Si on est dans la signature et la ligne est vide ou contient peu de texte, on la saute
if signature_section and (not line.strip() or len(line.strip()) < 40):
continue
# Si on trouve une ligne avec beaucoup de texte, on quitte le mode signature
if signature_section and len(line.strip()) > 60:
signature_section = False
# Filtrer les lignes qui contiennent probablement une signature
if not signature_section:
filtered_lines.append(line)
body = '\n'.join(filtered_lines)
# Supprimer les espaces et sauts de ligne multiples
body = re.sub(r'\n\s*\n', '\n\n', body)
body = re.sub(r' +', ' ', body)
# Supprimer les espaces en début et fin de chaîne
body = body.strip()
cleaned_message["body"] = body
cleaned_messages.append(cleaned_message)
return cleaned_messages
if __name__ == "__main__": if __name__ == "__main__":
import sys import sys
import argparse
parser = argparse.ArgumentParser(description="Extraction de tickets Odoo") if len(sys.argv) < 2:
parser.add_argument("ticket_code", help="Code du ticket à extraire (ex: T0167)") print("Usage: python retrieve_ticket.py <ticket_code>")
parser.add_argument("--config", default="config.json", help="Chemin vers le fichier de configuration")
parser.add_argument("--output-dir", help="Répertoire de sortie (par défaut: output/ticket_CODE)")
parser.add_argument("--verbose", "-v", action="store_true", help="Afficher plus d'informations")
parser.add_argument("--keep-html", action="store_true", help="Conserver le contenu HTML dans les messages")
parser.add_argument("--no-original", action="store_true", help="Ne pas conserver le corps de message original")
args = parser.parse_args()
# Charger la configuration
try:
with open(args.config, "r", encoding="utf-8") as f:
config = json.load(f)
if args.verbose:
print(f"Configuration chargée depuis {args.config}")
except Exception as e:
print(f"Erreur lors du chargement de la configuration: {e}")
sys.exit(1) sys.exit(1)
# Extraire les informations de connexion ticket_code = sys.argv[1]
odoo_config = config.get("odoo", {}) output_dir = f"output/ticket_{ticket_code}"
url = odoo_config.get("url")
db = odoo_config.get("db")
username = odoo_config.get("username")
api_key = odoo_config.get("api_key")
if not all([url, db, username, api_key]): config = {
print("Informations de connexion Odoo manquantes dans le fichier de configuration") "url": "https://odoo.example.com",
sys.exit(1) "db": "your_db_name",
"username": "your_username",
"api_key": "your_api_key"
}
# Définir le répertoire de sortie manager = TicketManager(config["url"], config["db"], config["username"], config["api_key"])
output_dir = args.output_dir or os.path.join(config.get("output_dir", "output"), f"ticket_{args.ticket_code}") if manager.login():
ticket = manager.get_ticket_by_code(ticket_code)
# Créer et connecter le gestionnaire de tickets if ticket:
ticket_manager = TicketManager(url, db, username, api_key) result = manager.extract_ticket_data(ticket["id"], output_dir)
print(f"Extraction terminée. Données disponibles dans {output_dir}")
# Personnaliser le nettoyage des messages HTML si demandé else:
if args.keep_html: print(f"Ticket avec code {ticket_code} non trouvé.")
# Remplacer la méthode de nettoyage par une qui ne fait rien
ticket_manager._clean_messages = lambda messages: [
{**msg, "body_original": msg["body"] if isinstance(msg.get("body"), str) else msg.get("body")}
for msg in messages
]
elif args.no_original:
# Modifier la méthode pour ne pas conserver le corps original
original_clean_method = ticket_manager._clean_messages
ticket_manager._clean_messages = lambda messages: [
{k: v for k, v in msg.items() if k != "body_original"}
for msg in original_clean_method(messages)
]
if not ticket_manager.login():
print("Échec de la connexion à Odoo")
sys.exit(1)
# Récupérer le ticket
if args.verbose:
print(f"Recherche du ticket {args.ticket_code}...")
ticket = ticket_manager.get_ticket_by_code(args.ticket_code)
if not ticket:
print(f"Ticket {args.ticket_code} non trouvé")
sys.exit(1)
if args.verbose:
print(f"Ticket {args.ticket_code} trouvé (ID: {ticket.get('id')})")
print(f"Extraction des données vers {output_dir}...")
# Extraire et sauvegarder toutes les données
result = ticket_manager.extract_ticket_data(ticket["id"], output_dir)
if "error" in result:
print(f"Erreur: {result['error']}")
sys.exit(1)
print(f"Extraction terminée avec succès")
print(f"- Informations du ticket: {result['files']['ticket_info']}")
print(f"- Messages: {result['files']['messages']}")
print(f"- Pièces jointes: {result['files']['attachments_info']}")
print(f"- Dossier des pièces jointes: {result['files']['attachments_dir']}")
# Afficher un résumé
print(f"\nRésumé du ticket {args.ticket_code}:")
print(f"- Nom: {ticket.get('name', 'N/A')}")
print(f"- Messages: {len(result['messages'])}")
print(f"- Pièces jointes: {len(result['attachments'])}")