Source code for kluster_fudge.init

from enum import Enum
import numpy.typing as npt
import numpy as np
from numba import njit, prange
from kluster_fudge.dist import distance, DistanceMetrics


[docs] class InitMethod(Enum): RAND = 0 HUANG = 1 CAO = 2
def init_centroids( X: npt.NDArray[np.float64], n_clusters: int, method: InitMethod, random_state: int | None = None, ) -> np.ndarray: """ Initialize centroids using the specified method. Args: X: (npt.NDArray[np.float64]) Data array (n_samples, n_features) n_clusters: (int) Number of clusters method: (InitMethod) Initialization method random_state: (int | None) Random seed for reproducibility Returns: (npt.NDArray[np.float64]) Centroids array (n_clusters, n_features) """ if random_state is not None: np.random.seed(random_state) if method == InitMethod.RAND: n_samples = X.shape[0] if n_clusters > n_samples: raise ValueError( f"n_clusters ({n_clusters}) cannot exceed n_samples ({n_samples})" ) random_indices = np.random.choice(n_samples, n_clusters, replace=False) return X[random_indices] elif method == InitMethod.HUANG: return _init_centroids_huang(X, n_clusters) elif method == InitMethod.CAO: return _init_centroids_cao(X, n_clusters) else: raise ValueError(f"Unknown init method: {method}") def _freq_sort_categories(X: npt.ArrayLike) -> np.ndarray: """ Sort the unique values in X by frequency in descending order. Args: X: (npt.ArrayLike) Data array (n_samples,) Returns: (npt.NDArray[np.float64]) Sorted unique values """ unique_vals, counts = np.unique(X, return_counts=True) sort_indices = np.argsort(counts) return unique_vals[sort_indices[::-1]] def _init_centroids_huang(X: npt.ArrayLike, n_clusters: int) -> np.ndarray: """ Initialize centroids using the Huang algorithm. Args: X: (npt.ArrayLike) Data array (n_samples, n_features) n_clusters: (int) Number of clusters Returns: (npt.NDArray[np.float64]) Centroids array (n_clusters, n_features) """ X = np.asarray(X) _, n_features = X.shape if X.ndim != 2: raise ValueError("Input array must be 2-dimensional") sorted_cols = [_freq_sort_categories(X[:, j]) for j in range(n_features)] synthetic_centroids = np.empty((n_clusters, n_features), dtype=X.dtype) for i in range(n_clusters): for j in range(n_features): unique_vals = sorted_cols[j] synthetic_centroids[i, j] = unique_vals[i % len(unique_vals)] final_centroids = np.empty((n_clusters, n_features), dtype=X.dtype) taken_indices = set() for i in range(n_clusters): best_idx = -1 distances = np.sum(X != synthetic_centroids[i], axis=1) candidate_indices = np.argsort(distances) for idx in candidate_indices: if idx not in taken_indices: best_idx = idx break if best_idx == -1: best_idx = candidate_indices[0] taken_indices.add(best_idx) final_centroids[i] = X[best_idx] return final_centroids @njit(parallel=True, fastmath=True) def _compute_X_density(X: npt.ArrayLike) -> np.ndarray: """ Compute the density of each data point in X. Args: X: (npt.ArrayLike) Data array (n_samples, n_features) Returns: (npt.NDArray[np.float64]) Density array (n_samples,) """ X = np.asarray(X) U, A = X.shape densities = np.zeros(U, dtype=np.float64) for a in range(A): col = X[:, a] max_val = np.max(col) counts = np.zeros(max_val + 1, dtype=np.int64) for u in range(U): counts[col[u]] += 1 for u in prange(U): val = col[u] densities[u] += counts[val] return densities / (U * A) def _init_centroids_cao(X: npt.NDArray[np.float64], n_clusters: int) -> np.ndarray: """ Initialize centroids using the Cao algorithm. Args: X: (npt.NDArray[np.float64]) Data array (n_samples, n_features) n_clusters: (int) Number of clusters Returns: (npt.NDArray[np.float64]) Centroids array (n_clusters, n_features) """ densities = _compute_X_density(X) first_centroid_index = np.argmax(densities) centroid_set = [X[first_centroid_index]] n_samples, _ = X.shape min_dists = distance( X, X[first_centroid_index].reshape(1, -1), DistanceMetrics.HAMMING ).flatten() for _ in range(n_clusters - 1): # compute score scores = densities * min_dists best_idx = np.argmax(scores) new_centroid = X[best_idx] centroid_set.append(new_centroid) # compute distance to new centroid dist_to_new = distance( X, new_centroid.reshape(1, -1), DistanceMetrics.HAMMING ).flatten() min_dists = np.minimum(min_dists, dist_to_new) return np.array(centroid_set)