# Polygon.io
Notebook to use polygon.io with Spark and FinSpace

## Reference
[Polygon.io](https://polygon.io)
[Python Client](https://github.com/polygon-io/client-python)

In [None]:
%local
from aws.finspace.cluster import FinSpaceClusterManager

# if this was already run, no need to run again
if 'finspace_clusters' not in globals():
 finspace_clusters = FinSpaceClusterManager()
 finspace_clusters.auto_connect()
else:
 print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')

In [None]:
try:
 sc.install_pypi_package('polygon-api-client==0.2.11')
except Exception as e:
 print('Packages already Installed')

# Variables and Libraries

You will need a Polygon client_key, fill its value below.

**IMPORTANT** Use a group ID from your FinSpace to grant permissions to for the dataset.

In [None]:
import time
import pandas as pd
import urllib.parse as urlparse

from aws.finspace.analytics import FinSpaceAnalyticsManager

from polygon import RESTClient
from urllib.parse import parse_qs

#
# User Group to grant access to the dataset
# 
# FILL IN VALUES BELOW FOR YOUR GROUP ID AND CLIENT KEY
# ----------------------------------------------------------------------
group_id = '' 
dataset_id = None

client_key = ''

client = RESTClient(client_key)

finspace_manager = FinSpaceAnalyticsManager(spark)

# Get Tickers

Using the Polygon APIs, create a table of all Tickers.

In [None]:
# function to extract the pagination cursor
def get_cursor(url):
 parsed = urlparse.urlparse(url)
 cursor = parse_qs(parsed.query)['cursor']
 return cursor

resp = client.reference_tickers_v3(limit=1000)

all_tickers = []

if resp.status == 'OK':
 all_tickers.extend(resp.results)

while hasattr(resp, 'next_url'):
 print("Next, but first sleeping...")
 time.sleep((60/5) + 1) # 5 calls per minute
 cursor = get_cursor(resp.next_url)
 resp = client.reference_tickers_v3(limit=1000, cursor=cursor)

 all_tickers.extend(resp.results)

# create pandas dataframe from the responses 
tickers_df = pd.DataFrame.from_records(all_tickers)
tickers_df

# Convert to Spark DataFrame

In [None]:
from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
 if f == 'datetime64[ns]': return TimestampType()
 elif f == 'int64': return LongType()
 elif f == 'int32': return IntegerType()
 elif f == 'float64': return FloatType()
 elif f == 'bool': return BooleanType()
 else: return StringType()

def define_structure(string, format_type):
 try: typo = equivalent_type(format_type)
 except: typo = StringType()
 return StructField(string, typo)

def get_schema(pandas_df):
 columns = list(pandas_df.columns)
 types = list(pandas_df.dtypes)
 struct_list = []
 for column, typo in zip(columns, types): 
 struct_list.append(define_structure(column, typo))
 
 return StructType(struct_list)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
 p_schema = get_schema(pandas_df)
 
 return sqlContext.createDataFrame(pandas_df, p_schema)


In [None]:
tickersDF = pandas_to_spark(tickers_df)

tickersDF.printSchema()

In [None]:
from pyspark.sql.functions import *

# convert the datatime column of string to proper timestamp type
tickersDF = ( tickersDF
 .withColumnRenamed('last_updated_utc', 'input_timestamp_str')
 .withColumn("last_updated_utc",to_timestamp("input_timestamp_str"))
 .drop('input_timestamp_str')
)

In [None]:
# sample the table
tickersDF.show(5)

# Create FinSpace Dataset

Using the FinSpace APIs will define the Dataset, add the Changeset, create auto-updating view, and associate and populate attributes to the dataset. 

## Definitions

Here are the various data elements we need for creating the dataset.

In [None]:
# Name for the dataset
name = "Ticker Universe"

# description for the dataset
description = "All ticker symbols which are supported by Polygon.io"

# this is the attribute set to use, will search for it in system, this name assumes the Capital Markets Sample Data Bundle was installed
att_name = "Capital Market Details"

# Attributes to associate, based on the definition of the attribute set
att_values = [
 { 'field' : 'AssetClass', 'type' : 'TAXONOMY', 'values' : [ 'Equity', 'CommonStocks', 'Currencies', 'FXSpot', 'Crypto'] },
 { 'field' : 'EventType', 'type' : 'TAXONOMY', 'values' : [ ] },
 { 'field' : 'Exchange', 'type' : 'TAXONOMY', 'values' : [ ] },
 { 'field' : 'FinancialContentType', 'type' : 'TAXONOMY', 'values' : [ ] },
 { 'field' : 'RegionsAndCountries', 'type' : 'TAXONOMY', 'values' : [ ] }
]

# Permissions to grant the above group for the created dataset
basicPermissions = [
 "ViewDatasetDetails",
 "ReadDatasetData",
 "AddDatasetData",
 "CreateSnapshot",
 "EditDatasetMetadata",
 "ManageDatasetPermissions",
 "DeleteDataset"
]

request_dataset_permissions = [{"permission": permissionName} for permissionName in basicPermissions]

# All datasets have ownership
basicOwnerInfo = {
 "phoneNumber" : "12125551000",
 "email" : "jdoe@amazon.com",
 "name" : "Jane Doe"
}

# schema of the dataset
schema = {
 'primaryKeyColumns': [],
 'columns' : [
 {'dataType': 'STRING', 'name': 'ticker', 'description': 'The exchange symbol that this item is traded under'},
 {'dataType': 'STRING', 'name': 'name', 'description': 'The name of the asset. For stocks equities this will be the companies registered name. For crypto/fx this will be the name of the currency or coin pair'},
 {'dataType': 'STRING', 'name': 'market', 'description': 'The market type of the asset'},
 {'dataType': 'STRING', 'name': 'locale', 'description': 'The locale of the asset'},
 {'dataType': 'STRING', 'name': 'primary_exchange', 'description': 'The ISO code of the primary listing exchange for this asset'},
 {'dataType': 'STRING', 'name': 'type', 'description': 'The type of the asset'},
 {'dataType': 'BOOLEAN', 'name': 'active', 'description': 'Whether or not the asset is actively traded. False means the asset has been delisted'},
 {'dataType': 'STRING', 'name': 'currency_name', 'description': 'The name of the currency that this asset is traded with'},
 {'dataType': 'STRING', 'name': 'cik', 'description': 'The CIK number for this ticker'},
 {'dataType': 'STRING', 'name': 'composite_figi', 'description': 'The composite OpenFIGI number for this ticker'},
 {'dataType': 'STRING', 'name': 'share_class_figi', 'description': 'The share Class OpenFIGI number for this ticker'},
 {'dataType': 'STRING', 'name': 'currency_symbol', 'description': ''},
 {'dataType': 'STRING', 'name': 'base_currency_symbol', 'description': ''},
 {'dataType': 'STRING', 'name': 'base_currency_name', 'description': ''},
 {'dataType': 'DATETIME', 'name': 'last_updated_utc', 'description': 'The last time this asset record was updated'}
 ]
}


In [None]:
# call FinSpace to create the dataset if no ID was assigned
# if an ID was assigned, will not create a dataset but will simply add data to it
if dataset_id is None:
 # Create the dataset if it does not exist yet
 resp = finspace_manager.finspace_client.create_dataset(
 name = name, 
 description = description, 
 permissionGroupId = group_id,
 datasetPermissions = request_dataset_permissions,
 kind = "TABULAR",
 ownerInfo = basicOwnerInfo,
 schema = schema
 )

 dataset_id = resp["datasetId"]
 
 time.sleep(5)

print(f'Dataset ID: {dataset_id}')

In [None]:
resp = finspace_manager.finspace_client.instance.get_user_ingestion_info()

upload_location = resp['ingestionPath']
tickersDF.write.parquet(upload_location)

resp = finspace_manager.finspace_client.instance.create_changeset(datasetId=dataset_id, changeType='REPLACE', 
 sourceType='S3', sourceParams={'s3SourcePath': upload_location}, formatType='PARQUET', formatParams={})

changeset_id = resp['changeset']['id']

print(f"changeset_id = {changeset_id}")

In [None]:
def wait_for_ingestion(client, dataset_id: str, changeset_id: str, sleep_sec=10):
 """
 function that will continuously poll the changeset creation to ensure it completes or fails before returning
 :param dataset_id: GUID of the dataset
 :type: str
 :param changeset_id: GUID of the changeset
 :type: str
 :param sleep_sec: seconds to wait between checks
 :type: int
 """
 while True:
 resp1 = client.describe_changeset(datasetId=dataset_id, id=changeset_id)

 resp2 = resp1.get('changeset', '')
 status = resp2.get('status', '')

 if status == 'SUCCESS':
 print(f"Changeset complete")
 break
 elif status == 'PENDING' or status == 'RUNNING':
 print(f"Changeset status is still PENDING, waiting {sleep_sec} sec ...")
 time.sleep(sleep_sec)
 continue
 else:
 raise Exception(f"Bad changeset status: {resp1}{status}, failing now.")

In [None]:
wait_for_ingestion(finspace_manager.finspace_client, dataset_id=dataset_id, changeset_id=changeset_id)

# Create View of the Dataset

In [None]:
print( f"dataset_id: {dataset_id}") 

resp = finspace_manager.finspace_client.list_data_views(datasetIdEquals = dataset_id, maxResults=100)

existing_views = resp['dataViews']

autoupdate_view_id = None

for ss in existing_views:
 if ss['autoUpdate'] == True: 
 autoupdate_view_id = ss.get('dataViewId', None)
 
# create a an auto-update snapshot for this dataset if one does not already exist
if autoupdate_view_id is None:
 print("creating auto-update view")

 resp = finspace_manager.finspace_client.create_materialized_snapshot(
 destinationProperties={},
 autoUpdate=True,
 sortColumns=[],
 partitionColumns=[],
 destinationType = "GLUE_TABLE",
 datasetId=dataset_id)
 autoupdate_view_id = resp['id']
else:
 print(f"Exists: autoupdate_view_id = {autoupdate_view_id}")
 

## Associate Attribute Set

In [None]:
def list_attribute_sets(client):
 resp = client.list_dataset_types(sort='NAME')
 results = resp['datasetTypeSummaries']

 while "nextToken" in resp:
 resp = client.list_dataset_types(sort='NAME', nextToken=resp['nextToken'])
 results.extend(resp['datasetTypeSummaries'])

 return results

def attribute_set(client, name: str):
 """
 Exact name search for a dataset type of the given name
 :param name: name of the dataset type to find
 :param name: name of the dataset type to find
 :type: str
 :return
 """
 all_dataset_types = list_attribute_sets(client)
 existing_dataset_type = next((c for c in all_dataset_types if c['name'].lower() == name.lower()), None)

 if existing_dataset_type:
 return existing_dataset_type

def describe_attribute_set(client, attribute_set_id: str):
 """
 Calls describe dataset type API function and only returns the dataset type portion of the response
 :param attribute_set_id: the GUID of the dataset type to get description of
 :type: str
 """
 resp = None
 dataset_type_details_resp = client.describe_dataset_type(datasetTypeId=attribute_set_id)

 if 'datasetType' in dataset_type_details_resp:
 resp = dataset_type_details_resp['datasetType']

 return resp

def associate_attribute_set(client, att_name: str, att_values: list, dataset_id: str):
 # get the attribute set by name, will need its id
 att_set = attribute_set(client, att_name)

 # get the dataset's information, will need the arn
 dataset_details_resp = client.describe_dataset_details(datasetId=dataset_id)

 dataset = dataset_details_resp.get("dataset", None)

 if dataset is None:
 raise ValueError(f'No dataset found for id: {dataset_id}')

 # disassociate any existing relationship
 try:
 client.dissociate_dataset_from_attribute_set(datasetArn=dataset['arn'], attributeSetId=att_set['id'], datasetId=dataset_id)
 except:
 print("Nothing to disassociate")

 arn = dataset['arn']
 attribute_set_id = att_set['id']

 foo = client.associate_dataset_with_attribute_set(datasetArn=arn, attributeSetId=attribute_set_id, datasetId=dataset_id)

 resp = client.update_dataset_attribute_set_context(datasetArn=arn, datasetId=dataset_id, attributeSetId=attribute_set_id, values=att_values)

 if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
 return resp

 return

In [None]:
# Associate an attribute set and fill its values
print(f"Associating values to attribute set: {att_name}")

associate_attribute_set(finspace_manager.finspace_client, att_name=att_name, att_values=att_values, dataset_id=dataset_id)

In [None]:
print(f"""
ticker_dataset_id = '{dataset_id}'
ticker_view_id = '{autoupdate_view_id}'
""")

In [None]:
import datetime
print( f"Last Run: {datetime.datetime.now()}" )