Source code for kluster_fudge.dist

from __future__ import annotations
import numpy as np
import numpy.typing as npt
from numba import prange, njit
from enum import Enum


__all__ = ["DistanceMetrics", "distance"]


[docs] class DistanceMetrics(Enum): HAMMING = "hamming" JACCARD = "jaccard" NG = "ng"
# we have list of centroid which we want to compare our input to # a point is a list of xyz, in higher dimensions it has n items # we have a list of centroids we want to compare against a list of targets # X is data, Centroids is centroids @njit(parallel=True, fastmath=True) def _hamming( X: npt.NDArray[np.int64], centroids: npt.NDArray[np.int64] ) -> npt.NDArray[np.float64]: """ Compute Hamming distance between X and centroids. Args: X: (npt.NDArray[np.int64]) Data array (n_samples, n_features) centroids: (npt.NDArray[np.int64]) Centroids array (n_clusters, n_features) Returns: (npt.NDArray[np.float64]) Distance matrix (n_samples, n_clusters) """ if X.shape[1] != centroids.shape[1]: raise ValueError("X and centroids must have the same number of features") rows = X.shape[0] # NUMBER of rows # number of columns of distance matrix = number of centroids n_clusters = centroids.shape[0] # dist matrix distance: npt.NDArray[np.float64] = np.zeros((rows, n_clusters), dtype=np.float64) for i in prange(rows): # iterate from 0 through the number of rows for j in range(n_clusters): dist = 0 for a, b in zip(X[i], centroids[j]): if a != b: dist += 1 distance[i][j] = dist return distance @njit(parallel=True, fastmath=True) def _jaccard( X: npt.NDArray[np.int64], centroids: npt.NDArray[np.int64] ) -> npt.NDArray[np.float64]: """ Compute Jaccard distance between X and centroids. Args: X: (npt.NDArray[np.int64]) Data array (n_samples, n_features) centroids: (npt.NDArray[np.int64]) Centroids array (n_clusters, n_features) Returns: (npt.NDArray[np.float64]) Distance matrix (n_samples, n_clusters) """ if X.shape[1] != centroids.shape[1]: raise ValueError("X and centroids must have the same number of features") rows, cols = X.shape # number of columns of distance matrix = number of centroids n_clusters = centroids.shape[0] # dist matrix distance: npt.NDArray[np.float64] = np.zeros((rows, n_clusters), dtype=np.float64) for i in prange(rows): # iterate from 0 through the number of rows for j in range(n_clusters): dist = 0 for a, b in zip(X[i], centroids[j]): if a == b: dist += 1 distance[i][j] = ( 1 - (dist / (cols * 2 - dist)) ) # double the num of columns (because a union between both centroids and data points), then subtract the numner of similar elements return distance @njit(parallel=True, fastmath=True) def _ng( X: npt.NDArray[np.int64], centroids: npt.NDArray[np.int64], labels: npt.NDArray[np.int64], ) -> npt.NDArray[np.float64]: """ Compute NG Distance between X and centroids. Args: X: (npt.NDArray[np.int64]) Data array (n_samples, n_features) centroids: (npt.NDArray[np.int64]) Centroids array (n_clusters, n_features) labels: (npt.NDArray[np.int64]) Labels array (n_samples,) Returns: (npt.NDArray[np.float64]) Distance matrix (n_samples, n_clusters) """ if X.shape[1] != centroids.shape[1]: raise ValueError("X and centroids must have the same number of features") rows, cols = X.shape n_clusters = centroids.shape[0] # find max value to determine array size for counts max_val = 0 for r in range(rows): for c in range(cols): if X[r, c] > max_val: max_val = X[r, c] # allocate counts & cluster sizes counts = np.zeros((n_clusters, cols, max_val + 1), dtype=np.int32) cluster_sizes = np.zeros(n_clusters, dtype=np.int32) # populate counts & cluster sizes for r in range(rows): cluster_id = labels[r] if cluster_id < 0 or cluster_id >= n_clusters: continue # skip invalid labels (shouldn't happen) cluster_sizes[cluster_id] += 1 for c in range(cols): val = X[r, c] counts[cluster_id, c, val] += 1 distance = np.zeros((rows, n_clusters), dtype=np.float64) for r in prange(rows): for i in range(n_clusters): dist = 0.0 if cluster_sizes[i] == 0: # If cluster is empty, max distance dist = float(cols) else: for c in range(cols): val = X[r, c] if val <= max_val: freq = counts[i, c, val] prob = freq / cluster_sizes[i] # given by: sum(1 - P(x_j | C)) dist += 1.0 - prob else: dist += 1.0 # Should not happen based on max_val logic distance[r, i] = dist return distance
[docs] def distance( X: np.ndarray, centroids: np.ndarray, metric: DistanceMetrics, labels: npt.NDArray[np.int64] | None = None, ) -> npt.NDArray[np.float64]: """ Compute distance between X and centroids using the specified metric. Args: X: (npt.NDArray[np.int64]) Data array (n_samples, n_features) centroids: (npt.NDArray[np.int64]) Centroids array (n_clusters, n_features) metric: (DistanceMetrics) Distance metric to use labels: (npt.NDArray[np.int64] | None) Labels array (n_samples,) for ng dist Returns: (npt.NDArray[np.float64]) Distance matrix (n_samples, n_clusters) """ if metric == DistanceMetrics.HAMMING: return _hamming(X, centroids) elif metric == DistanceMetrics.JACCARD: return _jaccard(X, centroids) elif metric == DistanceMetrics.NG: if labels is None: raise ValueError("Labels are required for NG distance") return _ng(X, centroids, labels) else: raise ValueError(f"Unsupported distance metric: {metric}")