Tutorial : K-Means Clustering on Spark

4932

Analytics is discovering insights using data. Traditionally, statistical and visual techniques dominated the field. But, with advances in Machine Learning and AI, Analytics has got wings. To know more about AI/ML in Analytics, read this article here:  AI in Analytics vs AI in Automation.

In this article, we will dive a little deeper into Discovery Analytics using Clustering. It is a Machine Learning/Data Mining technique that is used to group similar data points. Furthermore, it can also find hidden patterns in data, aiding data discovery. There are multiple forms of clustering viz. distance-based, density-based, hierarchical clustering to name a few. Besides, there are multiple algorithms like K-Means, DBSCAN, etc. Since this is a tutorial, we won’t go into the theory. This article will be a hands-on tutorial to implement the K-means algorithm.

There are multiple libraries to implement the k-means algorithm. The most popular amongst them is Scikit Learn. However, Scikit Learn suffers a major disadvantage i.e. it does not scale well for larger datasets, since it works on a single node.

To run ML in a distributed way, Spark has its own library called MlLib. Here is the step-by-step implementation:

Step 1: Load Iris Dataset

For this tutorial, we will use the scikit-learn Iris dataset. Please note that this is for demonstration. In the real world, we will not use spark for such tiny datasets.

import pandas as pd
from sklearn.datasets import load_iris
from pyspark.sql import SparkSession

df_iris = load_iris(as_frame=True)

Convert it to Pandas Dataframe. Again, this is only for demonstration.

pd_df_iris = pd.DataFrame(df_iris.data, columns = df_iris.feature_names)
pd_df_iris['target'] = pd.Series(df_iris.target)

Next, convert it to spark Dataframe and drop the ‘target’ column, since it is unsupervised learning.

spark_df_iris = spark.createDataFrame(pd_df_iris)
spark_df_iris = spark_df_iris.drop("target")

Step 2: Find the optimal number of clusters using the silhouette method.

Silhouette score is an evaluation metric for the clustering algorithms. It is a measure of similarity between a data point and the other points in a cluster. Read more about it here.

The higher the silhouette score, the better is the clustering. Now, in k means clustering algorithm, the number of clusters (k) is the hyper-parameter to be tuned. But, before that, let’s create a vector assembler and transform raw features into a single set of features.

from pyspark.ml.feature import VectorAssembler
assemble=VectorAssembler(inputCols=[
'sepal length (cm)',
'sepal width (cm)',
'petal length (cm)',
'petal width (cm)'],outputCol = 'iris_features')

assembled_data=assemble.transform(spark_df_iris)

Next, we will run the silhouette method.

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

silhouette_scores=[]
evaluator = ClusteringEvaluator(featuresCol='iris_features', \
metricName='silhouette', distanceMeasure='squaredEuclidean')

for K in range(2,11):

    KMeans_=KMeans(featuresCol='iris_features', k=K)

    KMeans_fit=KMeans_.fit(assembled_data)

    KMeans_transform=KMeans_fit.transform(assembled_data) 

    evaluation_score=evaluator.evaluate(KMeans_transform)

    silhouette_scores.append(evaluation_score)

Next, plot the silhouette scores against the number of clusters.

import matplotlib.pyplot as plt
fig, ax = plt.subplots(1,1, figsize =(10,8))
ax.plot(range(2,11),silhouette_scores)
ax.set_xlabel('Number of Clusters')
ax.set_ylabel('Silhouette Score')

We can see that the local maximum is at K=3 i.e. 3 clusters will give us the best results (which is also the number of labels in the original dataset).

Step 3: Build the K-Means Clustering model

Now, let’s build the model with 3 clusters.

KMeans_=KMeans(featuresCol='iris_features', k=3) 
KMeans_Model=KMeans_.fit(assembled_data)
KMeans_Assignments=KMeans_Model.transform(assembled_data)

Step 4: Visualize Clustering using the PCA

Now, in order to visualize the 4-dimensional data into 2, we will use a dimensionality reduction technique viz. PCA. Spark has its own flavour of PCA.

First. perform the PCA. k=2 represents the number of principal components.

from pyspark.ml.feature import PCA as PCAml
pca = PCAml(k=2, inputCol="iris_features", outputCol="pca")
pca_model = pca.fit(assembled_data)
pca_transformed = pca_model.transform(assembled_data)

Next, extract the principal components

import numpy as np
x_pca = np.array(pca_transformed.rdd.map(lambda row: row.pca).collect())

Next, retrieve the cluster assignments from k-means assignments.

cluster_assignment = np.array(KMeans_Assignments.rdd.map(lambda row: row.prediction).collect()).reshape(-1,1)

Finally, plot the principal components.

import seaborn as sns
import matplotlib.pyplot as plt

pca_data = np.hstack((x_pca,cluster_assignment))

pca_df = pd.DataFrame(data=pca_data, columns=("1st_principal", "2nd_principal","cluster_assignment"))
sns.FacetGrid(pca_df,hue="cluster_assignment", height=6).map(plt.scatter, '1st_principal', '2nd_principal' ).add_legend()

plt.show()

Conclusion

We hope this article helps. This is only for information. We neither claim any guarantees regarding its completeness or accuracy.



I am a Data Scientist with 6+ years of experience.


Leave a Reply