{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# How to run what-if scenarios for trading strategies with Amazon FinSpace\n", "\n", "This notebook performs the following steps: \n", "\n", "1.\tLoad data: We load one minute granular equity price data that is provided as part of the Capital Markets Sample Data Bundle which is an additional item to install into Amazon FinSpace environments from the AWS console. \n", "\n", "2.\tPrepare data: We quickly analyze and prepare the dataset for model training and backtesting and use one of the Amazon FinSpace functions to calculate Exponential Moving Average (EMA).\n", "\n", "3.\tModel training: We use Spark ML to train multiple random forest machine learning algorithms.\n", "\n", "4.\tBacktest strategies: We use the Backtrader framework for the actual backtesting, then each strategy is executed for all scenarios.\n", "\n", "5.\tBenchmark strategies: In order to benchmark the different strategy configurations we calculate Profit and Loss (PNL) for each configuration.\n", "\n", "6.\tWhat-if output: Once the benchmark is completed, we output the best strategy configuration (according to PNL).\n", "\n", "Before executing the notebook: \n", "1. Select the FinSpace PySpark Kernel in the top right corner of this notebook\n", "2. Wait ca. 5 minutes until the FinSpace PySpark Kernel is available \n", "\n", "### Connect to FinSpace Cluster" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%local\n", "from aws.finspace.cluster import FinSpaceClusterManager\n", "\n", "# if this was already run, no need to run again\n", "if 'finspace_clusters' not in globals():\n", " finspace_clusters = FinSpaceClusterManager()\n", " finspace_clusters.auto_connect()\n", "else:\n", " print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. Load data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#####----------------------------------------------------------\n", "##### REPLACE WITH CORRECT IDS!\n", "##### Dataset: \"US Equity Time-Bar Summary - 1 min, 14 Symbols - Sample\"\n", "#####----------------------------------------------------------\n", "dataset_id = \"\"\n", "data_view_id = \"\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Use the FinSpace API to get the dataset as a Spark DataFrame" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from aws.finspace.analytics import FinSpaceAnalyticsManager\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "import datetime as dt\n", "import pyspark.sql.functions as F\n", "import pyspark.sql.types as T\n", "\n", "from aws.finspace.timeseries.spark.analytics import *\n", "from aws.finspace.timeseries.spark.windows import *\n", "from aws.finspace.timeseries.spark.util import string_to_timestamp_micros\n", "\n", "finspace_analytics = FinSpaceAnalyticsManager(spark = spark)\n", "df = finspace_analytics.read_data_view(dataset_id = dataset_id, data_view_id = data_view_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Sample a few rows of the DataFrame" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Select historical price data\n", "\n", "In this example we pick a few days in 2020 for ticker NFLX. Please adjust ticker and time frame as needed." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Just a few days for testing\n", "sDate = dt.datetime(2020, 1, 1)\n", "eDate = dt.datetime(2020, 1, 7) \n", "\n", "df2 = ( df.filter(df.eventtype == \"TRADE NB\").filter( df.date.between(sDate, eDate) ).filter(df.ticker == \"NFLX\") ).select(['start','open','high','low','close','volume','vwap'])\n", "df2.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "dfPandas=df2.toPandas()\n", "dfPandas.set_index(pd.DatetimeIndex(dfPandas['start']),inplace=True)\n", "del dfPandas['start']\n", "dfPandas[['close']].plot()\n", "%matplot plt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. Prepare data\n", "\n", "### Add technical indicator: EMA(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# group the sets of values\n", "partitionList = []\n", "\n", "tenor = 10\n", "timeCol = 'start'\n", "priceCol = 'close'\n", "emaDef = exponential_moving_average(tenor, timeCol, priceCol)\n", "\n", "#Example: use MACD instead of simpler EMA\n", "#macdDef = moving_average_converge_diverge_hist( 12, 26, 9, timeCol, priceCol )\n", "\n", "df3 = compute_analytics_on_features(df2, \"exponential_moving_average\", emaDef, partition_col_list = partitionList)\n", "df4=df3.filter(df3.exponential_moving_average>0)\n", "df4.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Add label: VWAP (t + 5 minutes)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "futureDF = df4.withColumn(\"futurestart\", df4.start + F.expr('INTERVAL 5 MINUTES'))\n", "futureDF = futureDF.withColumnRenamed(\"vwap\", \"label\")\n", "futureDF.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#we're adding a new column that contains volume weighted average price and \"move\" the values ahead of 5 minutes and use this as label\n", "df4 = df4.alias('df4')\n", "futureDF = futureDF.alias('futureDF')\n", "\n", "fullDF = df4.join(futureDF, df4.start == futureDF.futurestart).select('df4.*', 'futureDF.label')\n", "fullDF.show(6)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Split data in training data and test data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Get full data set\n", "data=fullDF.collect()\n", "dataLen=len(data)\n", "dataLen" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Take 70% of data for training, use 30% of data for testing\n", "trainLen=int(dataLen*0.7)\n", "trainingData=spark.createDataFrame(data[:trainLen])\n", "testData=spark.createDataFrame(data[trainLen:])\n", "(trainingData.count(),testData.count())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. Model Training\n", "\n", "### Train multiple Random Forest models via Spark ML. This takes a few minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.ml.feature import VectorAssembler\n", "from pyspark.ml.regression import RandomForestRegressor\n", "from pyspark.ml.tuning import ParamGridBuilder\n", "import numpy as np\n", "from pyspark.ml.tuning import CrossValidator\n", "from pyspark.ml.evaluation import RegressionEvaluator\n", "from pyspark.ml import Pipeline\n", "from time import *\n", "\n", "feature_list = [\"vwap\", \"open\", \"high\", \"low\", \"close\", \"volume\", \"exponential_moving_average\"]\n", "assembler = VectorAssembler(inputCols=feature_list, outputCol=\"features\")\n", "rf = RandomForestRegressor().setFeaturesCol(\"features\").setLabelCol(\"label\")\n", "pipeline = Pipeline(stages=[assembler, rf])\n", "\n", "#hyperparameters values: numTrees start, numTrees stop, numTrees num, maxDepth start, maxDepth stop, maxDepth num, numFolds\n", "set1=[ 5, 25, 3, 5, 10, 3, 9]\n", "set2=[ 10, 50, 3, 5, 10, 3, 9]\n", "set3=[ 5, 25, 3, 5, 10, 3, 27]\n", "\n", "params=[set1, set2, set3]\n", "models=[]\n", "\n", "for i in params:\n", " print(\"set\" + str(i) )\n", " paramGrid = ParamGridBuilder() \\\n", " .addGrid(rf.numTrees, [int(x) for x in np.linspace(start = i[0], stop = i[1], num = i[2])]) \\\n", " .addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = i[3], stop = i[4], num = i[5])]) \\\n", " .build()\n", "\n", " crossval = CrossValidator(estimator=pipeline,\n", " estimatorParamMaps=paramGrid,\n", " evaluator=RegressionEvaluator(),\n", " numFolds=i[6])\n", "\n", " starttime = time()\n", " m=crossval.fit(trainingData)\n", " models.append(m)\n", " endtime = time()\n", " trainingtime = endtime - starttime\n", " print(\"Training time: %.3f seconds\" % (trainingtime))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 4. Backtest strategies\n", "\n", "### Install backtesting library" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sc.install_pypi_package(\"backtrader\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define helper classes for using the backtesting library. This contains code for calculating performance metrics and for reading time series data from the Spark cluster." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import backtrader as bt\n", "import backtrader.feeds as btfeeds\n", "import backtrader.analyzers as btanalyzers\n", "from backtrader.feed import DataBase\n", "from backtrader import date2num\n", "from backtrader import TimeFrame\n", "import os\n", "import pytz\n", "from pytz import timezone\n", "import json\n", "import time\n", "import itertools\n", "import datetime\n", "\n", "# More documentation about backtrader: https://www.backtrader.com/\n", "\n", "class AlgoStrategy():\n", " \n", " def __init__(self,strategy): \n", " self.cerebro = bt.Cerebro() \n", " strategy.init_broker(self.cerebro.broker)\n", " data=strategy.add_data(self.cerebro)\n", " strategy.data=data\n", " \n", " self.cerebro.addstrategy(strategy)\n", " \n", " self.portfolioStartValue=self.cerebro.broker.getvalue() \n", " self.cerebro.addanalyzer(btanalyzers.DrawDown, _name='dd')\n", " self.cerebro.addanalyzer(btanalyzers.SharpeRatio_A, _name='sharpe')\n", " self.cerebro.addanalyzer(btanalyzers.SQN, _name='sqn')\n", " self.cerebro.addanalyzer(btanalyzers.TradeAnalyzer, _name='ta')\n", " \n", " def performance(self):\n", " analyzer=self.thestrat.analyzers.ta.get_analysis()\n", " dd_analyzer=self.thestrat.analyzers.dd.get_analysis()\n", " \n", " #Get the results we are interested in\n", " total_open = analyzer.total.open\n", " total_closed = analyzer.total.closed\n", " total_won = analyzer.won.total\n", " total_lost = analyzer.lost.total\n", " win_streak = analyzer.streak.won.longest\n", " lose_streak = analyzer.streak.lost.longest\n", " pnl_net = round(analyzer.pnl.net.total,2)\n", " strike_rate=0\n", " if total_closed>0:\n", " strike_rate = (total_won / total_closed) * 100\n", " #Designate the rows\n", " h1 = ['Total Open', 'Total Closed', 'Total Won', 'Total Lost']\n", " h2 = ['Strike Rate','Win Streak', 'Losing Streak', 'PnL Net']\n", " h3 = ['DrawDown Pct','MoneyDown', '', '']\n", " self.total_closed=total_closed\n", " self.strike_rate=strike_rate\n", " self.max_drawdown=dd_analyzer.max.drawdown\n", " r1 = [total_open, total_closed,total_won,total_lost]\n", " r2 = [('%.2f%%' %(strike_rate)), win_streak, lose_streak, pnl_net]\n", " r3 = [('%.2f%%' %(dd_analyzer.max.drawdown)), dd_analyzer.max.moneydown, '', '']\n", " #Check which set of headers is the longest.\n", " header_length = max(len(h1),len(h2),len(h3))\n", " #Print the rows\n", " print_list = [h1,r1,h2,r2,h3,r3]\n", " row_format =\"{:<15}\" * (header_length + 1)\n", " print(\"Trade Analysis Results:\")\n", " for row in print_list:\n", " print(row_format.format('',*row))\n", "\n", " analyzer=self.thestrat.analyzers.sqn.get_analysis()\n", " sharpe_analyzer=self.thestrat.analyzers.sharpe.get_analysis()\n", " self.sqn = analyzer.sqn\n", " self.sharpe_ratio = sharpe_analyzer['sharperatio']\n", " if self.sharpe_ratio is None:\n", " self.sharpe_ratio=0\n", " self.pnl = self.cerebro.broker.getvalue()-self.portfolioStartValue\n", " print('[SQN:%.2f, Sharpe Ratio:%.2f, Final Portfolio:%.2f, Total PnL:%.2f]' % (self.sqn,self.sharpe_ratio,self.cerebro.broker.getvalue(),self.pnl))\n", " \n", " def run(self):\n", " thestrats = self.cerebro.run()\n", " self.thestrat = thestrats[0]\n", " self.performance()\n", "\n", "class MyFeed(DataBase):\n", " def __init__(self):\n", " super(MyFeed, self).__init__()\n", " self.list=testData.select(\"start\", \"open\", \"high\", \"low\", \"close\", \"volume\", \"vwap\", \"exponential_moving_average\").collect()\n", " self.n=0\n", " \n", " self.fromdate=self.list[0]['start']\n", " self.todate=self.list[len(self.list)-1]['start']\n", " self.timeframe=bt.TimeFrame.Minutes\n", " print(\"from=%s,to=%s\" % (self.fromdate,self.todate))\n", " \n", " self.m={}\n", " #print(self.list)\n", " \n", " def start(self):\n", " # Nothing to do for this data feed type\n", " pass\n", "\n", " def stop(self):\n", " # Nothing to do for this data feed type\n", " pass\n", " \n", " def _load(self):\n", " if self.n>=len(self.list):\n", " return False\n", " \n", " r=self.list[self.n]\n", " self.lines.datetime[0] = date2num(r['start'])\n", " \n", " self.lines.open[0] = r['open']\n", " self.lines.high[0] = r['high']\n", " self.lines.low[0] = r['low']\n", " self.lines.close[0] = r['close']\n", " self.lines.volume[0] = r['volume']\n", " self.m[r['start']]=r\n", " \n", " self.n=self.n+1\n", " return True\n", "\n", "class StrategyTemplate(bt.Strategy):\n", " \n", " def __init__(self): \n", " self.lastDay=-1\n", " self.lastMonth=-1\n", " self.dataclose = self.datas[0].close\n", " \n", " @staticmethod\n", " def init_broker(broker):\n", " pass\n", " \n", " @staticmethod\n", " def add_data(cerebro):\n", " pass\n", " \n", " def next(self):\n", " dt=self.datas[0].datetime.datetime(0)\n", " #print(\"[NEXT]:%s:close=%s\" % (dt,self.dataclose[0]))\n", " \n", " #SOM\n", " if self.lastMonth!=dt.month:\n", " if self.lastMonth!=-1:\n", " chg=self.broker.getvalue()-self.monthCash\n", " #print(\"[%s] SOM:chg=%.2f,cash=%.2f\" % (dt,chg,self.broker.getvalue()))\n", " self.lastMonth=dt.month\n", " self.monthCash=self.broker.getvalue()\n", " \n", " #SOD\n", " if self.lastDay!=dt.day:\n", " self.lastDay=dt.day\n", " #print(\"[%s] SOD:cash=%.2f\" % (dt,self.broker.getvalue()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define the trading strategy that we want to backtest. The method \"next\" defines the trade logic for each data record." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class MyStrategy(StrategyTemplate):\n", "\n", " def __init__(self): # Initiation\n", " super(MyStrategy, self).__init__()\n", " \n", " def init_broker(broker):\n", " broker.setcash(1000000.0)\n", " broker.setcommission(commission=0.0) \n", " \n", " def add_data(cerebro):\n", " data = MyFeed()\n", " cerebro.adddata(data)\n", " return data\n", " \n", " def next(self): # Processing\n", " super(MyStrategy, self).next()\n", " dt=self.datas[0].datetime.datetime(0)\n", " r=self.data.m[dt]\n", " #print(r)\n", " size=self.cerebro.strat_params['size']\n", " threshold_PctChg=self.cerebro.strat_params['pct_chg']\n", " \n", " model=self.cerebro.strat_params['model']\n", " df=spark.createDataFrame([r])\n", " VWAP=r['vwap']\n", " predictedVWAP = model.transform(df).collect()[0]['prediction']\n", " expectedPctChg=(predictedVWAP-VWAP)/VWAP*100.0\n", " \n", " goLong=expectedPctChg>threshold_PctChg\n", " goShort=expectedPctChg<-threshold_PctChg\n", " #print(\"expectedPctChg=%s,goLong=%s,goShort=%s\" % (expectedPctChg,goLong,goShort))\n", " \n", " if not self.position:\n", " if goLong:\n", " print(\"%s:%s x BUY @ %.2f\" % (dt,size,r['close']))\n", " self.buy(size=size) # Go long\n", " else:\n", " print(\"%s:%s x SELL @ %.2f\" % (dt,size,r['close']))\n", " self.sell(size=size) # Go short\n", " elif self.position.size>0 and goShort:\n", " print(\"%s:%s x SELL @ %.2f\" % (dt,size*2,r['close']))\n", " self.sell(size=size*2)\n", " elif self.position.size<0 and goLong: \n", " print(\"%s:%s x BUY @ %.2f\" % (dt,size*2,r['close']))\n", " self.buy(size=size*2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create multiple what-if scenario configurations" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "scenarios=[]\n", "for p in range(0,len(models)):\n", " for s in range(0,1):\n", " c={'scenario':(p+1), \"size\":100,\"pct_chg\":0.01,\"model\":models[p],'model_name':'model.%s' % (p+1)}\n", " print(c)\n", " scenarios.append(c)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 5. Benchmark strategies. This takes a few minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# run scenarios\n", "best_config=None\n", "best_pnl=None\n", "n=0\n", "for c in scenarios:\n", " print(\"*** [%s] RUN SCENARIO:%s\" % ((n+1),c))\n", " config=c\n", " algo=AlgoStrategy(MyStrategy)\n", " algo.cerebro.strat_params=config\n", " algo.run()\n", " if best_pnl is None or best_pnl