llm_lab_perso/utils/ollama_manager.py
2025-04-21 17:36:30 +02:00

239 lines
10 KiB
Python

"""
Gestionnaire de modèles Ollama pour optimiser le chargement et la bascule entre modèles
"""
import requests
import subprocess
import logging
import platform
import time
import os
import threading
import sys
logger = logging.getLogger("ollama_manager")
class OllamaModelManager:
"""
Classe utilitaire pour gérer efficacement les modèles Ollama:
- Préchargement des modèles
- Basculement entre modèles
- Vérification de l'état des modèles
- Libération des ressources
"""
def __init__(self):
self.base_url = "http://localhost:11434/api"
self.available_models = []
self.running_model = None
self.lock = threading.Lock()
self.last_model_switch_time = 0
self.model_switching_in_progress = False
self.refresh_available_models()
def refresh_available_models(self):
"""Actualise la liste des modèles disponibles dans Ollama"""
try:
response = requests.get(f"{self.base_url}/tags", timeout=5)
if response.status_code == 200:
data = response.json()
self.available_models = [model.get("name", "") for model in data.get("models", [])]
logger.info(f"Modèles Ollama disponibles: {self.available_models}")
else:
logger.warning(f"Impossible de récupérer les modèles: {response.status_code}")
except Exception as e:
logger.error(f"Erreur lors de la récupération des modèles: {str(e)}")
def get_running_model(self):
"""Détecte le modèle Ollama actuellement chargé"""
try:
# Méthode simplifiée et plus fiable - via appel à l'API Ollama
# Les méthodes précédentes avec subprocess peuvent causer des erreurs d'encodage
for model in self.available_models:
try:
response = requests.post(
f"{self.base_url}/generate",
json={"model": model, "prompt": "Hi", "stream": False},
timeout=1.0
)
if response.status_code == 200:
# Un modèle répond rapidement s'il est déjà chargé
self.running_model = model
return model
except Exception:
# Si timeout ou erreur, on continue avec le modèle suivant
pass
except Exception as e:
logger.error(f"Erreur lors de la détection du modèle en cours: {str(e)}")
return None
def ensure_model_loaded(self, model_name, max_wait=120):
"""
S'assure qu'un modèle est chargé en mémoire, prêt à répondre rapidement.
Retourne True si le modèle est chargé avec succès.
"""
with self.lock: # Éviter les conflits entre threads
# Si ce modèle n'est pas disponible, on ne peut pas le charger
if model_name not in self.available_models:
logger.warning(f"Modèle {model_name} non disponible dans Ollama")
return False
# Si le modèle est déjà chargé, rien à faire
current_model = self.get_running_model()
if current_model == model_name:
logger.info(f"Modèle {model_name} déjà chargé et prêt")
return True
# Marquer le début du chargement
self.model_switching_in_progress = True
self.last_model_switch_time = time.time()
# Charger le modèle avec un petit prompt pour forcer le chargement
logger.info(f"Chargement du modèle {model_name}... (délai max: {max_wait}s)")
try:
# Prompt minimal pour charger le modèle
response = requests.post(
f"{self.base_url}/generate",
json={"model": model_name, "prompt": "Hello", "stream": False},
timeout=max_wait
)
if response.status_code == 200:
self.running_model = model_name
logger.info(f"Modèle {model_name} chargé avec succès")
self.model_switching_in_progress = False
return True
else:
logger.error(f"Erreur lors du chargement du modèle {model_name}: {response.status_code}")
self.model_switching_in_progress = False
return False
except requests.exceptions.Timeout:
logger.warning(f"Délai dépassé lors du chargement du modèle {model_name} (timeout: {max_wait}s)")
self.model_switching_in_progress = False
return False
except Exception as e:
logger.error(f"Erreur lors du chargement du modèle {model_name}: {str(e)}")
self.model_switching_in_progress = False
return False
def switch_model(self, model_name, max_wait=60):
"""
Bascule vers un autre modèle.
Pour éviter les délais, cette méthode est non bloquante par défaut
et retourne True si le processus de changement a démarré.
"""
# Vérifier si un basculement est déjà en cours
if self.model_switching_in_progress:
logger.info(f"Basculement vers {model_name} ignoré - un autre basculement est déjà en cours")
return False
# Vérifier si le modèle est déjà chargé
if self.running_model == model_name:
logger.info(f"Modèle {model_name} déjà actif, pas besoin de basculer")
return True
# Éviter les basculements trop fréquents (moins de 10 secondes entre les basculements)
if time.time() - self.last_model_switch_time < 10:
logger.info(f"Basculement vers {model_name} reporté - dernier basculement trop récent")
return False
# Démarrer le chargement dans un thread séparé pour ne pas bloquer l'API
self.model_switching_in_progress = True
threading.Thread(
target=self.ensure_model_loaded,
args=(model_name, max_wait),
daemon=True
).start()
logger.info(f"Basculement vers le modèle {model_name} démarré en arrière-plan")
return True
def preload_models(self, models_list):
"""
Précharge une liste de modèles pour accélérer leur disponibilité.
Utile au démarrage du serveur.
"""
logger.info(f"Démarrage du préchargement des modèles: {', '.join(models_list)}")
# Vérifier quels modèles sont disponibles
self.refresh_available_models()
# Filtrer pour n'inclure que les modèles disponibles
available_models = [model for model in models_list if model in self.available_models]
if len(available_models) != len(models_list):
missing = set(models_list) - set(available_models)
logger.warning(f"Certains modèles demandés ne sont pas disponibles: {', '.join(missing)}")
for model in available_models:
threading.Thread(
target=self.ensure_model_loaded,
args=(model, 180), # Temps d'attente plus long pour le préchargement (3 minutes)
daemon=True
).start()
# Pause pour éviter de surcharger Ollama avec plusieurs chargements simultanés
time.sleep(5)
def is_model_switch_needed(self, requested_model):
"""
Détermine si un basculement de modèle est nécessaire et approprié
"""
# Si le modèle demandé n'est pas disponible, pas besoin de basculer
if requested_model not in self.available_models:
return False
# Si c'est le modèle actuel, pas besoin de basculer
if self.running_model == requested_model:
return False
# Si un basculement est déjà en cours, ne pas en lancer un autre
if self.model_switching_in_progress:
return False
# Si le dernier basculement était récent, éviter de créer du thrashing
if time.time() - self.last_model_switch_time < 10:
return False
# Dans tous les autres cas, le basculement est nécessaire
return True
def _is_server_available(self):
"""Check if Ollama server is available"""
try:
response = requests.get(f"{self.base_url}/tags", timeout=5) # Timeout augmenté
return response.status_code == 200
except requests.exceptions.RequestException:
return False
def is_ollama_available(self):
"""Vérifie si le serveur Ollama est disponible et répond aux requêtes"""
try:
response = requests.get(f"{self.base_url}/tags", timeout=5) # Timeout augmenté
return response.status_code == 200
except Exception:
return False
# Singleton pour utilisation dans l'application
ollama_manager = OllamaModelManager()
if __name__ == "__main__":
# Configuration du logging pour les tests
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Test de la classe
manager = OllamaModelManager()
print(f"Modèles disponibles: {manager.available_models}")
current_model = manager.get_running_model()
print(f"Modèle actuellement chargé: {current_model or 'Aucun'}")
if manager.available_models:
test_model = manager.available_models[0]
print(f"Test de chargement du modèle {test_model}...")
if manager.ensure_model_loaded(test_model):
print(f"Modèle {test_model} chargé avec succès!")
else:
print(f"Échec du chargement du modèle {test_model}")