Source code for kluster_fudge.core

from __future__ import annotations
import numpy.typing as npt
import numpy as np
from kluster_fudge.dist import DistanceMetrics, distance
from kluster_fudge.init import init_centroids, InitMethod
from kluster_fudge.utils import update_centroids


[docs] class KModes: def __init__( self, n_clusters: int = 8, n_init: int = 10, max_iter: int = 100, init_method: str = "cao", dist_metric: str = "hamming", random_state: int = 42, ) -> None: self.n_clusters = n_clusters self.n_init = n_init self.max_iter = max_iter self.dist_metric = dist_metric self.centroids = None self.labels = None self.encodings = [] self.decoded_centroids = None self.is_df = False self.random_state = random_state self.cost_ = 0.0 if init_method == "random": self.init_method = InitMethod.RAND elif init_method == "huang": self.init_method = InitMethod.HUANG elif init_method == "cao": self.init_method = InitMethod.CAO else: raise ValueError(f"Unknown init method: {init_method}") if dist_metric == "hamming": self.dist_metric = DistanceMetrics.HAMMING elif dist_metric == "jaccard": self.dist_metric = DistanceMetrics.JACCARD elif dist_metric == "ng": self.dist_metric = DistanceMetrics.NG else: raise ValueError(f"Unknown distance metric: {dist_metric}") def _encode(self, X: npt.NDArray) -> npt.NDArray: self.encodings = [] # First encode to a temporary int64 array to handle arbitrary category counts X_encoded_temp = np.zeros(X.shape, dtype=np.int64) # every column has its own integer encoding for i in range(X.shape[1]): unique_vals, encoded = np.unique(X[:, i], return_inverse=True) self.encodings.append(unique_vals) X_encoded_temp[:, i] = encoded # Determine optimal dtype based on max category index max_cat = 0 for enc in self.encodings: if len(enc) > max_cat: max_cat = len(enc) # -1 because 0-indexed, so max index is len - 1. But effectively we care about storing the value 'len-1'. # Actually max value in X_encoded is exactly len(unique_vals) - 1. max_val = max_cat - 1 if max_val < 256: dtype = np.uint8 elif max_val < 65536: dtype = np.uint16 elif max_val < 4294967296: dtype = np.uint32 else: dtype = np.int64 # Fallback, though unlikely for categorical data return X_encoded_temp.astype(dtype) def _decode(self, centroids_encoded: npt.NDArray[np.int64]) -> npt.NDArray[np.str_]: """ Decode the centroids from integer encoding to original values. Args: centroids_encoded: (npt.NDArray[np.int64]) Array of encoded centroids (n_clusters, n_features) Returns: (npt.NDArray[np.str_]) Decoded centroids array (n_clusters, n_features) """ decoded = [] for i in range(len(self.encodings)): col_map = self.encodings[i] decoded_col = col_map[centroids_encoded[:, i].astype(int)] decoded.append(decoded_col) # decoded is a list of arrays representing columns; stack horizontally so each row is a centroid return np.array(decoded).T def _compute_cost( self, X: npt.NDArray[np.int64], centroids: npt.NDArray[np.int64], labels: npt.NDArray[np.int64], ) -> float: """ Compute the total cost (sum of distances) of the clustering. Args: X: (npt.NDArray[np.int64]) Data array (n_samples, n_features) centroids: (npt.NDArray[np.int64]) Centroids array (n_clusters, n_features) labels: (npt.NDArray[np.int64]) Labels array (n_samples,) Returns: (float) Total cost """ cost = 0.0 dist_mat = distance(X, centroids, self.dist_metric, labels=labels) rows = np.arange(X.shape[0]) cost = np.sum(dist_mat[rows, labels]) return cost
[docs] def fit(self, X: npt.ArrayLike) -> None: """ Fit the model to the input data. Args: X: (npt.ArrayLike) Data array (n_samples, n_features) Returns: None """ # check if X is a pandas dataframe, if so, convert to numpy array if hasattr(X, "values"): X = X.values self.is_df = True else: X = np.asarray(X) # encode X into integer array for efficiency X = self._encode(X) best_cost = float("inf") best_centroids = None best_labels = None if self.n_init < 1: raise ValueError(f"n_init must be at least 1, got {self.n_init}") # Initialize best_cost to infinity best_cost = np.inf best_centroids = None best_labels = None for init_idx in range(self.n_init): # Use a different random state for each initialization current_random_state = ( self.random_state + init_idx if self.random_state is not None else None ) centroids = init_centroids( X, self.n_clusters, self.init_method, random_state=current_random_state ) labels = np.zeros(X.shape[0], dtype=int) # Iteration loop for i in range(self.max_iter): # Use Hamming for the first iteration if metric is NG to generate initial labels current_metric = self.dist_metric if i == 0 and self.dist_metric == DistanceMetrics.NG: current_metric = DistanceMetrics.HAMMING dist = distance( X, centroids, current_metric, labels=labels ) # compute distance labels = np.argmin(dist, axis=1) # update centroids new_centroids = update_centroids(X, labels, self.n_clusters) if np.array_equal(centroids, new_centroids): break centroids = new_centroids # Compute cost for run and update final label assignments final_dist = distance(X, centroids, self.dist_metric, labels=labels) labels = np.argmin(final_dist, axis=1) cost = self._compute_cost(X, centroids, labels) # Update best run if this one is better if cost < best_cost: best_cost = cost best_centroids = centroids best_labels = labels self.centroids = best_centroids self.labels = best_labels self.cost_ = best_cost self.decoded_centroids = self._decode(self.centroids)
[docs] def predict(self, X: npt.ArrayLike) -> npt.NDArray[np.int64]: """ Predict the cluster labels for the input data. Args: X: (npt.ArrayLike) Data array (n_samples, n_features) Returns: (npt.NDArray[np.int64]) Labels array (n_samples,) """ if hasattr(X, "values"): X = X.values else: X = np.asarray(X) X = self._encode(X) dist = distance(X, self.centroids, self.dist_metric) return np.argmin(dist, axis=1)
[docs] def fit_predict(self, X: npt.ArrayLike) -> npt.NDArray[np.int64]: """ Fit the model to the input data and return the cluster labels. Args: X: (npt.ArrayLike) Data array (n_samples, n_features) Returns: (npt.NDArray[np.int64]) Labels array (n_samples,) """ self.fit(X) return self.labels