{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Introduction\n", "\n", "Welcome to your Amazon SageMaker notebook instance! \n", "\n", "This is a fully managed AWS environment that provides you a Jupyter Notebook to work with data. To learn more about Amazon SageMake notebook instances, check out our [documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/nbi.html).\n", "\n", "## Summary\n", "\n", "We're looking to build a linear model to help with our Wild Rydes machine learning challenge. A linear model is a supervised learning algorithm used to solving either classification or regression problems. This notebook will help us to:\n", "\n", "1. Setup serverless querying of data in S3 via [Amazon Athena](https://aws.amazon.com/athena/).\n", "2. Prepare dataframes using [pandas](https://pandas.pydata.org/) and [numpy](https://numpy.org).\n", "3. Build and train a machline learning model via the [Amazon SageMaker Python SDK](https://docs.aws.amazon.com/sagemaker/latest/dg/frameworks.html).\n", "\n", "To get started, let's input the name of the S3 bucket you created earlier in this workshop:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ACTION: provide the data bucket NAME you are using for this workshop\n", "data_bucket = '' " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll now use your S3 bucket to help interact with data.\n", "\n", "### Objective: Enable serverless querying via Amazon Athena\n", "\n", "The next thing we'll do is install [PyAthena](https://pypi.org/project/PyAthena/) so we can use Amazon Athena. Athena is a serverless query solution that will let use query data in S3 as if it were loaded into a database. Run the next cell to install the package we need to get started with Athena." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%capture \n", "## above line surpresses installation output from this cell\n", "\n", "\n", "## Install PyAthena to enable our notebook instance to use Athena\n", "import sys\n", "!{sys.executable} -m pip install PyAthena" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "PyAthena gives us the ability to connect to Amazon Athena so we can start defining data objects in S3. In the next cell, let's go ahead and load up the PyAthena, pandas, and boto3 library so we can start sending SQL queries to Athena. Our first query will be to establish a serverless database for use in this workshop." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# load necessary libraries\n", "from pyathena import connect\n", "import pandas as pd\n", "import boto3\n", "\n", "# create place to store athena query results\n", "athena_query_results = (\"s3://%s/results/\" % data_bucket)\n", "\n", "# establish athena connection in the same region as our S3 bucket\n", "data_bucket_region = boto3.client('s3').get_bucket_location(Bucket=data_bucket)['LocationConstraint']\n", "\n", "conn = connect(s3_staging_dir=athena_query_results,\n", " region_name=data_bucket_region)\n", "\n", "# create a serverless database\n", "athena_database = \"wildrydesworkshop\"\n", "\n", "sql_stmt_create_db = (\"CREATE DATABASE IF NOT EXISTS %s\" % athena_database)\n", "\n", "pd.read_sql(sql_stmt_create_db, conn)\n", "\n", "# list all databases to confirm our new database exists\n", "pd.read_sql((\"SHOW DATABASES LIKE '%s'\" % athena_database), conn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now have a serverless database that we can create external tables in. External tables are objects where the data is stored away from the related compute service. In this case, Athena is our query service and our data is stored separately in S3.\n", "\n", "In the Data Processing portion of this workshop, we processed some telemetry data and saved the results in S3. Let's go ahead and create an external table of this data so we can query in later." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create an external table based on the process ride telemetry data\n", "processed_table = \"ridetelemetry\"\n", "\n", "# drop the table if we're rerunning this cell\n", "pd.read_sql(\"DROP TABLE IF EXISTS %s.%s\" % (athena_database, processed_table),conn)\n", "\n", "# create an external table based on the schema output from our data procressing module\n", "sql_stmt_create_tb = \"\"\"\n", "CREATE EXTERNAL TABLE IF NOT EXISTS %s.%s (\n", " distance double\n", ",healthpoints bigint\n", ",latitude double\n", ",longitude double\n", ",magicpoints bigint\n", ",name string\n", ",statustime string\n", ",fieldservice bigint\n", ",groundstation string\n", ")\n", "ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'\n", "WITH SERDEPROPERTIES (\n", "\"field.delim\" = \",\"\n", ",\"skip.header.line.count\" = \"1\"\n", ")\n", "LOCATION 's3://%s/processed/'\n", "\"\"\" % (athena_database, processed_table, data_bucket)\n", "\n", "pd.read_sql(sql_stmt_create_tb, conn)\n", "\n", "# confirm that our new table exists\n", "pd.read_sql((\"SHOW TABLES IN %s\" % athena_database), conn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now have an external table in Athena based on our telemetry data. That's great but want to know what's even better? We can also make external tables of public datasets published to Amazon S3! That means all the historical weather data that NOAA stores in S3 can also be an external table for us to use. Let's go ahead and create an external table of it." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create an external table based on NOAA's weather files\n", "weather_table = \"weather\"\n", "\n", "# drop the table if we're rerunning this cell\n", "pd.read_sql(\"DROP TABLE IF EXISTS %s.%s\" % (athena_database, weather_table),conn)\n", "\n", "# create an external table based on the data schema @ https://docs.opendata.aws/noaa-ghcn-pds/readme.html\n", "# 'year_date' isn't expressed as a typical datetime type, so we'll worry about it later.\n", "sql_stmt_create_tb = \"\"\"\n", "CREATE EXTERNAL TABLE IF NOT EXISTS %s.%s (\n", " id string\n", ",year_date string\n", ",element string\n", ",data_value double\n", ",m_flag string\n", ",q_flag string\n", ",s_flag string\n", ",obs_time string\n", ")\n", "ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'\n", "WITH SERDEPROPERTIES (\n", "\"field.delim\" = \",\"\n", ")\n", "LOCATION 's3://noaa-ghcn-pds/csv/'\n", "\"\"\" % (athena_database, weather_table)\n", "\n", "# send the create table query statement to our Athena connection\n", "pd.read_sql(sql_stmt_create_tb, conn)\n", "\n", "# confirm that our new table exists\n", "pd.read_sql((\"SHOW TABLES IN %s\" % athena_database), conn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Checkpoint reached: Enabled querying data in S3\n", "\n", "We now have the ability to query both our telemetry data and NOAA data directly from S3 via Athena. Let's run a couple queries to see how large these tables are." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# count telemetry records\n", "pd.read_sql(\"\"\"\n", "SELECT '%s' as tablename, COUNT(*) records FROM %s.%s\n", "UNION ALL\n", "SELECT '%s' as tablename, COUNT(*) records FROM %s.%s\n", "\"\"\" % (processed_table, athena_database, processed_table,\n", " weather_table, athena_database, weather_table\n", " ), conn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Objective: Create dataframe objects for our linear model\n", "\n", "Our telemetry data looks pretty managable but check out the size of the weather data! We probably don't need the vast majority of that. Let's setup our telemetry data as a pandas dataframe so can we figure out how much weather data we really need." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create a Python panda dataframe with our processed ride telemetry data\n", "df_telemetry = pd.read_sql(\"SELECT * FROM %s.%s\" % (athena_database, processed_table), conn)\n", "\n", "# we need to describe the datetime format in order for our telemetry data to use\n", "# a datatime data type. Let's update our 'statustime' field to a datetime data type.\n", "df_telemetry['statustime'] = pd.to_datetime(df_telemetry['statustime'], format='%Y-%m-%d %H:%M:%S.%f')\n", "\n", "# let's add a date field that doesn't have time so we can easily join to weather data later\n", "df_telemetry['year_date'] = pd.to_datetime(df_telemetry['statustime'].dt.strftime('%Y-%m-%d'))\n", "\n", "# here's the descriptive summary of the telemetry dataframe\n", "df_telemetry.describe(include='all')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Thinking about how to filter down our weather data, the above telemetry dataframe description tells us:\n", "- There are only a couple groundstation ids\n", "- Our ride dates start in 2017\n", "\n", "Keeping those facts in mind, let's work on creating our weather dataframe next." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# first let's get our interested ground stations ids from our telemetry data\n", "unique_gs = tuple(df_telemetry.groundstation.unique())\n", "\n", "# weather data goes back far in time. let's only grab data for the years we need\n", "start_year = pd.to_datetime(df_telemetry.statustime.min()).year\n", "\n", "# let's create our query statement\n", "weather_query = \"\"\"\n", "SELECT * FROM %s.%s\n", "WHERE q_flag = ''\n", "AND id IN %s\n", "AND year(date_parse(year_date, '%%Y%%m%%d')) >= %s\n", "\"\"\" % (athena_database, weather_table, unique_gs, start_year)\n", "\n", "# now we'll pass the query to Athena to get back our interested weather data.\n", "# Athena will scan over 2 billion records (90+ GB) in just over 30 seconds.\n", "df_weather = pd.read_sql(weather_query, conn)\n", "\n", "# we want to make sure the 'data_value' field is a numeric column\n", "df_weather['data_value'] = pd.to_numeric(df_weather['data_value'])\n", "\n", "# let's also make sure 'year_date' is a proper date field\n", "df_weather['year_date'] = pd.to_datetime(df_weather['year_date'])\n", "\n", "#\n", "df_weather.describe(include='all')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# we want some interesting weather facts as features, so let's pivot the data\n", "df_weather_pivot = pd.DataFrame(df_weather, columns = ['id','year_date','element', 'data_value']) \\\n", " .query('element in (\"TMIN\", \"TMAX\", \"PRCP\")') \\\n", " .pivot_table(index=['id','year_date'], columns='element', values='data_value') \\\n", " .reset_index()\n", "\n", "# element definitions from https://docs.opendata.aws/noaa-ghcn-pds/readme.html\n", "## PRCP = Precipitation (tenths of mm)\n", "## TMAX = Maximum temperature (tenths of degrees C)\n", "## TMIN = Minimum temperature (tenths of degrees C)\n", "\n", "# now our weather data looks like this\n", "df_weather_pivot.describe(include='all')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Checkpoint reached: created dataframes as inputs into our linear model\n", "\n", "We crafted two dataframes: ride telemetry & pivoted weather facts. Now let's merge them into a single dataframe so we can start preparing our model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Let's merge our telemetry data with weather so we can include weather elements as model features\n", "merge_df = pd.merge(df_telemetry, df_weather_pivot\n", " , left_on=['groundstation','year_date']\n", " , right_on=['id', 'year_date']\n", " , how='left'\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Objective: Pre-Processing the Data\n", "Now that we have data ready for modeling, let's shape it into inputs for our model.\n", "\n", "Our linear learner is expecting numpy arrays. We'll first load the data into numpy arrays, and randomly split it into train set and test set with a 90/10 split." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "nbpresent": { "id": "a58fdf0d-32fb-4690-add3-433cc721773d" } }, "outputs": [], "source": [ "import numpy as np\n", "import os\n", "\n", "processed_subdir = \"standardized\"\n", "folder = \"/home/ec2-user/SageMaker/\"\n", "train_features_file = os.path.join(folder, processed_subdir, \"train/csv/features.csv\")\n", "train_labels_file = os.path.join(folder, processed_subdir, \"train/csv/labels.csv\")\n", "test_features_file = os.path.join(folder, processed_subdir, \"test/csv/features.csv\")\n", "test_labels_file = os.path.join(folder, processed_subdir, \"test/csv/labels.csv\")\n", "\n", "raw = merge_df[['distance','healthpoints','magicpoints','TMIN','TMAX','PRCP','fieldservice']].to_numpy(dtype=np.float32)\n", "\n", "# split into train/test with a 90/10 split\n", "np.random.seed(0)\n", "np.random.shuffle(raw)\n", "train_size = int(0.9 * raw.shape[0])\n", "train_features = raw[:train_size, :-1]\n", "train_labels = raw[:train_size, -1]\n", "test_features = raw[train_size:, :-1]\n", "test_labels = raw[train_size:, -1]\n", "\n", "print('train_features shape = ', train_features.shape)\n", "print('train_labels shape = ', train_labels.shape)\n", "print('test_features shape = ', test_features.shape)\n", "print('test_labels shape = ', test_labels.shape)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's write the training data to Amazon S3 in recordio-protobuf format. This allows us to persist the data in case we need to return to this exact training set later.\n", "\n", "We first create an io buffer wrapping the data, next we upload it to Amazon S3. Notice that the choice of bucket and prefix should change for different users and different datasets." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker.amazon.common as smac\n", "import io\n", "\n", "train_prefix = 'train'\n", "key = 'recordio-pb-data'\n", "\n", "buf = io.BytesIO()\n", "smac.write_numpy_to_dense_tensor(buf, train_features, train_labels)\n", "buf.seek(0)\n", "\n", "boto3.resource('s3').Bucket(data_bucket).Object(os.path.join(train_prefix, key)).upload_fileobj(buf)\n", "s3_train_data = 's3://{}/{}/{}'.format(data_bucket, train_prefix, key)\n", "print('uploaded training data location: {}'.format(s3_train_data))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We should save our testing dataset to S3, too. This way we can evaluate the performance of the model from the training logs alongside the actual test data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_prefix = 'test'\n", "\n", "buf = io.BytesIO()\n", "smac.write_numpy_to_dense_tensor(buf, test_features, test_labels)\n", "buf.seek(0)\n", "\n", "boto3.resource('s3').Bucket(data_bucket).Object(os.path.join(test_prefix, key)).upload_fileobj(buf)\n", "s3_test_data = 's3://{}/{}/{}'.format(data_bucket, test_prefix, key)\n", "print('uploaded test data location: {}'.format(s3_test_data))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Checkpoint reached: data is ready for our linear learner model\n", "\n", "### Objective: build and train our linear learner model\n", "\n", "We take a moment to explain at a high level, how Machine Learning training and prediction works in Amazon SageMaker. First, we need to train a model. This is a process that given a labeled dataset and hyper-parameters guiding the training process, outputs a model. Once the training is done, we set up what is called an **endpoint**. An endpoint is a web service that given a request containing an unlabeled data point, or mini-batch of data points, returns a prediction(s).\n", "\n", "In Amazon SageMaker the training is done via an object called an **estimator**. When setting up the estimator we specify the location (in Amazon S3) of the training data, the path (again in Amazon S3) to the output directory where the model will be serialized, generic hyper-parameters such as the machine type to use during the training process, and specific hyper-parameters such as the index type, etc. Once the estimator is initialized, we can call its **fit** method in order to do the actual training.\n", "\n", "Now that we are ready for training, we start with a convenience function that starts a training job." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "from sagemaker.predictor import csv_serializer, json_deserializer\n", "from sagemaker.amazon.amazon_estimator import get_image_uri\n", "\n", "\n", "def trained_estimator_from_hyperparams(s3_train_data, hyperparams, output_path, s3_test_data=None):\n", " \"\"\"\n", " Create an Estimator from the given hyperparams, fit to training data, \n", " and return a deployed predictor\n", " \n", " \"\"\"\n", " # set up the estimator\n", " linear = sagemaker.estimator.Estimator(get_image_uri(boto3.Session().region_name, \"linear-learner\"),\n", " get_execution_role(),\n", " train_instance_count=1,\n", " train_instance_type='ml.m5.2xlarge',\n", " output_path=output_path,\n", " sagemaker_session=sagemaker.Session())\n", " linear.set_hyperparameters(**hyperparams)\n", " \n", " # train a model. fit_input contains the locations of the train and test data\n", " fit_input = {'train': s3_train_data}\n", " if s3_test_data is not None:\n", " fit_input['test'] = s3_test_data\n", " linear.fit(fit_input)\n", " return linear" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we run the actual training job. For now, we stick to default parameters." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import math\n", "\n", "hyperparams = {\n", " 'feature_dim': int(train_features.shape[1]),\n", " 'mini_batch_size': int(0.1 * train_features.shape[0]),\n", " 'predictor_type': 'binary_classifier' \n", "}\n", "\n", "output_path = 's3://' + data_bucket\n", "linear_estimator = trained_estimator_from_hyperparams(s3_train_data, hyperparams, output_path, \n", " s3_test_data=s3_test_data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that we mentioned a test set in the training job. When a test set is provided the training job doesn't just produce a model but also applies it to the test set and reports the accuracy. In the logs you can view the accuracy of the model on the test set." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Conclusion\n", "\n", "We now have a trained model living in S3. Instead of creating a SageMaker Endpoint, we will use Lambda to make inferences against the model.\n", "\n", "If you want to test the model using a SageMaker Endpoint before moving on, check out our documentation:\n", "https://docs.aws.amazon.com/sagemaker/latest/dg/ex1-deploy-model.html#ex1-deploy-model-boto\n", "\n", "At this point you can close out of the SageMaker notebook and continue with the workshop instructions." ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" }, "notice": "Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License." }, "nbformat": 4, "nbformat_minor": 4 }