# Importing S3 Data into FinSpace

This notebook will demonstrate use of FinSpace APIs to create a dataset and populate it with data from an external (to FinSpace) S3 source.

## Preparation
There exists an S3 bucket with the data you wish to import, and that bucket has given the your FinSpace's service account access to that bucket. 

## Deutsche Börse Public Dataset (DBG PDS)
Github: https://github.com/Deutsche-Boerse/dbg-pds 

Copy data into a bucket that has entitled the FinSpace service account to it. That bucket must grant 
s3:GetObject and s3:ListBucket actions to the service account ARN.

FinSpace Service Account ARN (replace with your environment's service account): 
 arn:aws:iam::**INFRASTRUCTURE_ACCOUNT_ID**:role/FinSpaceServiceRole

## Entitlement Example

- S3 bucket is externally accessible
- replace INFRASTRUCTURE_ACCOUNT_ID with your environment's service account
- replace S3_BUCKET with your s3 bucket

```
{
 "Version": "2012-10-17",
 "Id": "CrossAccountAccess",
 "Statement": [
 {
 "Effect": "Allow",
 "Principal": {
 "AWS": [
 "arn:aws:iam::INFRASTRUCTURE_ACCOUNT_ID:role/FinSpaceServiceRole"
 ]
 },
 "Action": "s3:GetObject",
 "Resource": "arn:aws:s3:::S3_BUCKET/*"
 },
 {
 "Effect": "Allow",
 "Principal": {
 "AWS": [
 "arn:aws:iam::INFRASTRUCTURE_ACCOUNT_ID:role/FinSpaceServiceRole"
 ]
 },
 "Action": "s3:ListBucket",
 "Resource": "arn:aws:s3:::S3_BUCKET"
 }
 ]
}
 ```

In [None]:
%local
from aws.finspace.cluster import FinSpaceClusterManager

# if this was already run, no need to run again
if 'finspace_clusters' not in globals():
 finspace_clusters = FinSpaceClusterManager()
 finspace_clusters.auto_connect()
else:
 print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')

# FinSpace Environment

Please provide values from your AWS account (S3 location) and your FinSpace environment. The group ID is from the user group you want to associate the dataset to, this example will grant all permissions to the group for this dataset it creates.

## Getting the Group ID

Navigate to the Analyst group (gear menu, users and groups, select group named Analyst). The URL is of this pattern: 
http://**ENVIRONMEN_ID**.**REGION**.amazonfinspace.com/userGroup/**GROUP_ID** 

Copy the string for GroupID into the **group_id** variable assignment below


In [None]:
import time
import pandas as pd
from aws.finspace.analytics import FinSpaceAnalyticsManager

# location of data copied from s3://deutsche-boerse-xetra-pds 
root_folder = 's3://'

# dataset_id, if None will create, if not None, this update will be an append usig bucket contents
dataset_id = None

# User Group to grant access to the dataset
group_id = ''

# date range to pull from S3 into FinSpace (directories of root_foles)
start_date = '2021-01-04' 
end_date = '2021-01-06'

finspace_manager = FinSpaceAnalyticsManager(spark)

# Dataset Definitions
Capture the dataset's name, description, schema, attribute set, attribute set values, permissions to assign to the permission group.

In [None]:
# Name for the dataset
name = "Deutsche Börse Public Dataset (Xetra)"

# description for the dataset
description = """The Deutsche Börse Public Dataset (PDS) project makes near-time data derived from Deutsche Börse's trading systems available to the public for free. This is the first time that such detailed financial market data has been shared freely and continually from the source provider.
"""

# this is the attribute set to use, will search for it in system
att_name = "Capital Market Details"

# Attributes to associate, based on the definition of the attribute set
att_values = [
 { 'field' : 'AssetClass', 'type' : 'TAXONOMY', 'values' : [ 'Equity', 'CommonStocks'] },
 { 'field' : 'EventType', 'type' : 'TAXONOMY', 'values' : [ 'OHLC' ] },
 { 'field' : 'Exchange', 'type' : 'TAXONOMY', 'values' : [ ] },
 { 'field' : 'FinancialContentType', 'type' : 'TAXONOMY', 'values' : [ ] },
 { 'field' : 'RegionsAndCountries', 'type' : 'TAXONOMY', 'values' : [ 'Germany' ] }
]

# Permissions to grant the above group for the created dataset
basicPermissions = [
 "ViewDatasetDetails",
 "ReadDatasetData",
 "AddDatasetData",
 "CreateSnapshot",
 "EditDatasetMetadata",
 "ManageDatasetPermissions",
 "DeleteDataset"
]

request_dataset_permissions = [{"permission": permissionName} for permissionName in basicPermissions]

# All datasets have ownership
basicOwnerInfo = {
 "phoneNumber" : "12125551000",
 "email" : "jdoe@amazon.com",
 "name" : "Jane Doe"
}

# schema of the dataset
schema = {
 'primaryKeyColumns': [],
 'columns' : [
 {'dataType': 'STRING', 'name': 'ISIN', 'description': 'ISIN of the security'},
 {'dataType': 'STRING', 'name': 'Mnemonic', 'description': 'The product market segment, following the convention on http://www.eurexchange.com'},
 {'dataType': 'STRING', 'name': 'SecurityDesc', 'description': 'Description of the security'},
 {'dataType': 'STRING', 'name': 'SecurityType', 'description': 'Type of security'},
 {'dataType': 'STRING', 'name': 'Currency', 'description': 'Currency in which the product is traded'},
 {'dataType': 'INTEGER', 'name': 'SecurityID', 'description': 'Unique identifier for each contract'},
 {'dataType': 'DATE', 'name': 'Date', 'description': 'Date of trading period'},
 {'dataType': 'STRING', 'name': 'Time', 'description': 'Minute of trading to which this entry relates'},
 {'dataType': 'DOUBLE', 'name': 'StartPrice', 'description': 'Trading price at the start of period'},
 {'dataType': 'DOUBLE', 'name': 'MaxPrice', 'description': 'Maximum price over the period'},
 {'dataType': 'DOUBLE', 'name': 'MinPrice', 'description': 'Minimum price over the period'},
 {'dataType': 'DOUBLE', 'name': 'EndPrice', 'description': 'Trading price at the end of the period'},
 {'dataType': 'DOUBLE', 'name': 'TradedVolume', 'description': 'Total value traded'},
 {'dataType': 'INTEGER', 'name': 'NumberOfTrades', 'description': 'Number of distinct trades during the period'}
 ]
}


In [None]:
# call FinSpace to create the dataset if no ID was assigned
# if an ID was assigned, will not create a dataset but will simply add data to it
if dataset_id is None:
 # Create the dataset if it does not exist yet
 resp = finspace_manager.finspace_client.create_dataset(
 name = name, 
 description = description, 
 permissionGroupId = group_id,
 datasetPermissions = request_dataset_permissions,
 kind = "TABULAR",
 ownerInfo = basicOwnerInfo,
 schema = schema
 )

 dataset_id = resp["datasetId"]
 
 time.sleep(5)

print(f'Dataset ID: {dataset_id}')

In [None]:
# use pandas to generate a range of dates between start and end
dates = pd.date_range(start=start_date, end=end_date)

for d in dates:
 s3_source = f'{root_folder}/{d.strftime("%Y-%m-%d")}'
 print(f'Ingesting from: {s3_source}')

 try:
 resp = finspace_manager.finspace_client.instance.create_changeset(datasetId=dataset_id, changeType='APPEND', 
 sourceType='S3', sourceParams={'s3SourcePath': s3_source}, 
 formatType='CSV', 
 formatParams={'separator': ',', 'withHeader': 'true'})

 changeset_id = resp['changeset']['id']
 
 except Exception as e:
 print(f'No Data, Weekend? {d} {e}')
 continue

 print(f'Changeset ID: {changeset_id}')


In [None]:
def wait_for_ingestion(client, dataset_id: str, changeset_id: str, sleep_sec=10):
 """
 function that will continuously poll the changeset creation to ensure it completes or fails before returning
 :param dataset_id: GUID of the dataset
 :type: str
 :param changeset_id: GUID of the changeset
 :type: str
 :param sleep_sec: seconds to wait between checks
 :type: int
 """
 while True:
 resp1 = client.describe_changeset(datasetId=dataset_id, id=changeset_id)

 resp2 = resp1.get('changeset', '')
 status = resp2.get('status', '')

 if status == 'SUCCESS':
 print(f"Changeset complete")
 break
 elif status == 'PENDING' or status == 'RUNNING':
 print(f"Changeset status is still PENDING, waiting {sleep_sec} sec ...")
 time.sleep(sleep_sec)
 continue
 else:
 raise Exception(f"Bad changeset status: {resp1}{status}, failing now.")

In [None]:
wait_for_ingestion(finspace_manager.finspace_client, dataset_id=dataset_id, changeset_id=changeset_id)

In [None]:
print( f"dataset_id: {dataset_id}") 

resp = finspace_manager.finspace_client.list_data_views(datasetIdEquals = dataset_id, maxResults=100)

existing_views = resp['dataViews']

autoupdate_view_id = None

for ss in existing_views:
 if ss['autoUpdate'] == True: 
 autoupdate_view_id = ss['dataViewId']
 
# create a an auto-update snapshot for this dataset if one does not already exist
if autoupdate_view_id is None:
 print("creating auto-update view")

 resp = finspace_manager.finspace_client.create_materialized_snapshot(
 destinationProperties={},
 autoUpdate=True,
 sortColumns=[],
 partitionColumns=[],
 destinationType = "GLUE_TABLE",
 datasetId=dataset_id)
 autoupdate_view_id = resp['id']
else:
 print(f"Exists: autoupdate_view_id = {autoupdate_view_id}")
 

## Associate Attribute Set

In [None]:
def list_attribute_sets(client):
 resp = client.list_dataset_types(sort='NAME')
 results = resp['datasetTypeSummaries']

 while "nextToken" in resp:
 resp = client.list_dataset_types(sort='NAME', nextToken=resp['nextToken'])
 results.extend(resp['datasetTypeSummaries'])

 return results

def attribute_set(client, name: str):
 """
 Exact name search for a dataset type of the given name
 :param name: name of the dataset type to find
 :param name: name of the dataset type to find
 :type: str
 :return
 """
 all_dataset_types = list_attribute_sets(client)
 existing_dataset_type = next((c for c in all_dataset_types if c['name'].lower() == name.lower()), None)

 if existing_dataset_type:
 return existing_dataset_type

def describe_attribute_set(client, attribute_set_id: str):
 """
 Calls describe dataset type API function and only returns the dataset type portion of the response
 :param attribute_set_id: the GUID of the dataset type to get description of
 :type: str
 """
 resp = None
 dataset_type_details_resp = client.describe_dataset_type(datasetTypeId=attribute_set_id)

 if 'datasetType' in dataset_type_details_resp:
 resp = dataset_type_details_resp['datasetType']

 return resp

def associate_attribute_set(client, att_name: str, att_values: list, dataset_id: str):
 # get the attribute set by name, will need its id
 att_set = attribute_set(client, att_name)

 # get the dataset's information, will need the arn
 dataset_details_resp = client.describe_dataset_details(datasetId=dataset_id)

 dataset = dataset_details_resp.get("dataset", None)

 if dataset is None:
 raise ValueError(f'No dataset found for id: {dataset_id}')

 # disassociate any existing relationship
 try:
 client.dissociate_dataset_from_attribute_set(datasetArn=dataset['arn'], attributeSetId=att_set['id'], datasetId=dataset_id)
 except:
 print("Nothing to disassociate")

 arn = dataset['arn']
 attribute_set_id = att_set['id']

 client.associate_dataset_with_attribute_set(datasetArn=arn, attributeSetId=attribute_set_id, datasetId=dataset_id)

 resp = client.update_dataset_attribute_set_context(datasetArn=arn, datasetId=dataset_id, attributeSetId=attribute_set_id, values=att_values)

 if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
 return resp

 return

In [None]:
# Associate an attribute set and fill its values
print(f"Associating values to attribute set: {att_name}")

associate_attribute_set(finspace_manager.finspace_client.instance, att_name=att_name, att_values=att_values, dataset_id=dataset_id)

In [None]:
import datetime
print( f"Last Run: {datetime.datetime.now()}" )