In [None]:
%local
from aws.finspace.cluster import FinSpaceClusterManager

# if this was already run, no need to run again
if 'finspace_clusters' not in globals():
 finspace_clusters = FinSpaceClusterManager()
 finspace_clusters.auto_connect()
else:
 print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')

# Collect Timebars and Summarize
Time bars are obtained by sampling information at fixed time intervals, e.g., once every minute. 

**Series:** Time Series Data Engineering and Analysis

As part of the big data timeseries processing workflow FinSpace supports, show how one takes raw, uneven in time events of TAQ data and collect them into a performant derived dataset of collected bars of data.


### Timeseries Workflow
Raw Events → **\[Collect bars → Summarize bars\]** → Fill Missing → Prepare → Analytics

![Workflow](workflow.png)

## Getting the Group ID

Navigate to the Analyst group (gear menu, users and groups, select group named Analyst). The URL is of this pattern: 
http://**ENVIRONMENT_ID**.**REGION**.amazonfinspace.com/userGroup/**GROUP_ID** 

Copy the string for Group ID into the **basicPermissionGroupId** variable assignment below

In [None]:
# REPLACE WITH CORRECT IDS!
# US Equity TAQ Sample - AMZN 6 Months - Sample
source_dataset_id = ''
source_view_id = ''

# Group: Analyst
basicPermissionGroupId = ''

In [None]:
# notebook imports
import time
import datetime as dt
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pprint 

# FinSpace imports
from aws.finspace.timeseries.spark.util import string_to_timestamp_micros
from aws.finspace.timeseries.spark.windows import create_time_bars, compute_analytics_on_features, compute_features_on_time_bars
from aws.finspace.timeseries.spark.spec import BarInputSpec, TimeBarSpec
from aws.finspace.timeseries.spark.summarizer import *

# destination if adding to an existing dataset
dest_dataset_id = None

start_date = "2019-10-01"
end_date = "2019-12-31"

barNum = 9
barUnit = "minute"

# 
d = time.strftime('%Y-%m-%d %-I:%M %p %Z') # name is unique to date and time created

name = f"TAQ Timebar Summaries - DEMO ({barNum} {barUnit})"
description = f"TAQ data summarized into time bars of {barNum} {barUnit} containing STD, VWAP, OHLC and Total Volume. start: {start_date} end: {end_date}"

# Dataset Ownership

In [None]:
# permissions that will be given on the dataset
basicPermissions = [
 "ViewDatasetDetails" 
 ,"ReadDatasetData" 
 ,"AddDatasetData" 
 ,"CreateSnapshot" 
 ,"EditDatasetMetadata"
 ,"ManageDatasetPermissions"
 ,"DeleteDataset"
]

request_dataset_permissions = [{"permission": permissionName} for permissionName in basicPermissions]

basicOwnerInfo = {
 "phoneNumber" : "12125551000",
 "email" : "jdoe@amazon.com",
 "name" : "John Doe"
}

## Python Helper Functions

In [None]:
# function to generate a series of dates from a given start/stop date
def daterange(startD, endD):
 for n in range(int ((endD - startD).days)+1):
 yield startD + dt.timedelta(n)

#
def businessDatesBetween(startD, endD):
 weekdays = [6, 7]

 holidays = [ dt.date(2019, 11, 28), 
 dt.date(2019, 12, 25), 
 dt.date(2020, 1, 1), 
 dt.date(2020, 1, 20), 
 dt.date(2020, 2, 17),
 dt.date(2020, 4, 10),
 dt.date(2020, 5, 25),
 dt.date(2020, 7, 3), 
 dt.date(2020, 9, 7),
 dt.date(2020, 11, 26) ]

 processDates = list()

 for aDate in daterange(startD, endD):
 if (aDate.isoweekday() not in weekdays) & (aDate not in holidays): 
 processDates.append( aDate )
 
 return( processDates )

# Get the Data from FinSpace
Using the given dataset and view ids, get the view as a Spark DataFrame

In [None]:
from aws.finspace.analytics import FinSpaceAnalyticsManager
finspace_manager = FinSpaceAnalyticsManager(spark)

tDF = finspace_manager.read_data_view(source_dataset_id, source_view_id)
tDF.printSchema()

## Interact with the DataFrame
As a Spark DataFrame, you can interact with the data using Spark.

In [None]:
tDF.show(5)

# FUNCTIONS: Collect and Summarize
The functions below process the time series data by first collecting the data into time-bars then summarizing the data captured in the bar. The bars are collected into a column 'activity' for the window of time in the collectTimeBars function. The summarize bar function's purpose is to summarize the data collected in the bar, that bar can be of any type, not just time.

Customizations
- vary the width and steps of the time-bar, collect different data from the source DataFrame
- Summarize the bar with other calculations 

Bring Your Own 
- Customers can add their own custom Spark user defined functions (UDF) into the summarizer phase

![Workflow](workflow.png)


In [None]:
#-------------------------------------------------------------------
# Collects event data into Time-Bars
#
# barWidth: number and units and time, e.g. '1 minute'
#-------------------------------------------------------------------
def collectTimeBars( taqDF, barWidth ): 

 # define the time-bar, column for time and how much time to collect
 timebar_spec = TimeBarSpec(timestamp_column='datetime', window_duration=barWidth, slide_duration=barWidth)
 
 # what from the source DataFrame to collect in the bar
 bar_input_spec = BarInputSpec('activity', 'datetime', 'timestamp', 'price', 'quantity', 'exchange', 'conditions' )

 # The results in a new DataFrame
 barDF = ( create_time_bars(data=taqDF, 
 timebar_column='window', 
 grouping_col_list=['date', 'ticker', 'eventtype'], 
 input_spec=bar_input_spec, 
 timebar_spec=timebar_spec)
 .withColumn('activity_count', F.size(F.col('activity'))) )

 return( barDF )

#-------------------------------------------------------------------
# Summarizes the data that was collected in the bar
#-------------------------------------------------------------------
def summarizeBars( barDF ):

# Bar data is in a column that is a list of structs named 'activity'
# values collected in 'activity': datetime, teimstamp, price, quantity, exchange, conditions
 
 sumDF = ( barDF
 .withColumn( 'std', std( 'activity.price' ) )
 .withColumn( 'vwap', vwap( 'activity.price', 'activity.quantity' ) )
 .withColumn( 'ohlc', ohlc_func( 'activity.datetime', 'activity.price' ) ) 
 .withColumn( 'volume', total_volume( 'activity.quantity' ) )
# .withColumn('MY_RESULT', MY_SPECIAL_FUNCTION( 'activity.datetime', 'activity.price', 'activity.quantity' ) )
 .drop( barDF.activity )
 )

 return( sumDF )


# Create the Spark DataFrame
Create a Spark dataframe that is the result of the data pipline to collect TAQ data into time bars and then summarizes each bar.

## Outline of Processing
- for each set of dates in the overall range....
- collect data into time bars
- summarize the data for each bar
- save as a changeset to the dataset
 - creates a new dataset if one does not exist yet
 - uses the FinSpace APIs to simpliffy dataset creation from a Spark DataFrame
- continue until all dates have been processed

In [None]:
finalDF_schema = {
 'primaryKeyColumns': [],
 'columns' : [
 {'dataType': 'DATE', 'name': 'date', 'description': 'The trade date'},
 {'dataType': 'STRING', 'name': 'ticker', 'description': 'Equity Ticker'},
 {'dataType': 'STRING', 'name': 'eventtype', 'description': 'Event type'},
 {'dataType': 'INTEGER', 'name': 'activity_count', 'description': 'Number of events in period'},
 {'dataType': 'DOUBLE', 'name': 'std', 'description': 'Standard deviation of prices in period'},
 {'dataType': 'DOUBLE', 'name': 'vwap', 'description': 'Volumn Weighted Average Price in period'},
 {'dataType': 'DOUBLE', 'name': 'volume', 'description': 'Total shares during the period'},
 {'dataType': 'DATETIME', 'name': 'start', 'description': 'Period Start'},
 {'dataType': 'DATETIME', 'name': 'end', 'description': 'Period End'},
 {'dataType': 'DOUBLE', 'name': 'open', 'description': 'First/opening price over the period'},
 {'dataType': 'DOUBLE', 'name': 'high', 'description': 'High price over the period'},
 {'dataType': 'DOUBLE', 'name': 'low', 'description': 'High price over the period'},
 {'dataType': 'DOUBLE', 'name': 'close', 'description': 'Last/Closing price over the period'}
 ]
}

In [None]:
# convert strings to dates
start_dt = dt.datetime.strptime(start_date, '%Y-%m-%d').date()
end_dt = dt.datetime.strptime(end_date, '%Y-%m-%d').date()

# get the list of business dates between given dates
processDates = businessDatesBetween( start_dt, end_dt )

# grabs a set items from the list, allows us to iterate with a set of dates at a time
def chunker(seq, size):
 return (seq[pos:pos + size] for pos in range(0, len(seq), size))

chunk_size = 3

# necessary for time bar API
barWidth = f"{barNum} {barUnit}"

isFirst = True

for dates in chunker(processDates, chunk_size):
 print(f"Processing {len(dates)}: {dates}")

 # filter the data for the day
 dayDF = tDF.filter(tDF.date.isin(dates))

 # collect the data into time bars of the desired width
 dayDF = collectTimeBars( dayDF, barWidth )

 # summarize the bars, drop activity since its no longer needed
 dayDF = summarizeBars( dayDF ).drop('activity')

 # add indicators using summaries
 #dayDF = addIndicators( dayDF, numSteps = 10, shortStep = 12, longStep = 26)

 ## flatted the complex schema into a simple one, drop columns no longer needed
 finalDF = ( dayDF
 .withColumn("start", dayDF.window.start)
 .withColumn("end", dayDF.window.end)

 .withColumn("open", dayDF.ohlc.open)
 .withColumn("high", dayDF.ohlc.high)
 .withColumn("low", dayDF.ohlc.low)
 .withColumn("close", dayDF.ohlc.close)

 .drop("window")
 .drop("ohlc")
 )
 
 # create the changeset
 change_type = "APPEND"
 
 # is this the first pass and no dest_dateset_id given, create the dataset
 if (isFirst and dest_dataset_id is None): 
 
 print("creating dataset")

 # Create the dataset if it does not exist yet
 resp = finspace_manager.finspace_client.create_dataset(
 name = name, 
 description = description, 
 permissionGroupId = basicPermissionGroupId,
 datasetPermissions = request_dataset_permissions,
 kind = "TABULAR",
 ownerInfo = basicOwnerInfo,
 schema = finalDF_schema
 )

 dest_dataset_id = resp["datasetId"]
 # first changeset will be a replace
 change_type = "REPLACE"

 print( f"Created dest_dataset_id= {dest_dataset_id}") 

 print(f"Creating Changeset: {change_type}")
 
 resp = finspace_manager.finspace_client.get_user_ingestion_info()
 
 upload_location = resp['ingestionPath']
 finalDF.write.parquet(upload_location)
 
 resp = finspace_manager.finspace_client.create_changeset(datasetId=dest_dataset_id, changeType=change_type, 
 sourceType='S3', sourceParams={'s3SourcePath': upload_location}, formatType='PARQUET', formatParams={})
 
 changeset_id = resp['changeset']['id']
 
 isFirst = False
 
 print(f"changeset_id = {changeset_id}")

In [None]:
def wait_for_ingestion(client, dataset_id: str, changeset_id: str, sleep_sec=10):
 """
 function that will continuously poll the changeset creation to ensure it completes or fails before returning
 :param dataset_id: GUID of the dataset
 :type: str
 :param changeset_id: GUID of the changeset
 :type: str
 :param sleep_sec: seconds to wait between checks
 :type: int
 """
 while True:
 resp1 = client.describe_changeset(datasetId=dataset_id, id=changeset_id)

 resp2 = resp1.get('changeset', '')
 status = resp2.get('status', '')

 if status == 'SUCCESS':
 print(f"Changeset complete")
 break
 elif status == 'PENDING' or status == 'RUNNING':
 print(f"Changeset status is still PENDING, waiting {sleep_sec} sec ...")
 time.sleep(sleep_sec)
 continue
 else:
 raise Exception(f"Bad changeset status: {resp1}{status}, failing now.")

In [None]:
wait_for_ingestion(finspace_manager.finspace_client, dataset_id=dest_dataset_id, changeset_id=changeset_id)

# Create Views of the Dataset
use the FinSpace APIs to create 2 views of the data, an 'as-of' view for state up to this moment, and an additional auto-updating view if one does not exist for the dataset.

In [None]:
print( f"dest_dataset_id: {dest_dataset_id}") 

In [None]:
resp = finspace_manager.finspace_client.list_data_views(datasetIdEquals = dest_dataset_id, maxResults=100)
resp

In [None]:
resp = finspace_manager.finspace_client.list_data_views(datasetIdEquals = dest_dataset_id, maxResults=100)

existing_views = resp['dataViews']

autoupdate_view_id = None

for ss in existing_views:
 if ss['autoUpdate'] == True: 
 autoupdate_view_id = ss.get('dataViewId', None)
 
# create a an auto-update snapshot for this dataset if one does not already exist
if autoupdate_view_id is None:
 print("creating auto-update view")

 resp = finspace_manager.finspace_client.create_materialized_snapshot(
 destinationProperties={},
 autoUpdate=True,
 sortColumns=[],
 partitionColumns=[],
 destinationType = "GLUE_TABLE",
 datasetId=dest_dataset_id)
 autoupdate_view_id = resp['id']
else:
 print(f"Exists: autoupdate_view_id = {autoupdate_view_id}")
 

## Associate Attribute Set
Associate the 'Sample Data Attribute Set' to the data just created

In [None]:
def list_attribute_sets(client):
 resp = client.list_dataset_types(sort='NAME')
 results = resp['datasetTypeSummaries']

 while "nextToken" in resp:
 resp = client.list_dataset_types(sort='NAME', nextToken=resp['nextToken'])
 results.extend(resp['datasetTypeSummaries'])

 return results

def attribute_set(client, name: str):
 """
 Exact name search for a dataset type of the given name
 :param name: name of the dataset type to find
 :param name: name of the dataset type to find
 :type: str
 :return
 """
 all_dataset_types = list_attribute_sets(client)
 existing_dataset_type = next((c for c in all_dataset_types if c['name'].lower() == name.lower()), None)

 if existing_dataset_type:
 return existing_dataset_type

def describe_attribute_set(client, attribute_set_id: str):
 """
 Calls describe dataset type API function and only returns the dataset type portion of the response
 :param attribute_set_id: the GUID of the dataset type to get description of
 :type: str
 """
 resp = None
 dataset_type_details_resp = client.describe_dataset_type(datasetTypeId=attribute_set_id)

 if 'datasetType' in dataset_type_details_resp:
 resp = dataset_type_details_resp['datasetType']

 return resp

def associate_attribute_set(client, att_name: str, att_values: list, dataset_id: str):
 # get the attribute set by name, will need its id
 att_set = attribute_set(client, att_name)

 # get the dataset's information, will need the arn
 dataset_details_resp = client.describe_dataset_details(datasetId=dataset_id)

 dataset = dataset_details_resp.get("dataset", None)

 if dataset is None:
 raise ValueError(f'No dataset found for id: {dataset_id}')

 # disassociate any existing relationship
 try:
 client.dissociate_dataset_from_attribute_set(datasetArn=dataset['arn'], attributeSetId=att_set['id'], datasetId=dataset_id)
 except:
 print("Nothing to disassociate")

 arn = dataset['arn']
 attribute_set_id = att_set['id']

 client.associate_dataset_with_attribute_set(datasetArn=arn, attributeSetId=attribute_set_id, datasetId=dataset_id)

 resp = client.update_dataset_attribute_set_context(datasetArn=arn, datasetId=dataset_id, attributeSetId=attribute_set_id, values=att_values)

 if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
 return resp

 return

In [None]:
# this is the attribute set to use, will search for it in system
att_name = "Capital Market Details"

# Attributes to associate, based on the definition of the attribute set
att_values = [
 { 'field' : 'AssetClass', 'type' : 'TAXONOMY', 'values' : [ 'Equity', 'CommonStocks', 'ETFs'] },
 { 'field' : 'EventType', 'type' : 'TAXONOMY', 'values' : [ ] },
 { 'field' : 'Exchange', 'type' : 'TAXONOMY', 'values' : [ ] },
 { 'field' : 'FinancialContentType', 'type' : 'TAXONOMY', 'values' : [ ] },
 { 'field' : 'RegionsAndCountries', 'type' : 'TAXONOMY', 'values' : [ ] }
]


# Associate an attribute set and fill its values
print(f"Associating values to attribute set: {att_name}")

associate_attribute_set(finspace_manager.finspace_client, att_name=att_name, att_values=att_values, dataset_id=dest_dataset_id)

In [None]:
print(f"dataset_id = '{dest_dataset_id}'")

In [None]:
import datetime
print( f"Last Run: {datetime.datetime.now()}" )