# Using the SageMaker built-in K-Means algorithm

Amazon SageMaker provides several built-in algorithms that you can use for a variety of problem types. These algorithms provide high-performance, scalable machine learning and are optimized for speed, scale, and accuracy. In this notebook, we will explore K-means, which is an unsupervised learning algorithm for clustering use cases. K-means finds k cluster centroids for a given set of records, such that all points within a cluster are closer in distance to their centroid than they are to any other centroid.

The SageMaker built-in K-means algorithm has many improvements over other state-of-the-art implementations, including (1) the ability to create good clusters with only a single pass over the dataset; (2) GPU support for blazing fast performance (e.g. train on ~37Gb of data in 7 minutes for about U.S. $1.00; (3) the ability to not only be faster, but also achieve the same accuracy as state-of-the-art multiple pass implementations. 

We’ll use this K-means algorithm on the GDELT dataset, https://registry.opendata.aws/gdelt, which monitors world news media across the world; data is stored for every second of every day. This information is freely available on Amazon S3 as part of the AWS Public Datasets program. 

**PREREQUISTES**: be sure you are running this notebook with a MXNet kernel. For example, in SageMaker Studio you could use a Python 3 (MXNet CPU Optimized) kernel, while for a SageMaker notebook instance, the conda_mxnet_p36 kernel can be used.


## Data Processing and Exploration

To begin, we'll import some libraries we'll need throughout the notebook and specify a Amazon S3 bucket for the data.

In [None]:
%matplotlib inline

import boto3
import gzip
import numpy as np
import pandas as pd
import pickle
import re
import sklearn.cluster
import sklearn
import sys
import urllib.request
from sagemaker import get_execution_role
from sagemaker.session import Session
if not sys.warnoptions:
 import warnings
 warnings.simplefilter("ignore")

role = get_execution_role()

# S3 bucket and prefix
bucket = bucket = Session().default_bucket()
prefix = 'sagemaker/DEMO-kmeans'

The GDELT data are stored as multiple files on Amazon S3, with two different formats: historical, which covers the years from 1979 to 2013, and daily updates, which cover the years from 2013 on. For this example, we’ll stick to the historical format. Let’s bring in 1979 data for the purpose of interactive exploration. 

In [None]:
def get_gdelt(filename):
 s3 = boto3.resource('s3')
 s3.Bucket('gdelt-open-data').download_file('events/' + filename, '.gdelt.csv')
 df = pd.read_csv('.gdelt.csv', sep='\t')
 header = pd.read_csv('https://www.gdeltproject.org/data/lookups/CSV.header.historical.txt', sep='\t')
 df.columns = header.columns
 return df

data = get_gdelt('1979.csv')
data

### Processing the data

We'll now prepare the data for machine learning, with a few functions to help us scale this to GDELT datasets from other years. There are 57 columns, some of which are sparsely populated, cryptically named, and in a format that’s not particularly friendly for machine learning. So, for our use case, we’ll reduce to a few core attributes. 

In [None]:
import io
import os
import sagemaker.amazon.common as smac

data = data[['EventCode', 'NumArticles', 'AvgTone', 'Actor1Geo_Lat', 'Actor1Geo_Long', 'Actor2Geo_Lat', 'Actor2Geo_Long']]
data['EventCode'] = data['EventCode'].astype(object)

events = pd.crosstab(index=data['EventCode'], columns='count').sort_values(by='count', ascending=False).index[:20]

#routine that converts the training data into protobuf format required for Sagemaker K-means.
def write_to_s3(bucket, prefix, channel, file_prefix, X):
 buf = io.BytesIO()
 smac.write_numpy_to_dense_tensor(buf, X.astype('float32'))
 buf.seek(0)
 boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, channel, file_prefix + '.data')).upload_fileobj(buf)

#filter data based on actor locations and events as described above
def transform_gdelt(df, events=None):
 df = df[['AvgTone', 'EventCode', 'NumArticles', 'Actor1Geo_Lat', 'Actor1Geo_Long', 'Actor2Geo_Lat', 'Actor2Geo_Long']]
 df['EventCode'] = df['EventCode'].astype(object)
 if events is not None:
 df = df[np.in1d(df['EventCode'], events)]
 return pd.get_dummies(df[((df['Actor1Geo_Lat'] == 0) & (df['Actor1Geo_Long'] == 0) != True) &
 ((df['Actor2Geo_Lat'] == 0) & (df['Actor2Geo_Long'] == 0) != True)])

#prepare training training and save to S3.
def prepare_gdelt(bucket, prefix, file_prefix, events=None, random_state=1729, save_to_s3=True):
 df = get_gdelt(file_prefix + '.csv')
 model_data = transform_gdelt(df, events)
 train_data = model_data.sample(frac=1, random_state=random_state).to_numpy() #.as_matrix()
 if save_to_s3:
 write_to_s3(bucket, prefix, 'train', file_prefix, train_data)
 return train_data

In [None]:
BEGIN_YEAR = 1979
END_YEAR = 1980

for year in range(BEGIN_YEAR, END_YEAR):
 train_data = prepare_gdelt(bucket, prefix, str(year), events)

### Visualizing the data

We'll now briefly explore a sample of the dataset using the t-Distributed Stochastic Neighbor Embedding (TSNE) algorithm. TSNE is a non-linear dimensionality reduction algorithm often used for exploring high-dimensional data. Here, we'll use TSNE for visualizing the first 10000 data points from 1979 dataset. From this greatly simplified view of the data, it appears that the dataset may be amenable to modeling with a clustering algorithm such as K-means.

In [None]:
import matplotlib
from matplotlib import pyplot as plt
from sklearn import manifold

train_79 = prepare_gdelt(bucket, prefix, '1979', events, save_to_s3=False)

tsne = manifold.TSNE(n_components=2, init='pca', random_state=1200)
X_tsne = tsne.fit_transform(train_79[:10000])

plt.figure(figsize=(6, 5))
X_tsne_1000 = X_tsne[:1000]
plt.scatter(X_tsne_1000[:, 0], X_tsne_1000[:, 1])
plt.show()

## SageMaker Experiments setup

SageMaker Experiments is a great way to organize your data science work. You can create experiments to organize all your model development work for: [1] a business use case you are addressing (e.g. create an experiment named “customer churn prediction”), or [2] a data science team that owns the experiment (e.g. create experiment named “marketing analytics experiment”), or [3] a specific data science and ML project. Think of it as a “folder” for organizing your “files”.

To begin, we'll install the SageMaker Experiments SDK.

In [None]:
!{sys.executable} -m pip install sagemaker-experiments

Let's track the parameters from the data preprocessing step we performed above. To do this, we'll manually add the preprocessing step to a `Tracker` object. For larger datasets and more complex preprocessing, we'd likely use SageMaker Processing to spin up a cluster of preprocessing instances separate from this notebook. 

In [None]:
from sagemaker.analytics import ExperimentAnalytics

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker

with Tracker.create(display_name="Preprocessing", sagemaker_boto_client=boto3.client('sagemaker')) as tracker:
 tracker.log_parameters({
 "begin_year": BEGIN_YEAR,
 "end_year": END_YEAR,
 })
 # we can log the s3 uri to the dataset we just uploaded
 tracker.log_input(name="kmeans-dataset", media_type="s3/uri", value="s3://{}/{}/train/".format(bucket, prefix))

The SageMaker Experiments object itself is easily created with a minimal number of parameters. 

In [None]:
import time

kmeans_experiment = Experiment.create(
 experiment_name=f"kmeans-gdelt-{int(time.time())}", 
 description="Clustering on the GDELT dataset", 
 sagemaker_boto_client=boto3.client('sagemaker'))
print(kmeans_experiment)

## Training a set of K-means models in parallel

Finding the optimal number of clusters for a particular dataset often is at least partially a subjective judgment based on visual inspection of graphs. Typically multiple training jobs are run with different values of k (the number of clusters) to generate graph data. To speed up this process, we'll use the capability of SageMaker to easily incorporate parallelization in training. In particular, we'll:

- run multiple training jobs in parallel; AND
- further parallelize training by specifying that each training job is itself parallelized on a cluster of 2 instances.

In [None]:
from time import gmtime, strftime

output_time = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
output_folder = 'kmeans-gdelt-' + output_time
K = range(2, 12, 2) 
INSTANCE_COUNT = 2
# make this false to run jobs one at a time, e.g. to avoid resource limits if the range above is increased 
run_parallel_jobs = True 
job_names = []

For each training job, we'll provide a set of training parameters that differ primarily in the number of clusters.

In [None]:
from sagemaker.amazon.amazon_estimator import get_image_uri

def get_training_parameters(k, experiment_name, trial_name):
 
 create_training_params = \
 {
 "AlgorithmSpecification": {
 "TrainingImage": get_image_uri(boto3.Session().region_name, 'kmeans'),
 "TrainingInputMode": "File"
 },
 "RoleArn": role,
 "OutputDataConfig": {
 "S3OutputPath": output_location
 },
 "ResourceConfig": {
 "InstanceCount": INSTANCE_COUNT,
 "InstanceType": "ml.c4.8xlarge",
 "VolumeSizeInGB": 50
 },
 "TrainingJobName": job_name,
 "HyperParameters": {
 "k": str(k),
 "feature_dim": "26",
 "mini_batch_size": "1000"
 },
 "StoppingCondition": {
 "MaxRuntimeInSeconds": 60 * 60
 },
 "InputDataConfig": [
 {
 "ChannelName": "train",
 "DataSource": {
 "S3DataSource": {
 "S3DataType": "S3Prefix",
 "S3Uri": "s3://{}/{}/train/".format(bucket, prefix),
 "S3DataDistributionType": "FullyReplicated"
 }
 },

 "CompressionType": "None",
 "RecordWrapperType": "None"
 }
 ],
 "ExperimentConfig": {
 "ExperimentName": experiment_name,
 "TrialName": trial_name,
 "TrialComponentDisplayName": 'Training'
 }
 }
 
 return create_training_params

Now we will launch multiple training jobs in parallel. While training the models, we'll experiment with several values for the number of hidden channel in the model. We'll create a Trial to track each training job run, and a TrialComponent from the tracker we created before to add to the Trial. This will enrich the Trial with the parameters we captured from the data preprocessing stage.

In [None]:
preprocessing_trial_component = tracker.trial_component
k_trial_name_map = {}

for k in K:
 
 # create trial
 trial_name = f"kmeans-training-job-{k}-clusters-{int(time.time())}"
 kmeans_trial = Trial.create(
 trial_name=trial_name, 
 experiment_name=kmeans_experiment.experiment_name,
 sagemaker_boto_client=boto3.client('sagemaker'),
 )
 k_trial_name_map[k] = trial_name 
 # associate the preprocessing trial component with the current trial
 kmeans_trial.add_trial_component(preprocessing_trial_component)
 
 print('Starting train job with k = ' + str(k))
 output_location = 's3://{}/kmeans_example/output/'.format(bucket) + output_folder
 print('Training artifacts will be uploaded to: {}'.format(output_location))
 job_name = output_folder + str(k)

 sagemaker = boto3.client('sagemaker')
 create_training_params = get_training_parameters(k, kmeans_experiment.experiment_name, kmeans_trial.trial_name)
 sagemaker.create_training_job(**create_training_params)

 status = sagemaker.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
 print(status)
 if not run_parallel_jobs:
 try:
 sagemaker.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=job_name)
 finally:
 status = sagemaker.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
 print("Training job ended with status: " + status)
 if status == 'Failed':
 message = sagemaker.describe_training_job(TrainingJobName=job_name)['FailureReason']
 print('Training failed with the following error: {}'.format(message))
 raise Exception('Training job failed')
 
 job_names.append(job_name)

Running the following cell will check the job status; it will finish executing when all jobs have either completed or failed. Each individual job will take about 3 minutes, however, they will not start at exactly the same time, so you might expect the entire set of jobs to complete in about 5 minutes.

In [None]:
while len(job_names):
 try:
 sagemaker.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=job_names[0])
 finally:
 status = sagemaker.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
 print("Training job ended with status: " + status)
 if status == 'Failed':
 message = sagemaker.describe_training_job(TrainingJobName=job_name)['FailureReason']
 print('Training failed with the following error: {}'.format(message))
 raise Exception('Training job failed')

 print(job_name)

 info = sagemaker.describe_training_job(TrainingJobName=job_name)
 job_names.pop(0)

## Examine results with SageMaker Experiments

Now we will use the analytics capabilities of the SageMaker Experiments Python SDK to query and compare the training runs in our experiment. You can retrieve specific trial components, such as training, by using a search expression.

In [None]:
search_expression = {
 "Filters":[
 {
 "Name": "DisplayName",
 "Operator": "Equals",
 "Value": "Training",
 }
 ],
}

We'll display the training trial components in ascending order of average Mean Square Error, which was used as a metric during training. Typically the trial components dataframe will have many columns. We can limit the number of columns displayed in various ways. For example, we can limit which metrics columns to display (here we are excluding some other metrics such as training throughput), and which parameter columns (here only k since it is the only one varying, the others such as mini-batch size were fixed).

In [None]:
sess = boto3.Session()
sm = sess.client('sagemaker')

trial_component_analytics = ExperimentAnalytics(
 sagemaker_session=Session(sess, sm), 
 experiment_name=kmeans_experiment.experiment_name,
 search_expression=search_expression,
 sort_by="metrics.train:msd.avg",
 sort_order="Ascending",
 metric_names=['train:msd'],
 parameter_names=['k']
)

In [None]:
trial_component_analytics.dataframe()

Next let's look at an example of tracing the lineage of a model by accessing the data tracked by SageMaker Experiments for the trial with k = 8. This time the query also will return the preprocessing trial component, as well as the training component, so we can get a more complete picture of the steps taken to produce the model.

In [None]:
lineage_table = ExperimentAnalytics(
 sagemaker_session=Session(sess, sm), 
 search_expression={
 "Filters":[{
 "Name": "Parents.TrialName",
 "Operator": "Equals",
 "Value": k_trial_name_map[8]
 }]
 },
 sort_by="CreationTime",
 sort_order="Ascending",
 metric_names=['train:msd'],
)

In [None]:
lineage_table.dataframe()

## Apply the elbow method to determine the optimal number of clusters

Next we'll plot the Euclidean distance to the cluster centroids. In general, the error should decrease as k gets larger. This is because when the number of clusters increases, they should be smaller, so distortion is also smaller. This produces an “elbow effect” in the graph. The idea of the elbow method is to visually select the k at which the rate of decrease sharply shifts. 

In [None]:
import mxnet as mx
from scipy.spatial.distance import cdist

plt.plot()
colors = ['b', 'g', 'r']
markers = ['o', 'v', 's']
models = {}
distortions = []

for k in K:
 s3_client = boto3.client('s3')
 key = 'kmeans_example/output/' + output_folder +'/' + output_folder + str(k) + '/output/model.tar.gz'
 s3_client.download_file(bucket, key, 'model.tar.gz')
 print("Model for k={} ({})".format(k, key))
 !tar -xvf model.tar.gz 
 kmeans_model=mx.ndarray.load('model_algo-1')
 kmeans_numpy = kmeans_model[0].asnumpy()
 distortions.append(sum(np.min(cdist(train_data, kmeans_numpy, 'euclidean'), axis=1)) / train_data.shape[0])
 models[k] = kmeans_numpy
 
# Plot the elbow
plt.plot(K, distortions, 'bx-')
plt.xlabel('k')
plt.ylabel('distortion')
plt.title('Elbow graph')
plt.show()

Based on the graph above, k = 8 could be a good cluster size for this dataset. However, again this is partially subjective so k = 7 may be just as effective or more so for the use case. Also note that even though we referred to k as a "hyperparameter," we wouldn't apply hyperparameter optimization techniques (HPO) to tune k because it is a “static” hyperparameter — in general, there is a monotonically decreasing relationship between number of centroids and the objective metrics that SageMaker K-means reports. Accordingly, tuning for k would mean you would always end up at, or near, your maximum k value.

## Retrieving the best model

If we consider the model with k = 8 the best suited for our purposes, we can now retrieve it. Let's examine the map of training trial components.

In [None]:
print(k_trial_name_map)

The fourth trial component is the one with k = 8, so we will retrieve the model from that one (note that indexing starts at zero so it is at index 3). The model artifact is simply a zipped file stored in S3.

In [None]:
best_trial_component_name = trial_component_analytics.dataframe().iloc[3]['TrialComponentName']
best_trial_component = TrialComponent.load(best_trial_component_name)
model_data = best_trial_component.output_artifacts['SageMaker.ModelArtifact'].value
print(model_data)

In [None]:
from sagemaker import KMeansModel

model = KMeansModel(model_data=model_data,
 role=role,
 sagemaker_session=Session())

From here, the model can be deployed to a SageMaker hosted endpoint and used to obtain real time predictions, or used for batch inference. For example, to get the cluster assignments of each data point in the training data, code similar to the following could be used:

```python

predictor = model.deploy(instance_type='ml.m5.xlarge',
 initial_instance_count=1)

result = predictor.predict(train_data)

```

## Cleanup

To prevent unnecessary clutter in your AWS account, you can delete all of the information tracked by the Experiment as well as the Experiment itself.

> Trial components can exist independent of trials and experiments. You might want keep them if you plan on further exploration. If so, comment out tc.delete()

In [None]:
def cleanup(experiment):
 for trial_summary in experiment.list_trials():
 trial = Trial.load(sagemaker_boto_client=sm, trial_name=trial_summary.trial_name)
 for trial_component_summary in trial.list_trial_components():
 tc = TrialComponent.load(
 sagemaker_boto_client=sm,
 trial_component_name=trial_component_summary.trial_component_name)
 trial.remove_trial_component(tc)
 try:
 # comment out to keep trial components
 tc.delete()
 except:
 # tc is associated with another trial
 continue
 # to prevent throttling
 time.sleep(.5)
 trial.delete()
 experiment.delete()

In [None]:
cleanup(kmeans_experiment)