{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 1) Load external data into S3 bucket and run DMS task" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Load Daily Data into S3 Bucket\n", "\n", "### Obtaining Data\n", "\n", "\n", "We use the dataset generated by [Chi Zhang](https://github.com/vermouth1992/drl-portfolio-management/tree/master/src/utils/datasets). It contains the historic price of 16 target stocks from NASDAQ100, including open, close, high and low prices from 2012-08-13 to 2017-08-11. Specifically, those stocks are: “AAPL”, “ATVI”, “CMCSA”, “COST”, “CSX”, “DISH”, “EA”, “EBAY”, “FB”, “GOOGL”, “HAS”, “ILMN”, “INTC”, “MAR”, “REGN” and “SBUX”.\n", "\n", "**This dataset is licensed under a MIT License**\n", "\n", "Copyright (c) 2017 Chi Zhang\n", "\n", "Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the \"Software\"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:\n", "\n", "The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.\n", "\n", "THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\n", "\n", "### Output dataset \n", "\n", "- Contains **5 years** of EOD data for one of the stocks\n", "- The data is saved into the specified S3 bucket as CSV.\n", "\n", "```\n", "hist_data_daily/{sym}.csv (columns: dt,sym,open,high,low,close,vol)\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# specify S3 bucket\n", "s3bucket=\"<< SPECIFY YOUR S3 BUCKET HERE >>\"\n", "s3bucket" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load Dataset to S3 Bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!rm stocks_history_target.h5\n", "!wget https://github.com/aws-samples/algorithmic-trading/raw/master/1_Data/stocks_history_target.h5 " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import h5py\n", "import datetime\n", "import pandas as pd\n", "import sys\n", "\n", "START_DATE = '2016-07-01' # Last 5 years of data\n", "END_DATE = '2021-07-01'\n", "DATE_FORMAT = '%Y-%m-%d'\n", "START_DATETIME = datetime.datetime.strptime(START_DATE, DATE_FORMAT)\n", "\n", "def read_stock_history(filepath):\n", " \"\"\" Read data from extracted h5\n", " Args:\n", " filepath: path of file\n", " Returns:\n", " history:\n", " abbreviation:\n", " \"\"\"\n", " with h5py.File(filepath, 'r') as f:\n", " history = f['history'][:]\n", " abbreviation = f['abbreviation'][:].tolist()\n", " abbreviation = [abbr.decode('utf-8') for abbr in abbreviation]\n", " return history, abbreviation\n", "\n", "def index_to_date(index):\n", " return (START_DATETIME + datetime.timedelta(index)).strftime(DATE_FORMAT)\n", "\n", "def save_stock_data(stk,history,abbreviation):\n", " p=abbreviation.index(stk)\n", " h=history[p]\n", " tData=[]\n", " hData=['dt','sym','open','high','low','close','vol']\n", " for x in range(0,h.shape[0]):\n", " row=[]\n", " row.append(index_to_date(x))\n", " row.append(stk)\n", " v=h[x]\n", " for y in range(0,len(v)):\n", " row.append(v[y])\n", " tData.append(row) \n", " df=pd.DataFrame(tData,columns=hData)\n", " df.set_index(pd.DatetimeIndex(df['dt']), inplace=True)\n", " del df['dt']\n", " df = df.truncate(before=(pd.Timestamp(END_DATE)-pd.DateOffset(years=1)),after=pd.Timestamp(END_DATE)) # We Train the model based on 1 year of data\n", " df.to_csv(sym+\".csv\")\n", " print(\"store:\"+stk)\n", " return df\n", "\n", "sym='INTC' # stock symbol \n", "history,abbreviation=read_stock_history('stocks_history_target.h5')\n", "save_stock_data(sym,history,abbreviation)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "filepath = sym+\".csv\"\n", "df = pd.read_csv(filepath,infer_datetime_format=True, parse_dates=['dt'], index_col=['dt'])\n", "df.tail()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 cp {sym}.csv s3://{s3bucket}/marketData/intc/\n", "!rm {sym}.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Before moving on to Step 2, please run DMS task via AWS console." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 2) Data Preparation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%run init_model.py 'algo-kinesis-ema-hpo'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import sagemaker as sage\n", "from sagemaker import get_execution_role\n", "import datetime\n", "from sagemaker.tensorflow import TensorFlow\n", "import json\n", "\n", "role = get_execution_role()\n", "sess = sage.Session()\n", "region = sess.boto_session.region_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kinesis_streams=!(aws kinesis list-streams --output text | grep 'kinesis-algo-blog' | awk '{print $2}')\n", "kinesis_stream=kinesis_streams[0]\n", "kinesis_stream" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "import boto3\n", "import time\n", "from dateutil import parser\n", "\n", "kinesis_client = client = boto3.client('kinesis')\n", "streamName=kinesis_stream\n", "response=kinesis_client.describe_stream(StreamName=streamName)\n", "shard_id = response['StreamDescription']['Shards'][0]['ShardId']\n", "shard_it = kinesis_client.get_shard_iterator(StreamName=streamName, ShardId=shard_id, ShardIteratorType=\"TRIM_HORIZON\")[\"ShardIterator\"]\n", "\n", "l=[]\n", "c=0\n", "while c<1:\n", " out = kinesis_client.get_records(ShardIterator=shard_it, Limit=1000)\n", " shard_it = out[\"NextShardIterator\"]\n", " for o in out[\"Records\"]:\n", " jdat = json.loads(o[\"Data\"])\n", " if 'data' in jdat:\n", " data=jdat['data']\n", " if data['sym']==sym:\n", " data['dt']=parser.parse(data['dt']).date() \n", " l.append(data)\n", " if out['MillisBehindLatest']==0:\n", " c=c+1\n", " \n", "df=pd.DataFrame(l, columns =['dt','sym','open','high','low','close','vol'])\n", "df=df.set_index('dt')\n", "del df['sym']\n", "df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "trainCount=int(len(df)*0.7)\n", "dfTrain = df.iloc[:trainCount]\n", "\n", "dfTest = df.iloc[trainCount:]\n", "dfTest.to_csv('local/'+algo_name+'/input/data/training/data.csv')\n", "dfTest.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "%matplotlib notebook\n", "dfTest[\"close\"].plot()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 3) Strategy Configuration\n", "\n", "* `fast_period` = Fast Period for Moving Average Indicator in min (e.g. 50)\n", "* `slow_period` = Slow Period for Moving Average Indicator in min (e.g. 200)\n", "* `size` = The number of shares for a transaction\n", "\n", "Set some default parameters" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile local/{algo_name}/input/config/hyperparameters.json\n", "{ \n", " \"fast_period\" : \"8\",\n", " \"slow_period\" : \"21\",\n", " \"size\" : \"100\"\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%run update_config_kinesis.py $algo_name $kinesis_stream $sym" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 4) Strategy Definition\n", "\n", "In the following cell, you can modify the strategy code.\n", "\n", "Here are some helpful links:\n", "* Backtrader Documentation: https://www.backtrader.com/docu/strategy/\n", "* TA-Lib Indicator Reference: https://www.backtrader.com/docu/talibindautoref/\n", "* Backtrader Indicator Reference: https://www.backtrader.com/docu/indautoref/\n", "\n", "Load data directly from Kinesis in KinesisFeed class." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile model/{algo_name}.py\n", "import backtrader as bt\n", "import backtrader.feeds as btfeeds\n", "import backtrader.analyzers as btanalyzers\n", "from backtrader.feed import DataBase\n", "from backtrader import date2num\n", "from backtrader import TimeFrame\n", "from algo_base import *\n", "import time\n", "from dateutil import parser\n", "import boto3\n", "\n", "class KinesisFeed(DataBase):\n", " def __init__(self,region,streamName,sym,test_data):\n", " global kinesis\n", " super(KinesisFeed, self).__init__()\n", " \n", " kinesis_client = client = boto3.client('kinesis', region_name=region)\n", " response=kinesis_client.describe_stream(StreamName=streamName)\n", " shard_id = response['StreamDescription']['Shards'][0]['ShardId']\n", " shard_it = kinesis_client.get_shard_iterator(StreamName=streamName, ShardId=shard_id, ShardIteratorType=\"TRIM_HORIZON\")[\"ShardIterator\"]\n", "\n", " c=0\n", " l=[]\n", " while c<1:\n", " out = kinesis_client.get_records(ShardIterator=shard_it, Limit=1000)\n", " shard_it = out[\"NextShardIterator\"]\n", " for o in out[\"Records\"]:\n", " jdat = json.loads(o[\"Data\"])\n", " if 'data' in jdat:\n", " data=jdat['data']\n", " if data['sym']==sym:\n", " data['dt']=parser.parse(data['dt']).date() \n", " l.append(data)\n", " if out['MillisBehindLatest']==0:\n", " c=c+1\n", " self.list=l\n", " self.n=0\n", " self.listLen=len(self.list)\n", " trainCount=int(self.listLen*0.7)\n", " if test_data is not None:\n", " self.list = self.list[:trainCount]\n", " else:\n", " self.list = self.list[trainCount:]\n", " \n", " self.fromdate=self.list[0]['dt']\n", " self.todate=self.list[len(self.list)-1]['dt']\n", " self.timeframe=bt.TimeFrame.Days\n", " print(\"from=%s,to=%s\" % (self.fromdate,self.todate))\n", " \n", " def start(self):\n", " # Nothing to do for this data feed type\n", " pass\n", "\n", " def stop(self):\n", " # Nothing to do for this data feed type\n", " pass\n", " \n", " def _load(self):\n", " if self.n>=len(self.list):\n", " return False\n", " \n", " r=self.list[self.n]\n", " self.lines.datetime[0] = date2num(r['dt'])\n", " \n", " self.lines.open[0] = r['open']\n", " self.lines.high[0] = r['high']\n", " self.lines.low[0] = r['low']\n", " self.lines.close[0] = r['close']\n", " self.lines.volume[0] = r['vol']\n", " \n", " self.n=self.n+1\n", " return True\n", "\n", "class MyStrategy(StrategyTemplate):\n", "\n", " def __init__(self): # Initiation\n", " super(MyStrategy, self).__init__()\n", " self.config[\"fast_period\"]=int(self.config[\"fast_period\"])\n", " self.config[\"slow_period\"]=int(self.config[\"slow_period\"])\n", " self.config[\"size\"]=int(self.config[\"size\"])\n", "\n", " self.emaFast = bt.ind.ExponentialMovingAverage(period=self.config[\"fast_period\"])\n", " self.emaSlow = bt.ind.ExponentialMovingAverage(period=self.config[\"slow_period\"])\n", " self.size = self.config[\"size\"]\n", "\n", " def init_broker(broker):\n", " broker.setcash(100000.0)\n", " broker.setcommission(commission=0.0) \n", " \n", " def add_data(cerebro):\n", " test_data=('test_data' in MyStrategy.config)\n", " data = KinesisFeed(MyStrategy.config['region'],MyStrategy.config['kinesis_stream'],MyStrategy.config['sym'],test_data)\n", " cerebro.adddata(data)\n", " \n", " def add_data_csv(cerebro):\n", " data = btfeeds.GenericCSVData(\n", " dataname=MyStrategy.TRAIN_FILE,\n", " dtformat=('%Y-%m-%d'),\n", " timeframe=bt.TimeFrame.Days,\n", " datetime=0,\n", " time=-1,\n", " high=2,\n", " low=3,\n", " open=1,\n", " close=4,\n", " volume=5,\n", " openinterest=-1\n", " )\n", " cerebro.adddata(data)\n", "\n", " def next(self): # Processing\n", " super(MyStrategy, self).next()\n", " dt=self.datas[0].datetime.datetime(0)\n", " if not self.position:\n", " if self.emaFast[0] > self.emaSlow[0]:\n", " self.buy(size=self.size) # Go long\n", " else:\n", " self.sell(size=self.size) # Go short\n", " elif self.position.size>0 and self.emaFast[0] < self.emaSlow[0]:\n", " self.sell(size=2*self.size) # Go short\n", " elif self.position.size<0 and self.emaFast[0] > self.emaSlow[0]: \n", " self.buy(size=2*self.size) # Go long" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 5) Strategy Docker Image Creation\n", "\n", "**Please note that the initial docker build may take a few minutes. Subsequent runs are fast.**" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "#Build Local Algo Image\n", "!docker build -t $algo_name .\n", "#!docker run -v $(pwd)/local/$algo_name:/opt/ml --rm $algo_name train" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!chmod 777 *.sh\n", "!./build_and_push.sh $algo_name" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 6) Hyperparameter Optimization with SageMaker on Training Data\n", "\n", "Find the optimal strategy configuration based on PNL." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker as sage\n", "from sagemaker import get_execution_role\n", "from sagemaker.estimator import Estimator \n", "\n", "role = get_execution_role()\n", "sess = sage.Session()\n", "\n", "WORK_DIRECTORY = 'local/'+algo_name+'/input/data/training'\n", "data_location = sess.upload_data(WORK_DIRECTORY, key_prefix='data')\n", "print(data_location)\n", "\n", "conf_file='local/'+algo_name+'/input/config/hyperparameters.json'\n", "with open(conf_file, 'r') as f:\n", " config = json.load(f)\n", "print(config)\n", "\n", "prefix=algo_name\n", "job_name=prefix.replace('_','-')\n", "\n", "account = sess.boto_session.client('sts').get_caller_identity()['Account']\n", "region = sess.boto_session.region_name\n", "image = f'{account}.dkr.ecr.{region}.amazonaws.com/{prefix}:latest'\n", "\n", "algo = sage.estimator.Estimator(\n", " image_uri=image,\n", " role=role,\n", " instance_count=1,\n", " instance_type='ml.m4.xlarge',\n", " output_path=\"s3://{}/output\".format(sess.default_bucket()),\n", " sagemaker_session=sess,\n", " base_job_name=job_name,\n", " hyperparameters=config,\n", " metric_definitions=[\n", " {\n", " \"Name\": \"algo:pnl\",\n", " \"Regex\": \"Total PnL:(.*?)]\"\n", " },\n", " {\n", " \"Name\": \"algo:sharpe_ratio\",\n", " \"Regex\": \"Sharpe Ratio:(.*?),\"\n", " }\n", " ])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.tuner import (\n", " IntegerParameter,\n", " CategoricalParameter,\n", " ContinuousParameter,\n", " HyperparameterTuner,\n", ")\n", "\n", "hyperparameter_ranges = {\n", " \"fast_period\": IntegerParameter(5, 10),\n", " \"slow_period\": IntegerParameter(21, 31)\n", "}\n", "objective_metric_name= \"algo:pnl\"\n", "tuner = HyperparameterTuner(algo,\n", " objective_metric_name,\n", " hyperparameter_ranges,\n", " max_jobs=6,\n", " max_parallel_jobs=3,\n", " metric_definitions=[\n", " {\n", " \"Name\": \"algo:pnl\",\n", " \"Regex\": \"Total PnL:(.*?)]\"\n", " }\n", " ]\n", " )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tuner.fit(data_location)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "best_params=boto3.client('sagemaker').describe_hyper_parameter_tuning_job(\n", "HyperParameterTuningJobName=tuner.latest_tuning_job.job_name)['BestTrainingJob']['TunedHyperParameters']\n", "best_params" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.analytics import TrainingJobAnalytics\n", "bestjob=tuner.best_training_job()\n", "metrics_dataframe = TrainingJobAnalytics(training_job_name=bestjob).dataframe()\n", "metrics_dataframe" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 7) Run strategy with SageMaker with optimal hyperparameters on Test Data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Run Remote test via SageMaker\n", "import sagemaker as sage\n", "from sagemaker import get_execution_role\n", "from sagemaker.estimator import Estimator \n", "\n", "role = get_execution_role()\n", "sess = sage.Session()\n", "\n", "# Use optimal hyperparameter and test data\n", "conf_file='local/'+algo_name+'/input/config/hyperparameters.json'\n", "with open(conf_file, 'r') as f:\n", " config = json.load(f)\n", "config['fast_period']=best_params['fast_period']\n", "config['slow_period']=best_params['slow_period']\n", "config['test_data']='true'\n", "print(config)\n", "\n", "prefix=algo_name\n", "job_name=prefix.replace('_','-')\n", "\n", "account = sess.boto_session.client('sts').get_caller_identity()['Account']\n", "region = sess.boto_session.region_name\n", "image = f'{account}.dkr.ecr.{region}.amazonaws.com/{prefix}:latest'\n", "\n", "algo = sage.estimator.Estimator(\n", " image_uri=image,\n", " role=role,\n", " instance_count=1,\n", " instance_type='ml.m4.xlarge',\n", " output_path=\"s3://{}/output\".format(sess.default_bucket()),\n", " sagemaker_session=sess,\n", " base_job_name=job_name,\n", " hyperparameters=config,\n", " metric_definitions=[\n", " {\n", " \"Name\": \"algo:pnl\",\n", " \"Regex\": \"Total PnL:(.*?)]\"\n", " },\n", " {\n", " \"Name\": \"algo:sharpe_ratio\",\n", " \"Regex\": \"Sharpe Ratio:(.*?),\"\n", " }\n", " ])\n", "algo.fit(data_location)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.analytics import TrainingJobAnalytics\n", "\n", "latest_job_name = algo.latest_training_job.job_name\n", "metrics_dataframe = TrainingJobAnalytics(training_job_name=latest_job_name).dataframe()\n", "metrics_dataframe" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Get Algo Chart from S3\n", "model_name=algo.model_data.replace('s3://'+sess.default_bucket()+'/','')\n", "import boto3\n", "s3 = boto3.resource('s3')\n", "my_bucket = s3.Bucket(sess.default_bucket())\n", "my_bucket.download_file(model_name,'model.tar.gz')\n", "!tar -xzf model.tar.gz\n", "!rm model.tar.gz\n", "from IPython.display import Image\n", "Image(filename='chart.png') " ] } ], "metadata": { "kernelspec": { "display_name": "conda_tensorflow_p36", "language": "python", "name": "conda_tensorflow_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 2 }