{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "![logo](./finspace_logo.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%local\n", "from aws.finspace.cluster import FinSpaceClusterManager\n", "\n", "# if this was already run, no need to run again\n", "if 'finspace_clusters' not in globals():\n", " finspace_clusters = FinSpaceClusterManager()\n", " finspace_clusters.auto_connect()\n", "else:\n", " print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f'Spark Version: {sc.version}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Realized Volatility\n", "This notebook will pull summarized data from FinSpace's catalog and then use the analytic function realized_volatility to compute realized volatility for a group of tickers and exchange event types." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# import needed libraries\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "import datetime as dt\n", "import pyspark.sql.functions as F\n", "import pyspark.sql.types as T\n", "\n", "from aws.finspace.timeseries.spark.analytics import *\n", "from aws.finspace.timeseries.spark.windows import *\n", "\n", "from aws.finspace.timeseries.spark.util import string_to_timestamp_micros" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#####----------------------------------------------------------\n", "##### REPLACE WITH CORRECT IDS!\n", "##### Dataset: \"US Equity Time-Bar Summary - 1 min, 14 Symbols - Sample\"\n", "#####\n", "#####----------------------------------------------------------\n", "dataset_id = '' \n", "view_id = ''" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from aws.finspace.analytics import FinSpaceAnalyticsManager\n", "finspace_manager = FinSpaceAnalyticsManager(spark = spark)\n", "\n", "sumDF = finspace_manager.read_data_view(dataset_id = dataset_id, data_view_id = view_id)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# What is the date range for the data?\n", "sumDF.select(F.min(sumDF.date).alias(\"MIN\"), F.max(sumDF.date).alias(\"MAX\")).show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# What tickers are in this dataset?\n", "sumDF.groupBy(\"ticker\").count().orderBy('ticker').show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Filter and select\n", "sDate = dt.datetime(2020, 1, 15)\n", "eDate = dt.datetime(2020, 2, 15)\n", "\n", "#df = ( sumDF.filter(sumDF.eventtype == \"TRADE NB\").filter( sumDF.date.between(sDate, eDate) ) )\n", "df = ( sumDF.filter( sumDF.date.between(sDate, eDate) ) )\n", "\n", "# sample the data\n", "df.show(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Spark Analytics\n", "All our analytic functions have help, lets look at the signatures for the functions we will use\n", "\n", "![Workflow](./workflow.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "help(realized_volatility)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tenor = 15\n", "numStd = 2\n", "\n", "# analytics to calculate\n", "realVolDef = realized_volatility( tenor, \"end\", \"vwap\" )\n", "bbandsDef = bollinger_bands(tenor, numStd, \"end\", \"vwap\", \"high\", \"low\")\n", "\n", "# group the sets of values\n", "partitionList = [\"ticker\", \"eventtype\"]\n", "\n", "tsDF = df\n", "\n", "tsDF = compute_analytics_on_features(tsDF, \"realized_volatility\", realVolDef, partition_col_list = partitionList)\n", "tsDF = compute_analytics_on_features(tsDF, \"bollinger_band\", bbandsDef, partition_col_list = partitionList)\n", "\n", "# will be working with the once calculated, lets cache it\n", "#tsDF = tsDF.cache()\n", "\n", "tsDF.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Realized Volatility Graph\n", "Calculate and plot realized volatility\n", "\n", "When plotting with Spark, the calculations are performed on the cluster, specifically, the data is collected to the driver, the plot image created, then the image is shipped over to the local notebook to be shown. This is all done for you." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fTicker = 'AMZN'\n", "\n", "# filter and bring data into a pandas dataframe for plotting\n", "pltDF = ( tsDF\n", " .filter(sumDF.eventtype == \"TRADE NB\")\n", " .filter(df.ticker == fTicker)\n", " .select( 'end', 'realized_volatility' )\n", ").toPandas()\n", "\n", "pltDF = pltDF.set_index('end')\n", "pltDF.index = pltDF.index.strftime(\"%Y-%m-%d %H:%m\")\n", "\n", "fig, ax = plt.subplots(1, 1, figsize=(12, 6))\n", "\n", "#ax.get_yaxis().set_major_formatter( matplotlib.ticker.FuncFormatter(lambda x, p: format(int(x), ',')) )\n", "\n", "# Realized Volatility\n", "pltDF[[ 'realized_volatility' ]].plot(figsize=(12,6))\n", "\n", "# labels and other items to make the plot readable\n", "plt.title(f\"{fTicker} Realized Vol (tenor: {tenor}, 1 min bars)\")\n", "plt.ylabel('Realized Vol')\n", "plt.xlabel('Date/Time')\n", "plt.xticks(rotation=30)\n", "plt.subplots_adjust(bottom=0.2)\n", "\n", "%matplot plt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# So why that spike?\n", "\n", "[Amazon soars after huge earnings beat](https://www.cnbc.com/2020/01/30/amazon-amzn-q4-2019-earnings.html) (CNBC). \n", "- Amazon reported fourth-quarter results on Thursday that smashed analysts’ expectations. \n", "- The company’s profits rebounded during the quarter, while revenue climbed 21% year over year. \n", "- The outperforming results show Amazon’s big investments in one-day delivery are paying off. \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Keep Iterating\n", "The data wasn't just calculated for one ticker, Spark did this for every ticker in the DataFrame..." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fTicker = 'GOOG'\n", "\n", "pltDF = ( tsDF\n", " .filter(sumDF.eventtype == \"TRADE NB\")\n", " .filter(df.ticker == fTicker)\n", " .select( 'end', 'realized_volatility' )\n", ").toPandas()\n", "\n", "pltDF = pltDF.set_index('end')\n", "pltDF.index = pltDF.index.strftime(\"%Y-%m-%d %H:%m\")\n", "\n", "# Realized Vol\n", "pltDF[[ 'realized_volatility' ]].plot(figsize=(12,6))\n", "\n", "plt.title(f\"{fTicker} Realized Vol (tenor: {tenor}, 1 min bars)\")\n", "plt.ylabel('Realized Vol')\n", "plt.xlabel('Date/Time')\n", "plt.xticks(rotation=30)\n", "plt.subplots_adjust(bottom=0.2)\n", "\n", "%matplot plt" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fTicker = 'AAPL'\n", "\n", "pltDF = ( tsDF\n", " .filter(sumDF.eventtype == \"TRADE NB\")\n", " .filter(df.ticker == fTicker)\n", " .select( 'end', 'realized_volatility' )\n", ").toPandas()\n", "\n", "pltDF = pltDF.set_index('end')\n", "pltDF.index = pltDF.index.strftime(\"%Y-%m-%d %H:%m\")\n", "\n", "# Realized Vol\n", "pltDF[[ 'realized_volatility' ]].plot(figsize=(12,6))\n", "\n", "plt.title(f\"{fTicker} Realized Vol (tenor: {tenor}, 1 min bars)\")\n", "plt.ylabel('Realized Vol')\n", "plt.xlabel('Date/Time')\n", "plt.xticks(rotation=30)\n", "plt.subplots_adjust(bottom=0.2)\n", "\n", "%matplot plt" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import datetime\n", "print( f\"Last Run: {datetime.datetime.now()}\" )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "FinSpace PySpark (finspace-sparkmagic-84084/latest)", "language": "python", "name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:489461498020:image/finspace-sparkmagic-84084" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }