![logo](./finspace_logo.png)

In [None]:
%local
from aws.finspace.cluster import FinSpaceClusterManager

# if this was already run, no need to run again
if 'finspace_clusters' not in globals():
 finspace_clusters = FinSpaceClusterManager()
 finspace_clusters.auto_connect()
else:
 print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')

In [None]:
print(f'Spark Version: {sc.version}')

# Realized Volatility
This notebook will pull summarized data from FinSpace's catalog and then use the analytic function realized_volatility to compute realized volatility for a group of tickers and exchange event types.

In [None]:
# import needed libraries
import pandas as pd
import matplotlib.pyplot as plt
import datetime as dt
import pyspark.sql.functions as F
import pyspark.sql.types as T

from aws.finspace.timeseries.spark.analytics import *
from aws.finspace.timeseries.spark.windows import *

from aws.finspace.timeseries.spark.util import string_to_timestamp_micros

In [None]:
#####----------------------------------------------------------
##### REPLACE WITH CORRECT IDS!
##### Dataset: "US Equity Time-Bar Summary - 1 min, 14 Symbols - Sample"
#####
#####----------------------------------------------------------
dataset_id = '' 
view_id = ''

In [None]:
from aws.finspace.analytics import FinSpaceAnalyticsManager
finspace_manager = FinSpaceAnalyticsManager(spark = spark)

sumDF = finspace_manager.read_data_view(dataset_id = dataset_id, data_view_id = view_id)

In [None]:
# What is the date range for the data?
sumDF.select(F.min(sumDF.date).alias("MIN"), F.max(sumDF.date).alias("MAX")).show()

In [None]:
# What tickers are in this dataset?
sumDF.groupBy("ticker").count().orderBy('ticker').show()

In [None]:
# Filter and select
sDate = dt.datetime(2020, 1, 15)
eDate = dt.datetime(2020, 2, 15)

#df = ( sumDF.filter(sumDF.eventtype == "TRADE NB").filter( sumDF.date.between(sDate, eDate) ) )
df = ( sumDF.filter( sumDF.date.between(sDate, eDate) ) )

# sample the data
df.show(10)

# Spark Analytics
All our analytic functions have help, lets look at the signatures for the functions we will use

![Workflow](./workflow.png)

In [None]:
help(realized_volatility)

In [None]:
tenor = 15
numStd = 2

# analytics to calculate
realVolDef = realized_volatility( tenor, "end", "vwap" )
bbandsDef = bollinger_bands(tenor, numStd, "end", "vwap", "high", "low")

# group the sets of values
partitionList = ["ticker", "eventtype"]

tsDF = df

tsDF = compute_analytics_on_features(tsDF, "realized_volatility", realVolDef, partition_col_list = partitionList)
tsDF = compute_analytics_on_features(tsDF, "bollinger_band", bbandsDef, partition_col_list = partitionList)

# will be working with the once calculated, lets cache it
#tsDF = tsDF.cache()

tsDF.printSchema()

# Realized Volatility Graph
Calculate and plot realized volatility

When plotting with Spark, the calculations are performed on the cluster, specifically, the data is collected to the driver, the plot image created, then the image is shipped over to the local notebook to be shown. This is all done for you.

In [None]:
fTicker = 'AMZN'

# filter and bring data into a pandas dataframe for plotting
pltDF = ( tsDF
 .filter(sumDF.eventtype == "TRADE NB")
 .filter(df.ticker == fTicker)
 .select( 'end', 'realized_volatility' )
).toPandas()

pltDF = pltDF.set_index('end')
pltDF.index = pltDF.index.strftime("%Y-%m-%d %H:%m")

fig, ax = plt.subplots(1, 1, figsize=(12, 6))

#ax.get_yaxis().set_major_formatter( matplotlib.ticker.FuncFormatter(lambda x, p: format(int(x), ',')) )

# Realized Volatility
pltDF[[ 'realized_volatility' ]].plot(figsize=(12,6))

# labels and other items to make the plot readable
plt.title(f"{fTicker} Realized Vol (tenor: {tenor}, 1 min bars)")
plt.ylabel('Realized Vol')
plt.xlabel('Date/Time')
plt.xticks(rotation=30)
plt.subplots_adjust(bottom=0.2)

%matplot plt

# So why that spike?

[Amazon soars after huge earnings beat](https://www.cnbc.com/2020/01/30/amazon-amzn-q4-2019-earnings.html) (CNBC). 
- Amazon reported fourth-quarter results on Thursday that smashed analysts’ expectations. 
- The company’s profits rebounded during the quarter, while revenue climbed 21% year over year. 
- The outperforming results show Amazon’s big investments in one-day delivery are paying off. 


# Keep Iterating
The data wasn't just calculated for one ticker, Spark did this for every ticker in the DataFrame...

In [None]:
fTicker = 'GOOG'

pltDF = ( tsDF
 .filter(sumDF.eventtype == "TRADE NB")
 .filter(df.ticker == fTicker)
 .select( 'end', 'realized_volatility' )
).toPandas()

pltDF = pltDF.set_index('end')
pltDF.index = pltDF.index.strftime("%Y-%m-%d %H:%m")

# Realized Vol
pltDF[[ 'realized_volatility' ]].plot(figsize=(12,6))

plt.title(f"{fTicker} Realized Vol (tenor: {tenor}, 1 min bars)")
plt.ylabel('Realized Vol')
plt.xlabel('Date/Time')
plt.xticks(rotation=30)
plt.subplots_adjust(bottom=0.2)

%matplot plt

In [None]:
fTicker = 'AAPL'

pltDF = ( tsDF
 .filter(sumDF.eventtype == "TRADE NB")
 .filter(df.ticker == fTicker)
 .select( 'end', 'realized_volatility' )
).toPandas()

pltDF = pltDF.set_index('end')
pltDF.index = pltDF.index.strftime("%Y-%m-%d %H:%m")

# Realized Vol
pltDF[[ 'realized_volatility' ]].plot(figsize=(12,6))

plt.title(f"{fTicker} Realized Vol (tenor: {tenor}, 1 min bars)")
plt.ylabel('Realized Vol')
plt.xlabel('Date/Time')
plt.xticks(rotation=30)
plt.subplots_adjust(bottom=0.2)

%matplot plt

In [None]:
import datetime
print( f"Last Run: {datetime.datetime.now()}" )