# Step 1) Load external data into S3 bucket and run DMS task

# Load Daily Data into S3 Bucket

### Obtaining Data


We use the dataset generated by [Chi Zhang](https://github.com/vermouth1992/drl-portfolio-management/tree/master/src/utils/datasets). It contains the historic price of 16 target stocks from NASDAQ100, including open, close, high and low prices from 2012-08-13 to 2017-08-11. Specifically, those stocks are: “AAPL”, “ATVI”, “CMCSA”, “COST”, “CSX”, “DISH”, “EA”, “EBAY”, “FB”, “GOOGL”, “HAS”, “ILMN”, “INTC”, “MAR”, “REGN” and “SBUX”.

**This dataset is licensed under a MIT License**

Copyright (c) 2017 Chi Zhang

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

### Output dataset 

- Contains **5 years** of EOD data for one of the stocks
- The data is saved into the specified S3 bucket as CSV.

```
hist_data_daily/{sym}.csv (columns: dt,sym,open,high,low,close,vol)
```

In [None]:
# specify S3 bucket
s3bucket="<< SPECIFY YOUR S3 BUCKET HERE >>"
s3bucket

### Load Dataset to S3 Bucket

In [None]:
!rm stocks_history_target.h5
!wget https://github.com/aws-samples/algorithmic-trading/raw/master/1_Data/stocks_history_target.h5 

In [None]:
import h5py
import datetime
import pandas as pd
import sys

START_DATE = '2016-07-01' # Last 5 years of data
END_DATE = '2021-07-01'
DATE_FORMAT = '%Y-%m-%d'
START_DATETIME = datetime.datetime.strptime(START_DATE, DATE_FORMAT)

def read_stock_history(filepath):
 """ Read data from extracted h5
 Args:
 filepath: path of file
 Returns:
 history:
 abbreviation:
 """
 with h5py.File(filepath, 'r') as f:
 history = f['history'][:]
 abbreviation = f['abbreviation'][:].tolist()
 abbreviation = [abbr.decode('utf-8') for abbr in abbreviation]
 return history, abbreviation

def index_to_date(index):
 return (START_DATETIME + datetime.timedelta(index)).strftime(DATE_FORMAT)

def save_stock_data(stk,history,abbreviation):
 p=abbreviation.index(stk)
 h=history[p]
 tData=[]
 hData=['dt','sym','open','high','low','close','vol']
 for x in range(0,h.shape[0]):
 row=[]
 row.append(index_to_date(x))
 row.append(stk)
 v=h[x]
 for y in range(0,len(v)):
 row.append(v[y])
 tData.append(row) 
 df=pd.DataFrame(tData,columns=hData)
 df.set_index(pd.DatetimeIndex(df['dt']), inplace=True)
 del df['dt']
 df = df.truncate(before=(pd.Timestamp(END_DATE)-pd.DateOffset(years=1)),after=pd.Timestamp(END_DATE)) # We Train the model based on 1 year of data
 df.to_csv(sym+".csv")
 print("store:"+stk)
 return df

sym='INTC' # stock symbol 
history,abbreviation=read_stock_history('stocks_history_target.h5')
save_stock_data(sym,history,abbreviation)

In [None]:
import pandas as pd
filepath = sym+".csv"
df = pd.read_csv(filepath,infer_datetime_format=True, parse_dates=['dt'], index_col=['dt'])
df.tail()

In [None]:
!aws s3 cp {sym}.csv s3://{s3bucket}/marketData/intc/
!rm {sym}.csv

### Before moving on to Step 2, please run DMS task via AWS console.

# Step 2) Data Preparation

In [None]:
%run init_model.py 'algo-kinesis-ema-hpo'

In [None]:
import os
import sagemaker as sage
from sagemaker import get_execution_role
import datetime
from sagemaker.tensorflow import TensorFlow
import json

role = get_execution_role()
sess = sage.Session()
region = sess.boto_session.region_name

In [None]:
kinesis_streams=!(aws kinesis list-streams --output text | grep 'kinesis-algo-blog' | awk '{print $2}')
kinesis_stream=kinesis_streams[0]
kinesis_stream

In [None]:
import boto3
import time
from dateutil import parser

kinesis_client = client = boto3.client('kinesis')
streamName=kinesis_stream
response=kinesis_client.describe_stream(StreamName=streamName)
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
shard_it = kinesis_client.get_shard_iterator(StreamName=streamName, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON")["ShardIterator"]

l=[]
c=0
while c<1:
 out = kinesis_client.get_records(ShardIterator=shard_it, Limit=1000)
 shard_it = out["NextShardIterator"]
 for o in out["Records"]:
 jdat = json.loads(o["Data"])
 if 'data' in jdat:
 data=jdat['data']
 if data['sym']==sym:
 data['dt']=parser.parse(data['dt']).date() 
 l.append(data)
 if out['MillisBehindLatest']==0:
 c=c+1
 
df=pd.DataFrame(l, columns =['dt','sym','open','high','low','close','vol'])
df=df.set_index('dt')
del df['sym']
df.head()

In [None]:
trainCount=int(len(df)*0.7)
dfTrain = df.iloc[:trainCount]

dfTest = df.iloc[trainCount:]
dfTest.to_csv('local/'+algo_name+'/input/data/training/data.csv')
dfTest.head()

In [None]:
%matplotlib notebook
dfTest["close"].plot()

# Step 3) Strategy Configuration

* `fast_period` = Fast Period for Moving Average Indicator in min (e.g. 50)
* `slow_period` = Slow Period for Moving Average Indicator in min (e.g. 200)
* `size` = The number of shares for a transaction

Set some default parameters

In [None]:
%%writefile local/{algo_name}/input/config/hyperparameters.json
{ 
 "fast_period" : "8",
 "slow_period" : "21",
 "size" : "100"
}

In [None]:
%run update_config_kinesis.py $algo_name $kinesis_stream $sym

# Step 4) Strategy Definition

In the following cell, you can modify the strategy code.

Here are some helpful links:
* Backtrader Documentation: https://www.backtrader.com/docu/strategy/
* TA-Lib Indicator Reference: https://www.backtrader.com/docu/talibindautoref/
* Backtrader Indicator Reference: https://www.backtrader.com/docu/indautoref/

Load data directly from Kinesis in KinesisFeed class.

In [None]:
%%writefile model/{algo_name}.py
import backtrader as bt
import backtrader.feeds as btfeeds
import backtrader.analyzers as btanalyzers
from backtrader.feed import DataBase
from backtrader import date2num
from backtrader import TimeFrame
from algo_base import *
import time
from dateutil import parser
import boto3

class KinesisFeed(DataBase):
 def __init__(self,region,streamName,sym,test_data):
 global kinesis
 super(KinesisFeed, self).__init__()
 
 kinesis_client = client = boto3.client('kinesis', region_name=region)
 response=kinesis_client.describe_stream(StreamName=streamName)
 shard_id = response['StreamDescription']['Shards'][0]['ShardId']
 shard_it = kinesis_client.get_shard_iterator(StreamName=streamName, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON")["ShardIterator"]

 c=0
 l=[]
 while c<1:
 out = kinesis_client.get_records(ShardIterator=shard_it, Limit=1000)
 shard_it = out["NextShardIterator"]
 for o in out["Records"]:
 jdat = json.loads(o["Data"])
 if 'data' in jdat:
 data=jdat['data']
 if data['sym']==sym:
 data['dt']=parser.parse(data['dt']).date() 
 l.append(data)
 if out['MillisBehindLatest']==0:
 c=c+1
 self.list=l
 self.n=0
 self.listLen=len(self.list)
 trainCount=int(self.listLen*0.7)
 if test_data is not None:
 self.list = self.list[:trainCount]
 else:
 self.list = self.list[trainCount:]
 
 self.fromdate=self.list[0]['dt']
 self.todate=self.list[len(self.list)-1]['dt']
 self.timeframe=bt.TimeFrame.Days
 print("from=%s,to=%s" % (self.fromdate,self.todate))
 
 def start(self):
 # Nothing to do for this data feed type
 pass

 def stop(self):
 # Nothing to do for this data feed type
 pass
 
 def _load(self):
 if self.n>=len(self.list):
 return False
 
 r=self.list[self.n]
 self.lines.datetime[0] = date2num(r['dt'])
 
 self.lines.open[0] = r['open']
 self.lines.high[0] = r['high']
 self.lines.low[0] = r['low']
 self.lines.close[0] = r['close']
 self.lines.volume[0] = r['vol']
 
 self.n=self.n+1
 return True

class MyStrategy(StrategyTemplate):

 def __init__(self): # Initiation
 super(MyStrategy, self).__init__()
 self.config["fast_period"]=int(self.config["fast_period"])
 self.config["slow_period"]=int(self.config["slow_period"])
 self.config["size"]=int(self.config["size"])

 self.emaFast = bt.ind.ExponentialMovingAverage(period=self.config["fast_period"])
 self.emaSlow = bt.ind.ExponentialMovingAverage(period=self.config["slow_period"])
 self.size = self.config["size"]

 def init_broker(broker):
 broker.setcash(100000.0)
 broker.setcommission(commission=0.0) 
 
 def add_data(cerebro):
 test_data=('test_data' in MyStrategy.config)
 data = KinesisFeed(MyStrategy.config['region'],MyStrategy.config['kinesis_stream'],MyStrategy.config['sym'],test_data)
 cerebro.adddata(data)
 
 def add_data_csv(cerebro):
 data = btfeeds.GenericCSVData(
 dataname=MyStrategy.TRAIN_FILE,
 dtformat=('%Y-%m-%d'),
 timeframe=bt.TimeFrame.Days,
 datetime=0,
 time=-1,
 high=2,
 low=3,
 open=1,
 close=4,
 volume=5,
 openinterest=-1
 )
 cerebro.adddata(data)

 def next(self): # Processing
 super(MyStrategy, self).next()
 dt=self.datas[0].datetime.datetime(0)
 if not self.position:
 if self.emaFast[0] > self.emaSlow[0]:
 self.buy(size=self.size) # Go long
 else:
 self.sell(size=self.size) # Go short
 elif self.position.size>0 and self.emaFast[0] < self.emaSlow[0]:
 self.sell(size=2*self.size) # Go short
 elif self.position.size<0 and self.emaFast[0] > self.emaSlow[0]: 
 self.buy(size=2*self.size) # Go long

# Step 5) Strategy Docker Image Creation

**Please note that the initial docker build may take a few minutes. Subsequent runs are fast.**

In [None]:
#Build Local Algo Image
!docker build -t $algo_name .
#!docker run -v $(pwd)/local/$algo_name:/opt/ml --rm $algo_name train

In [None]:
!chmod 777 *.sh
!./build_and_push.sh $algo_name

# Step 6) Hyperparameter Optimization with SageMaker on Training Data

Find the optimal strategy configuration based on PNL.

In [None]:
import sagemaker as sage
from sagemaker import get_execution_role
from sagemaker.estimator import Estimator 

role = get_execution_role()
sess = sage.Session()

WORK_DIRECTORY = 'local/'+algo_name+'/input/data/training'
data_location = sess.upload_data(WORK_DIRECTORY, key_prefix='data')
print(data_location)

conf_file='local/'+algo_name+'/input/config/hyperparameters.json'
with open(conf_file, 'r') as f:
 config = json.load(f)
print(config)

prefix=algo_name
job_name=prefix.replace('_','-')

account = sess.boto_session.client('sts').get_caller_identity()['Account']
region = sess.boto_session.region_name
image = f'{account}.dkr.ecr.{region}.amazonaws.com/{prefix}:latest'

algo = sage.estimator.Estimator(
 image_uri=image,
 role=role,
 instance_count=1,
 instance_type='ml.m4.xlarge',
 output_path="s3://{}/output".format(sess.default_bucket()),
 sagemaker_session=sess,
 base_job_name=job_name,
 hyperparameters=config,
 metric_definitions=[
 {
 "Name": "algo:pnl",
 "Regex": "Total PnL:(.*?)]"
 },
 {
 "Name": "algo:sharpe_ratio",
 "Regex": "Sharpe Ratio:(.*?),"
 }
 ])

In [None]:
from sagemaker.tuner import (
 IntegerParameter,
 CategoricalParameter,
 ContinuousParameter,
 HyperparameterTuner,
)

hyperparameter_ranges = {
 "fast_period": IntegerParameter(5, 10),
 "slow_period": IntegerParameter(21, 31)
}
objective_metric_name= "algo:pnl"
tuner = HyperparameterTuner(algo,
 objective_metric_name,
 hyperparameter_ranges,
 max_jobs=6,
 max_parallel_jobs=3,
 metric_definitions=[
 {
 "Name": "algo:pnl",
 "Regex": "Total PnL:(.*?)]"
 }
 ]
 )

In [None]:
tuner.fit(data_location)

In [None]:
best_params=boto3.client('sagemaker').describe_hyper_parameter_tuning_job(
HyperParameterTuningJobName=tuner.latest_tuning_job.job_name)['BestTrainingJob']['TunedHyperParameters']
best_params

In [None]:
from sagemaker.analytics import TrainingJobAnalytics
bestjob=tuner.best_training_job()
metrics_dataframe = TrainingJobAnalytics(training_job_name=bestjob).dataframe()
metrics_dataframe

# Step 7) Run strategy with SageMaker with optimal hyperparameters on Test Data

In [None]:
#Run Remote test via SageMaker
import sagemaker as sage
from sagemaker import get_execution_role
from sagemaker.estimator import Estimator 

role = get_execution_role()
sess = sage.Session()

# Use optimal hyperparameter and test data
conf_file='local/'+algo_name+'/input/config/hyperparameters.json'
with open(conf_file, 'r') as f:
 config = json.load(f)
config['fast_period']=best_params['fast_period']
config['slow_period']=best_params['slow_period']
config['test_data']='true'
print(config)

prefix=algo_name
job_name=prefix.replace('_','-')

account = sess.boto_session.client('sts').get_caller_identity()['Account']
region = sess.boto_session.region_name
image = f'{account}.dkr.ecr.{region}.amazonaws.com/{prefix}:latest'

algo = sage.estimator.Estimator(
 image_uri=image,
 role=role,
 instance_count=1,
 instance_type='ml.m4.xlarge',
 output_path="s3://{}/output".format(sess.default_bucket()),
 sagemaker_session=sess,
 base_job_name=job_name,
 hyperparameters=config,
 metric_definitions=[
 {
 "Name": "algo:pnl",
 "Regex": "Total PnL:(.*?)]"
 },
 {
 "Name": "algo:sharpe_ratio",
 "Regex": "Sharpe Ratio:(.*?),"
 }
 ])
algo.fit(data_location)

In [None]:
from sagemaker.analytics import TrainingJobAnalytics

latest_job_name = algo.latest_training_job.job_name
metrics_dataframe = TrainingJobAnalytics(training_job_name=latest_job_name).dataframe()
metrics_dataframe

In [None]:
#Get Algo Chart from S3
model_name=algo.model_data.replace('s3://'+sess.default_bucket()+'/','')
import boto3
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(sess.default_bucket())
my_bucket.download_file(model_name,'model.tar.gz')
!tar -xzf model.tar.gz
!rm model.tar.gz
from IPython.display import Image
Image(filename='chart.png') 