# Time Series Blog Notebook with Custom Calendar
This notebook executes an end to end time series analysis using the Amazon FinSpace time series framework and included analytics functions. The notebook will process Equity TAQ data (provided to FinSpace by AlgoSeek LLC) and runs that data through the framework to generate Volatility and Bollinger bad data and plots.

This notebook goes beyond the original blog notebook in that it uses the [pandas_market_calendar](https://pypi.org/project/pandas-market-calendars/) library to create a custom calendar for use in the 'Fill and Filter' stage of the time series data pipeline. 

In [None]:
%local
from aws.finspace.cluster import FinSpaceClusterManager

# if this was already run, no need to run again
if 'finspace_clusters' not in globals():
 finspace_clusters = FinSpaceClusterManager()
 finspace_clusters.auto_connect()
else:
 print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')

# Collect and Summarize Timebars
Time bars are obtained by sampling information at fixed time intervals, e.g., once every minute. 

**Series:** Time Series Data Engineering and Analysis

As part of the big data timeseries processing workflow FinSpace supports, shows how one takes raw, uneven in time events of TAQ data and collects them into a performant derived dataset of collected bars of data.

## Timeseries Workflow
Raw Events → **\[Collect bars → Summarize bars → Fill Missing → Prepare → Analytics\]**

![Workflow](workflow.png)

In [None]:
#####----------------------------------------------------------
##### REPLACE WITH CORRECT IDS!
##### Dataset: "US Equity TAQ - AMZN 6 Months"
#####
#####----------------------------------------------------------
dataset_id = ''
view_id = ''

In [None]:
# python imports
import time
import datetime
import pprint 

import pandas as pd
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
import pyspark.sql.types as T

# FinSpace imports
from aws.finspace.timeseries.spark.util import string_to_timestamp_micros
from aws.finspace.timeseries.spark.windows import create_time_bars, compute_analytics_on_features, compute_features_on_time_bars
from aws.finspace.timeseries.spark.spec import BarInputSpec, TimeBarSpec
from aws.finspace.timeseries.spark.summarizer import *
from aws.finspace.timeseries.spark.analytics import *
from aws.finspace.timeseries.finance.calendars import *
from aws.finspace.timeseries.spark.prepare import *

# Date range
start_date = datetime.datetime(2019, 10, 1)
end_date = datetime.datetime(2019, 12, 31)

barNum = 1
barUnit = "minute"

barWidth = f"{barNum} {barUnit}"

# Get the Data from FinSpace
Using the given dataset and view ids, get the view as a Spark DataFrame

In [None]:
from aws.finspace.analytics import FinSpaceAnalyticsManager
finspace_manager = FinSpaceAnalyticsManager(spark)

tDF = finspace_manager.read_data_view(dataset_id, view_id)
tDF.printSchema()

# Interact with Spark DataFrame
As a Spark DataFrame, you can interact with the data with spark functions.

In [None]:
tDF.printSchema()
tDF.show(5)
print(f'Rows: {tDF.count():,}')

# Time Series Framework Stages
The functions below process the time series data by first collecting the data into time-bars then summarizing the data captured in the bar. The bars are collected into a column 'activity' for the window of time in the collectTimeBars function. The summarize bar function's purpose is to summarize the data collected in the bar, that bar can be of any type, not just time.

Customizations
- vary the width and steps of the time-bar, collect different data from the source DataFrame
- Summarize the bar with other calculations 

Bring Your Own 
- Customers can add their own custom Spark user defined functions (UDF) into the summarizer phase

![Workflow](workflow.png)


# Stage: Collect Bars

Collect raw TAQ events into time bars using FinSpace time series functions.

In [None]:
# define the time-bar, column for time and how much time to collect
timebar_spec = TimeBarSpec(timestamp_column='datetime', window_duration=barWidth, slide_duration=barWidth)

# what columns to collect in the bar
bar_input_spec = BarInputSpec('activity', 'datetime', 'timestamp', 'price', 'quantity', 'exchange', 'conditions' )

# timebar column name
timebar_col = 'window'

# The results in a new DataFrame, also add column for number of activity items collected in the bar
collDF = create_time_bars(data = tDF, 
 timebar_column = timebar_col, 
 grouping_col_list = ['date', 'ticker', 'eventtype'], 
 input_spec = bar_input_spec, 
 timebar_spec = timebar_spec)\
 .withColumn( 'activity_count', F.size( F.col('activity') ) )

In [None]:
# schema at end of this stage
collDF.printSchema()

# sample 5 rows, truncate results (activity can get big)
collDF.filter( collDF.date == start_date ).show(5, True)

# Stage: Summarize Bars

Summarize the bars and once summarized drop activity since it will no longer be needed.


In [None]:
# Bar data is in a column that is a list of structs named 'activity'
# values collected in 'activity': datetime, teimstamp, price, quantity, exchange, conditions

# Spark Way
sumDF = ( collDF
 .withColumn( 'std', std( 'activity.price' ) )
 .withColumn( 'vwap', vwap( 'activity.price', 'activity.quantity' ) )
 .withColumn( 'ohlc', ohlc_func( 'activity.datetime', 'activity.price' ) ) 
 .withColumn( 'volume', total_volume( 'activity.quantity' ) )
# .withColumn('MY_RESULT', MY_SPECIAL_FUNCTION( 'activity.datetime', 'activity.price', 'activity.quantity' ) )
 .drop( collDF.activity )
)

# Library Way
sumDF = compute_features_on_time_bars(collDF, "std", std( 'activity.price' ), True, "window")
sumDF = compute_features_on_time_bars(sumDF, "vwap", vwap( 'activity.price', 'activity.quantity' ), True, "window")
sumDF = compute_features_on_time_bars(sumDF, "ohlc", ohlc_func( 'activity.datetime', 'activity.price' ), True, "window")
sumDF = compute_features_on_time_bars(sumDF, "volume", total_volume( 'activity.quantity' ), True, "window")

sumDF = sumDF.drop(sumDF.activity)

In [None]:
# schema at end of this stage
sumDF.printSchema()

# sample 5 rows, don't truncate so we can see full values
sumDF.show(5, False)

# Stage: Fill and Filter

## Custom Calendar
Define a calendar that uses the pandas_market_calendars package.

See [PyPi](https://pypi.org/project/pandas-market-calendars), [github](https://github.com/rsheftel/pandas_market_calendars), and the [documentation](http://pandas_market_calendars.readthedocs.io/en/latest/) for more information and [here](https://pandas-market-calendars.readthedocs.io/en/latest/calendars.html) for all the calendars covered.

In [None]:
# install the library on your cluster if not already installed
try:
 sc.install_pypi_package('pandas_market_calendars==1.2')
except Exception as e:
 print('Packages already Installed')

In [None]:
import pandas_market_calendars as mcal
import datetime

class MarketCalendar(AbstractCalendar):
 def __init__(self, calendar_name: str = 'NYSE'):
 super(AbstractCalendar).__init__()
 self.calendar = mcal.get_calendar(calendar_name)

 def raw_calendar_data(self) -> typing.Dict[str, typing.Any]:
 return {AbstractCalendar.TZINFO: self.calendar.tz.info}

 def create_schedule_from_to(self, from_date: datetime.date, to_date: datetime.date, time_bar_spec_window_duration: str,
 from_time: typing.Optional[datetime.time] = None,
 to_time: typing.Optional[datetime.time] = None,
 ) -> numpy.array:
 """

 A list of datetimes are created from the given start and end date with a frequency as given by time_bar_spec_window_duration. 
 The from_time and to_time values are not used with this class but are required to maintain the interface with the abstract class.

 :param from_date: from date
 :param to_date: to date
 :param time_bar_spec_window_duration:
 :param from_time NOT USED
 :param to_time NOT USED

 :return: List of datetime
 """

 tz = self.calendar.tz.zone
 
 if isinstance(from_date, datetime.date) or isinstance(to_date, datetime.date):
 from_date = datetime.datetime(from_date.year, from_date.month, from_date.day, 0)
 to_date = datetime.datetime(to_date.year, to_date.month, to_date.day, 23, 59, 59, 999999)
 if not from_date.tzinfo and not to_date.tzinfo:
 from_date = pytz.timezone(tz).localize(from_date)
 to_date = pytz.timezone(tz).localize(to_date)
 elif from_date.tzinfo != to_date.tzinfo:
 raise RuntimeError("invalid input for timezones in create schedule")
 
 data = self.calendar.schedule(start_date=from_date, end_date=to_date)

 #aa = pytz.timezone(nyse.tz.zone).localize( mcal.date_range(all_days, frequency=timebar_spec.windowDuration, closed=None) )
 valid_dates = mcal.date_range(data, frequency=time_bar_spec_window_duration, closed=None).to_pydatetime()

 return valid_dates


In [None]:
# fill and filter, use the timebar defined in collect stage
ffDF = time_bar_fill_and_filter(sumDF, timebar_col, MarketCalendar('NYSE'), timebar_spec, start_date, end_date)

# Stage: Prepare Feature Dataset
Simplify schema by selecting needed items and drop what is not needed.

In [None]:
prepDF = ( ffDF
 .filter( ffDF.date.between(start_date, end_date) )

 # flatten window
 .withColumn("start", ffDF.window.start)
 .withColumn("end", ffDF.window.end)
 .drop("window")

 # flatten ohlc
 .withColumn("open", ffDF.ohlc.open)
 .withColumn("high", ffDF.ohlc.high)
 .withColumn("low", ffDF.ohlc.low)
 .withColumn("close", ffDF.ohlc.close)
 .drop("ohlc")
)

In [None]:
prepDF.printSchema()

# sample the data
prepDF.show(10, False)

# Stage: Analytics

Now apply analytics to the data, in our case calculate realized volatility and bollinger bands

In [None]:
# See help for the function
help(realized_volatility)

In [None]:
# See help for the function
help(bollinger_bands)

In [None]:
# Arguments to the functions
tenor = 15
numStd = 2

# analytics to calculate
realVolDef = realized_volatility( tenor, "end", "vwap" )
bbandsDef = bollinger_bands(tenor, numStd, "end", "vwap", "high", "low")

# group the dataset's values by....
partitionList = ["ticker", "eventtype"]

# Prepare the dataframe
tsDF = prepDF

tsDF = compute_analytics_on_features(tsDF, "realized_volatility", realVolDef, partition_col_list = partitionList)
tsDF = compute_analytics_on_features(tsDF, "bollinger_band", bbandsDef, partition_col_list = partitionList)


In [None]:
tsDF.printSchema()

# sample first fiew rows, but lets be sure to filter the null values as well
# Times is UTC, realized_volatility not null after the given tenor
tsDF.drop( "date", "activity_count" ).filter( tsDF.realized_volatility.isNotNull() ).sort(tsDF.end).show( 10, False )

# Plots 
## Realized Volatility Graph
Calculate and plot realized volatility

When plotting with Spark, the calculations are performed on the cluster, specifically, the data is collected to the driver, the plot image created, then the image is shipped over to the local notebook to be shown. This is all done for you.

In [None]:
# ticker and event to filter for
fTicker = 'AMZN'
event_type = 'TRADE NB'

# filter and bring data into a pandas dataframe for plotting
pltDF = ( tsDF
 .filter(tsDF.eventtype == event_type)
 .filter(tsDF.ticker == fTicker)
 .select( 'end', 'realized_volatility' )
).toPandas()

pltDF = pltDF.set_index('end')
pltDF.index = pltDF.index.strftime("%Y-%m-%d %H:%m")

fig, ax = plt.subplots(1, 1, figsize=(12, 6))

#ax.get_yaxis().set_major_formatter( matplotlib.ticker.FuncFormatter(lambda x, p: format(int(x), ',')) )

# Realized Volatility
pltDF[[ 'realized_volatility' ]].plot(figsize=(12,6))

# labels and other items to make the plot readable
plt.title(f"{fTicker} Realized Vol (tenor: {tenor}, 5 min bars)")
plt.ylabel('Realized Vol')
plt.xlabel('Date/Time')
plt.xticks(rotation=30)
plt.subplots_adjust(bottom=0.2)

%matplot plt

## Bollinger Bands
Bollinger Bands where calculated as well....

In [None]:
# filter the bollinger band data
pltDF = ( tsDF
 .filter(tsDF.eventtype == "TRADE NB")
 .withColumn('upper_band', tsDF.bollinger_band.upper_band)
 .withColumn('middle_band', tsDF.bollinger_band.middle_band)
 .withColumn('lower_band', tsDF.bollinger_band.lower_band)
 .filter(tsDF.ticker == fTicker)
 .select( 'end', 'close', 'upper_band', 'middle_band', 'lower_band' )
).toPandas()

pltDF = pltDF.set_index('end')
pltDF.index = pltDF.index.strftime("%Y-%m-%d %H:%m")

# Simple Bollinger Band
pltDF[['close', 'middle_band', 'upper_band', 'lower_band']].plot(figsize=(12,6))

plt.title(f"{fTicker} Bollinger Bands (tenor: {tenor}, 5 min bars, n-std: {numStd})")
plt.ylabel('Price (USD)')
plt.xlabel('Date/Time')
plt.xticks(rotation=30)
plt.subplots_adjust(bottom=0.2)

%matplot plt

In [None]:
# What is the date range for the data?
tsDF.select( F.min(tsDF.start).alias("MIN"), F.max(tsDF.end).alias("MAX")).show()

In [None]:
# What tickers are in this dataset?
tsDF.groupBy("ticker").count().orderBy('ticker').show()

In [None]:
print( f"Last Run: {datetime.datetime.now()}" )