{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Time Series Analysis\n", "\n", "Forecasting time series, that contains information on sequential events, can be carried out by fitting the data to a probability distribution. This enables us to formulate past and future trends as the results of a parametric equation.\n", "\n", "In the ideal scenario, if we know the probability function for quantities we aim to forecast, for a given initial value, we could find values of all future states. However, in reality, such as when predicting a stock value, we do not all factors influencing the stock, and therefore cannot formaulate a probability function that governs relationship between all these factors.\n", "\n", "Therefore, we use a numeric approximation by discretizing time into small intervals and computing the evolution of values based on available information, such as trends within the past evolution of states, and some other boundary conditions, such as news media articles, strike price of an option, etc.\n", "\n", "![Discretization of a dynamic system into finite states and formulation of future trends as a function of past trends](../images/financial-time-series.gif)\n", "\n", "Deep learning approaches used in predicting such trends in stock prices uses trends in multiple correlated variables, such as price trends of similar stocks, market sentiment etc., without any assumption on their probability distribution. Deep learning techniques leverage short and long term memory of micro/macro cycles observed in the past by training the model to learn what matters and what doesn't.\n", "\n", "In the original **Recurrent Neural Network (RNN)** design, a delay unit is used, which memorizes the state at a given time step and re-injects it into the deep layers at the next time step, recurrently over a chosen period of time (called lags). The algorithm learns to infer a sequence of future values (called horizon) based on a given lag by learning over multiple pairs of lag-horizon taken across the available timeline. This is referred to as sequence to sequence learning.\n", "\n", "![Architecture of recurrent neural networks and gated recurrent units (GRU)](../images/financial-time-series-rnn-gru.gif)\n", "\n", "The recurrence equation can make the network very deep because it replicates the entire hidden network at every step and leads to vanishing memory (i.e. the influence of the deepest layers vanishes due to repeated multiplications of small number derivatives needed to reach these layers during the process of gradient optimization). More advanced gated systems such as **Gated Recurrent Units (GRU)** help with the vanishing memory issue, by re-injecting prior time-step learning into more recent time-step learning to benefit from both long term and short term memory.\n", "\n", "In this workshop, you'll create a dynamic network based on Gated Recurrent Units for target and covariates. As target, you'll use closing price of a chosen stock, with some other metrices such as opening price, and maximum and minimum prices of the same stock, as well as few other stocks, whose movement show some similarity (based on the clustering analysis you did in the previous module).\n", "\n", "As described in the readme section of the workshop repository, you'll use code written in separate [training](../container/rnn/train) and [prediction](../container/rnn/predictor.py) files." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Stock Price Prediction of a single stock, using custom RNN based model\n", "\n", "The training and prediciton algorithm used for this exercise is pre-written for you, using TensorFlow with Keras binding, and is available under the container folder of the repository you cloned through notebook configuration. The code to be used for [training](../container/rnn/train) and [prediction](../container/rnn/predictor.py) is already available, and within the scope of this exercise, you'll simply package the available code within a container image, and use SageMaker's high level Estimator API for training the model and hosting the trained model behind an HTTP endpoint.\n", "\n", "The following diagram provides an overview of the container architecture, to be used in this scenario.\n", "![BYOA Container Architecture](../images/byoa-container-architecture.png)\n", "\n", "The container model used here, is derived from the [blog post](https://aws.amazon.com/blogs/machine-learning/train-and-host-scikit-learn-models-in-amazon-sagemaker-by-building-a-scikit-docker-container/) describing how to train and host Sciki-Learn models on SageMaker. With the only difference being yuou'll be using Keras with TensorFlow, instead of Scikit-Learn, the container architecture remains the same.\n", "\n", "This also serves to demonstrate that SageMaker capabilities can be extend to virtually any algorithm you use, using any popular Machine Leanrning libraries, with ease." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import boto3\n", "import pandas as pd\n", "import sagemaker\n", "from sagemaker.estimator import Estimator\n", "from sagemaker.predictor import RealTimePredictor\n", "from sagemaker.predictor import csv_serializer\n", "\n", "from ipywidgets import interact_manual, SelectionSlider\n", "from IPython.display import display\n", "\n", "import custom_rnn_util as util" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Hyperparameters\n", "\n", "The training code is written, so that it is customizable via several hyperparamaters. These are passed to the `Estimator` simply as a JSON variable. You can investigate the training code and see that the code is written to assume default values for all parameters, if not specified. \n", "\n", "Following is an overview of few, that you can change if you like, before executing training job:\n", "\n", "- interval : Use only values `D`(for day) or `H`(for hours). During data preparation we had raw data resampled at various interval levels and saved in S3 buckets. This parameter would tell you which to use. Using smaller interval, such as minute will cause the training to take much longer, whereas with larger interval, number of observed samples will be too low to extract any meaningful patterns.\n", "- lag, horizon : These specifies how far back the model reaches out while generating prediction, and how far forward it can forecast.\n", "- target_stock : You can use any stock symbol out of the ones that are available in traingin data file.\n", "- covariate_stocks : This allows you to provide a list of stocks, preferably use some stocks that you found clustered within the same group during the clustering analysis you did in previous module.\n", "- target_column : You can choose any metrics available within training data, by default, the algorithm works to use `EndPrice` as target variable\n", "- covariate_columns : Optionally you can specify some additional series that have correlation with the main time series, such as opening price, and maximum and minimum price during past intervals. \n", "\n", "**In a real world stock prediction example, you'd ideally want to use covariates based on some broader metrices, such as news analysis, options market related to underlying securities and so on.** Keep in mind though analyzing what supporting metrices to use to increase model accuracy is out of scope of this workshop. As such, the model you'll train here would have limited value in actually predicting the stock values in real world." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Define parameters\n", "interval = 'D' #Use D or H\n", "\n", "assert interval == 'D' or interval == 'H'\n", "\n", "if interval == 'D':\n", " lag = 10 # Use 10 for D, 80 for H\n", " horizon = 5 #Use 5 for D, 40 for H\n", " dateformat = '%Y-%m-%d'\n", "elif interval == 'H':\n", " lag = 80 # Use 10 for D, 80 for H\n", " horizon = 40 #Use 5 for D, 40 for H \n", " dateformat = '%Y-%m-%d %H:%M:%S'\n", " \n", "target_stock = \"BMW\"\n", "covariate_stocks = \"CON, DAI, PAH3, VOW3\"\n", "target_column = \"EndPrice\"\n", "covariate_columns = \"StartPrice, MinPrice, MaxPrice\"\n", "num_epochs = 1000\n", "percent_train = 85.0\n", "num_units = 256\n", "batch_size = 4096\n", "dropout_ratio = 0.1\n", "\n", "hyperparameters = {\n", " \"interval\": interval,\n", " \"lag\": str(lag),\n", " \"horizon\": str(horizon),\n", " \"num_epochs\": str(num_epochs),\n", " \"batch_size\": str(batch_size),\n", " \"percent_train\": str(percent_train),\n", " \"num_units\": str(num_units),\n", " \"target_stock\": target_stock,\n", " \"covariate_stocks\": covariate_stocks,\n", " \"target_column\": target_column,\n", " \"covariate_columns\": covariate_columns,\n", " \"dropout_ratio\": str(dropout_ratio)\n", " \n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Training Data\n", "\n", "During data preparation steps, you uploaded the resampled data to your S3 bucket, attached to your SageMaker session, under an appropriate prefix, depending on resampling interval. Here you refer to the data in the corresponding location, based on the interval you choose now, before submitting the training job." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "#Define training data location\n", "\n", "artifactname = 'dbg-custom-rnn'\n", "base_job_name = \"{}-{}-{}\".format(artifactname, interval, target_stock)\n", "\n", "role = sagemaker.get_execution_role()\n", "session = sagemaker.Session()\n", "s3_bucket = session.default_bucket()\n", "s3_data_key = 'dbg-stockdata/source'\n", "data_location = \"s3://{}/{}/{}/resampled_stockdata.csv\".format(s3_bucket, s3_data_key, interval)\n", "output_location = \"s3://{}/{}/{}/output\".format(s3_bucket, artifactname, interval)\n", "s3 = boto3.client('s3')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## ECR Repository\n", "\n", "With the code ready at hand, proceed to create a repository on Amazon ECR.\n", "\n", "We provide an utility [script](../container/build_and_push.sh) that:\n", "- Identifies the AWS region you are using\n", "- Creates a repository with if it doesn't exist already\n", "- Run Docker locally to create the container image\n", "- Retrieves ECR Login command using the credential you are using with SageMaker\n", "- Pushes the image, as latest, to the ECR repository" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "# Define model artifact name and image\n", "account = session.boto_session.client('sts').get_caller_identity()['Account']\n", "region = session.boto_session.region_name\n", "image = '{}.dkr.ecr.{}.amazonaws.com/{}:latest'.format(account, region, artifactname)\n", "os.chdir(\"../container\")\n", "!sh build_and_push.sh $artifactname" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once the repository is created, you can check your [ECR Console](https://console.aws.amazon.com/ecs/home#/repositories) to verify that the repository named `dbg-custom-rnn` have been created." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Model Training\n", "\n", "To train a model in [Amazon SageMaker](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-training.html), you create a training job. In this workshop, you'll do so using SageMaker's high level [Estimator API](https://sagemaker.readthedocs.io/en/latest/estimators.html).\n", "\n", "Estimator is a generic interface that allows you to train using any supplied algorithm, which in this case is the algorithm you just packaged and pulished to your Amazon ECR repository.\n", "\n", "Following are the necessary inputs while submitting a training job:\n", "- Uniquely identifiable job name\n", "- Amazon ECR registry path where the training code is stored\n", "- URL of the Amazon S3 bucket where you have the training data stored\n", "- URL of the S3 bucket where you want to store the output of the job (upon training completion SageMaker archives whatever files your code stores under the path `/opt/ml/model` within the container, and makes those available as a tar-file named `model.tar.gz` at the specified location on S3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# Instantiate estimator with container image of artifact and backend EC2 instance(s)\n", "rnn = Estimator(image,\n", " role, 1, 'ml.c5.18xlarge',\n", " output_path=output_location,\n", " base_job_name = base_job_name,\n", " sagemaker_session=session)\n", "\n", "rnn.set_hyperparameters(**hyperparameters)\n", "# Train the model\n", "rnn.fit(data_location)\n", "estimator_job = rnn.latest_training_job.job_name\n", "model_archive = \"{}/{}/output/{}/output/model.tar.gz\".format(artifactname,interval,estimator_job)\n", "print(\"Estimator created at completion of training job {}\".format(estimator_job))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can observe in the [training](../container/rnn/train) code, that loss obtained after each epoch is stored alongwith the trained model, in the output location.\n", "\n", "Following utility function, as implemented in [custom_rnn_util](./custom_rnn_util.py) Python file, extracts the loss history file and plots the loss. You should verify that loss is progressively lower, as trainign progresses, ths validating that gradient descent converges over the epochs and the loss is minimized." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "util.plot_loss(s3, s3_bucket, model_archive, \"loss_history.csv\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Model Deployment\n", "\n", "Deploying a model using Amazon SageMaker hosting services is a three-step process:\n", "- Creating a model : tell SageMaker where it can find model components\n", "- Creating configuration for an HTTPS endpoint: specify name of one or more model variants and the number and size of compute instances to use in serving prediction requests\n", "- Creating an HTTPS endpoint: launch required ML compute instances and deploy the model onto those instances\n", "\n", "While you can execute all of these steps from within your AWS Console, using the high level `Estimator` API, provides a quick way to execute all pf these steps at once." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# Create an endpoint on a web server\n", "predictor = rnn.deploy(1, 'ml.m4.xlarge', serializer=csv_serializer)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "While the model is being deployed, you can verify that Model and Endpoint configuration is created, and that the Endpoint is being provisioned, from [SageMaker console](https://console.aws.amazon.com/sagemaker/home#/endpoints).\n", "\n", "Once the deployment is complete, the status of the endpoint would change to `In Service`, at which point you are ready to use the endpoint to generate predictions." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Forecasting and Plotting\n", "\n", "Upon completion of deployment, your predictor will have an handle to the endpoint you just provisioned. \n", "\n", "If using this notebook at any later time, or with a previously deployed endpoint, you can simply obtain a handle to the the endpoint, by specifying the right estimator job name, and instantiating a `RealTimePredictor` with the job name and content type." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Use a job name from previous training, \n", "#if you need to connect to a pre-deployed endpoint.\n", "\n", "#When running in one session, predictor is already initialized when 'deploy' method returns. \n", "\n", "#estimator_job = 'dbg-custom-rnn-D-BMW-2018-11-19-23-11-02-036' \n", "\n", "predictor = RealTimePredictor(estimator_job, content_type = \"text/csv\")\n", "model_archive = \"{}/{}/output/{}/output/model.tar.gz\".format(artifactname,interval,estimator_job)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Within the training code, we split the test data into few samples of size spanning the combined lag and horizon interval, and stored the CSV files for the whole test set, as well as the split test set, in model output location. These files are therefore available within the model archive.\n", "\n", "First you use utility methods, as implemented in [custom_rnn_util](./custom_rnn_util.py) Python file to extract and load these files. The functions also saves uncompressed CSV files in the output location on S3, alongside the model archive." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "filepaths = util.extract_matching_csv_files_from_s3_tarfile(s3, s3_bucket, model_archive, \"test[0-9]+.csv\", model_archive[:model_archive.find(\"/model\")], 0)\n", "\n", "testfilename = \"testdata.csv\"\n", "testdata = util.load_csv_from_s3_tarfile(s3, s3_bucket, model_archive, testfilename, model_archive[:model_archive.find(\"/model\")], 0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you follow the prediction code at [predictor.py](../container/rnn/predictor.py), you'll notice that the `transform` method, that gets invoked when a request is sent, is written to handle either an S3 location containing the CSV file for the data to be used for forecasting, or serialized CSV data directly.\n", "\n", "While sending serialized CSV as request payload is convenient, for very large size models requiring data spanning long interval, might increase the latency and network traffic. As opposed to that, if you have your data already stored in S3, ability to just specify the data lcoation on S3 helps in containing network traffic and this faster turnaround.\n", "\n", "As a first test, you invoke prediction by simply passing the extracted S3 location of test sample files to the predictor. The utility function also plots the forecasted data, alongside the observed data.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "util.plot_sample_predictions(predictor, filepaths, target_stock, target_column, lag)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As a second test, to verify the ability to generate predictions by passing serialized CSV data, you use the following utility function, with a specified increment, that generates a series of predictions at the incremental level specified, and plots similar graphs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "util.plot_sample_test_performance(predictor, testdata, target_stock, covariate_stocks, target_column, lag, horizon, horizon)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As you can observe in the above plots, the predictor does a good job in predicting on certain dates, not so good on others.\n", "\n", "You can also use another utility function, as follows, to plot the forecasts slightly differently, superimposing all the predicitons upon an end to end plot of test data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "util.plot_overall_test_performance(predictor, testdata, target_stock, covariate_stocks, target_column, lag, horizon, horizon)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you chose to use data sampled at daily intervals, you should see that the forecasts closely followed the observed values in slightly more than 50% of cases. This indicates that our rudimentary model does a better job than at least a random guess, in predicting stock price movements." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dateoptions = util.get_date_range(testdata, target_stock, dateformat, lag, horizon, interval)\n", "style = {'description_width': 'initial'}\n", "@interact_manual(\n", " forecast_date = SelectionSlider(options=dateoptions,style=style) \n", ")\n", "def plot_interact(forecast_date):\n", " try:\n", " forecast_date_index = list(testdata.index).index(forecast_date.strftime(dateformat))\n", " print(\"Generating forecast for {} onwards\".format(forecast_date.strftime(dateformat))) \n", " util.predict_and_plot(predictor, testdata, forecast_date_index, target_stock, covariate_stocks, target_column, lag, horizon) \n", " except ValueError:\n", " print(\"Data for {} doesn't exist\".format(forecast_date.strftime(dateformat)))\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Some Final Thoughts\n", "\n", "1. In our custom RNN based code, we used a particular stock's price as main time series, and optionally used some others as exogenous time series. As you might have noticed, for example if you trained your model using `BMW` as main series, and `Chrysler`, `Volkswagon`, `Contiental` and `Porsche` as covariates, you cannot use the same model to generate predictions for any of those other stocks. In order to forecast using this approach, you'll have to train one model each for every stock you want to forecast.\n", "\n", "1. Our choice of using opening and maximum and minimum price may not be a good choice in actually forecasting stock price movement, simply because the correlation observed in thse metrices may just be an effect of some external factor, and metrices among themselves might not have any causality.\n", "\n", "1. We used clustering in an attempt to find stocks whose movement might affect each other. Following the same argument however, there might be no causality, instead the stocks move together due to some external factor influencing all such stocks in a similar fashion, because they are somehow related, either due to to being in same industry, or otherwise.\n", "\n", "\n", "Feel free to repeat these experiments using other data sources, in conjuntion with the DBG data set, such as options market data, news headlines and such. \n", "\n", "You can share your findings by emailing the author of the workshop at [binoyd@amazon.com](mailto:binoyd@amazon.com)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Delete EndPoint\n", "\n", "Keep in mind that a running EndPoint has the chosen compute instances running, regardless of whether or not requests are being sent.\n", "\n", "Therefore, in order to contain cost overrun, you should always delete the unused EndPoints. This doesn't delete the trained model artefacts, and at any point, the EndPoint can be reprovisioned, simply by using the correspodning training job name." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor.delete_endpoint()" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }