{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Time Series Blog Notebook with Custom Calendar\n", "This notebook executes an end to end time series analysis using the Amazon FinSpace time series framework and included analytics functions. The notebook will process Equity TAQ data (provided to FinSpace by AlgoSeek LLC) and runs that data through the framework to generate Volatility and Bollinger bad data and plots.\n", "\n", "This notebook goes beyond the original blog notebook in that it uses the [pandas_market_calendar](https://pypi.org/project/pandas-market-calendars/) library to create a custom calendar for use in the 'Fill and Filter' stage of the time series data pipeline. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%local\n", "from aws.finspace.cluster import FinSpaceClusterManager\n", "\n", "# if this was already run, no need to run again\n", "if 'finspace_clusters' not in globals():\n", " finspace_clusters = FinSpaceClusterManager()\n", " finspace_clusters.auto_connect()\n", "else:\n", " print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Collect and Summarize Timebars\n", "Time bars are obtained by sampling information at fixed time intervals, e.g., once every minute. \n", "\n", "**Series:** Time Series Data Engineering and Analysis\n", "\n", "As part of the big data timeseries processing workflow FinSpace supports, shows how one takes raw, uneven in time events of TAQ data and collects them into a performant derived dataset of collected bars of data.\n", "\n", "## Timeseries Workflow\n", "Raw Events → **\\[Collect bars → Summarize bars → Fill Missing → Prepare → Analytics\\]**\n", "\n", "![Workflow](workflow.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#####----------------------------------------------------------\n", "##### REPLACE WITH CORRECT IDS!\n", "##### Dataset: \"US Equity TAQ - AMZN 6 Months\"\n", "#####\n", "#####----------------------------------------------------------\n", "dataset_id = ''\n", "view_id = ''" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# python imports\n", "import time\n", "import datetime\n", "import pprint \n", "\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "import pyspark.sql.functions as F\n", "import pyspark.sql.types as T\n", "\n", "# FinSpace imports\n", "from aws.finspace.timeseries.spark.util import string_to_timestamp_micros\n", "from aws.finspace.timeseries.spark.windows import create_time_bars, compute_analytics_on_features, compute_features_on_time_bars\n", "from aws.finspace.timeseries.spark.spec import BarInputSpec, TimeBarSpec\n", "from aws.finspace.timeseries.spark.summarizer import *\n", "from aws.finspace.timeseries.spark.analytics import *\n", "from aws.finspace.timeseries.finance.calendars import *\n", "from aws.finspace.timeseries.spark.prepare import *\n", "\n", "# Date range\n", "start_date = datetime.datetime(2019, 10, 1)\n", "end_date = datetime.datetime(2019, 12, 31)\n", "\n", "barNum = 1\n", "barUnit = \"minute\"\n", "\n", "barWidth = f\"{barNum} {barUnit}\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Get the Data from FinSpace\n", "Using the given dataset and view ids, get the view as a Spark DataFrame" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from aws.finspace.analytics import FinSpaceAnalyticsManager\n", "finspace_manager = FinSpaceAnalyticsManager(spark)\n", "\n", "tDF = finspace_manager.read_data_view(dataset_id, view_id)\n", "tDF.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Interact with Spark DataFrame\n", "As a Spark DataFrame, you can interact with the data with spark functions." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tDF.printSchema()\n", "tDF.show(5)\n", "print(f'Rows: {tDF.count():,}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Time Series Framework Stages\n", "The functions below process the time series data by first collecting the data into time-bars then summarizing the data captured in the bar. The bars are collected into a column 'activity' for the window of time in the collectTimeBars function. The summarize bar function's purpose is to summarize the data collected in the bar, that bar can be of any type, not just time.\n", "\n", "Customizations\n", "- vary the width and steps of the time-bar, collect different data from the source DataFrame\n", "- Summarize the bar with other calculations \n", "\n", "Bring Your Own \n", "- Customers can add their own custom Spark user defined functions (UDF) into the summarizer phase\n", "\n", "![Workflow](workflow.png)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Stage: Collect Bars\n", "\n", "Collect raw TAQ events into time bars using FinSpace time series functions." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# define the time-bar, column for time and how much time to collect\n", "timebar_spec = TimeBarSpec(timestamp_column='datetime', window_duration=barWidth, slide_duration=barWidth)\n", "\n", "# what columns to collect in the bar\n", "bar_input_spec = BarInputSpec('activity', 'datetime', 'timestamp', 'price', 'quantity', 'exchange', 'conditions' )\n", "\n", "# timebar column name\n", "timebar_col = 'window'\n", "\n", "# The results in a new DataFrame, also add column for number of activity items collected in the bar\n", "collDF = create_time_bars(data = tDF, \n", " timebar_column = timebar_col, \n", " grouping_col_list = ['date', 'ticker', 'eventtype'], \n", " input_spec = bar_input_spec, \n", " timebar_spec = timebar_spec)\\\n", " .withColumn( 'activity_count', F.size( F.col('activity') ) )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# schema at end of this stage\n", "collDF.printSchema()\n", "\n", "# sample 5 rows, truncate results (activity can get big)\n", "collDF.filter( collDF.date == start_date ).show(5, True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Stage: Summarize Bars\n", "\n", "Summarize the bars and once summarized drop activity since it will no longer be needed.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Bar data is in a column that is a list of structs named 'activity'\n", "# values collected in 'activity': datetime, teimstamp, price, quantity, exchange, conditions\n", "\n", "# Spark Way\n", "sumDF = ( collDF\n", " .withColumn( 'std', std( 'activity.price' ) )\n", " .withColumn( 'vwap', vwap( 'activity.price', 'activity.quantity' ) )\n", " .withColumn( 'ohlc', ohlc_func( 'activity.datetime', 'activity.price' ) ) \n", " .withColumn( 'volume', total_volume( 'activity.quantity' ) )\n", "# .withColumn('MY_RESULT', MY_SPECIAL_FUNCTION( 'activity.datetime', 'activity.price', 'activity.quantity' ) )\n", " .drop( collDF.activity )\n", ")\n", "\n", "# Library Way\n", "sumDF = compute_features_on_time_bars(collDF, \"std\", std( 'activity.price' ), True, \"window\")\n", "sumDF = compute_features_on_time_bars(sumDF, \"vwap\", vwap( 'activity.price', 'activity.quantity' ), True, \"window\")\n", "sumDF = compute_features_on_time_bars(sumDF, \"ohlc\", ohlc_func( 'activity.datetime', 'activity.price' ), True, \"window\")\n", "sumDF = compute_features_on_time_bars(sumDF, \"volume\", total_volume( 'activity.quantity' ), True, \"window\")\n", "\n", "sumDF = sumDF.drop(sumDF.activity)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# schema at end of this stage\n", "sumDF.printSchema()\n", "\n", "# sample 5 rows, don't truncate so we can see full values\n", "sumDF.show(5, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Stage: Fill and Filter\n", "\n", "## Custom Calendar\n", "Define a calendar that uses the pandas_market_calendars package.\n", "\n", "See [PyPi](https://pypi.org/project/pandas-market-calendars), [github](https://github.com/rsheftel/pandas_market_calendars), and the [documentation](http://pandas_market_calendars.readthedocs.io/en/latest/) for more information and [here](https://pandas-market-calendars.readthedocs.io/en/latest/calendars.html) for all the calendars covered." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# install the library on your cluster if not already installed\n", "try:\n", " sc.install_pypi_package('pandas_market_calendars==1.2')\n", "except Exception as e:\n", " print('Packages already Installed')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas_market_calendars as mcal\n", "import datetime\n", "\n", "class MarketCalendar(AbstractCalendar):\n", " def __init__(self, calendar_name: str = 'NYSE'):\n", " super(AbstractCalendar).__init__()\n", " self.calendar = mcal.get_calendar(calendar_name)\n", "\n", " def raw_calendar_data(self) -> typing.Dict[str, typing.Any]:\n", " return {AbstractCalendar.TZINFO: self.calendar.tz.info}\n", "\n", " def create_schedule_from_to(self, from_date: datetime.date, to_date: datetime.date, time_bar_spec_window_duration: str,\n", " from_time: typing.Optional[datetime.time] = None,\n", " to_time: typing.Optional[datetime.time] = None,\n", " ) -> numpy.array:\n", " \"\"\"\n", "\n", " A list of datetimes are created from the given start and end date with a frequency as given by time_bar_spec_window_duration. \n", " The from_time and to_time values are not used with this class but are required to maintain the interface with the abstract class.\n", "\n", " :param from_date: from date\n", " :param to_date: to date\n", " :param time_bar_spec_window_duration:\n", " :param from_time NOT USED\n", " :param to_time NOT USED\n", "\n", " :return: List of datetime\n", " \"\"\"\n", "\n", " tz = self.calendar.tz.zone\n", " \n", " if isinstance(from_date, datetime.date) or isinstance(to_date, datetime.date):\n", " from_date = datetime.datetime(from_date.year, from_date.month, from_date.day, 0)\n", " to_date = datetime.datetime(to_date.year, to_date.month, to_date.day, 23, 59, 59, 999999)\n", " if not from_date.tzinfo and not to_date.tzinfo:\n", " from_date = pytz.timezone(tz).localize(from_date)\n", " to_date = pytz.timezone(tz).localize(to_date)\n", " elif from_date.tzinfo != to_date.tzinfo:\n", " raise RuntimeError(\"invalid input for timezones in create schedule\")\n", " \n", " data = self.calendar.schedule(start_date=from_date, end_date=to_date)\n", "\n", " #aa = pytz.timezone(nyse.tz.zone).localize( mcal.date_range(all_days, frequency=timebar_spec.windowDuration, closed=None) )\n", " valid_dates = mcal.date_range(data, frequency=time_bar_spec_window_duration, closed=None).to_pydatetime()\n", "\n", " return valid_dates\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# fill and filter, use the timebar defined in collect stage\n", "ffDF = time_bar_fill_and_filter(sumDF, timebar_col, MarketCalendar('NYSE'), timebar_spec, start_date, end_date)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Stage: Prepare Feature Dataset\n", "Simplify schema by selecting needed items and drop what is not needed." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "prepDF = ( ffDF\n", " .filter( ffDF.date.between(start_date, end_date) )\n", "\n", " # flatten window\n", " .withColumn(\"start\", ffDF.window.start)\n", " .withColumn(\"end\", ffDF.window.end)\n", " .drop(\"window\")\n", "\n", " # flatten ohlc\n", " .withColumn(\"open\", ffDF.ohlc.open)\n", " .withColumn(\"high\", ffDF.ohlc.high)\n", " .withColumn(\"low\", ffDF.ohlc.low)\n", " .withColumn(\"close\", ffDF.ohlc.close)\n", " .drop(\"ohlc\")\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "prepDF.printSchema()\n", "\n", "# sample the data\n", "prepDF.show(10, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Stage: Analytics\n", "\n", "Now apply analytics to the data, in our case calculate realized volatility and bollinger bands" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# See help for the function\n", "help(realized_volatility)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# See help for the function\n", "help(bollinger_bands)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Arguments to the functions\n", "tenor = 15\n", "numStd = 2\n", "\n", "# analytics to calculate\n", "realVolDef = realized_volatility( tenor, \"end\", \"vwap\" )\n", "bbandsDef = bollinger_bands(tenor, numStd, \"end\", \"vwap\", \"high\", \"low\")\n", "\n", "# group the dataset's values by....\n", "partitionList = [\"ticker\", \"eventtype\"]\n", "\n", "# Prepare the dataframe\n", "tsDF = prepDF\n", "\n", "tsDF = compute_analytics_on_features(tsDF, \"realized_volatility\", realVolDef, partition_col_list = partitionList)\n", "tsDF = compute_analytics_on_features(tsDF, \"bollinger_band\", bbandsDef, partition_col_list = partitionList)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tsDF.printSchema()\n", "\n", "# sample first fiew rows, but lets be sure to filter the null values as well\n", "# Times is UTC, realized_volatility not null after the given tenor\n", "tsDF.drop( \"date\", \"activity_count\" ).filter( tsDF.realized_volatility.isNotNull() ).sort(tsDF.end).show( 10, False )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Plots \n", "## Realized Volatility Graph\n", "Calculate and plot realized volatility\n", "\n", "When plotting with Spark, the calculations are performed on the cluster, specifically, the data is collected to the driver, the plot image created, then the image is shipped over to the local notebook to be shown. This is all done for you." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# ticker and event to filter for\n", "fTicker = 'AMZN'\n", "event_type = 'TRADE NB'\n", "\n", "# filter and bring data into a pandas dataframe for plotting\n", "pltDF = ( tsDF\n", " .filter(tsDF.eventtype == event_type)\n", " .filter(tsDF.ticker == fTicker)\n", " .select( 'end', 'realized_volatility' )\n", ").toPandas()\n", "\n", "pltDF = pltDF.set_index('end')\n", "pltDF.index = pltDF.index.strftime(\"%Y-%m-%d %H:%m\")\n", "\n", "fig, ax = plt.subplots(1, 1, figsize=(12, 6))\n", "\n", "#ax.get_yaxis().set_major_formatter( matplotlib.ticker.FuncFormatter(lambda x, p: format(int(x), ',')) )\n", "\n", "# Realized Volatility\n", "pltDF[[ 'realized_volatility' ]].plot(figsize=(12,6))\n", "\n", "# labels and other items to make the plot readable\n", "plt.title(f\"{fTicker} Realized Vol (tenor: {tenor}, 5 min bars)\")\n", "plt.ylabel('Realized Vol')\n", "plt.xlabel('Date/Time')\n", "plt.xticks(rotation=30)\n", "plt.subplots_adjust(bottom=0.2)\n", "\n", "%matplot plt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Bollinger Bands\n", "Bollinger Bands where calculated as well...." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# filter the bollinger band data\n", "pltDF = ( tsDF\n", " .filter(tsDF.eventtype == \"TRADE NB\")\n", " .withColumn('upper_band', tsDF.bollinger_band.upper_band)\n", " .withColumn('middle_band', tsDF.bollinger_band.middle_band)\n", " .withColumn('lower_band', tsDF.bollinger_band.lower_band)\n", " .filter(tsDF.ticker == fTicker)\n", " .select( 'end', 'close', 'upper_band', 'middle_band', 'lower_band' )\n", ").toPandas()\n", "\n", "pltDF = pltDF.set_index('end')\n", "pltDF.index = pltDF.index.strftime(\"%Y-%m-%d %H:%m\")\n", "\n", "# Simple Bollinger Band\n", "pltDF[['close', 'middle_band', 'upper_band', 'lower_band']].plot(figsize=(12,6))\n", "\n", "plt.title(f\"{fTicker} Bollinger Bands (tenor: {tenor}, 5 min bars, n-std: {numStd})\")\n", "plt.ylabel('Price (USD)')\n", "plt.xlabel('Date/Time')\n", "plt.xticks(rotation=30)\n", "plt.subplots_adjust(bottom=0.2)\n", "\n", "%matplot plt" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# What is the date range for the data?\n", "tsDF.select( F.min(tsDF.start).alias(\"MIN\"), F.max(tsDF.end).alias(\"MAX\")).show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# What tickers are in this dataset?\n", "tsDF.groupBy(\"ticker\").count().orderBy('ticker').show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print( f\"Last Run: {datetime.datetime.now()}\" )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "FinSpace PySpark (finspace-sparkmagic-84084/latest)", "language": "python", "name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:489461498020:image/finspace-sparkmagic-84084" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }