Chapter 2: K-Means

Let’s build a K-Means model from scratch.

First, we’ll define a K-Means object and pass the following hyperparameters:

  • The number of desired clusters (K)
  • The maximum number of steps before converging
  • The number of times the algorithm will be run
  • A seed to fix the randomness of the model
import numpy as np
import copy

from utils.distances import euclidean

class KMeans:

    def __init__(self, k=3, seed=None, n_runs=10, max_iters=300):

        self.max_iters = max_iters
        self.k = k
        self.rnd = np.random.RandomState(seed)
        self.n_runs = n_runs

K-Means requires a notion of distance. For that, it uses the Euclidean distance because this is the only distance which guarantees the algorithm to converge. Indeed, the algorithm ensures that every step reduces the average distance between a centroid and the points in that clusters, which is equivalent to reducing the within-cluster sum of squares.

This Cross-Validated post explains this in more details.

    def _distance(self, a, b):

        return euclidean(a, b)

Once this distance measure is defined, the within-cluster sum of squares, also called inertia, is computed by calculating the sum of the squared distances of all points to their respective centroid.

    def _inertia(self, X, centroids, labels):

        distances = []

        for i, centroid in enumerate(centroids):

            distances.extend([self._distance(x, centroid)**2 for x in X[labels == i]])

        return np.sum(distances)

One of the keys of the K-Means algorithm is the relation between the clusters and the centroids. Indeed, if the clustering is defined, the centroids can be inferred, and conversely, if the centroids are defined, the clusterings can be inferred.

This is why we’ll write two functions. One that will assign labels to data points given centroids, and one that will assign centroids given labels.

_compute_labels will assign each data point to each closest centroid, and _compute_centroids will define centroids as the mean point of points within a certain cluster.

    def _compute_centroids(self, X, labels):

        centroids = []

        for i in range(self.k):

            centroid = np.mean(X[labels == i], axis=0)
            centroids.append(centroid)

        return np.array(centroids)

    def _compute_labels(self, X, centroids):

        labels = []

        for x in X:

            distances = [self._distance(x, centroid) for centroid in centroids]
            label = np.argmin(distances)
            labels.append(label)

        return np.array(labels)

Once the _compute_labels function is defined, we can now easily define a predict function. Indeed, for a clustering algorithm, “predicting” comes down to assigning clusters to data points.

Therefore, given some data points, one can reuse the _compute_labels function, which assigns each data point to each closest centroid.

    def predict(self, X):

        return self._compute_labels(X, self.centroids_)

Once we have centroids, we can assign data points to clusters. But first, we need to find these centroids. This will be done in the fit function.

First, we need to initialize some centroids. Once we have those, we then start the iterative process. Given the centroids, labels are given to the data points, and given these labels, the centroids are updated.

This process is repeated until the labels do not change, and therefore the centroids do not move anymore. This is what we call convergence.

    def fit(self, X, y=None):

        centroids = self._initialize_centroids(X)
        labels = self._compute_labels(X, centroids)
        old_labels = np.full_like(labels, -1)

        while any(old_labels != labels):

            old_labels = labels
            centroids = self._compute_centroids(X, labels)
            labels = self._compute_labels(X, centroids)

            yield centroids, labels

The only thing left to define is how to initialize the centroids. There exist multiple ways to do this, but a popular one is to pick \(K\) random data points.

Another popular initialization technique is kmeans++

    def _initialize_centroids(self, X):

        X_ = X.copy()
        self.rnd.shuffle(X_)
        return X_[:self.k]