# Import Amazon Redshift tables as datasets in FinSpace
This notebook shows how to connect to Amazon Redshift from FinSpace. It also demonstrated how to create a FinSpace dataset for every table in Amazon Redshift and populate it with an associated attribute set with the values necessary to retrieve tables from Redshift directly (catalog, schema, and table names). The attribute set contains 'Source - Redshift' sub-category. The sub-category helps with data discovery - analysis can browse for data from a specific source in FinSpace by going to a category under 'Source' in the navigation menu.

## Prerequisites 
To follow the steps in this notebook, create a sub-category 'Redshift' under 'Source' category in FinSpace. After that, create an attribute set in FinSpace to hold metadata about the table's location in Redshift: catalog (database), schema, table name, and source. In this notebook, the attribute set's name is 'Redshift Table Attributes'.

## Outline
- Given a name of a database:
 - Get all tables in the database (exclude system tables)
 - For each table...
 - check if a table already exists in FinSpace
 - create a FinSpace dataset (tabular, give schema)
 - populate the Attribute Set (defined above) with the metadata about the table (catalog, schema, and table name)
 - associate the populated attribute set to the created dataset

## References
- [Spark SQL - JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)
- [Redshift JDBC](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html)


# 1. Configure Connections and Environment

## 1.1. Connect to the Spark cluster

In [None]:
%local
from aws.finspace.cluster import FinSpaceClusterManager

# if this was already run, no need to run again
if 'finspace_clusters' not in globals():
 finspace_clusters = FinSpaceClusterManager()
 finspace_clusters.auto_connect()
else:
 print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')

## 1.2. Configure JDBC driver for Amazon Redshift

Use [this page](https://docs.aws.amazon.com/redshift/latest/mgmt/jdbc20-download-driver.html) to get the latest JDBC driver for Redshift.


In [None]:
%%configure -f
{ "conf":{
 "spark.jars": "https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/2.0.0.7/redshift-jdbc42-2.0.0.7.jar"
 }
}

## 1.3. Import Python Helper Classes and Utility Functions

## 1.3.1. Python Helper
These are the FinSpace helper classes found in the FinSpace samples and examples github

In [2]:
# %load finspace.py
import datetime
import time
import boto3
import os
import pandas as pd
import urllib

from urllib.parse import urlparse
from botocore.config import Config
from boto3.session import Session


# Base FinSpace class
class FinSpace:

 def __init__(
 self,
 config=Config(retries={'max_attempts': 3, 'mode': 'standard'}),
 boto_session: Session = None,
 dev_overrides: dict = None,
 service_name = 'finspace-data'):
 """
 To configure this class object, simply instantiate with no-arg if hitting prod endpoint, or else override it:
 e.g.
 `hab = FinSpaceAnalyticsManager(region_name = 'us-east-1',
 dev_overrides = {'hfs_endpoint': 'https://39g32x40jk.execute-api.us-east-1.amazonaws.com/alpha'})`
 """
 self.hfs_endpoint = None
 self.region_name = None

 if dev_overrides is not None:
 if 'hfs_endpoint' in dev_overrides:
 self.hfs_endpoint = dev_overrides['hfs_endpoint']

 if 'region_name' in dev_overrides:
 self.region_name = dev_overrides['region_name']
 else:
 if boto_session is not None:
 self.region_name = boto_session.region_name
 else:
 self.region_name = self.get_region_name()

 self.config = config

 self._boto3_session = boto3.session.Session(region_name=self.region_name) if boto_session is None else boto_session

 print(f"service_name: {service_name}")
 print(f"endpoint: {self.hfs_endpoint}")
 print(f"region_name: {self.region_name}")

 self.client = self._boto3_session.client(service_name, endpoint_url=self.hfs_endpoint, config=self.config)

 @staticmethod
 def get_region_name():
 req = urllib.request.Request("http://169.254.169.254/latest/meta-data/placement/region")
 with urllib.request.urlopen(req) as response:
 return response.read().decode("utf-8")

 # --------------------------------------
 # Utility Functions
 # --------------------------------------
 @staticmethod
 def get_list(all_list: dir, name: str):
 """
 Search for name found in the all_list dir and return that list of things.
 Removes repetitive code found in functions that call boto apis then search for the expected returned items

 :param all_list: list of things to search
 :type: dir:

 :param name: name to search for in all_lists
 :type: str

 :return: list of items found in name
 """
 r = []

 # is the given name found, is found, add to list
 if name in all_list:
 for s in all_list[name]:
 r.append(s)

 # return the list
 return r

 # --------------------------------------
 # Classification Functions
 # --------------------------------------

 def list_classifications(self):
 """
 Return list of all classifications

 :return: all classifications
 """
 all_list = self.client.list_classifications(sort='NAME')

 return self.get_list(all_list, 'classifications')

 def classification_names(self):
 """
 Get the classifications names

 :return list of classifications names only
 """
 classification_names = []
 all_classifications = self.list_classifications()
 for c in all_classifications:
 classification_names.append(c['name'])
 return classification_names

 def classification(self, name: str):
 """
 Exact name search for a classification of the given name

 :param name: name of the classification to find
 :type: str

 :return
 """

 all_classifications = self.list_classifications()
 existing_classification = next((c for c in all_classifications if c['name'].lower() == name.lower()), None)
 if existing_classification:
 return existing_classification

 def describe_classification(self, classification_id: str):
 """
 Calls the describe classification API function and only returns the taxonomy portion of the response.

 :param classification_id: the GUID of the classification to get description of
 :type: str
 """
 resp = None
 taxonomy_details_resp = self.client.describe_taxonomy(taxonomyId=classification_id)

 if 'taxonomy' in taxonomy_details_resp:
 resp = taxonomy_details_resp['taxonomy']

 return (resp)

 def create_classification(self, classification_definition):
 resp = self.client.create_taxonomy(taxonomyDefinition=classification_definition)

 taxonomy_id = resp["taxonomyId"]

 return (taxonomy_id)

 def delete_classification(self, classification_id):
 resp = self.client.delete_taxonomy(taxonomyId=classification_id)

 if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
 return resp

 return True

 # --------------------------------------
 # Attribute Set Functions
 # --------------------------------------

 def list_attribute_sets(self):
 """
 Get list of all dataset_types in the system

 :return: list of dataset types
 """
 resp = self.client.list_dataset_types()
 results = resp['datasetTypeSummaries']

 while "nextToken" in resp:
 resp = self.client.list_dataset_types(nextToken=resp['nextToken'])
 results.extend(resp['datasetTypeSummaries'])

 return (results)

 def attribute_set_names(self):
 """
 Get the list of all dataset type names

 :return list of all dataset type names
 """

 dataset_type_names = []
 all_dataset_types = self.list_dataset_types()
 for c in all_dataset_types:
 dataset_type_names.append(c['name'])
 return dataset_type_names

 def attribute_set(self, name: str):
 """
 Exact name search for a dataset type of the given name

 :param name: name of the dataset type to find
 :type: str

 :return
 """

 all_dataset_types = self.list_dataset_types()
 existing_dataset_type = next((c for c in all_dataset_types if c['name'].lower() == name.lower()), None)
 if existing_dataset_type:
 return existing_dataset_type

 def describe_attribute_set(self, attribute_set_id: str):
 """
 Calls the describe dataset type API function and only returns the dataset type portion of the response.

 :param attribute_set_id: the GUID of the dataset type to get description of
 :type: str
 """
 resp = None
 dataset_type_details_resp = self.client.describe_dataset_type(datasetTypeId=attribute_set_id)

 if 'datasetType' in dataset_type_details_resp:
 resp = dataset_type_details_resp['datasetType']

 return (resp)

 def create_attribute_set(self, attribute_set_def):
 resp = self.client.create_dataset_type(datasetTypeDefinition=attribute_set_def)

 att_id = resp["datasetTypeId"]

 return (att_id)

 def delete_attribute_set(self, attribute_set_id: str):
 resp = self.client.delete_attribute_set(attributeSetId=attribute_set_id)

 if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
 return resp

 return True

 def associate_attribute_set(self, att_name: str, att_values: list, dataset_id: str):
 fncs = ['dissociate_dataset_from_attribute_set', 'associate_dataset_with_attribute_set', 'update_dataset_attribute_set_context']
 if self.check_functions(fncs) is False:
 raise Exception(f"not all functions found in client {fncs}")

 # get the attribute set by name, will need its id
 att_set = self.attribute_set(att_name)

 # get the dataset's information, will need the arn
 dataset = self.describe_dataset_details(dataset_id=dataset_id)

 # disassociate any existing relationship
 try:
 self.client.dissociate_dataset_from_attribute_set(datasetArn=dataset['arn'],
 datasetTypeId=att_set['id'])
 except:
 print("Nothing to disassociate")

 arn = dataset['arn']
 dataset_type_id = att_set['id']

 self.client.associate_dataset_with_attribute_set(datasetId=dataset_id, datasetArn=arn, attributeSetId=dataset_type_id)

 resp = self.client.update_dataset_attribute_set_context(datasetId=dataset_id, datasetArn=arn, attributeSetId=dataset_type_id, values=att_values)

 if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
 return resp

 return True

 # --------------------------------------
 # Permission Group Functions
 # --------------------------------------

 def list_permission_groups(self, max_results: int):
 all_perms = self.client.list_permission_groups(MaxResults=max_results)
 return (self.get_list(all_perms, 'permissionGroups'))

 def permission_group(self, name):
 all_groups = self.list_permission_groups(max_results = 100)

 existing_group = next((c for c in all_groups if c['name'].lower() == name.lower()), None)

 if existing_group:
 return existing_group

 def describe_permission_group(self, permission_group_id: str):
 resp = None

 perm_resp = self.client.describe_permission_group(permissionGroupId=permission_group_id)

 if 'permissionGroup' in perm_resp:
 resp = perm_resp['permissionGroup']

 return (resp)

 # --------------------------------------
 # Dataset Functions
 # --------------------------------------

 def describe_dataset_details(self, dataset_id: str):
 """
 Calls the describe dataset details API function and only returns the dataset details portion of the response.

 :param dataset_id: the GUID of the dataset to get description of
 :type: str
 """
 resp = None
 dataset_details_resp = self.client.describe_dataset_details(datasetId=dataset_id)

 if 'dataset' in dataset_details_resp:
 resp = dataset_details_resp["dataset"]

 return (resp)

 def create_dataset(self, name: str, description: str, permission_group_id: str, dataset_permissions: [], kind: str,
 owner_info, schema):
 """
 Create a dataset

 Warning, dataset names are not unique, be sure to check for the same name dataset before creating a new one

 :param name: Name of the dataset
 :type: str

 :param description: Description of the dataset
 :type: str

 :param permission_group_id: permission group for the dataset
 :type: str

 :param dataset_permissions: permissions for the group on the dataset

 :param kind: Kind of dataset, choices: TABULAR
 :type: str

 :param owner_info: owner information for the dataset

 :param schema: Schema of the dataset

 :return: the dataset_id of the created dataset
 """

 if dataset_permissions:
 request_dataset_permissions = [{"permission": permissionName} for permissionName in dataset_permissions]
 else:
 request_dataset_permissions = []

 response = self.client.create_dataset(name=name,
 permissionGroupId = permission_group_id,
 datasetPermissions = request_dataset_permissions,
 kind=kind,
 description = description.replace('\n', ' '),
 ownerInfo = owner_info,
 schema = schema)

 return response["datasetId"]

 def ingest_from_s3(self,
 s3_location: str,
 dataset_id: str,
 change_type: str,
 wait_for_completion: bool = True,
 format_type: str = "CSV",
 format_params: dict = {'separator': ',', 'withHeader': 'true'}):
 """
 Creates a changeset and ingests the data given in the S3 location into the changeset

 :param s3_location: the source location of the data for the changeset, will be copied into the changeset
 :stype: str

 :param dataset_id: the identifier of the containing dataset for the changeset to be created for this data
 :type: str

 :param change_type: What is the kind of changetype? "APPEND", "REPLACE" are the choices
 :type: str

 :param wait_for_completion: Boolean, should the function wait for the operation to complete?
 :type: str

 :param format_type: format type, CSV, PARQUET, XML, JSON
 :type: str

 :param format_params: dictionary of format parameters
 :type: dict

 :return: the id of the changeset created
 """
 create_changeset_response = self.client.create_changeset(
 datasetId=dataset_id,
 changeType=change_type,
 sourceType='S3',
 sourceParams={'s3SourcePath': s3_location},
 formatType=format_type.upper(),
 formatParams=format_params
 )

 changeset_id = create_changeset_response['changeset']['id']

 if wait_for_completion:
 self.wait_for_ingestion(dataset_id, changeset_id)
 return changeset_id

 def describe_changeset(self, dataset_id: str, changeset_id: str):
 """
 Function to get a description of the the givn changeset for the given dataset

 :param dataset_id: identifier of the dataset
 :type: str

 :param changeset_id: the idenfitier of the changeset
 :type: str

 :return: all information about the changeset, if found
 """
 describe_changeset_resp = self.client.describe_changeset(datasetId=dataset_id, id=changeset_id)

 return describe_changeset_resp['changeset']

 def create_as_of_view(self, dataset_id: str, as_of_date: datetime, destination_type: str,
 partition_columns: list = [], sort_columns: list = [], destination_properties: dict = {},
 wait_for_completion: bool = True):
 """
 Creates an 'as of' static view up to and including the requested 'as of' date provided.

 :param dataset_id: identifier of the dataset
 :type: str

 :param as_of_date: as of date, will include changesets up to this date/time in the view
 :type: datetime

 :param destination_type: destination type
 :type: str

 :param partition_columns: columns to partition the data by for the created view
 :type: list

 :param sort_columns: column to sort the view by
 :type: list

 :param destination_properties: destination properties
 :type: dict

 :param wait_for_completion: should the function wait for the system to create the view?
 :type: bool

 :return str: GUID of the created view if successful

 """
 create_materialized_view_resp = self.client.create_materialized_snapshot(
 datasetId=dataset_id,
 asOfTimestamp=as_of_date,
 destinationType=destination_type,
 partitionColumns=partition_columns,
 sortColumns=sort_columns,
 autoUpdate=False,
 destinationProperties=destination_properties
 )
 view_id = create_materialized_view_resp['id']
 if wait_for_completion:
 self.wait_for_view(dataset_id=dataset_id, view_id=view_id)
 return view_id

 def create_auto_update_view(self, dataset_id: str, destination_type: str,
 partition_columns=[], sort_columns=[], destination_properties={},
 wait_for_completion=True):
 """
 Creates an auto-updating view of the given dataset

 :param dataset_id: identifier of the dataset
 :type: str

 :param destination_type: destination type
 :type: str

 :param partition_columns: columns to partition the data by for the created view
 :type: list

 :param sort_columns: column to sort the view by
 :type: list

 :param destination_properties: destination properties
 :type: str

 :param wait_for_completion: should the function wait for the system to create the view?
 :type: bool

 :return str: GUID of the created view if successful

 """
 create_materialized_view_resp = self.client.create_materialized_snapshot(
 datasetId=dataset_id,
 destinationType=destination_type,
 partitionColumns=partition_columns,
 sortColumns=sort_columns,
 autoUpdate=True,
 destinationProperties=destination_properties
 )
 view_id = create_materialized_view_resp['id']
 if wait_for_completion:
 self.wait_for_view(dataset_id=dataset_id, view_id=view_id)
 return view_id

 def wait_for_ingestion(self, dataset_id: str, changeset_id: str, sleep_sec=10):
 """
 function that will continuously poll the changeset creation to ensure it completes or fails before returning.

 :param dataset_id: GUID of the dataset
 :type: str

 :param changeset_id: GUID of the changeset
 :type: str

 :param sleep_sec: seconds to wait between checks
 :type: int

 """
 while True:
 status = self.describe_changeset(dataset_id=dataset_id, changeset_id=changeset_id)['status']
 if status == 'SUCCESS':
 print(f"Changeset complete")
 break
 elif status == 'PENDING' or status == 'RUNNING':
 print(f"Changeset status is still PENDING, waiting {sleep_sec} sec ...")
 time.sleep(sleep_sec)
 continue
 else:
 raise Exception(f"Bad changeset status: {status}, failing now.")

 def wait_for_view(self, dataset_id: str, view_id: str, sleep_sec=10):
 """
 function that will continuously poll the view creation to ensure it completes or fails before returning.

 :param dataset_id: GUID of the dataset
 :type: str

 :param view_id: GUID of the view
 :type: str

 :param sleep_sec: seconds to wait between checks
 :type: int

 """
 while True:
 list_views_resp = self.client.list_materialization_snapshots(datasetId=dataset_id, maxResults=100)
 matched_views = list(filter(lambda d: d['id'] == view_id, list_views_resp['materializationSnapshots']))

 if len(matched_views) != 1:
 size = len(matched_views)
 raise Exception(f"Unexpected error: found {size} views that match the view Id: {view_id}")

 status = matched_views[0]['status']
 if status == 'SUCCESS':
 print(f"View complete")
 break
 elif status == 'PENDING' or status == 'RUNNING':
 print(f"View status is still PENDING, continue to wait till finish...")
 time.sleep(sleep_sec)
 continue
 else:
 raise Exception(f"Bad view status: {status}, failing now.")

 def list_changesets(self, dataset_id: str):
 resp = self.client.list_changesets(datasetId=dataset_id, sortKey='CREATE_TIMESTAMP')
 results = resp['changesets']

 while "nextToken" in resp:
 resp = self.client.list_changesets(datasetId=dataset_id, sortKey='CREATE_TIMESTAMP',
 nextToken=resp['nextToken'])
 results.extend(resp['changesets'])

 return (results)

 def list_views(self, dataset_id: str, max_results=50):
 resp = self.client.list_materialization_snapshots(datasetId=dataset_id, maxResults=max_results)
 results = resp['materializationSnapshots']

 while "nextToken" in resp:
 resp = self.client.list_materialization_snapshots(datasetId=dataset_id, maxResults=max_results,
 nextToken=resp['nextToken'])
 results.extend(resp['materializationSnapshots'])

 return (results)

 def list_datasets(self, max_results: int):
 all_datasets = self.client.list_datasets(maxResults=max_results)
 return (self.get_list(all_datasets, 'datasets'))

 def list_dataset_types(self):
 resp = self.client.list_dataset_types(sort='NAME')
 results = resp['datasetTypeSummaries']

 while "nextToken" in resp:
 resp = self.client.list_dataset_types(sort='NAME', nextToken=resp['nextToken'])
 results.extend(resp['datasetTypeSummaries'])

 return (results)

 @staticmethod
 def get_execution_role():
 """
 Convenience function from SageMaker to get the execution role of the user of the sagemaker studio notebook

 :return: the ARN of the execution role in the sagemaker studio notebook
 """
 import sagemaker as sm

 e_role = sm.get_execution_role()
 return (f"{e_role}")

 def get_user_ingestion_info(self):
 return (self.client.get_user_ingestion_info())

 def upload_pandas(self, data_frame: pd.DataFrame):
 import awswrangler as wr
 resp = self.client.get_working_location(locationType='INGESTION')
 upload_location = resp['s3Uri']
 wr.s3.to_parquet(data_frame, f"{upload_location}data.parquet", index=False, boto3_session=self._boto3_session)
 return upload_location

 def ingest_pandas(self, data_frame: pd.DataFrame, dataset_id: str, change_type: str, wait_for_completion=True):
 print("Uploading the pandas dataframe ...")
 upload_location = self.upload_pandas(data_frame)

 print("Data upload finished. Ingesting data ...")
 return self.ingest_from_s3(upload_location, dataset_id, change_type, wait_for_completion, format_type='PARQUET')

 def read_view_as_pandas(self, dataset_id: str, view_id: str):
 """
 Returns a pandas dataframe the view of the given dataset. Views in FinSpace can be quite large, be careful!

 :param dataset_id:
 :param view_id:

 :return: Pandas dataframe with all data of the view
 """
 import awswrangler as wr # use awswrangler to read the table

 # @todo: switch to DescribeMateriliazation when available in HFS
 views = self.list_views(dataset_id=dataset_id, max_results=50)
 filtered = [v for v in views if v['id'] == view_id]

 if len(filtered) == 0:
 raise Exception('No such view found')
 if len(filtered) > 1:
 raise Exception('Internal Server error')
 view = filtered[0]

 # 0. Ensure view is ready to be read
 if (view['status'] != 'SUCCESS'):
 status = view['status']
 print(f'view run status is not ready: {status}. Returning empty.')
 return

 glue_db_name = view['destinationTypeProperties']['databaseName']
 glue_table_name = view['destinationTypeProperties']['tableName']

 # determine if the table has partitions first, different way to read is there are partitions
 p = wr.catalog.get_partitions(table=glue_table_name, database=glue_db_name, boto3_session=self._boto3_session)

 def no_filter(partitions):
 if len(partitions.keys()) > 0:
 return True

 return False

 df = None

 if len(p) == 0:
 df = wr.s3.read_parquet_table(table=glue_table_name, database=glue_db_name,
 boto3_session=self._boto3_session)
 else:
 spath = wr.catalog.get_table_location(table=glue_table_name, database=glue_db_name,
 boto3_session=self._boto3_session)
 cpath = wr.s3.list_directories(f"{spath}/*", boto3_session=self._boto3_session)

 read_path = f"{spath}/"

 # just one? Read it
 if len(cpath) == 1:
 read_path = cpath[0]

 df = wr.s3.read_parquet(read_path, dataset=True, partition_filter=no_filter,
 boto3_session=self._boto3_session)

 # Query Glue table directly with wrangler
 return df

 @staticmethod
 def get_schema_from_pandas(df: pd.DataFrame):
 """
 Returns the FinSpace schema columns from the given pandas dataframe.

 :param df: pandas dataframe to interrogate for the schema

 :return: FinSpace column schema list
 """

 # for translation to FinSpace's schema
 # 'STRING'|'CHAR'|'INTEGER'|'TINYINT'|'SMALLINT'|'BIGINT'|'FLOAT'|'DOUBLE'|'DATE'|'DATETIME'|'BOOLEAN'|'BINARY'
 DoubleType = "DOUBLE"
 FloatType = "FLOAT"
 DateType = "DATE"
 StringType = "STRING"
 IntegerType = "INTEGER"
 LongType = "BIGINT"
 BooleanType = "BOOLEAN"
 TimestampType = "DATETIME"

 hab_columns = []

 for name in dict(df.dtypes):
 p_type = df.dtypes[name]

 switcher = {
 "float64": DoubleType,
 "int64": IntegerType,
 "datetime64[ns, UTC]": TimestampType,
 "datetime64[ns]": DateType
 }

 habType = switcher.get(str(p_type), StringType)

 hab_columns.append({
 "dataType": habType,
 "name": name,
 "description": ""
 })

 return (hab_columns)

 @staticmethod
 def get_date_cols(df: pd.DataFrame):
 """
 Returns which are the data columns found in the pandas dataframe.
 Pandas does the hard work to figure out which of the columns can be considered to be date columns.

 :param df: pandas dataframe to interrogate for the schema

 :return: list of column names that can be parsed as dates by pandas

 """
 date_cols = []

 for name in dict(df.dtypes):

 p_type = df.dtypes[name]

 if str(p_type).startswith("date"):
 date_cols.append(name)

 return (date_cols)

 def get_best_schema_from_csv(self, path, is_s3=True, read_rows=500, sep=','):
 """
 Uses multiple reads of the file with pandas to determine schema of the referenced files.
 Files are expected to be csv.

 :param path: path to the files to read
 :type: str

 :param is_s3: True if the path is s3; False if filesystem
 :type: bool

 :param read_rows: number of rows to sample for determining schema

 :param sep:

 :return dict: schema for FinSpace
 """
 #
 # best efforts to determine the schema, sight unseen
 import awswrangler as wr

 # 1: get the base schema
 df1 = None

 if is_s3:
 df1 = wr.s3.read_csv(path, nrows=read_rows, sep=sep)
 else:
 df1 = pd.read_csv(path, nrows=read_rows, sep=sep)

 num_cols = len(df1.columns)

 # with number of columns, try to infer dates
 df2 = None

 if is_s3:
 df2 = wr.s3.read_csv(path, parse_dates=list(range(0, num_cols)), infer_datetime_format=True,
 nrows=read_rows, sep=sep)
 else:
 df2 = pd.read_csv(path, parse_dates=list(range(0, num_cols)), infer_datetime_format=True, nrows=read_rows,
 sep=sep)

 date_cols = self.get_date_cols(df2)

 # with dates known, parse the file fully
 df = None

 if is_s3:
 df = wr.s3.read_csv(path, parse_dates=date_cols, infer_datetime_format=True, nrows=read_rows, sep=sep)
 else:
 df = pd.read_csv(path, parse_dates=date_cols, infer_datetime_format=True, nrows=read_rows, sep=sep)

 schema_cols = self.get_schema_from_pandas(df)

 return (schema_cols)

 def s3_upload_file(self, source_file: str, s3_destination: str):
 """
 Uploads a local file (full path) to the s3 destination given (expected form: s3:////).
 The filename will have spaces replaced with _.

 :param source_file: path of file to upload
 :param s3_destination: full path to where to save the file
 :type: str

 """
 hab_s3_client = self._boto3_session.client(service_name='s3')

 o = urlparse(s3_destination)
 bucket = o.netloc
 prefix = o.path.lstrip('/')

 fname = os.path.basename(source_file)

 hab_s3_client.upload_file(source_file, bucket, f"{prefix}{fname.replace(' ', '_')}")

 def list_objects(self, s3_location: str):
 """
 lists the objects found at the s3_location. Strips out the boto API response header,
 just returns the contents of the location. Internally uses the list_objects_v2.

 :param s3_location: path, starting with s3:// to get the list of objects from
 :type: str

 """
 o = urlparse(s3_location)
 bucket = o.netloc
 prefix = o.path.lstrip('/')

 results = []

 hab_s3_client = self._boto3_session.client(service_name='s3')

 paginator = hab_s3_client.get_paginator('list_objects_v2')
 pages = paginator.paginate(Bucket=bucket, Prefix=prefix)

 for page in pages:
 if 'Contents' in page:
 results.extend(page['Contents'])

 return (results)

 def list_clusters(self, status: str = None):
 """
 Lists current clusters and their statuses

 :param status: status to filter for

 :return dict: list of clusters
 """

 resp = self.client.list_clusters()

 clusters = []

 if 'clusters' not in resp:
 return (clusters)

 for c in resp['clusters']:
 if status is None:
 clusters.append(c)
 else:
 if c['clusterStatus']['state'] in status:
 clusters.append(c)

 return (clusters)

 def get_cluster(self, cluster_id):
 """
 Resize the given cluster to desired template

 :param cluster_id: cluster id
 """

 clusters = self.list_clusters()

 for c in clusters:
 if c['clusterId'] == cluster_id:
 return (c)

 return (None)

 def update_cluster(self, cluster_id: str, template: str):
 """
 Resize the given cluster to desired template

 :param cluster_id: cluster id
 :param template: target template to resize to
 """

 cluster = self.get_cluster(cluster_id=cluster_id)

 if cluster['currentTemplate'] == template:
 print(f"Already using template: {template}")
 return (cluster)

 self.client.update_cluster(clusterId=cluster_id, template=template)

 return (self.get_cluster(cluster_id=cluster_id))

 def wait_for_status(self, clusterId: str, status: str, sleep_sec=10, max_wait_sec=900):
 """
 Function polls service until cluster is in desired status.

 :param clusterId: the cluster's ID
 :param status: desired status for clsuter to reach
 :
 """
 total_wait = 0

 while True and total_wait < max_wait_sec:
 resp = self.client.list_clusters()

 this_cluster = None

 # is this the cluster?
 for c in resp['clusters']:
 if clusterId == c['clusterId']:
 this_cluster = c

 if this_cluster is None:
 print(f"clusterId:{clusterId} not found")
 return (None)

 this_status = this_cluster['clusterStatus']['state']

 if this_status.upper() != status.upper():
 print(f"Cluster status is {this_status}, waiting {sleep_sec} sec ...")
 time.sleep(sleep_sec)
 total_wait = total_wait + sleep_sec
 continue
 else:
 return (this_cluster)

 def get_working_location(self, locationType='SAGEMAKER'):
 resp = None
 location = self.client.get_working_location(locationType=locationType)

 if 's3Uri' in location:
 resp = location['s3Uri']

 return (resp)


In [4]:
# %load finspace_spark.py
import datetime
import time
import boto3
from botocore.config import Config

# FinSpace class with Spark bindings

class SparkFinSpace(FinSpace):
 import pyspark
 def __init__(
 self, 
 spark: pyspark.sql.session.SparkSession = None,
 config = Config(retries = {'max_attempts': 0, 'mode': 'standard'}),
 dev_overrides: dict = None
 ):
 FinSpace.__init__(self, config=config, dev_overrides=dev_overrides)
 self.spark = spark # used on Spark cluster for reading views, creating changesets from DataFrames
 
 def upload_dataframe(self, data_frame: pyspark.sql.dataframe.DataFrame):
 resp = self.client.get_user_ingestion_info()
 upload_location = resp['ingestionPath']
# data_frame.write.option('header', 'true').csv(upload_location)
 data_frame.write.parquet(upload_location)
 return upload_location
 
 def ingest_dataframe(self, data_frame: pyspark.sql.dataframe.DataFrame, dataset_id: str, change_type: str, wait_for_completion=True):
 print("Uploading data...")
 upload_location = self.upload_dataframe(data_frame)
 
 print("Data upload finished. Ingesting data...")
 
 return self.ingest_from_s3(upload_location, dataset_id, change_type, wait_for_completion, format_type='parquet', format_params={})
 
 def read_view_as_spark(
 self,
 dataset_id: str,
 view_id: str
 ):
 # TODO: switch to DescribeMatz when available in HFS
 views = self.list_views(dataset_id=dataset_id, max_results=50)
 filtered = [v for v in views if v['id'] == view_id]

 if len(filtered) == 0:
 raise Exception('No such view found')
 if len(filtered) > 1:
 raise Exception('Internal Server error')
 view = filtered[0]
 
 # 0. Ensure view is ready to be read
 if (view['status'] != 'SUCCESS'): 
 status = view['status'] 
 print(f'view run status is not ready: {status}. Returning empty.')
 return

 glue_db_name = view['destinationTypeProperties']['databaseName']
 glue_table_name = view['destinationTypeProperties']['tableName']
 
 # Query Glue table directly with catalog function of spark
 return self.spark.table(f"`{glue_db_name}`.`{glue_table_name}`")
 
 def get_schema_from_spark(self, data_frame: pyspark.sql.dataframe.DataFrame):
 from pyspark.sql.types import StructType

 # for translation to FinSpace's schema
 # 'STRING'|'CHAR'|'INTEGER'|'TINYINT'|'SMALLINT'|'BIGINT'|'FLOAT'|'DOUBLE'|'DATE'|'DATETIME'|'BOOLEAN'|'BINARY'
 DoubleType = "DOUBLE"
 FloatType = "FLOAT"
 DateType = "DATE"
 StringType = "STRING"
 IntegerType = "INTEGER"
 LongType = "BIGINT"
 BooleanType = "BOOLEAN"
 TimestampType = "DATETIME"
 
 hab_columns = []

 items = [i for i in data_frame.schema] 

 switcher = {
 "BinaryType" : StringType,
 "BooleanType" : BooleanType,
 "ByteType" : IntegerType,
 "DateType" : DateType,
 "DoubleType" : FloatType,
 "IntegerType" : IntegerType,
 "LongType" : IntegerType,
 "NullType" : StringType,
 "ShortType" : IntegerType,
 "StringType" : StringType,
 "TimestampType" : TimestampType,
 }

 
 for i in items:
# print( f"name: {i.name} type: {i.dataType}" )

 habType = switcher.get( str(i.dataType), StringType)

 hab_columns.append({
 "dataType" : habType, 
 "name" : i.name,
 "description" : ""
 })

 return( hab_columns )


In [None]:
# initialize the FinSpace helper
finspace = SparkFinSpace(spark=spark)

## 1.3.2. Add Utility Functions for dataset import
These functions help translate Redshift data to FinSpace schema.

In [None]:
#
# Utility functions
#------------------------------------------------------------

def get_field_by_name(f_list, title, name = 'name'):
 for f in f_list:
 if f['title'] == title:
 return f[name]
 return None

def get_finspace_schema(table_schema_pdf): 

 DoubleType = "DOUBLE"
 FloatType = "FLOAT"
 DateType = "DATE"
 StringType = "STRING"
 IntegerType = "INTEGER"
 LongType = "BIGINT"
 BooleanType = "BOOLEAN"
 TimestampType = "DATETIME"

 switcher = {
 "DATE" : DateType,
 "FLOAT": FloatType,
 "NUMBER": DoubleType,
 "TEXT": StringType
 }

 columns = []

 for index, row in table_schema_pdf.iterrows():
 name = row['COLUMN_NAME']
 description = row['COMMENT']
 data_type = row['DATA_TYPE']

 if description is None: description = ''
 
 habType = switcher.get(str(data_type), StringType)

 columns.append( {'dataType': habType, 'name': name, 'description': description} )


 schema = {
 'primaryKeyColumns': [],
 'columns' : columns
 }

 return schema


# 2. Import Tables from Amazon Redshift database as FinSpace datasets
Get a list of tables, columns, and associated comments from Amazon Redshift. Add metadata for each table into a FinSpace attribute set. Ths attribute set is then associated with the FinSpace dataset. Each FinSpace dataset represents a table in the given Redshift database (with the exception of information_schema and system tables). 

## 2.1. Setup parameters
Replace parameters below with connection information to Amaozn Reshift (cluster, port, user, password). Also specify FinSpace Group ID who should get access to datasets.

In [None]:
# Amazon Redshift database name
dbName = '' ### <------ REPLACE WITH YOURS ###

# Attribute Set name in FinSpace 
att_name = 'Redshift Table Attributes'

# User Group to grant access to the dataset
group_id = ''### <------ REPLACE WITH YOURS ###

# Source name
source_name = 'Redshift'

# Redshift connection details
dbCluster = '' ### <------ REPLACE WITH YOURS ###
dbPort = '' ### <------ REPLACE WITH YOURS ###
dbUser = ''### <------ REPLACE WITH YOURS ###
dbPassword = ''### <------ REPLACE WITH YOURS ###

## 2.2. Get tables and associated metadata From Amazon Redshift

In [None]:
# SQL query to get a list of tables and columns with associate comments
tablesQuery = f"select tb.table_catalog, tb.table_schema, tb.table_name, ad.description as comment from {dbName}.information_schema.tables tb left join (select * from pg_catalog.pg_description mt left join (select cl.oid as table_id, cl.relname as table_name, cl.relnamespace as schema_id, np.nspname as schema_name from pg_class as cl left join pg_catalog.pg_namespace as np on np.oid = cl.relnamespace) et on mt.objoid = et.table_id where mt.objsubid = 0) ad on tb.table_name = ad.table_name and tb.table_schema = ad.schema_name where tb.table_schema <> 'information_schema' and tb.table_schema not like 'pg_%'"

schemaQuery = f"select tb.*, ad.description as comment from {dbName}.information_schema.columns tb left join (select * from pg_catalog.pg_description mt left join (select cl.oid as table_id, cl.relname as table_name, cl.relnamespace as schema_id, np.nspname as schema_name from pg_class as cl left join pg_catalog.pg_namespace as np on np.oid = cl.relnamespace) et on mt.objoid = et.table_id) ad on tb.table_name = ad.table_name and tb.table_schema = ad.schema_name and tb.ordinal_position = ad.objsubid where tb.table_schema <> 'information_schema' and tb.table_schema not like 'pg_%'"

# Redshift connection URL
urlStr = f"jdbc:redshift://{dbCluster}:{dbPort}/{dbName}?user={dbUser}&password={dbPassword}"

 # Get all the tables in a database (exclude information and system tables)
tablesDF = spark.read \
 .format("jdbc") \
 .option("driver", "com.amazon.redshift.jdbc42.Driver") \
 .option("url", urlStr) \
 .option("query", tablesQuery) \
 .load()

tablesDF.show(5)


# Get all columns in the database
schemaDF = spark.read \
 .format("jdbc") \
 .option("driver", "com.amazon.redshift.jdbc42.Driver") \
 .option("url", urlStr) \
 .option("query", schemaQuery) \
 .load() 

## 2.3. Get the Attribute Set
Retrieve Redshift attribute set by name. We will use its identifiers for populating the attribute set for asociation to the datasets. We need the exact IDs of the fields, not just their names.

In [None]:
# Get the attribute set
sfAttrSet = finspace.attribute_set(att_name)

att_def = None
att_fields = None

# Get the fields of the attribute set
att_resp = finspace.describe_attribute_set(sfAttrSet['id'])

if 'definition' in att_resp: 
 att_def = att_resp['definition']
 
if 'fields' in att_def:
 att_fields = att_def['fields']
 
print(att_fields)

## 2.4. Get Amazon Redshift Source

One of the fields in the Redshift attribute set identifies the Source as Redshift. We need to extract classification values from FinSpace and then use them to populate the Redshift data attribtue set and associate it to the FinSpace datasets which we will create in the next steps.

In [None]:
# Get the key for "Redshift" sub-category in the classification 'Source'
source_cls = finspace.classification('Source')

source_fields = finspace.describe_classification(source_cls['id'])
source_key = None

for n in source_fields['definition']['nodes']:
 if n['fields']['name'] == source_name: 
 source_key = n['key']

# this is the key for source in the Category
print(f'Source: {source_name} Key: {source_key}')

In [None]:
# all the tables into a pandas dataframe to then iterate on
tablesDF.select('TABLE_CATALOG', 'TABLE_SCHEMA', 'TABLE_NAME', 'COMMENT').show(10, False)

## 2.5. Get a list of datsets in FinSpace to avoid duplicates

In [None]:
# Get all the datasets from Redshift (classification type Source, with values ‘Redshift’)
resp = finspace.client.list_dataset_metadata_by_taxonomy_node(taxonomyId=source_cls['id'], taxonomyNodeKey=source_key)

# Get a list of datasets to iterate over
datasets = resp['datasetMetadataSummaries']


# Build the lookup table for existing datasets from Redshift to avoid creating duplicates
types_list = []

for s in datasets:

 # end of the arn is the dataset ID
 dataset_id = os.path.basename(s['datasetArn'])

 # get the details of the dataset (name, description, etc)
 dataset_details_resp = finspace.client.describe_dataset_details(datasetId=dataset_id)

 dataset_details = None
 dataset_types = None
 owner_info = None
 taxonomy_info = None

 if 'dataset' in dataset_details_resp:
 dataset_details = dataset_details_resp["dataset"]

 if 'datasetTypeContexts' in dataset_details_resp:
 dataset_types = dataset_details_resp["datasetTypeContexts"]

 if 'ownerinfo' in dataset_details_resp:
 owner_info = dataset_details_resp["ownerinfo"]

 if 'taxonomyNodesinfo' in dataset_details_resp:
 taxonomy_info = dataset_details_resp["taxonomyNodesinfo"]
 
 # Pull Redshift attribute set from the list of dataset_types

 # first check the definition, then extract the values against the definition
 # have the keys of values/labels as the column header?
 for dt in dataset_types:
 if (dt['definition']['name'] != att_name):
 continue

 dd = {
 'dataset_id' : dataset_id
 }

 # used to map the field name (id) to the tile seen in the UI
 field_map = {}

 # get the field titles for name
 for f in dt['definition']['fields']:
 field_map[f['name']] = f['title']

 # human readable, else the keys would be numbers
 for v in dt['values']:
 dd[field_map[v['field']]] = v['values']

 types_list.append(dd)

types_pdf = pd.DataFrame(types_list)
print(types_pdf)

## 2.6. For each table in Amazon Redshift, create a datasets in FinSpace and attach an Attribute Set

In [None]:
# Permissions to grant the above group for the created dataset
basicPermissions = [
 "ViewDatasetDetails",
 "ReadDatasetData",
 "AddDatasetData",
 "CreateSnapshot",
 "EditDatasetMetadata",
 "ManageDatasetPermissions",
 "DeleteDataset"
]

# All datasets have ownership
basicOwnerInfo = {
 "phoneNumber" : "12125551000",
 "email" : "jdoe@amazon.com",
 "name" : "Jane Doe"
}

# Add the tables into a pandas dataframe to then iterate on
tablesPDF = tablesDF.select('TABLE_CATALOG', 'TABLE_SCHEMA', 'TABLE_NAME', 'COMMENT').toPandas()

c = 0
# create=False
create=True


# For each table, create a dataset with the necessary attribute set populated and associated to the dataset
for index, row in tablesPDF.iterrows():
 
 c = c + 1
 
 catalog = row.TABLE_CATALOG
 schema = row.TABLE_SCHEMA
 table = row.TABLE_NAME
 
 # do we already have this dataset?
 exist_i = None
 for ee_i, ee in types_pdf.iterrows():
 if catalog in ee.Catalog:
 if schema in ee.Schema:
 if table in ee.Table:
 exist_i = ee_i

 if exist_i is not None:
 print(f"Table {table} already exists in FinSpace")
 continue

 # Attributes and their populated values
 att_values = [
 { 'field' : get_field_by_name(att_fields, 'Catalog'), 'type' : get_field_by_name(att_fields, 'Catalog', 'type')['name'], 'values' : [ catalog ] },
 { 'field' : get_field_by_name(att_fields, 'Schema'), 'type' : get_field_by_name(att_fields, 'Schema', 'type')['name'], 'values' : [ schema ] },
 { 'field' : get_field_by_name(att_fields, 'Table'), 'type' : get_field_by_name(att_fields, 'Table', 'type')['name'], 'values' : [ table ] },
 { 'field' : get_field_by_name(att_fields, 'Source'), 'type' : get_field_by_name(att_fields, 'Source', 'type')['name'], 'values' : [ source_key ] },
 ]

 # get this table's schema from Redshift
 tableSchemaPDF = schemaDF.filter(schemaDF.table_name == table).filter(schemaDF.table_schema == schema).select('ORDINAL_POSITION', 'COLUMN_NAME', 'IS_NULLABLE', 'DATA_TYPE', 'COMMENT').orderBy('ORDINAL_POSITION').toPandas()

 print(tableSchemaPDF)
 # translate Redshift schema to FinSpace Schema
 fs_schema = get_finspace_schema(tableSchemaPDF)

 # name and description of the dataset to create
 name = f'{table}'
 description = f'Redshift table from catalog: {catalog}'
 
 if row.COMMENT is not None:
 description = row.COMMENT
 
 print(f'name: {name}')
 print(f'description: {description}')

 print("att_values:")
 for i in att_values:
 print(i)

 print("schema:")
 for i in fs_schema['columns']:
 print(i)
 
 if (create):
 # create the dataset
 dataset_id = finspace.create_dataset(
 name = name,
 description = description,
 permission_group_id = group_id,
 dataset_permissions = basicPermissions,
 kind = "TABULAR",
 owner_info = basicOwnerInfo,
 schema = fs_schema
 )

 print(f'Created, dataset_id: {dataset_id}')

 time.sleep(20)

 # associate tha attributes to the dataset
 if (att_name is not None and att_values is not None):
 print(f"Associating values to attribute set: {att_name}")
 finspace.associate_attribute_set(att_name=att_name, att_values=att_values, dataset_id=dataset_id) 