{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Start up and connect to a Spark Cluster \n",
"A new cluster will be started if no cluster exists. Otherwise, let's use the existing one"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%local\n",
"from aws.finspace.cluster import FinSpaceClusterManager\n",
"\n",
"# if this was already run, no need to run again\n",
"if 'finspace_clusters' not in globals():\n",
" finspace_clusters = FinSpaceClusterManager()\n",
" finspace_clusters.auto_connect()\n",
"\n",
"cluster_id = finspace_clusters.get_connected_cluster_id()\n",
"print(f'connected to cluster: {cluster_id}')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#%local\n",
"#import pandas as pd\n",
"#pd.DataFrame.from_dict( finspace_clusters.list()['clusters'] ) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Set dataset and view \n",
"The demo uses the dataset: \"US Equity TAQ Sample - AMZN 6 Months - Sample\". A user needs to replace the values of the 2 parameters with what are shown in finspace"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#%local \n",
"#####----------------------------------------------------------\n",
"##### Place to change\n",
"##### REPLACE WITH CORRECT IDS!\n",
"##### Dataset: \"US Equity TAQ Sample - AMZN 6 Months - Sample\"\n",
"#####\n",
"#####----------------------------------------------------------\n",
"\n",
"dataset_id = ''\n",
"view_id = ''"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Obtain and process data using FinSpace analytical functions \n",
"Import libraries for price pattern analysis (`scipy`), plotting (`matplotlib`), data processing (`pyspark`) and aggregation (`finspace`)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"from scipy.signal import argrelextrema\n",
"import os\n",
"import matplotlib.pyplot as plt\n",
"import matplotlib\n",
"from collections import defaultdict\n",
"import time\n",
"import matplotlib.pyplot as plt\n",
"import datetime as dt\n",
"import pyspark.sql.functions as F\n",
"import pyspark.sql.types as T\n",
"import pprint \n",
"from aws.finspace.timeseries.spark.util import string_to_timestamp_micros\n",
"from aws.finspace.timeseries.spark.windows import create_time_bars, compute_analytics_on_features, compute_features_on_time_bars\n",
"from aws.finspace.timeseries.spark.spec import BarInputSpec, TimeBarSpec\n",
"from aws.finspace.timeseries.spark.summarizer import *\n",
"from aws.finspace.timeseries.spark.analytics import *\n",
"from aws.finspace.timeseries.finance.calendars import *\n",
"from aws.finspace.timeseries.spark.prepare import *\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Call the finspace library function `FinSpaceAnalyticsManager` to obtain the finspace analytics manager"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Initialize and Connect\n",
"from aws.finspace.analytics import FinSpaceAnalyticsManager\n",
"finspace = FinSpaceAnalyticsManager(spark = spark)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Set the day range for price action analysis "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Date range\n",
"start_date = dt.datetime(2019, 10, 1)\n",
"end_date = dt.datetime(2019, 12, 31)\n",
"\n",
"# debug mode\n",
"debug_mode = True"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Read data from the finspace"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#tDF = finspace.read_view_as_spark(dataset_id = dataset_id, view_id = view_id)\n",
"tDF = finspace.read_data_view(dataset_id = dataset_id, data_view_id = view_id)\n",
"if debug_mode:\n",
" tDF.printSchema() \n",
" tDF.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next we decide how to generate \"bars\" for further technical analysis. We select the bar size to be 1 min because it is normally the highest frequency human traders can process. Further smoothing or aggregation can also be applied to multiple bars in a flexible and user configurable way."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"bw = '1 minute'\n",
"\n",
"# what columns to collect in the bar - fixed\n",
"bar_input_spec = BarInputSpec('activity', 'datetime', 'timestamp', 'price', 'quantity', 'exchange', 'conditions' )\n",
"# timebar column name - fixed\n",
"timebar_col = 'window'\n",
"# group the dataset's values by....\n",
"partitionList = [\"ticker\", \"eventtype\"]\n",
"# ticker and trade event for which the trading sign is defined. \n",
"fTicker = 'AMZN'\n",
"event_type = 'TRADE NB'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Call `finspace` library to generate bars in the form of spark dataframe. Then several number technical indicators (e.g. `std`, `vwap`, `ohlc`, `volume`) are generated using the library functions. Finally, we only select trading data within the analysis window"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"timebar_spec = TimeBarSpec(timestamp_column='datetime', window_duration=bw, slide_duration=bw)\n",
"bar_input_spec = BarInputSpec('activity', 'datetime', 'timestamp', 'price', 'quantity', 'exchange', 'conditions' )\n",
"collDF = create_time_bars(data = tDF, \n",
" timebar_column = timebar_col, \n",
" grouping_col_list = ['date', 'ticker', 'eventtype'], \n",
" input_spec = bar_input_spec, \n",
" timebar_spec = timebar_spec)\\\n",
" .withColumn( 'activity_count', F.size( F.col('activity') ) )\n",
"# free up DF\n",
"tDF.unpersist()\n",
"\n",
"sumDF = ( collDF\n",
" .withColumn( 'std', std( 'activity.price' ) )\n",
" .withColumn( 'vwap', vwap( 'activity.price', 'activity.quantity' ) )\n",
" .withColumn( 'ohlc', ohlc_func( 'activity.datetime', 'activity.price' ) ) \n",
" .withColumn( 'volume', total_volume( 'activity.quantity' ) ) \n",
" .drop( collDF.activity )\n",
" )\n",
"# free up DF\n",
"collDF.unpersist()\n",
"\n",
"ffDF = time_bar_fill_and_filter(sumDF, timebar_col, NYSECalendar20192020(), timebar_spec, start_date, end_date, fill_value=np.nan)\n",
"# free up DF\n",
"sumDF.unpersist()\n",
"\n",
"tsDF = ( ffDF \n",
" .filter(ffDF.date.between(start_date, end_date))\n",
" .filter(ffDF.eventtype==event_type)\n",
" # flatten window\n",
" .withColumn(\"start\", ffDF.window.start)\n",
" .withColumn(\"end\", ffDF.window.end)\n",
" .drop(\"window\")\n",
" # flatten ohlc\n",
" .withColumn(\"open\", ffDF.ohlc.open)\n",
" .withColumn(\"high\", ffDF.ohlc.high)\n",
" .withColumn(\"low\", ffDF.ohlc.low)\n",
" .withColumn(\"close\", ffDF.ohlc.close)\n",
" .drop(\"ohlc\")\n",
" # order \n",
" .orderBy(\"ticker\", \"eventtype\", \"start\")\n",
" )\n",
"# free up DF\n",
"ffDF.unpersist()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tsDF.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Further smooth close price to reduce noise in head-shoulder pattern detection "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Smoothing is usually needed to reduce the noise in price action pattern identification. This demo chooses to smooth the close price using exponential moving average of past 5 bars. Finally, only 3 columns of the dataframe are kept: the start time and close price of each bar, and the smoothed close price\n",
"\n",
"Note the smoothing can be combined with bar generation but in order to make the processing more flexible, we decouple them. This also speeds up repeated tests because the bars are generated one time where data size is greatly reduced. Further smoothing is only applied to the resultant smaller dataset of 1-min bars. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 5-bars\n",
"win=5\n",
"smoothing = exponential_moving_average(win, \"end\", \"close\")\n",
"\n",
"# group the sets of values. Note need to partition by date to prevent the window spreading cross the day\n",
"partitionList = [\"ticker\", \"eventtype\", \"date\"]\n",
"\n",
"tsDF = compute_analytics_on_features(tsDF, \"smoothed_close\", smoothing, partition_col_list = partitionList)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tsDF = (tsDF \n",
" .withColumn(\"end\", tsDF.end)\n",
" .withColumn(\"close\", tsDF.close)\n",
" .withColumn(\"smoothed_close\", tsDF.smoothed_close)\n",
" # drop everything else\n",
" .drop(\"date\", \"ticker\", \"eventtype\", \"activity_count\", \"std\", \"vwap\", \"volume\", \"start\", \"open\", \"high\", \"low\")\n",
" ) \n",
"tsDF.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Pick a time period to identify head-shoulder patterns \n",
"Note: for best visual identification, the period should not be too large. 1 day is chosen for demo purpose"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pa_win_start = '2019-10-02 00:00:01'\n",
"pa_win_end = '2019-10-02 23:59:59'\n",
"tsDF=tsDF.filter(tsDF.end.between(pa_win_start, pa_win_end))\n",
"tsDF.count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Extract the data within the selected period.\n",
"\n",
"Note `argrelextrema` can't process spark dataframe. Data is passed in as a `numpy` array through pandas dataframe. Also note the head-shoulders identification algorithm works on the unsmoothed data - `prices`. As a result, `dropna()` only applies to the smoothed prices `smooth_prices`. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"window_range=3\n",
"prices = tsDF.toPandas()[['end', 'close', 'smoothed_close']].set_index('end')\n",
"smooth_prices = prices['smoothed_close'].dropna()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Find head-shoulders using local maxmins \n",
"\n",
"Scipy function `argrelextrema` is used for this purpose. Then we perform further smoothing by picking minima and maxima out of a window: the local minima/maxima must be the minimum/maximum within the window of 7 bars centered on itself. This helps dampen noise when dealing with volatile time series.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"local_max = argrelextrema(smooth_prices.values, np.greater)[0]\n",
"local_min = argrelextrema(smooth_prices.values, np.less)[0]\n",
"price_local_max_dt = []\n",
"for i in local_max:\n",
" if (i>window_range) and (iwindow_range) and (i max_span: \n",
" continue \n",
" \n",
" E = window.iloc[0:5].close\n",
" \n",
" # IHS\n",
" if E[0]E[1] and E[2]>E[0] and E[2]>E[4] and E[4]>E[3] and abs(E[1]-E[3])<=np.mean(E[[1,3]])*0.015:\n",
" patterns['HS'].append((window.index[0], window.index[-1]))\n",
" \n",
" return patterns"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"lasti = max_min.index[0]\n",
"agg_maxmin = []\n",
"\n",
"for p in max_min.index[1:]:\n",
" if max_min.loc[p].type == max_min.loc[lasti].type:\n",
" if max_min.loc[p].type == 'max' and max_min.loc[p].close > max_min.loc[lasti].close or \\\n",
" max_min.loc[p].type == 'min' and max_min.loc[p].close < max_min.loc[lasti].close: \n",
" lasti = p\n",
" if max_min.loc[p].type != max_min.loc[lasti].type:\n",
" agg_maxmin.append(lasti)\n",
" lasti = p\n",
"\n",
"agg_maxmin.append(lasti)\n",
"pat = find_patterns(max_min, 2, unit='H')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Plot head shoulders "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(pat['HS'])\n",
"timestamps = lambda dtindex: [f'{t.hour}:{t.minute}:{t.second}' for t in dtindex]\n",
"\n",
"def get_end_point(tindex, xy1, xy2, length=1):\n",
" xrange = tindex.get_loc(xy2[0]) - tindex.get_loc(xy1[0])\n",
" slope = (xy2[1] - xy1[1] )/ xrange\n",
" xe = tindex[tindex.get_loc(xy1[0]) + xrange*length]\n",
" ye = xy1[1] + slope*xrange*length\n",
" #print(f'xe={xe}, ye={ye}, slopt={slope}, xrange={xrange}, length={length}')\n",
" return (xe, ye) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The identified head-shoulders are plotted. \n",
"* The left and right shoulders and head are red dots. As seen, they are local maxima, of which the head is the largest\n",
"* The neck points are in blue. \n",
"* The neckline is the dashline through the two neck points. In this example, the neckline tilts upward.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#%% plot max min\n",
"f, axes = plt.subplots(1, 1, figsize=(16, 5))\n",
"#timestamps = [f'{t.hour}:{t.minute}:{t.second}' for t in prices.index]\n",
"axes.plot(timestamps(prices.index), prices['close'])\n",
"#axes.plot(smooth_prices)\n",
"NT=len(prices['close'].index)\n",
"xstep=10\n",
"#axes.xaxis.set_ticks(prices['close'].index[range(0, NT, xstep)], rotation=45)\n",
"#axes.set_xticklabels(prices['close'].index, rotation=45)\n",
"axes.scatter(timestamps(max_min.index), max_min.close, s=100, alpha=.3, color='orange')\n",
"axes.xaxis.set_major_locator(plt.MaxNLocator(20))\n",
"axes.tick_params(labelrotation=45)\n",
"\n",
"pattern_color={'HS':'orange','IHS':'black'}\n",
"for pt in pat.keys():\n",
" for win in pat[pt]:\n",
" w = max_min.loc[win[0]:win[1]]\n",
" axes.scatter(timestamps(w.index), w.close, s=150, marker='o', alpha=.3, color=pattern_color[pt])\n",
" axes.plot(timestamps(w.index), w.close, color=pattern_color[pt])\n",
" (xe, ye) = get_end_point(prices.index, (w.index[1], w.close[1]), (w.index[3], w.close[3]), length=2)\n",
" axes.plot(timestamps([w.index[1], xe]), [w.close[1], ye], dashes= (5, 2, 1, 2))\n",
" #axes.plot((prices.index.get_loc(w.index[1]), w.close[1]), (xe, ye), color='blue', dashes= (5, 2, 1, 2)) \n",
"%matplot plt"
]
}
],
"metadata": {
"instance_type": "ml.t3.medium",
"kernelspec": {
"display_name": "FinSpace PySpark (finspace-sparkmagic-12539/latest)",
"language": "python",
"name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:695450025629:image/finspace-sparkmagic-12539"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 3
},
"mimetype": "text/x-python",
"name": "pyspark",
"pygments_lexer": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}