{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Using the SageMaker built-in K-Means algorithm\n", "\n", "Amazon SageMaker provides several built-in algorithms that you can use for a variety of problem types. These algorithms provide high-performance, scalable machine learning and are optimized for speed, scale, and accuracy. In this notebook, we will explore K-means, which is an unsupervised learning algorithm for clustering use cases. K-means finds k cluster centroids for a given set of records, such that all points within a cluster are closer in distance to their centroid than they are to any other centroid.\n", "\n", "The SageMaker built-in K-means algorithm has many improvements over other state-of-the-art implementations, including (1) the ability to create good clusters with only a single pass over the dataset; (2) GPU support for blazing fast performance (e.g. train on ~37Gb of data in 7 minutes for about U.S. $1.00; (3) the ability to not only be faster, but also achieve the same accuracy as state-of-the-art multiple pass implementations. \n", "\n", "We’ll use this K-means algorithm on the GDELT dataset, https://registry.opendata.aws/gdelt, which monitors world news media across the world; data is stored for every second of every day. This information is freely available on Amazon S3 as part of the AWS Public Datasets program. \n", "\n", "**PREREQUISTES**: be sure you are running this notebook with a MXNet kernel. For example, in SageMaker Studio you could use a Python 3 (MXNet CPU Optimized) kernel, while for a SageMaker notebook instance, the conda_mxnet_p36 kernel can be used.\n", "\n", "\n", "## Data Processing and Exploration\n", "\n", "To begin, we'll import some libraries we'll need throughout the notebook and specify a Amazon S3 bucket for the data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "\n", "import boto3\n", "import gzip\n", "import numpy as np\n", "import pandas as pd\n", "import pickle\n", "import re\n", "import sklearn.cluster\n", "import sklearn\n", "import sys\n", "import urllib.request\n", "from sagemaker import get_execution_role\n", "from sagemaker.session import Session\n", "if not sys.warnoptions:\n", " import warnings\n", " warnings.simplefilter(\"ignore\")\n", "\n", "role = get_execution_role()\n", "\n", "# S3 bucket and prefix\n", "bucket = bucket = Session().default_bucket()\n", "prefix = 'sagemaker/DEMO-kmeans'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The GDELT data are stored as multiple files on Amazon S3, with two different formats: historical, which covers the years from 1979 to 2013, and daily updates, which cover the years from 2013 on. For this example, we’ll stick to the historical format. Let’s bring in 1979 data for the purpose of interactive exploration. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def get_gdelt(filename):\n", " s3 = boto3.resource('s3')\n", " s3.Bucket('gdelt-open-data').download_file('events/' + filename, '.gdelt.csv')\n", " df = pd.read_csv('.gdelt.csv', sep='\\t')\n", " header = pd.read_csv('https://www.gdeltproject.org/data/lookups/CSV.header.historical.txt', sep='\\t')\n", " df.columns = header.columns\n", " return df\n", "\n", "data = get_gdelt('1979.csv')\n", "data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Processing the data\n", "\n", "We'll now prepare the data for machine learning, with a few functions to help us scale this to GDELT datasets from other years. There are 57 columns, some of which are sparsely populated, cryptically named, and in a format that’s not particularly friendly for machine learning. So, for our use case, we’ll reduce to a few core attributes. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import io\n", "import os\n", "import sagemaker.amazon.common as smac\n", "\n", "data = data[['EventCode', 'NumArticles', 'AvgTone', 'Actor1Geo_Lat', 'Actor1Geo_Long', 'Actor2Geo_Lat', 'Actor2Geo_Long']]\n", "data['EventCode'] = data['EventCode'].astype(object)\n", "\n", "events = pd.crosstab(index=data['EventCode'], columns='count').sort_values(by='count', ascending=False).index[:20]\n", "\n", "#routine that converts the training data into protobuf format required for Sagemaker K-means.\n", "def write_to_s3(bucket, prefix, channel, file_prefix, X):\n", " buf = io.BytesIO()\n", " smac.write_numpy_to_dense_tensor(buf, X.astype('float32'))\n", " buf.seek(0)\n", " boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, channel, file_prefix + '.data')).upload_fileobj(buf)\n", "\n", "#filter data based on actor locations and events as described above\n", "def transform_gdelt(df, events=None):\n", " df = df[['AvgTone', 'EventCode', 'NumArticles', 'Actor1Geo_Lat', 'Actor1Geo_Long', 'Actor2Geo_Lat', 'Actor2Geo_Long']]\n", " df['EventCode'] = df['EventCode'].astype(object)\n", " if events is not None:\n", " df = df[np.in1d(df['EventCode'], events)]\n", " return pd.get_dummies(df[((df['Actor1Geo_Lat'] == 0) & (df['Actor1Geo_Long'] == 0) != True) &\n", " ((df['Actor2Geo_Lat'] == 0) & (df['Actor2Geo_Long'] == 0) != True)])\n", "\n", "#prepare training training and save to S3.\n", "def prepare_gdelt(bucket, prefix, file_prefix, events=None, random_state=1729, save_to_s3=True):\n", " df = get_gdelt(file_prefix + '.csv')\n", " model_data = transform_gdelt(df, events)\n", " train_data = model_data.sample(frac=1, random_state=random_state).to_numpy() #.as_matrix()\n", " if save_to_s3:\n", " write_to_s3(bucket, prefix, 'train', file_prefix, train_data)\n", " return train_data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "BEGIN_YEAR = 1979\n", "END_YEAR = 1980\n", "\n", "for year in range(BEGIN_YEAR, END_YEAR):\n", " train_data = prepare_gdelt(bucket, prefix, str(year), events)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Visualizing the data\n", "\n", "We'll now briefly explore a sample of the dataset using the t-Distributed Stochastic Neighbor Embedding (TSNE) algorithm. TSNE is a non-linear dimensionality reduction algorithm often used for exploring high-dimensional data. Here, we'll use TSNE for visualizing the first 10000 data points from 1979 dataset. From this greatly simplified view of the data, it appears that the dataset may be amenable to modeling with a clustering algorithm such as K-means." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import matplotlib\n", "from matplotlib import pyplot as plt\n", "from sklearn import manifold\n", "\n", "train_79 = prepare_gdelt(bucket, prefix, '1979', events, save_to_s3=False)\n", "\n", "tsne = manifold.TSNE(n_components=2, init='pca', random_state=1200)\n", "X_tsne = tsne.fit_transform(train_79[:10000])\n", "\n", "plt.figure(figsize=(6, 5))\n", "X_tsne_1000 = X_tsne[:1000]\n", "plt.scatter(X_tsne_1000[:, 0], X_tsne_1000[:, 1])\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SageMaker Experiments setup\n", "\n", "SageMaker Experiments is a great way to organize your data science work. You can create experiments to organize all your model development work for: [1] a business use case you are addressing (e.g. create an experiment named “customer churn prediction”), or [2] a data science team that owns the experiment (e.g. create experiment named “marketing analytics experiment”), or [3] a specific data science and ML project. Think of it as a “folder” for organizing your “files”.\n", "\n", "To begin, we'll install the SageMaker Experiments SDK." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!{sys.executable} -m pip install sagemaker-experiments" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's track the parameters from the data preprocessing step we performed above. To do this, we'll manually add the preprocessing step to a `Tracker` object. For larger datasets and more complex preprocessing, we'd likely use SageMaker Processing to spin up a cluster of preprocessing instances separate from this notebook. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.analytics import ExperimentAnalytics\n", "\n", "from smexperiments.experiment import Experiment\n", "from smexperiments.trial import Trial\n", "from smexperiments.trial_component import TrialComponent\n", "from smexperiments.tracker import Tracker\n", "\n", "with Tracker.create(display_name=\"Preprocessing\", sagemaker_boto_client=boto3.client('sagemaker')) as tracker:\n", " tracker.log_parameters({\n", " \"begin_year\": BEGIN_YEAR,\n", " \"end_year\": END_YEAR,\n", " })\n", " # we can log the s3 uri to the dataset we just uploaded\n", " tracker.log_input(name=\"kmeans-dataset\", media_type=\"s3/uri\", value=\"s3://{}/{}/train/\".format(bucket, prefix))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The SageMaker Experiments object itself is easily created with a minimal number of parameters. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "\n", "kmeans_experiment = Experiment.create(\n", " experiment_name=f\"kmeans-gdelt-{int(time.time())}\", \n", " description=\"Clustering on the GDELT dataset\", \n", " sagemaker_boto_client=boto3.client('sagemaker'))\n", "print(kmeans_experiment)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Training a set of K-means models in parallel\n", "\n", "Finding the optimal number of clusters for a particular dataset often is at least partially a subjective judgment based on visual inspection of graphs. Typically multiple training jobs are run with different values of k (the number of clusters) to generate graph data. To speed up this process, we'll use the capability of SageMaker to easily incorporate parallelization in training. In particular, we'll:\n", "\n", "- run multiple training jobs in parallel; AND\n", "- further parallelize training by specifying that each training job is itself parallelized on a cluster of 2 instances." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from time import gmtime, strftime\n", "\n", "output_time = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "output_folder = 'kmeans-gdelt-' + output_time\n", "K = range(2, 12, 2) \n", "INSTANCE_COUNT = 2\n", "# make this false to run jobs one at a time, e.g. to avoid resource limits if the range above is increased \n", "run_parallel_jobs = True \n", "job_names = []" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For each training job, we'll provide a set of training parameters that differ primarily in the number of clusters." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.amazon.amazon_estimator import get_image_uri\n", "\n", "def get_training_parameters(k, experiment_name, trial_name):\n", " \n", " create_training_params = \\\n", " {\n", " \"AlgorithmSpecification\": {\n", " \"TrainingImage\": get_image_uri(boto3.Session().region_name, 'kmeans'),\n", " \"TrainingInputMode\": \"File\"\n", " },\n", " \"RoleArn\": role,\n", " \"OutputDataConfig\": {\n", " \"S3OutputPath\": output_location\n", " },\n", " \"ResourceConfig\": {\n", " \"InstanceCount\": INSTANCE_COUNT,\n", " \"InstanceType\": \"ml.c4.8xlarge\",\n", " \"VolumeSizeInGB\": 50\n", " },\n", " \"TrainingJobName\": job_name,\n", " \"HyperParameters\": {\n", " \"k\": str(k),\n", " \"feature_dim\": \"26\",\n", " \"mini_batch_size\": \"1000\"\n", " },\n", " \"StoppingCondition\": {\n", " \"MaxRuntimeInSeconds\": 60 * 60\n", " },\n", " \"InputDataConfig\": [\n", " {\n", " \"ChannelName\": \"train\",\n", " \"DataSource\": {\n", " \"S3DataSource\": {\n", " \"S3DataType\": \"S3Prefix\",\n", " \"S3Uri\": \"s3://{}/{}/train/\".format(bucket, prefix),\n", " \"S3DataDistributionType\": \"FullyReplicated\"\n", " }\n", " },\n", "\n", " \"CompressionType\": \"None\",\n", " \"RecordWrapperType\": \"None\"\n", " }\n", " ],\n", " \"ExperimentConfig\": {\n", " \"ExperimentName\": experiment_name,\n", " \"TrialName\": trial_name,\n", " \"TrialComponentDisplayName\": 'Training'\n", " }\n", " }\n", " \n", " return create_training_params" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we will launch multiple training jobs in parallel. While training the models, we'll experiment with several values for the number of hidden channel in the model. We'll create a Trial to track each training job run, and a TrialComponent from the tracker we created before to add to the Trial. This will enrich the Trial with the parameters we captured from the data preprocessing stage." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "preprocessing_trial_component = tracker.trial_component\n", "k_trial_name_map = {}\n", "\n", "for k in K:\n", " \n", " # create trial\n", " trial_name = f\"kmeans-training-job-{k}-clusters-{int(time.time())}\"\n", " kmeans_trial = Trial.create(\n", " trial_name=trial_name, \n", " experiment_name=kmeans_experiment.experiment_name,\n", " sagemaker_boto_client=boto3.client('sagemaker'),\n", " )\n", " k_trial_name_map[k] = trial_name \n", " # associate the preprocessing trial component with the current trial\n", " kmeans_trial.add_trial_component(preprocessing_trial_component)\n", " \n", " print('Starting train job with k = ' + str(k))\n", " output_location = 's3://{}/kmeans_example/output/'.format(bucket) + output_folder\n", " print('Training artifacts will be uploaded to: {}'.format(output_location))\n", " job_name = output_folder + str(k)\n", "\n", " sagemaker = boto3.client('sagemaker')\n", " create_training_params = get_training_parameters(k, kmeans_experiment.experiment_name, kmeans_trial.trial_name)\n", " sagemaker.create_training_job(**create_training_params)\n", "\n", " status = sagemaker.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']\n", " print(status)\n", " if not run_parallel_jobs:\n", " try:\n", " sagemaker.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=job_name)\n", " finally:\n", " status = sagemaker.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']\n", " print(\"Training job ended with status: \" + status)\n", " if status == 'Failed':\n", " message = sagemaker.describe_training_job(TrainingJobName=job_name)['FailureReason']\n", " print('Training failed with the following error: {}'.format(message))\n", " raise Exception('Training job failed')\n", " \n", " job_names.append(job_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Running the following cell will check the job status; it will finish executing when all jobs have either completed or failed. Each individual job will take about 3 minutes, however, they will not start at exactly the same time, so you might expect the entire set of jobs to complete in about 5 minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "while len(job_names):\n", " try:\n", " sagemaker.get_waiter('training_job_completed_or_stopped').wait(TrainingJobName=job_names[0])\n", " finally:\n", " status = sagemaker.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']\n", " print(\"Training job ended with status: \" + status)\n", " if status == 'Failed':\n", " message = sagemaker.describe_training_job(TrainingJobName=job_name)['FailureReason']\n", " print('Training failed with the following error: {}'.format(message))\n", " raise Exception('Training job failed')\n", "\n", " print(job_name)\n", "\n", " info = sagemaker.describe_training_job(TrainingJobName=job_name)\n", " job_names.pop(0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Examine results with SageMaker Experiments\n", "\n", "Now we will use the analytics capabilities of the SageMaker Experiments Python SDK to query and compare the training runs in our experiment. You can retrieve specific trial components, such as training, by using a search expression." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "search_expression = {\n", " \"Filters\":[\n", " {\n", " \"Name\": \"DisplayName\",\n", " \"Operator\": \"Equals\",\n", " \"Value\": \"Training\",\n", " }\n", " ],\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll display the training trial components in ascending order of average Mean Square Error, which was used as a metric during training. Typically the trial components dataframe will have many columns. We can limit the number of columns displayed in various ways. For example, we can limit which metrics columns to display (here we are excluding some other metrics such as training throughput), and which parameter columns (here only k since it is the only one varying, the others such as mini-batch size were fixed)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sess = boto3.Session()\n", "sm = sess.client('sagemaker')\n", "\n", "trial_component_analytics = ExperimentAnalytics(\n", " sagemaker_session=Session(sess, sm), \n", " experiment_name=kmeans_experiment.experiment_name,\n", " search_expression=search_expression,\n", " sort_by=\"metrics.train:msd.avg\",\n", " sort_order=\"Ascending\",\n", " metric_names=['train:msd'],\n", " parameter_names=['k']\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "trial_component_analytics.dataframe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next let's look at an example of tracing the lineage of a model by accessing the data tracked by SageMaker Experiments for the trial with k = 8. This time the query also will return the preprocessing trial component, as well as the training component, so we can get a more complete picture of the steps taken to produce the model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lineage_table = ExperimentAnalytics(\n", " sagemaker_session=Session(sess, sm), \n", " search_expression={\n", " \"Filters\":[{\n", " \"Name\": \"Parents.TrialName\",\n", " \"Operator\": \"Equals\",\n", " \"Value\": k_trial_name_map[8]\n", " }]\n", " },\n", " sort_by=\"CreationTime\",\n", " sort_order=\"Ascending\",\n", " metric_names=['train:msd'],\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "lineage_table.dataframe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Apply the elbow method to determine the optimal number of clusters\n", "\n", "Next we'll plot the Euclidean distance to the cluster centroids. In general, the error should decrease as k gets larger. This is because when the number of clusters increases, they should be smaller, so distortion is also smaller. This produces an “elbow effect” in the graph. The idea of the elbow method is to visually select the k at which the rate of decrease sharply shifts. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import mxnet as mx\n", "from scipy.spatial.distance import cdist\n", "\n", "plt.plot()\n", "colors = ['b', 'g', 'r']\n", "markers = ['o', 'v', 's']\n", "models = {}\n", "distortions = []\n", "\n", "for k in K:\n", " s3_client = boto3.client('s3')\n", " key = 'kmeans_example/output/' + output_folder +'/' + output_folder + str(k) + '/output/model.tar.gz'\n", " s3_client.download_file(bucket, key, 'model.tar.gz')\n", " print(\"Model for k={} ({})\".format(k, key))\n", " !tar -xvf model.tar.gz \n", " kmeans_model=mx.ndarray.load('model_algo-1')\n", " kmeans_numpy = kmeans_model[0].asnumpy()\n", " distortions.append(sum(np.min(cdist(train_data, kmeans_numpy, 'euclidean'), axis=1)) / train_data.shape[0])\n", " models[k] = kmeans_numpy\n", " \n", "# Plot the elbow\n", "plt.plot(K, distortions, 'bx-')\n", "plt.xlabel('k')\n", "plt.ylabel('distortion')\n", "plt.title('Elbow graph')\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Based on the graph above, k = 8 could be a good cluster size for this dataset. However, again this is partially subjective so k = 7 may be just as effective or more so for the use case. Also note that even though we referred to k as a \"hyperparameter,\" we wouldn't apply hyperparameter optimization techniques (HPO) to tune k because it is a “static” hyperparameter — in general, there is a monotonically decreasing relationship between number of centroids and the objective metrics that SageMaker K-means reports. Accordingly, tuning for k would mean you would always end up at, or near, your maximum k value." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Retrieving the best model\n", "\n", "If we consider the model with k = 8 the best suited for our purposes, we can now retrieve it. Let's examine the map of training trial components." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(k_trial_name_map)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The fourth trial component is the one with k = 8, so we will retrieve the model from that one (note that indexing starts at zero so it is at index 3). The model artifact is simply a zipped file stored in S3." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "best_trial_component_name = trial_component_analytics.dataframe().iloc[3]['TrialComponentName']\n", "best_trial_component = TrialComponent.load(best_trial_component_name)\n", "model_data = best_trial_component.output_artifacts['SageMaker.ModelArtifact'].value\n", "print(model_data)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker import KMeansModel\n", "\n", "model = KMeansModel(model_data=model_data,\n", " role=role,\n", " sagemaker_session=Session())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "From here, the model can be deployed to a SageMaker hosted endpoint and used to obtain real time predictions, or used for batch inference. For example, to get the cluster assignments of each data point in the training data, code similar to the following could be used:\n", "\n", "```python\n", "\n", "predictor = model.deploy(instance_type='ml.m5.xlarge',\n", " initial_instance_count=1)\n", "\n", "result = predictor.predict(train_data)\n", "\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleanup\n", "\n", "To prevent unnecessary clutter in your AWS account, you can delete all of the information tracked by the Experiment as well as the Experiment itself.\n", "\n", "> Trial components can exist independent of trials and experiments. You might want keep them if you plan on further exploration. If so, comment out tc.delete()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def cleanup(experiment):\n", " for trial_summary in experiment.list_trials():\n", " trial = Trial.load(sagemaker_boto_client=sm, trial_name=trial_summary.trial_name)\n", " for trial_component_summary in trial.list_trial_components():\n", " tc = TrialComponent.load(\n", " sagemaker_boto_client=sm,\n", " trial_component_name=trial_component_summary.trial_component_name)\n", " trial.remove_trial_component(tc)\n", " try:\n", " # comment out to keep trial components\n", " tc.delete()\n", " except:\n", " # tc is associated with another trial\n", " continue\n", " # to prevent throttling\n", " time.sleep(.5)\n", " trial.delete()\n", " experiment.delete()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cleanup(kmeans_experiment)" ] } ], "metadata": { "kernelspec": { "display_name": "conda_mxnet_p36", "language": "python", "name": "conda_mxnet_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 4 }