mirror of
https://github.com/Ladebeze66/llm_lab.git
synced 2025-12-13 10:46:50 +01:00
177 lines
6.7 KiB
Python
177 lines
6.7 KiB
Python
import json
|
|
import time
|
|
from typing import Dict, List, Any, Optional
|
|
from datetime import datetime
|
|
|
|
from agents.base_agent import Agent
|
|
from core.factory import LLMFactory
|
|
|
|
class AgentAnalyseJSON(Agent):
|
|
"""
|
|
Agent pour analyser des données JSON
|
|
"""
|
|
def __init__(self, nom: str = "AgentAnalyseJSON", modele: str = "mistral7b"):
|
|
"""
|
|
Initialisation de l'agent d'analyse JSON
|
|
|
|
Args:
|
|
nom: Nom de l'agent
|
|
modele: Identifiant du modèle LLM à utiliser (par défaut: mistral7b)
|
|
"""
|
|
super().__init__(nom)
|
|
|
|
# Création du modèle via la factory
|
|
self.llm = LLMFactory.create(modele)
|
|
|
|
# Configuration du modèle
|
|
self.llm.set_role("formateur", {
|
|
"system_prompt": "Tu es un expert en analyse de données JSON. Tu dois extraire des informations pertinentes, identifier des tendances et répondre à des questions sur les données.",
|
|
"params": {
|
|
"temperature": 0.4,
|
|
"top_p": 0.9
|
|
}
|
|
})
|
|
|
|
def executer(self, json_data: Dict[str, Any],
|
|
question: str = "Analyse ces données et extrait les informations principales.") -> Dict[str, Any]:
|
|
"""
|
|
Analyse des données JSON
|
|
|
|
Args:
|
|
json_data: Données JSON à analyser
|
|
question: Question à poser au modèle
|
|
|
|
Returns:
|
|
Résultats de l'analyse sous forme de dictionnaire
|
|
"""
|
|
# Conversion du JSON en chaîne formatée
|
|
json_str = json.dumps(json_data, ensure_ascii=False, indent=2)
|
|
|
|
# Construction du prompt avec le JSON et la question
|
|
prompt = f"{question}\n\nDonnées JSON à analyser:\n```json\n{json_str}\n```"
|
|
|
|
# Interrogation du modèle
|
|
reponse = self.llm.generate(prompt)
|
|
|
|
# Construction du résultat
|
|
resultats = {
|
|
"question": question,
|
|
"reponse": reponse,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"taille_json": len(json_str)
|
|
}
|
|
|
|
# Ajout à l'historique
|
|
self.ajouter_historique("analyse_json", prompt[:200] + "...", reponse[:200] + "...")
|
|
|
|
return resultats
|
|
|
|
def extraire_structure(self, json_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Extrait la structure d'un JSON (clés, types, profondeur)
|
|
|
|
Args:
|
|
json_data: Données JSON à analyser
|
|
|
|
Returns:
|
|
Dictionnaire contenant la structure et des statistiques
|
|
"""
|
|
resultat = {
|
|
"structure": {},
|
|
"statistiques": {
|
|
"nb_cles": 0,
|
|
"profondeur_max": 0,
|
|
"types": {}
|
|
}
|
|
}
|
|
|
|
def explorer_structure(data, chemin="", profondeur=0):
|
|
nonlocal resultat
|
|
|
|
# Mise à jour de la profondeur max
|
|
resultat["statistiques"]["profondeur_max"] = max(resultat["statistiques"]["profondeur_max"], profondeur)
|
|
|
|
if isinstance(data, dict):
|
|
structure = {}
|
|
for cle, valeur in data.items():
|
|
nouveau_chemin = f"{chemin}.{cle}" if chemin else cle
|
|
resultat["statistiques"]["nb_cles"] += 1
|
|
|
|
type_valeur = type(valeur).__name__
|
|
if type_valeur not in resultat["statistiques"]["types"]:
|
|
resultat["statistiques"]["types"][type_valeur] = 0
|
|
resultat["statistiques"]["types"][type_valeur] += 1
|
|
|
|
if isinstance(valeur, (dict, list)):
|
|
structure[cle] = explorer_structure(valeur, nouveau_chemin, profondeur + 1)
|
|
else:
|
|
structure[cle] = type_valeur
|
|
return structure
|
|
|
|
elif isinstance(data, list):
|
|
if data and isinstance(data[0], (dict, list)):
|
|
# Pour les listes de structures complexes, on analyse le premier élément
|
|
return [explorer_structure(data[0], f"{chemin}[0]", profondeur + 1)]
|
|
else:
|
|
# Pour les listes de valeurs simples
|
|
type_elements = "vide" if not data else type(data[0]).__name__
|
|
resultat["statistiques"]["nb_cles"] += 1
|
|
|
|
if "list" not in resultat["statistiques"]["types"]:
|
|
resultat["statistiques"]["types"]["list"] = 0
|
|
resultat["statistiques"]["types"]["list"] += 1
|
|
|
|
return f"list[{type_elements}]"
|
|
else:
|
|
return type(data).__name__
|
|
|
|
resultat["structure"] = explorer_structure(json_data)
|
|
|
|
self.ajouter_historique("extraire_structure", "JSON", resultat)
|
|
return resultat
|
|
|
|
def fusionner_jsons(self, json1: Dict[str, Any], json2: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Fusionne deux structures JSON en conservant les informations des deux
|
|
|
|
Args:
|
|
json1: Premier dictionnaire JSON
|
|
json2: Second dictionnaire JSON
|
|
|
|
Returns:
|
|
Dictionnaire fusionné
|
|
"""
|
|
if not json1:
|
|
return json2
|
|
|
|
if not json2:
|
|
return json1
|
|
|
|
resultat = json1.copy()
|
|
|
|
# Fonction récursive pour fusionner
|
|
def fusionner(dict1, dict2):
|
|
for cle, valeur in dict2.items():
|
|
if cle in dict1:
|
|
# Si les deux sont des dictionnaires, fusion récursive
|
|
if isinstance(dict1[cle], dict) and isinstance(valeur, dict):
|
|
fusionner(dict1[cle], valeur)
|
|
# Si les deux sont des listes, concaténation
|
|
elif isinstance(dict1[cle], list) and isinstance(valeur, list):
|
|
dict1[cle].extend(valeur)
|
|
# Sinon, on garde les deux valeurs dans une liste
|
|
else:
|
|
if not isinstance(dict1[cle], list):
|
|
dict1[cle] = [dict1[cle]]
|
|
if isinstance(valeur, list):
|
|
dict1[cle].extend(valeur)
|
|
else:
|
|
dict1[cle].append(valeur)
|
|
else:
|
|
# Si la clé n'existe pas dans dict1, on l'ajoute simplement
|
|
dict1[cle] = valeur
|
|
|
|
fusionner(resultat, json2)
|
|
self.ajouter_historique("fusionner_jsons", "Fusion de deux JSON", "Fusion réussie")
|
|
|
|
return resultat |