{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Train Forecast Model - Daily\n", "\n", "In this notebook we'll train a deep learning model that learns if a target price or stop loss would be hit for a long/short trade in the next days based on historical price data.\n", "\n", "Model:\n", "* Multilayer Perceptron (MLP) (Feedforward neural network)\n", "* 3 layers: input, hidden, output\n", "* Binary Classification\n", "* `Input`: Close, SMA(2 to 16), ROC(2 to 16)\n", "* `Output`: Does a long or short trade hit the profit target (2%) without hitting a stop loss (1.5%) in the next five days?" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%run ../2_Strategies/init_model.py 'model_long_short_predict'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile local/{model_name}/input/config/hyperparameters.json\n", "{ \n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 1) Get Data from Athena and S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# get S3 bucket\n", "s3bucket=!(aws s3 ls | grep algotrading- | awk '{print $3}')\n", "s3bucket=s3bucket[0]\n", "s3bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "!{sys.executable} -m pip install PyAthena" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import sagemaker as sage\n", "from sagemaker import get_execution_role\n", "import datetime\n", "from sagemaker.tensorflow import TensorFlow\n", "import json\n", "\n", "role = get_execution_role()\n", "sess = sage.Session()\n", "region = sess.boto_session.region_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "from pyathena import connect\n", "conn = connect(s3_staging_dir='s3://'+s3bucket+'/results/',\n", " region_name=region)\n", "\n", "df = pd.read_sql(\"SELECT * FROM algo_data.hist_data_daily;\", conn)\n", "df.set_index(pd.DatetimeIndex(df['dt']),inplace=True)\n", "del df['dt']\n", "df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.to_csv('local/'+model_name+'/input/data/training/data_orig.csv')\n", "print(\"count=%s\" % len(df))\n", "df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib notebook\n", "df[\"close\"].plot()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 2) Run Data Preparation Locally" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Modify Data Preparation Code\n", "\n", "In the following cell, you can modify the data preparation code or leave it as is." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile model/{model_name}_prep.py\n", "#!/usr/bin/env python\n", "\n", "import numpy as np\n", "import pandas as pd\n", "import talib as ta\n", "from talib.abstract import *\n", "import math\n", "\n", "prefix = '/opt/ml/'\n", "input_path = prefix + 'input/data/training'\n", "\n", "data_orig_file = input_path+'/data_orig.csv'\n", "data_file = input_path+'/data.csv'\n", "\n", "d = pd.read_csv(data_orig_file,infer_datetime_format=True, parse_dates=['dt'], index_col=['dt'])\n", "print(d.head())\n", "\n", "repeatCount=15\n", "repeatStep=1\n", "lookBack=repeatCount*repeatStep\n", "forwardWindow=5\n", "\n", "profitTarget=2.0/100.0\n", "stopTarget=1.5/100.0\n", "\n", "iCount=lookBack\n", "\n", "# header\n", "hData=[\"dt\"]\n", "hData.append(\"close\")\n", "for a in range(0,repeatCount):\n", " hData.append(\"sma\"+str((a+2)*repeatStep))\n", "for a in range(0,repeatCount):\n", " hData.append(\"roc\"+str((a+2)*repeatStep))\n", "hData.append(\"long\")\n", "hData.append(\"short\")\n", "\n", "# data\n", "tData=[]\n", "\n", "inputs = {\n", " 'close': np.array(d[\"close\"])\n", "}\n", "sma=[]\n", "for a in range(0,repeatCount):\n", " sma.append(SMA(inputs,timeperiod=(a+1)*repeatStep+1))\n", "roc=[]\n", "for a in range(0,repeatCount):\n", " roc.append(ROC(inputs,timeperiod=(a+1)*repeatStep+1))\n", "\n", "closeList=d[\"close\"]\n", "dLen=len(d)\n", "n=0\n", "lCount=0\n", "sCount=0\n", "nCount=0\n", "n=0\n", "for idx,row in d.iterrows():\n", " if n=cl+cl*profitTarget and low>=cl-cl*stopTarget:\n", " long=1\n", " lCount=lCount+1\n", " inputRec.append(long)\n", " \n", " #short\n", " short=0\n", " if low<=cl-cl*profitTarget and high<=cl+cl*stopTarget:\n", " short=1\n", " sCount=sCount+1\n", " inputRec.append(short)\n", "\n", " tData.append(inputRec)\n", " n=n+1\n", " \n", "print(\"lCount=%s,sCount=%s\" % (lCount,sCount))\n", "df1=pd.DataFrame(tData,columns=hData)\n", "df1.set_index(pd.DatetimeIndex(df1['dt']), inplace=True)\n", "del df1['dt']\n", " \n", "df1.to_csv(data_file)\n", "print(df1.head())\n", "print(\"count=%s\" % (len(df1)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run Data Preparation Locally in a Docker Container" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!cp model/{model_name}_prep.py model/train\n", "!chmod 777 model/train\n", "!docker build -t {model_name}_prep .\n", "!docker run -v $(pwd)/local/$model_name:/opt/ml --rm {model_name}_prep train" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Training and Test Data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "df = pd.read_csv(\"local/\"+model_name+\"/input/data/training/data.csv\", infer_datetime_format=True, parse_dates=['dt'], index_col=['dt'])\n", "print(\"totalCount=%s\" % len(df))\n", "\n", "trainCount=int(len(df)*0.4)\n", "dfTrain = df.iloc[:trainCount]\n", "dfTrain.to_csv(\"local/\"+model_name+\"/input/data/training/data_train.csv\")\n", "print(\"trainCount=%s\" % len(dfTrain))\n", "\n", "dfTest = df.iloc[trainCount:]\n", "dfTest.to_csv(\"local/\"+model_name+\"/input/data/training/data_test.csv\")\n", "print(\"testCount=%s\" % len(dfTest))\n", "dfTest.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Step 3) Train the Model\n", "\n", "In the following cell, you can modify the model training code or leave it as is." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile model/{model_name}.py\n", "#!/usr/bin/env python\n", "from __future__ import print_function\n", "\n", "import os\n", "import sys\n", "import traceback\n", "import math\n", "import numpy as np\n", "import pandas as pd\n", "import tensorflow as tf\n", "\n", "from keras.layers import Dropout, Dense\n", "from keras.wrappers.scikit_learn import KerasClassifier\n", "from keras.models import Sequential\n", "from keras.wrappers.scikit_learn import KerasRegressor\n", "\n", "yLen=2\n", "b=0\n", "\n", "# Optional\n", "os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'\n", "\n", "# These are the paths to where SageMaker mounts interesting things in your\n", "# container.\n", "prefix = '/opt/ml/'\n", "\n", "input_path = prefix + 'input/data/training/data_train.csv'\n", "test_path = prefix + 'input/data/training/data_test.csv'\n", "\n", "output_path = os.path.join(prefix, 'output')\n", "model_path = os.path.join(prefix, 'model')\n", "\n", "# Process and prepare the data\n", "def data_process(df):\n", " global yLen\n", " global b\n", " dataX=[]\n", " dataY=[]\n", " for idx,row in df.iterrows():\n", " row1=[]\n", " r=row[1:len(row)-yLen]\n", " for a in r:\n", " row1.append(a)\n", " x=np.array(row1)\n", " y=np.array(row[len(row)-yLen:])\n", " b=len(x)\n", " dataX.append(x)\n", " dataY.append(y)\n", " dataX=np.array(dataX).astype(np.float32)\n", " dataY=np.array(dataY).astype(np.float32)\n", " return dataX,dataY,b\n", "\n", "def build_classifier():\n", " global b\n", " global yLen\n", " print(\"build_classifier:b=%s,yLen=%s\" % (b,yLen))\n", " model = Sequential()\n", " model.add(Dense(b, input_dim=b, kernel_initializer='normal', activation='relu'))\n", " model.add(Dropout(0.2))\n", " model.add(Dense(int(b/2), kernel_initializer='normal', activation='relu'))\n", " model.add(Dropout(0.2))\n", " model.add(Dense(yLen,kernel_initializer='normal', activation='sigmoid'))\n", " model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])\n", " return model\n", "\n", "def generate_model(dataX, dataY, b):\n", " model=build_classifier()\n", " model.fit(dataX, dataY, epochs=100, batch_size=1)\n", " scores = model.evaluate(dataX, dataY, verbose=0)\n", " print(\"Training Data %s: %.2f%%\" % (model.metrics_names[1], scores[1]*100))\n", " return model\n", " \n", "def train():\n", " print('Starting the training.')\n", " try:\n", " raw_data = pd.read_csv(input_path)\n", " #print(raw_data)\n", " X, y, b = data_process(raw_data)\n", " model = generate_model(X, y, b)\n", " model.save(os.path.join(model_path, 'model.h5'))\n", " \n", " print('Training is complete. Model saved.')\n", " \n", " raw_data = pd.read_csv(test_path)\n", " testX, testY, b = data_process(raw_data)\n", " scores = model.evaluate(testX, testY, verbose=0)\n", " print(\"Test Data %s: %.2f%%\" % (model.metrics_names[1], scores[1]*100))\n", " \n", " except Exception as e:\n", " # Write out an error file. This will be returned as the failure\n", " # Reason in the DescribeTrainingJob result.\n", " trc = traceback.format_exc()\n", " with open(os.path.join(output_path, 'failure'), 'w') as s:\n", " s.write('Exception during training: ' + str(e) + '\\n' + trc)\n", " # Printing this causes the exception to be in the training job logs\n", " print(\n", " 'Exception during training: ' + str(e) + '\\n' + trc,\n", " file=sys.stderr)\n", " # A non-zero exit code causes the training job to be marked as Failed.\n", " sys.exit(255)\n", "\n", "if __name__ == '__main__':\n", " train()\n", "\n", " # A zero exit code causes the job to be marked a Succeeded.\n", " sys.exit(0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Option 1: Train Locally\n", "\n", "You can choose if you want to do the training locally (Option 1) or remote via SageMaker (Option 2)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Build Local ML Image\n", "!cp model/{model_name}.py model/train\n", "!chmod 777 model/train\n", "!docker build -t {model_name} .\n", "!docker run -v $(pwd)/local/$model_name:/opt/ml --rm {model_name} train" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Copy Model Artifact to Strategies Folder" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!ls -la local/{model_name}/model/model.h5\n", "!cp local/{model_name}/model/model.h5 ../2_Strategies/model/{model_name}.h5\n", "!ls -la ../2_Strategies/model/model_*.h5" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Option 2: Remote Training via SageMaker\n", "\n", "You can choose if you want to do the training locally (Option 1) or remote via SageMaker (Option 2)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Deploy ML Image to ECS\n", "!./build_and_push.sh $model_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import sagemaker as sage\n", "from sagemaker import get_execution_role\n", "import datetime\n", "from sagemaker.tensorflow import TensorFlow\n", "import json\n", "\n", "role = get_execution_role()\n", "sess = sage.Session()\n", "\n", "WORK_DIRECTORY = 'local/'+model_name+'/input/data/training'\n", "data_location = sess.upload_data(WORK_DIRECTORY, key_prefix='data')\n", "print(data_location)\n", "\n", "conf_file='local/'+model_name+'/input/config/hyperparameters.json'\n", "with open(conf_file, 'r') as f:\n", " config = json.load(f)\n", "print(config)\n", "\n", "prefix=model_name\n", "job_name=prefix.replace('_','-')\n", "\n", "account = sess.boto_session.client('sts').get_caller_identity()['Account']\n", "region = sess.boto_session.region_name\n", "image = f'{account}.dkr.ecr.{region}.amazonaws.com/{prefix}:latest'\n", "\n", "classifier = sage.estimator.Estimator(\n", " image_uri=image,\n", " role=role,\n", " instance_count=1,\n", " instance_type='ml.m4.xlarge',\n", " output_path=\"s3://{}/output\".format(sess.default_bucket()),\n", " sagemaker_session=sess,\n", " base_job_name=job_name)\n", "classifier.fit(data_location)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Download Model Artifact from Amazon S3 and copy it to Strategies Folder" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Get Model from S3\n", "model_name_s3=classifier.model_data.replace('s3://'+sess.default_bucket()+'/','')\n", "import boto3\n", "s3 = boto3.resource('s3')\n", "my_bucket = s3.Bucket(sess.default_bucket())\n", "my_bucket.download_file(model_name_s3,'model.tar.gz')\n", "!tar -xzf model.tar.gz\n", "!rm model.tar.gz\n", "!cp model.h5 ../2_Strategies/model/{model_name}.h5\n", "!ls -la model.h5\n", "!ls -la ../2_Strategies/model/model_*.h5" ] } ], "metadata": { "kernelspec": { "display_name": "conda_tensorflow_p36", "language": "python", "name": "conda_tensorflow_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 2 }