{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%local\n", "from aws.finspace.cluster import FinSpaceClusterManager\n", "\n", "# if this was already run, no need to run again\n", "if 'finspace_clusters' not in globals():\n", " finspace_clusters = FinSpaceClusterManager()\n", " finspace_clusters.auto_connect()\n", "else:\n", " print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Collect Timebars and Summarize\n", "Time bars are obtained by sampling information at fixed time intervals, e.g., once every minute. \n", "\n", "**Series:** Time Series Data Engineering and Analysis\n", "\n", "As part of the big data timeseries processing workflow FinSpace supports, show how one takes raw, uneven in time events of TAQ data and collect them into a performant derived dataset of collected bars of data.\n", "\n", "\n", "### Timeseries Workflow\n", "Raw Events → **\\[Collect bars → Summarize bars\\]** → Fill Missing → Prepare → Analytics\n", "\n", "![Workflow](workflow.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Getting the Group ID\n", "\n", "Navigate to the Analyst group (gear menu, users and groups, select group named Analyst). The URL is of this pattern: \n", "http://**ENVIRONMENT_ID**.**REGION**.amazonfinspace.com/userGroup/**GROUP_ID** \n", "\n", "Copy the string for Group ID into the **basicPermissionGroupId** variable assignment below" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# REPLACE WITH CORRECT IDS!\n", "# US Equity TAQ Sample - AMZN 6 Months - Sample\n", "source_dataset_id = ''\n", "source_view_id = ''\n", "\n", "# Group: Analyst\n", "basicPermissionGroupId = ''" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# notebook imports\n", "import time\n", "import datetime as dt\n", "import pyspark.sql.functions as F\n", "import pyspark.sql.types as T\n", "import pprint \n", "\n", "# FinSpace imports\n", "from aws.finspace.timeseries.spark.util import string_to_timestamp_micros\n", "from aws.finspace.timeseries.spark.windows import create_time_bars, compute_analytics_on_features, compute_features_on_time_bars\n", "from aws.finspace.timeseries.spark.spec import BarInputSpec, TimeBarSpec\n", "from aws.finspace.timeseries.spark.summarizer import *\n", "\n", "# destination if adding to an existing dataset\n", "dest_dataset_id = None\n", "\n", "start_date = \"2019-10-01\"\n", "end_date = \"2019-12-31\"\n", "\n", "barNum = 9\n", "barUnit = \"minute\"\n", "\n", "# \n", "d = time.strftime('%Y-%m-%d %-I:%M %p %Z') # name is unique to date and time created\n", "\n", "name = f\"TAQ Timebar Summaries - DEMO ({barNum} {barUnit})\"\n", "description = f\"TAQ data summarized into time bars of {barNum} {barUnit} containing STD, VWAP, OHLC and Total Volume. start: {start_date} end: {end_date}\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Dataset Ownership" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# permissions that will be given on the dataset\n", "basicPermissions = [\n", " \"ViewDatasetDetails\" \n", " ,\"ReadDatasetData\" \n", " ,\"AddDatasetData\" \n", " ,\"CreateSnapshot\" \n", " ,\"EditDatasetMetadata\"\n", " ,\"ManageDatasetPermissions\"\n", " ,\"DeleteDataset\"\n", "]\n", "\n", "request_dataset_permissions = [{\"permission\": permissionName} for permissionName in basicPermissions]\n", "\n", "basicOwnerInfo = {\n", " \"phoneNumber\" : \"12125551000\",\n", " \"email\" : \"jdoe@amazon.com\",\n", " \"name\" : \"John Doe\"\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Python Helper Functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# function to generate a series of dates from a given start/stop date\n", "def daterange(startD, endD):\n", " for n in range(int ((endD - startD).days)+1):\n", " yield startD + dt.timedelta(n)\n", "\n", "#\n", "def businessDatesBetween(startD, endD):\n", " weekdays = [6, 7]\n", "\n", " holidays = [ dt.date(2019, 11, 28), \n", " dt.date(2019, 12, 25), \n", " dt.date(2020, 1, 1), \n", " dt.date(2020, 1, 20), \n", " dt.date(2020, 2, 17),\n", " dt.date(2020, 4, 10),\n", " dt.date(2020, 5, 25),\n", " dt.date(2020, 7, 3), \n", " dt.date(2020, 9, 7),\n", " dt.date(2020, 11, 26) ]\n", "\n", " processDates = list()\n", "\n", " for aDate in daterange(startD, endD):\n", " if (aDate.isoweekday() not in weekdays) & (aDate not in holidays): \n", " processDates.append( aDate )\n", " \n", " return( processDates )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Get the Data from FinSpace\n", "Using the given dataset and view ids, get the view as a Spark DataFrame" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from aws.finspace.analytics import FinSpaceAnalyticsManager\n", "finspace_manager = FinSpaceAnalyticsManager(spark)\n", "\n", "tDF = finspace_manager.read_data_view(source_dataset_id, source_view_id)\n", "tDF.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Interact with the DataFrame\n", "As a Spark DataFrame, you can interact with the data using Spark." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tDF.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# FUNCTIONS: Collect and Summarize\n", "The functions below process the time series data by first collecting the data into time-bars then summarizing the data captured in the bar. The bars are collected into a column 'activity' for the window of time in the collectTimeBars function. The summarize bar function's purpose is to summarize the data collected in the bar, that bar can be of any type, not just time.\n", "\n", "Customizations\n", "- vary the width and steps of the time-bar, collect different data from the source DataFrame\n", "- Summarize the bar with other calculations \n", "\n", "Bring Your Own \n", "- Customers can add their own custom Spark user defined functions (UDF) into the summarizer phase\n", "\n", "![Workflow](workflow.png)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#-------------------------------------------------------------------\n", "# Collects event data into Time-Bars\n", "#\n", "# barWidth: number and units and time, e.g. '1 minute'\n", "#-------------------------------------------------------------------\n", "def collectTimeBars( taqDF, barWidth ): \n", "\n", " # define the time-bar, column for time and how much time to collect\n", " timebar_spec = TimeBarSpec(timestamp_column='datetime', window_duration=barWidth, slide_duration=barWidth)\n", " \n", " # what from the source DataFrame to collect in the bar\n", " bar_input_spec = BarInputSpec('activity', 'datetime', 'timestamp', 'price', 'quantity', 'exchange', 'conditions' )\n", "\n", " # The results in a new DataFrame\n", " barDF = ( create_time_bars(data=taqDF, \n", " timebar_column='window', \n", " grouping_col_list=['date', 'ticker', 'eventtype'], \n", " input_spec=bar_input_spec, \n", " timebar_spec=timebar_spec)\n", " .withColumn('activity_count', F.size(F.col('activity'))) )\n", "\n", " return( barDF )\n", "\n", "#-------------------------------------------------------------------\n", "# Summarizes the data that was collected in the bar\n", "#-------------------------------------------------------------------\n", "def summarizeBars( barDF ):\n", "\n", "# Bar data is in a column that is a list of structs named 'activity'\n", "# values collected in 'activity': datetime, teimstamp, price, quantity, exchange, conditions\n", " \n", " sumDF = ( barDF\n", " .withColumn( 'std', std( 'activity.price' ) )\n", " .withColumn( 'vwap', vwap( 'activity.price', 'activity.quantity' ) )\n", " .withColumn( 'ohlc', ohlc_func( 'activity.datetime', 'activity.price' ) ) \n", " .withColumn( 'volume', total_volume( 'activity.quantity' ) )\n", "# .withColumn('MY_RESULT', MY_SPECIAL_FUNCTION( 'activity.datetime', 'activity.price', 'activity.quantity' ) )\n", " .drop( barDF.activity )\n", " )\n", "\n", " return( sumDF )\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create the Spark DataFrame\n", "Create a Spark dataframe that is the result of the data pipline to collect TAQ data into time bars and then summarizes each bar.\n", "\n", "## Outline of Processing\n", "- for each set of dates in the overall range....\n", "- collect data into time bars\n", "- summarize the data for each bar\n", "- save as a changeset to the dataset\n", " - creates a new dataset if one does not exist yet\n", " - uses the FinSpace APIs to simpliffy dataset creation from a Spark DataFrame\n", "- continue until all dates have been processed" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "finalDF_schema = {\n", " 'primaryKeyColumns': [],\n", " 'columns' : [\n", " {'dataType': 'DATE', 'name': 'date', 'description': 'The trade date'},\n", " {'dataType': 'STRING', 'name': 'ticker', 'description': 'Equity Ticker'},\n", " {'dataType': 'STRING', 'name': 'eventtype', 'description': 'Event type'},\n", " {'dataType': 'INTEGER', 'name': 'activity_count', 'description': 'Number of events in period'},\n", " {'dataType': 'DOUBLE', 'name': 'std', 'description': 'Standard deviation of prices in period'},\n", " {'dataType': 'DOUBLE', 'name': 'vwap', 'description': 'Volumn Weighted Average Price in period'},\n", " {'dataType': 'DOUBLE', 'name': 'volume', 'description': 'Total shares during the period'},\n", " {'dataType': 'DATETIME', 'name': 'start', 'description': 'Period Start'},\n", " {'dataType': 'DATETIME', 'name': 'end', 'description': 'Period End'},\n", " {'dataType': 'DOUBLE', 'name': 'open', 'description': 'First/opening price over the period'},\n", " {'dataType': 'DOUBLE', 'name': 'high', 'description': 'High price over the period'},\n", " {'dataType': 'DOUBLE', 'name': 'low', 'description': 'High price over the period'},\n", " {'dataType': 'DOUBLE', 'name': 'close', 'description': 'Last/Closing price over the period'}\n", " ]\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# convert strings to dates\n", "start_dt = dt.datetime.strptime(start_date, '%Y-%m-%d').date()\n", "end_dt = dt.datetime.strptime(end_date, '%Y-%m-%d').date()\n", "\n", "# get the list of business dates between given dates\n", "processDates = businessDatesBetween( start_dt, end_dt )\n", "\n", "# grabs a set items from the list, allows us to iterate with a set of dates at a time\n", "def chunker(seq, size):\n", " return (seq[pos:pos + size] for pos in range(0, len(seq), size))\n", "\n", "chunk_size = 3\n", "\n", "# necessary for time bar API\n", "barWidth = f\"{barNum} {barUnit}\"\n", "\n", "isFirst = True\n", "\n", "for dates in chunker(processDates, chunk_size):\n", " print(f\"Processing {len(dates)}: {dates}\")\n", "\n", " # filter the data for the day\n", " dayDF = tDF.filter(tDF.date.isin(dates))\n", "\n", " # collect the data into time bars of the desired width\n", " dayDF = collectTimeBars( dayDF, barWidth )\n", "\n", " # summarize the bars, drop activity since its no longer needed\n", " dayDF = summarizeBars( dayDF ).drop('activity')\n", "\n", " # add indicators using summaries\n", " #dayDF = addIndicators( dayDF, numSteps = 10, shortStep = 12, longStep = 26)\n", "\n", " ## flatted the complex schema into a simple one, drop columns no longer needed\n", " finalDF = ( dayDF\n", " .withColumn(\"start\", dayDF.window.start)\n", " .withColumn(\"end\", dayDF.window.end)\n", "\n", " .withColumn(\"open\", dayDF.ohlc.open)\n", " .withColumn(\"high\", dayDF.ohlc.high)\n", " .withColumn(\"low\", dayDF.ohlc.low)\n", " .withColumn(\"close\", dayDF.ohlc.close)\n", "\n", " .drop(\"window\")\n", " .drop(\"ohlc\")\n", " )\n", " \n", " # create the changeset\n", " change_type = \"APPEND\"\n", " \n", " # is this the first pass and no dest_dateset_id given, create the dataset\n", " if (isFirst and dest_dataset_id is None): \n", " \n", " print(\"creating dataset\")\n", "\n", " # Create the dataset if it does not exist yet\n", " resp = finspace_manager.finspace_client.create_dataset(\n", " name = name, \n", " description = description, \n", " permissionGroupId = basicPermissionGroupId,\n", " datasetPermissions = request_dataset_permissions,\n", " kind = \"TABULAR\",\n", " ownerInfo = basicOwnerInfo,\n", " schema = finalDF_schema\n", " )\n", "\n", " dest_dataset_id = resp[\"datasetId\"]\n", " # first changeset will be a replace\n", " change_type = \"REPLACE\"\n", "\n", " print( f\"Created dest_dataset_id= {dest_dataset_id}\") \n", "\n", " print(f\"Creating Changeset: {change_type}\")\n", " \n", " resp = finspace_manager.finspace_client.get_user_ingestion_info()\n", " \n", " upload_location = resp['ingestionPath']\n", " finalDF.write.parquet(upload_location)\n", " \n", " resp = finspace_manager.finspace_client.create_changeset(datasetId=dest_dataset_id, changeType=change_type, \n", " sourceType='S3', sourceParams={'s3SourcePath': upload_location}, formatType='PARQUET', formatParams={})\n", " \n", " changeset_id = resp['changeset']['id']\n", " \n", " isFirst = False\n", " \n", " print(f\"changeset_id = {changeset_id}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def wait_for_ingestion(client, dataset_id: str, changeset_id: str, sleep_sec=10):\n", " \"\"\"\n", " function that will continuously poll the changeset creation to ensure it completes or fails before returning\n", " :param dataset_id: GUID of the dataset\n", " :type: str\n", " :param changeset_id: GUID of the changeset\n", " :type: str\n", " :param sleep_sec: seconds to wait between checks\n", " :type: int\n", " \"\"\"\n", " while True:\n", " resp1 = client.describe_changeset(datasetId=dataset_id, id=changeset_id)\n", "\n", " resp2 = resp1.get('changeset', '')\n", " status = resp2.get('status', '')\n", "\n", " if status == 'SUCCESS':\n", " print(f\"Changeset complete\")\n", " break\n", " elif status == 'PENDING' or status == 'RUNNING':\n", " print(f\"Changeset status is still PENDING, waiting {sleep_sec} sec ...\")\n", " time.sleep(sleep_sec)\n", " continue\n", " else:\n", " raise Exception(f\"Bad changeset status: {resp1}{status}, failing now.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wait_for_ingestion(finspace_manager.finspace_client, dataset_id=dest_dataset_id, changeset_id=changeset_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create Views of the Dataset\n", "use the FinSpace APIs to create 2 views of the data, an 'as-of' view for state up to this moment, and an additional auto-updating view if one does not exist for the dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print( f\"dest_dataset_id: {dest_dataset_id}\") " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "resp = finspace_manager.finspace_client.list_data_views(datasetIdEquals = dest_dataset_id, maxResults=100)\n", "resp" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "resp = finspace_manager.finspace_client.list_data_views(datasetIdEquals = dest_dataset_id, maxResults=100)\n", "\n", "existing_views = resp['dataViews']\n", "\n", "autoupdate_view_id = None\n", "\n", "for ss in existing_views:\n", " if ss['autoUpdate'] == True: \n", " autoupdate_view_id = ss.get('dataViewId', None)\n", " \n", "# create a an auto-update snapshot for this dataset if one does not already exist\n", "if autoupdate_view_id is None:\n", " print(\"creating auto-update view\")\n", "\n", " resp = finspace_manager.finspace_client.create_materialized_snapshot(\n", " destinationProperties={},\n", " autoUpdate=True,\n", " sortColumns=[],\n", " partitionColumns=[],\n", " destinationType = \"GLUE_TABLE\",\n", " datasetId=dest_dataset_id)\n", " autoupdate_view_id = resp['id']\n", "else:\n", " print(f\"Exists: autoupdate_view_id = {autoupdate_view_id}\")\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Associate Attribute Set\n", "Associate the 'Sample Data Attribute Set' to the data just created" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def list_attribute_sets(client):\n", " resp = client.list_dataset_types(sort='NAME')\n", " results = resp['datasetTypeSummaries']\n", "\n", " while \"nextToken\" in resp:\n", " resp = client.list_dataset_types(sort='NAME', nextToken=resp['nextToken'])\n", " results.extend(resp['datasetTypeSummaries'])\n", "\n", " return results\n", "\n", "def attribute_set(client, name: str):\n", " \"\"\"\n", " Exact name search for a dataset type of the given name\n", " :param name: name of the dataset type to find\n", " :param name: name of the dataset type to find\n", " :type: str\n", " :return\n", " \"\"\"\n", " all_dataset_types = list_attribute_sets(client)\n", " existing_dataset_type = next((c for c in all_dataset_types if c['name'].lower() == name.lower()), None)\n", "\n", " if existing_dataset_type:\n", " return existing_dataset_type\n", "\n", "def describe_attribute_set(client, attribute_set_id: str):\n", " \"\"\"\n", " Calls describe dataset type API function and only returns the dataset type portion of the response\n", " :param attribute_set_id: the GUID of the dataset type to get description of\n", " :type: str\n", " \"\"\"\n", " resp = None\n", " dataset_type_details_resp = client.describe_dataset_type(datasetTypeId=attribute_set_id)\n", "\n", " if 'datasetType' in dataset_type_details_resp:\n", " resp = dataset_type_details_resp['datasetType']\n", "\n", " return resp\n", "\n", "def associate_attribute_set(client, att_name: str, att_values: list, dataset_id: str):\n", " # get the attribute set by name, will need its id\n", " att_set = attribute_set(client, att_name)\n", "\n", " # get the dataset's information, will need the arn\n", " dataset_details_resp = client.describe_dataset_details(datasetId=dataset_id)\n", "\n", " dataset = dataset_details_resp.get(\"dataset\", None)\n", "\n", " if dataset is None:\n", " raise ValueError(f'No dataset found for id: {dataset_id}')\n", "\n", " # disassociate any existing relationship\n", " try:\n", " client.dissociate_dataset_from_attribute_set(datasetArn=dataset['arn'], attributeSetId=att_set['id'], datasetId=dataset_id)\n", " except:\n", " print(\"Nothing to disassociate\")\n", "\n", " arn = dataset['arn']\n", " attribute_set_id = att_set['id']\n", "\n", " client.associate_dataset_with_attribute_set(datasetArn=arn, attributeSetId=attribute_set_id, datasetId=dataset_id)\n", "\n", " resp = client.update_dataset_attribute_set_context(datasetArn=arn, datasetId=dataset_id, attributeSetId=attribute_set_id, values=att_values)\n", "\n", " if resp['ResponseMetadata']['HTTPStatusCode'] != 200:\n", " return resp\n", "\n", " return" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# this is the attribute set to use, will search for it in system\n", "att_name = \"Capital Market Details\"\n", "\n", "# Attributes to associate, based on the definition of the attribute set\n", "att_values = [\n", " { 'field' : 'AssetClass', 'type' : 'TAXONOMY', 'values' : [ 'Equity', 'CommonStocks', 'ETFs'] },\n", " { 'field' : 'EventType', 'type' : 'TAXONOMY', 'values' : [ ] },\n", " { 'field' : 'Exchange', 'type' : 'TAXONOMY', 'values' : [ ] },\n", " { 'field' : 'FinancialContentType', 'type' : 'TAXONOMY', 'values' : [ ] },\n", " { 'field' : 'RegionsAndCountries', 'type' : 'TAXONOMY', 'values' : [ ] }\n", "]\n", "\n", "\n", "# Associate an attribute set and fill its values\n", "print(f\"Associating values to attribute set: {att_name}\")\n", "\n", "associate_attribute_set(finspace_manager.finspace_client, att_name=att_name, att_values=att_values, dataset_id=dest_dataset_id)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f\"dataset_id = '{dest_dataset_id}'\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import datetime\n", "print( f\"Last Run: {datetime.datetime.now()}\" )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "FinSpace PySpark (finspace-sparkmagic-84084/latest)", "language": "python", "name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:489461498020:image/finspace-sparkmagic-84084" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }