{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Stock Price Prediction, using SageMaker DeepAR\n", "\n", "\n", "DeepAR is a supervised learning algorithm for forecasting scalar time series. This notebook demonstrates:\n", "- How to format Deutsche Börse maintained stock market data to be used as training input and prediction on DeepAR\n", "- Usage of `Dynamic Features` on DeepAR, with metrices within the Deutsche Börse dataset used as dynamic features\n", "- How to easily train an DeepAR Estimator and create a Predictor using DeepAR container image\n", "\n", "Running the model training using the highest memory CPU based [ML instance type](https://aws.amazon.com/sagemaker/pricing/instance-types/) available on AWS at this time (`ml.c5.18xlarge`), it should take about 15 minutes to train the model if you use data sampled at daily interval.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import datetime\n", "import sagemaker\n", "import pandas as pd\n", "import numpy as np\n", "from __future__ import print_function\n", "from ipywidgets import interact, interactive, fixed, interact_manual\n", "import ipywidgets as widgets\n", "from ipywidgets import IntSlider, FloatSlider, Checkbox\n", "import matplotlib\n", "import deepar_util as util\n", "from deepar_util import DeepARPredictor" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Choice of Series\n", "\n", "As we observed during data analysis phase that there are certain stocks that are clustered relatively more, based on their past price trends. Naturally, using these similarly moving stock would increase overall model performance, in absence of any other available external time series.\n", "\n", "Therefore, like before, use the stocks of companies in automobile industry, and tjhe closing price of these series as the time series' of choice, for the model to build upon. THe difference here however is that, we do not have to specify which is the main series, and which ones are exogenous. DeepAR algorithm works to build a model that accounts for trends in all provided time series, and can generate forecast on any of those. This saves you the hassle of building different models for different time series.\n", "\n", "Just like we used covariate time series in our custom RNN example, we can use such series on DeepAR, by using the Dynamic Features. The data formatting function works to take all covariate columns, and format those as dynamic features for each time series.\n", "\n", "So, in effect, following the configuration below, you end up having 6 time series' - the closing prices of the 6 stocks, each with 3 dynamic feature series' - opening price, and minimum and maximum prices." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Define parameters\n", "interval = 'D' #Use D or H\n", "\n", "assert interval == 'D' or interval == 'H'\n", "\n", "if interval == 'D':\n", " prediction_length = 91 #when interval = D \n", " context_length = 91\n", "elif interval == 'H':\n", " prediction_length = 2184 #when interval = H\n", " context_length = 2184\n", " \n", " \n", "mnemonics = ['CON','DAI','PAH3','BMW','VOW3']\n", "target_column = 'EndPrice'\n", "covariate_columns = ['StartPrice', 'MinPrice', 'MaxPrice']\n", "\n", "train_test_split = 0.8\n", "num_test_windows = 4\n", " \n", "hyperparameters = {\n", " \"prediction_length\": str(prediction_length), #number of time-steps model is trained to predict, always generates forecasts with this length\n", " \"context_length\": str(context_length), #number of time-points that the model gets to see before making the prediction, should be about same as the prediction_length\n", " \"time_freq\": interval, #granularity of the time series in the dataset\n", " \"epochs\": \"200\", #maximum number of passes over the training data\n", " \"early_stopping_patience\": \"40\", #training stops when no progress is made within the specified number of epochs\n", " \"num_layers\": \"2\", #number of hidden layers in the RNN, typically range from 1 to 4 \n", " \"num_cells\": \"40\", #number of cells to use in each hidden layer of the RNN, typically range from 30 to 100\n", " \"mini_batch_size\": \"128\", #size of mini-batches used during training, typically values range from 32 to 512\n", " \"learning_rate\": \"1e-3\", #learning rate used in training. Typical values range from 1e-4 to 1e-1\n", " \"dropout_rate\": \"0.1\", # dropout rate to use for regularization, typically less than 0.2. \n", " \"likelihood\": \"gaussian\" #noise model used for uncertainty estimates - gaussian/beta/negative-binomial/student-T/deterministic-L1\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Hyperparameters\n", "\n", "In our custom RNN example, we implemented the training code, so that certain aspects of training can be customized using hyperparamaters. In case of DeepAR, like any AWS provided algorithms, you can use similar hyparparameters configuration to get the most out of your model.\n", "\n", "[DeepAR Hhyperparameters](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar_hyperparameters.html) allows you to control certain aspects of your NN architecture, such as number of layers, number of recurrent units in each layer, learning rate, prediction length and context (horizon and lag), dropout rate etc. In the cell above we configured some, so as to get good accuracy, within a reasonable training time, as suitable or this workshop.\n", "\n", "In practice, you can use another valuable feature of SageMaker - [Hyperparameter tuning](https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning.html) to conduct a sort of grid search over your hyperparameters space and find the best combination for your use case." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Define IAM role and session\n", "role = sagemaker.get_execution_role()\n", "session = sagemaker.Session()\n", "\n", "#Obtain container image URI for SageMaker-DeepAR algorithm, based on region\n", "region = session.boto_region_name\n", "image_name = sagemaker.amazon.amazon_estimator.get_image_uri(region, \"forecasting-deepar\", \"latest\")\n", "print(\"Model will be trained using container image : {}\".format(image_name))\n", "\n", "#Define training data location\n", "s3_data_key = 'dbg-stockdata/source'\n", "s3_bucket = session.default_bucket()\n", "s3_output_path = \"s3://{}/{}/{}/output\".format(s3_bucket, s3_data_key, interval)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Training Data\n", "\n", "During data preparation steps, you uploaded the resampled data to your S3 bucket, attached to your SageMaker session, under an appropriate prefix, depending on resampling interval. \n", "\n", "In order to use the data for DeepAr however, you'll need to do some preprocessing to have the data formatted following [DeepAR Input/Output interface](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar.html#deepar-inputoutput). \n", "The function named `deeparize` in the utility file - [deepar_util.py](./deepar_util.py) does this transformation." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "metrics=[]\n", "metrics.extend(covariate_columns)\n", "metrics.append(target_column)\n", "stock_data_series, symbols = util.load_resampled_from_s3(interval, s3_bucket, s3_data_key, mnemonics, metrics)\n", "print(stock_data_series.shape)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally you split the formatted data into training and test channels and host on your S3 bucket, to be fed into estimator at training phase.\n", "\n", "Notice here, unlike our custom RNN code, where the train-test split was done inside the container, DeepAR expects you to do the split outside and provide the data as separate channels." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "train_data, test_data, train_start_dt, train_end_dt = util.generate_train_test_set(stock_data_series, target_column, covariate_columns, interval, train_test_split, num_test_windows)\n", "end_training = pd.Timestamp(datetime.datetime.strptime(str(train_end_dt), \"%Y-%m-%d %H:%M:%S\").strftime(\"%Y-%m-%d %H:%M:%S\"), freq = interval)\n", "\n", "train_channel = util.write_dicts_to_file(train_data, interval, s3_bucket, s3_data_key, \"train\")\n", "test_channel = util.write_dicts_to_file(test_data, interval, s3_bucket, s3_data_key, \"test\")\n", "print(\"Train channel {}\".format(train_channel))\n", "print(\"Test channel {}\".format(test_channel))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To validate that the series chosen indeed contains the correlated features that we hoped for, and that the deparize-transformation is working as expected, you can plot the series values - on separate plots, and on single plots.\n", "\n", "This is an optional step, but it often is a agood idea to have the data sanity checked before feeding to the model, to avoid costly mistakes later on." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "util.metrics_plot(mnemonics, metrics, stock_data_series)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "matplotlib.rcParams['figure.figsize'] = (25, 17) # use bigger graphs\n", "util.timeseries_plot(mnemonics, metrics, stock_data_series)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Model Training\n", "\n", "Just like in the previous module, you'll use SageMaker's high level [Estimator API](https://sagemaker.readthedocs.io/en/latest/estimators.html) to submit a training job, with only difference being, here you use the training image for SageMaker DeepAR that you obtained for the AWS region that you are running thhis workshop in. \n", "\n", "Following are the necessary inputs while submitting a training job:\n", "- Uniquely identifiable job name\n", "- SageMaker algorithm image path where the DeepAR training code is available\n", "- URLs of the Amazon S3 bucket where you have the training and test data stored\n", "- URL of the S3 bucket where you want to store the output of the job (upon training completion SageMaker archives the model artifacts and makes those available as a tar-file named `model.tar.gz` at the specified location on S3)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "# Instantiate estimator with container image for DeepAR and backend EC2 instance(s)\n", "estimator = sagemaker.estimator.Estimator(\n", " sagemaker_session=session,\n", " image_name=image_name,\n", " role=role,\n", " train_instance_count=1,\n", " train_instance_type='ml.c5.18xlarge',\n", " base_job_name='dbg-deepar-{}'.format(interval),\n", " output_path=s3_output_path\n", ")\n", "\n", "# Set the hyperparamters\n", "estimator.set_hyperparameters(**hyperparameters)\n", "\n", "# Specify data channels\n", "data_channels = {\n", " \"train\": train_channel,\n", " \"test\": test_channel\n", "}\n", "\n", "# Train the model\n", "estimator.fit(inputs=data_channels, wait=True)\n", "\n", "#Wait for training to finish\n", "estimator_job = estimator.latest_training_job.job_name\n", "print(\"Estimator created at completion of training job {}\".format(estimator_job))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "While the training job is running, you can observe the output and notice that DeepAR outputs the average epoch loss obtained after each epoch and reports on the best epoch loss obtained so far.\n", "\n", "At the end of the training, the algorithm chooses the best loss over all epochs, obtains a test score using the data available throrugh test channel, and reports the RMSE and quantile loss at various conofidence levels raning from `0.1` through `0.9`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Model Deployment\n", "\n", "As before, you'll use the `deploy` method on your trained `Estimator` to create a hosted model definition,confgure the endpoint, and deploy the model to the endpoint - all in one go.\n", "\n", "Another nice feature of Estimator API is that you can extend the [RealTimePredictor](https://sagemaker.readthedocs.io/en/latest/predictors.html) class and create a custom class, with methods to do prep-processing on data. This feature is useful in this case, as DeepAR prediction endpoint expects JSON serialized strings, and in Python we use DataFrames to load the data and work with it.\n", "\n", "Therefore, you use a custom predictor class - `DeepARPredictor`, as defined in [deepar_util.py](./deepar_util.py) that allows you to query the endpoint and perform predictionsusing `pandas.Series` objects rather than raw JSON strings." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "predictor = estimator.deploy(\n", " initial_instance_count=1,\n", " instance_type='ml.m4.xlarge',\n", " predictor_cls=DeepARPredictor)\n", "print(\"Pedictor attached to Endpoint: {}\".format(predictor.endpoint))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Forecasting and Plotting\n", "\n", "Upon completion of deployment, your predictor will have an handle to the endpoint you just provisioned. \n", "\n", "If using this notebook at any later time, or with a previously deployed endpoint, you can simply obtain a handle to the the endpoint, by specifying the right estimator job name, and instantiating a `DeepARPredictor` with the job name." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Use a job name from previous training, \n", "#if you need to connect to a pre-deployed endpoint.\n", "\n", "#When running in one session, predictor is already initialized when 'deploy' method returns. \n", "\n", "#estimator_job = 'dbg-deepar-D-2018-11-20-03-48-43-169'\n", "\n", "predictor = DeepARPredictor(estimator_job)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To make forecast for a particular stock, you simply use another utility method to extract time series and dynamic features for the chosen stock that you want to forecast, between the dates chosen.\n", "\n", "Also, notice that while requesting prediction forecast to a DeepAR endpoint, you can specify the quantiles, as needed, and the endpoint return results at various confidence levels, as specified in the request." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ts, dynamic_feat, observed = util.query_for_stock('BMW', target_column, covariate_columns, stock_data_series, prediction_length)\n", "prediction = predictor.predict(ts=ts, dynamic_feat = dynamic_feat, quantiles=[0.10, 0.5, 0.90], return_samples=False)\n", "prediction.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "util.plot_predicted_observed_at_quantile(ts, observed, prediction, prediction.columns[-1])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Since with DeepAR, you are not limited to geenrating prediction with just one main series, feel free to use the widget to choose different stocks, and generate forecasts at various confidence levels, with varying forecast horizons.\n", "\n", "On each execution, you also use an utility method to to plot the observed values, overlayed with the forecasted spread." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "style = {'description_width': 'initial'}\n", "@interact_manual(\n", " stock_id=IntSlider(min=0, max=len(mnemonics)-1, value=0, style=style), \n", " forecast_horizon=IntSlider(min=0, max=int(context_length*0.75), steps = int(np.log10(context_length))+1, value=int(np.log10(context_length))+1, style=style),\n", " confidence=IntSlider(min=55, max=95, value=80, step=5, style=style),\n", " history_plot=IntSlider(min=1, max=20, value=1, style=style),\n", " show_samples=Checkbox(value=False),\n", " continuous_update=False\n", ")\n", "def plot_interact(stock_id, forecast_horizon, confidence, history_plot, show_samples):\n", " util.plot(\n", " predictor,\n", " stock_id,\n", " mnemonics,\n", " stock_data_series,\n", " target_column,\n", " covariate_columns,\n", " prediction_length, \n", " history_plot*prediction_length,\n", " forecast_date=end_training \n", " + datetime.timedelta(days=forecast_horizon) if interval == \"D\"\n", " else datetime.timedelta(hours=forecast_horizon) if interval == \"H\"\n", " else datetime.timedelta(weeks=forecast_horizon) if interval == \"W\"\n", " else datetime.timedelta(days=forecast_horizon*30) if interval == \"M\"\n", " else 0,\n", " show_samples=show_samples, \n", " confidence=confidence\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Some Final Thoughts\n", "\n", "1. We made improvements from our custom RNN code, by using stocks identified to belong to a cluster - `BMW`, `Chrysler`, `Volkswagon`, `Contiental` and `Porsche`, together as time series' in DeepAR model. Having a ARIMAx model trained across the cross sections of these individiual series' helped us improve upon the prediction, and allowe dus to generate forecast for any stocks within the collection.\n", "\n", "1. We used closing price as target series to predict, and to demonstrate the dynamic feature, used opening, minimum and maximum price as covariates. However this is not possible in reality. The values of series' used as dynamic features need to be know across the forecast horizon. Therfore the accuracy that we experience above is somewhat artificial, in that we are forecasting closing prices, knowing the opening, minimum and maximum prices, which is impossible.\n", "\n", "1. To use dynamic features, in reality, you need to find data sources that can be known before, These could be:\n", " - Companies' forward looking sales/revenue targets\n", " - Analysts guidances on stock prices\n", " - Economic forecasts from other sources\n", " - Price trends of future options, base don the underlying stock\n", "\n", "Be creative, find data sources that can effectively be used, and feel free to share your findings by emailing the author of the workshop at [binoyd@amazon.com](mailto:binoyd@amazon.com)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Delete EndPoint\n", "\n", "Keep in mind that a running EndPoint has the chosen compute instances running, regardless of whether or not requests are being sent.\n", "\n", "Therefore, in order to contain cost overrun, you should always delete the unused EndPoints. This doesn't delete the trained model artefacts, and at any point, the EndPoint can be reprovisioned, simply by using the correspodning training job name.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor.delete_endpoint()" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }