Implémentation de l'algorithme SMO avec NumPy pour les machines à vecteurs de support

En apprentissage automatique, les machines à vecteurs de support (SVM) sont reconnues pour leurs performances en classification. L'algorithme de minimisation séquentielle (SMO) est un composant essentiel pour l'entraînement de ces modèles. Contrairement à de nombreux tutoriels qui présentent des versions simplifiées et des exemples bidimensionnels, cet article vous guidera à travers l'implémentation d'une version robuste de l'algorithme SMO de Platt, prête pour un usage en production. Nous l'appliquerons à des jeux de données classiques de Scikit-learn et l'encapsulerons dans une interface API de style Scikit-learn.

1. Principes clés de l'implémentation technique de l'algorithme SMO

L'algorithme SMO de Platt apporte trois améliorations majeures par rapport aux versions simplifiées : une heuristique pour la sélection des multiplicateurs de Lagrange (alpha), un mécanisme de cache d'erreurs, et une priorité aux échantillons non situés sur les bornes. Ces optimisations réduisent la complexité algorithmique de O(n²) à une complexité proche de O(n), permettant aux SVM de traiter des ensembles de données de plus grande taille.

La conception d'une structure de données centrale est la première étape. Nous allons créer une classe nommée OptimiseurSVM pour gérer tous les états intermédiaires nécessaires :

import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.svm import SVC


class OptimiseurSVM:
   def __init__(self, donnees_X, etiquettes_y, penalite_C=1.0, tolerance=0.001):
       self.caracteristiques = np.asmatrix(donnees_X)  # Matrice de caractéristiques (m × n)
       self.cibles = np.asmatrix(etiquettes_y).T  # Vecteur d'étiquettes (m × 1)
       self.penalite_C = float(penalite_C)  # Coefficient de pénalité
       self.tolerance = float(tolerance)  # Tolérance pour la convergence
       self.nombre_echantillons = donnees_X.shape[0]  # Nombre d'échantillons
       self.multiplicateurs_alpha = np.mat(np.zeros((self.nombre_echantillons, 1)))  # Multiplicateurs de Lagrange
       self.biais = 0.0  # Terme de biais
       self.cache_erreurs = np.mat(np.zeros((self.nombre_echantillons, 2)))  # Cache pour les erreurs de prédiction

La première colonne de la matrice self.cache_erreurs est un indicateur de validité (0/1), la seconde stocke l'erreur de prédiction correspondante E_i = f(x_i) - y_i. Cette approche prévient les calculs redondants et est cruciale pour l'accélération de l'algorithme.

2. Implémentation de la stratégie heuristique de sélection des alphas

L'algorithme SMO de Platt utilise une sélection heuristique en deux phases : la boucle externe choisit l'alpha_i qui viole le plus les conditions de KKT, et la boucle interne sélectionne l'alpha_j qui maximise |E_i - E_j|. Voici l'implémentation de la boucle interne :

   def _calculer_erreur_prediction(self, index):
       # f(x_index) = sum(alpha_k * y_k * K(x_k, x_index)) + biais
       prediction_valeur = float(np.multiply(self.multiplicateurs_alpha, self.cibles).T @ self.caracteristiques @ self.caracteristiques[index, :].T + self.biais)
       return prediction_valeur - float(self.cibles[index])

   def _mettre_a_jour_cache_erreurs(self, index_alpha, erreur_valeur):
       self.cache_erreurs[index_alpha] = [1, erreur_valeur]

   def _choisir_alpha_j(self, index_i, erreur_i):
       # Initialisation
       index_k_max = -1
       delta_erreur_max = 0
       erreur_j = 0

       self._mettre_a_jour_cache_erreurs(index_i, erreur_i) # Mettre à jour le cache

       # Obtenir les indices des entrées valides du cache
       indices_valides = np.where(self.cache_erreurs[:, 0] > 0)[0]

       if len(indices_valides) > 1:
           for k_idx in indices_valides:
               if k_idx == index_i: continue
               erreur_k = self._calculer_erreur_prediction(k_idx)
               delta_e = abs(erreur_i - erreur_k)
               if delta_e > delta_erreur_max:
                   index_k_max, delta_erreur_max, erreur_j = k_idx, delta_e, erreur_k
           return index_k_max, erreur_j
       else:  # Si pas de cache valide, choisir aléatoirement
           list_indices_valides = list(range(self.nombre_echantillons))
           list_indices_valides.pop(index_i) # remove current alpha index
           index_j = np.random.choice(list_indices_valides)
           erreur_j = self._calculer_erreur_prediction(index_j)
           return index_j, erreur_j

La boucle externe alterne entre deux modes de parcours :

  1. Balayage complet des échantillons : Vérifie si tous les α violent les conditions de KKT.
  2. Balayage des échantillons non-limites : Se concentre uniquement sur les échantillons où 0 < α_i < C.
   def _boucle_externe(self, max_iterations=1000):
       compteur_iterations = 0
       balayage_complet_actif = True
       alphas_modifies = 0

       while compteur_iterations < max_iterations and (alphas_modifies > 0 or balayage_complet_actif):
           alphas_modifies = 0
           if balayage_complet_actif:  # Parcours sur l'ensemble des échantillons
               for idx_i in range(self.nombre_echantillons):
                   alphas_modifies += self._boucle_interne(idx_i)
           else:  # Parcours sur les échantillons non-limites (0 < alpha < C)
               indices_non_limites = [i for i in range(self.nombre_echantillons)
                                    if 0 < self.multiplicateurs_alpha[i] < self.penalite_C]
               for idx_i in indices_non_limites:
                   alphas_modifies += self._boucle_interne(idx_i)

           compteur_iterations += 1
           balayage_complet_actif = not balayage_complet_actif  # Basculer le mode

       return self.biais, self.multiplicateurs_alpha

   def train(self, max_iter=1000):
       # Wrapper pour la boucle externe
       return self._boucle_externe(max_iter)
   
   def _clip_alpha(self, aj, H, L):
       if aj > H:
           aj = H
       if aj < L:
           aj = L
       return aj

   def _boucle_interne(self, i):
       Ei = self._calculer_erreur_prediction(i)

       if ((self.cibles[i] * Ei < -self.tolerance) and (self.multiplicateurs_alpha[i] < self.penalite_C)) or \
          ((self.cibles[i] * Ei > self.tolerance) and (self.multiplicateurs_alpha[i] > 0)):
           
           j, Ej = self._choisir_alpha_j(i, Ei)
           alpha_i_old = self.multiplicateurs_alpha[i].copy()
           alpha_j_old = self.multiplicateurs_alpha[j].copy()

           # Calcul des bornes L et H
           if self.cibles[i] != self.cibles[j]:
               L = max(0, self.multiplicateurs_alpha[j] - self.multiplicateurs_alpha[i])
               H = min(self.penalite_C, self.penalite_C + self.multiplicateurs_alpha[j] - self.multiplicateurs_alpha[i])
           else:
               L = max(0, self.multiplicateurs_alpha[j] + self.multiplicateurs_alpha[i] - self.penalite_C)
               H = min(self.penalite_C, self.multiplicateurs_alpha[j] + self.multiplicateurs_alpha[i])
           
           if L == H:
               return 0 # Pas de modification possible pour alpha_j

           eta = 2.0 * self.caracteristiques[i, :] @ self.caracteristiques[j, :].T - \
                 self.caracteristiques[i, :] @ self.caracteristiques[i, :].T - \
                 self.caracteristiques[j, :] @ self.caracteristiques[j, :].T
           
           if eta >= 0: # Should be negative for convex problem
               return 0

           # Mettre à jour alpha_j
           self.multiplicateurs_alpha[j] -= self.cibles[j] * (Ei - Ej) / eta
           self.multiplicateurs_alpha[j] = self._clip_alpha(self.multiplicateurs_alpha[j], H, L)

           self._mettre_a_jour_cache_erreurs(j, self._calculer_erreur_prediction(j)) # Update error cache for j

           if abs(self.multiplicateurs_alpha[j] - alpha_j_old) < 0.00001:
               return 0 # alpha_j n'a pas suffisamment changé

           # Mettre à jour alpha_i
           self.multiplicateurs_alpha[i] += self.cibles[j] * self.cibles[i] * (alpha_j_old - self.multiplicateurs_alpha[j])
           self._mettre_a_jour_cache_erreurs(i, self._calculer_erreur_prediction(i)) # Update error cache for i

           # Mettre à jour le biais
           b1 = self.biais - Ei - self.cibles[i] * (self.multiplicateurs_alpha[i] - alpha_i_old) * \
                (self.caracteristiques[i, :] @ self.caracteristiques[i, :].T) - \
                self.cibles[j] * (self.multiplicateurs_alpha[j] - alpha_j_old) * \
                (self.caracteristiques[i, :] @ self.caracteristiques[j, :].T)
           
           b2 = self.biais - Ej - self.cibles[i] * (self.multiplicateurs_alpha[i] - alpha_i_old) * \
                (self.caracteristiques[i, :] @ self.caracteristiques[j, :].T) - \
                self.cibles[j] * (self.multiplicateurs_alpha[j] - alpha_j_old) * \
                (self.caracteristiques[j, :] @ self.caracteristiques[j, :].T)
           
           if (0 < self.multiplicateurs_alpha[i]) and (self.multiplicateurs_alpha[i] < self.penalite_C):
               self.biais = b1
           elif (0 < self.multiplicateurs_alpha[j]) and (self.multiplicateurs_alpha[j] < self.penalite_C):
               self.biais = b2
           else:
               self.biais = (b1 + b2) / 2.0
           
           return 1 # Un couple alpha a été modifié
       else:
           return 0

3. Stratégies essentielles pour les jeux de données de haute dimension

Lorsque la dimensionnalité des caractéristiques augmente, trois aspects de l'implémentation requièrent une attention particulière :

  1. Compatibilité des fnoctions noyau : Même avec un noyau linéaire, il est judicieux de prévoir une interface pour le calcul de la matrice noyau.
  2. Stabilité numérique : Les données de haute dimension sont plus sujettes aux problèmes de débordement numérique, d'où la nécessité d'une régularisation accrue.
  3. Optimisation de la mémoire : Pour des caractéristiques N > 1000, l'utilsiation de matrices creuses est recommandée pour économiser la mémoire.

Voici des suggestions de paramètres pour le jeu de données du cancer du sein :

Paramètre Valeur Recommandée Description
C 0.6-1.2 Coefficient de pénalité ; une valeur trop élevée peut entraîner un surapprentissage.
tol 1e-3 Tolérance d'erreur, influence la vitesse de convergence.
max_iter 500-1000 Nombre maximal d'itérations.
# Charger et standardiser les données
donnees_cancer = load_breast_cancer()
X_standardise = StandardScaler().fit_transform(donnees_cancer.data)
y_standardise = donnees_cancer.target
y_standardise[y_standardise == 0] = -1  # Convertir les étiquettes en ±1

# Initialiser l'optimiseur
entraineur_svm = OptimiseurSVM(X_standardise, y_standardise, penalite_C=0.8, tolerance=0.001)
entraineur_svm.train(max_iterations=500)

4. Création d'une API de style Scikit-learn

Pour rendre notre implémentation plus conviviale, nous allons encapsuler les interfaces standard d'apprentissage automatique :

class ClassificateurSVM:
   def __init__(self, penalite_C=1.0, tolerance=1e-3, max_iterations=1000):
       self.penalite_C = penalite_C
       self.tolerance = tolerance
       self.max_iterations = max_iterations
       self.optimiseur_interne = None
       self.biais = 0.0
       self.poids_w = None
       
   def ajuster(self, X_fit, y_fit):
       self.optimiseur_interne = OptimiseurSVM(X_fit, y_fit, self.penalite_C, self.tolerance)
       self.biais, alphas_optimaux = self.optimiseur_interne.train(self.max_iterations)
       self.poids_w = self._calculer_vecteur_poids(X_fit, y_fit, alphas_optimaux)
       return self
   
   def predire(self, X_predict):
       if self.poids_w is None or self.biais is None:
           raise RuntimeError("Le modèle doit être ajusté avant de faire des prédictions.")
       X_mat_predict = np.asmatrix(X_predict)
       scores_decision = X_mat_predict @ self.poids_w.T + self.biais
       return np.where(scores_decision >= 0, 1, -1)
   
   def _calculer_vecteur_poids(self, X_data, y_data, alphas_final):
       # Calculer le vecteur de poids w = Σ(α_i * y_i * x_i)
       y_mat = np.asmatrix(y_data).T
       X_mat = np.asmatrix(X_data)
       return np.sum(np.multiply(np.multiply(alphas_final, y_mat), X_mat), axis=0).T


Nous pouvons maintenant entraîner et évaluer notre modèle comme avec Scikit-learn :

X_entrainement, X_test, y_entrainement, y_test = train_test_split(X_standardise, y_standardise, test_size=0.2, random_state=42)
classifieur_perso = ClassificateurSVM(penalite_C=0.8).ajuster(X_entrainement, y_entrainement)
predictions_perso = classifieur_perso.predire(X_test)
print(f"Précision du classifieur SMO personnalisé: {accuracy_score(y_test, predictions_perso):.2%}")

5. Visualisation de la frontière de décision et comparaison des performances

Bien que le jeu de données du cancer du sein soit de haute dimension, nous pouvons le visualiser après une réduction de dimensionnalité via PCA :

# Réduction de dimensionnalité à 2D
pca_transform = PCA(n_components=2)
X_pca_train = pca_transform.fit_transform(X_entrainement)

# Entraîner un modèle 2D
classifieur_2d = ClassificateurSVM(penalite_C=0.8).ajuster(X_pca_train, y_entrainement)

# Créer un maillage pour la visualisation
x_min, x_max = X_pca_train[:, 0].min() - 1, X_pca_train[:, 0].max() + 1
y_min, y_max = X_pca_train[:, 1].min() - 1, X_pca_train[:, 1].max() + 1
xx_grid, yy_grid = np.meshgrid(np.arange(x_min, x_max, 0.02),
                              np.arange(y_min, y_max, 0.02))

# Prédire les catégories pour chaque point du maillage
Z_grid = classifieur_2d.predire(np.c_[xx_grid.ravel(), yy_grid.ravel()])
Z_grid = Z_grid.reshape(xx_grid.shape)

# Dessiner la frontière de décision
plt.figure(figsize=(10, 7))
plt.contourf(xx_grid, yy_grid, Z_grid, alpha=0.4, cmap=plt.cm.coolwarm)
plt.scatter(X_pca_train[:, 0], X_pca_train[:, 1], c=y_entrainement.flatten(), s=20, edgecolor='k', cmap=plt.cm.coolwarm)
plt.title('Frontière de décision de l\'algorithme SMO (PCA 2D)')
plt.xlabel('Composante Principale 1')
plt.ylabel('Composante Principale 2')
plt.colorbar(label='Classe prédite')
plt.show()

Lors de la comparaison des performances avec la classe SVC de Scikit-learn, il convient de noter :

  • Assurez-vous d'utiliser les mêmes étapes de prétraitement.
  • Pour les données linéairement séparables, la précision des deux implémentations devrait être comparable.
  • Notre implémentation personnalisée peut être légèrement plus lente à l'entraînement, mais offre une plus grande flexibilité pour la personnalisation et le débogage.
# Comparaison avec Scikit-learn SVC
sk_classifieur = SVC(kernel='linear', C=0.8, random_state=42)
sk_classifieur.fit(X_entrainement, y_entrainement.ravel()) # .ravel() car SVC attend un 1D array pour y
predictions_sk = sk_classifieur.predict(X_test)

print(f"Précision du classifieur SMO personnalisé: {accuracy_score(y_test, predictions_perso):.2%}")
print(f"Précision de Scikit-learn SVC: {accuracy_score(y_test, predictions_sk):.2%}")

En cas de performances nettement inférieures de l'implémentation personnalisée par rapport à Scikit-learn dans un projet réel, il est conseillé de vérifier les points suivants :

  1. La logique de mise à jour des alphas est-elle correcte ?
  2. Le cache d'erreurs est-il mis à jour en temps voulu ?
  3. Les seuils de jugement des conditions de KKT sont-ils appropriés ?

Étiquettes: NumPy Scikit-learn svm SMO Algorithm Machine Learning

Publié le 3 juillet à 23h31