{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Importing S3 Data into FinSpace\n", "\n", "This notebook will demonstrate use of FinSpace APIs to create a dataset and populate it with data from an external (to FinSpace) S3 source.\n", "\n", "## Preparation\n", "There exists an S3 bucket with the data you wish to import, and that bucket has given the your FinSpace's service account access to that bucket. \n", "\n", "## Deutsche Börse Public Dataset (DBG PDS)\n", "Github: https://github.com/Deutsche-Boerse/dbg-pds \n", "\n", "Copy data into a bucket that has entitled the FinSpace service account to it. That bucket must grant \n", "s3:GetObject and s3:ListBucket actions to the service account ARN.\n", "\n", "FinSpace Service Account ARN (replace with your environment's service account): \n", " arn:aws:iam::**INFRASTRUCTURE_ACCOUNT_ID**:role/FinSpaceServiceRole\n", "\n", "## Entitlement Example\n", "\n", "- S3 bucket is externally accessible\n", "- replace INFRASTRUCTURE_ACCOUNT_ID with your environment's service account\n", "- replace S3_BUCKET with your s3 bucket\n", "\n", "```\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Id\": \"CrossAccountAccess\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"AWS\": [\n", " \"arn:aws:iam::INFRASTRUCTURE_ACCOUNT_ID:role/FinSpaceServiceRole\"\n", " ]\n", " },\n", " \"Action\": \"s3:GetObject\",\n", " \"Resource\": \"arn:aws:s3:::S3_BUCKET/*\"\n", " },\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"AWS\": [\n", " \"arn:aws:iam::INFRASTRUCTURE_ACCOUNT_ID:role/FinSpaceServiceRole\"\n", " ]\n", " },\n", " \"Action\": \"s3:ListBucket\",\n", " \"Resource\": \"arn:aws:s3:::S3_BUCKET\"\n", " }\n", " ]\n", "}\n", " ```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%local\n", "from aws.finspace.cluster import FinSpaceClusterManager\n", "\n", "# if this was already run, no need to run again\n", "if 'finspace_clusters' not in globals():\n", " finspace_clusters = FinSpaceClusterManager()\n", " finspace_clusters.auto_connect()\n", "else:\n", " print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# FinSpace Environment\n", "\n", "Please provide values from your AWS account (S3 location) and your FinSpace environment. The group ID is from the user group you want to associate the dataset to, this example will grant all permissions to the group for this dataset it creates.\n", "\n", "## Getting the Group ID\n", "\n", "Navigate to the Analyst group (gear menu, users and groups, select group named Analyst). The URL is of this pattern: \n", "http://**ENVIRONMEN_ID**.**REGION**.amazonfinspace.com/userGroup/**GROUP_ID** \n", "\n", "Copy the string for GroupID into the **group_id** variable assignment below\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "import pandas as pd\n", "from aws.finspace.analytics import FinSpaceAnalyticsManager\n", "\n", "# location of data copied from s3://deutsche-boerse-xetra-pds \n", "root_folder = 's3://'\n", "\n", "# dataset_id, if None will create, if not None, this update will be an append usig bucket contents\n", "dataset_id = None\n", "\n", "# User Group to grant access to the dataset\n", "group_id = ''\n", "\n", "# date range to pull from S3 into FinSpace (directories of root_foles)\n", "start_date = '2021-01-04' \n", "end_date = '2021-01-06'\n", "\n", "finspace_manager = FinSpaceAnalyticsManager(spark)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Dataset Definitions\n", "Capture the dataset's name, description, schema, attribute set, attribute set values, permissions to assign to the permission group." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Name for the dataset\n", "name = \"Deutsche Börse Public Dataset (Xetra)\"\n", "\n", "# description for the dataset\n", "description = \"\"\"The Deutsche Börse Public Dataset (PDS) project makes near-time data derived from Deutsche Börse's trading systems available to the public for free. This is the first time that such detailed financial market data has been shared freely and continually from the source provider.\n", "\"\"\"\n", "\n", "# this is the attribute set to use, will search for it in system\n", "att_name = \"Capital Market Details\"\n", "\n", "# Attributes to associate, based on the definition of the attribute set\n", "att_values = [\n", " { 'field' : 'AssetClass', 'type' : 'TAXONOMY', 'values' : [ 'Equity', 'CommonStocks'] },\n", " { 'field' : 'EventType', 'type' : 'TAXONOMY', 'values' : [ 'OHLC' ] },\n", " { 'field' : 'Exchange', 'type' : 'TAXONOMY', 'values' : [ ] },\n", " { 'field' : 'FinancialContentType', 'type' : 'TAXONOMY', 'values' : [ ] },\n", " { 'field' : 'RegionsAndCountries', 'type' : 'TAXONOMY', 'values' : [ 'Germany' ] }\n", "]\n", "\n", "# Permissions to grant the above group for the created dataset\n", "basicPermissions = [\n", " \"ViewDatasetDetails\",\n", " \"ReadDatasetData\",\n", " \"AddDatasetData\",\n", " \"CreateSnapshot\",\n", " \"EditDatasetMetadata\",\n", " \"ManageDatasetPermissions\",\n", " \"DeleteDataset\"\n", "]\n", "\n", "request_dataset_permissions = [{\"permission\": permissionName} for permissionName in basicPermissions]\n", "\n", "# All datasets have ownership\n", "basicOwnerInfo = {\n", " \"phoneNumber\" : \"12125551000\",\n", " \"email\" : \"jdoe@amazon.com\",\n", " \"name\" : \"Jane Doe\"\n", "}\n", "\n", "# schema of the dataset\n", "schema = {\n", " 'primaryKeyColumns': [],\n", " 'columns' : [\n", " {'dataType': 'STRING', 'name': 'ISIN', 'description': 'ISIN of the security'},\n", " {'dataType': 'STRING', 'name': 'Mnemonic', 'description': 'The product market segment, following the convention on http://www.eurexchange.com'},\n", " {'dataType': 'STRING', 'name': 'SecurityDesc', 'description': 'Description of the security'},\n", " {'dataType': 'STRING', 'name': 'SecurityType', 'description': 'Type of security'},\n", " {'dataType': 'STRING', 'name': 'Currency', 'description': 'Currency in which the product is traded'},\n", " {'dataType': 'INTEGER', 'name': 'SecurityID', 'description': 'Unique identifier for each contract'},\n", " {'dataType': 'DATE', 'name': 'Date', 'description': 'Date of trading period'},\n", " {'dataType': 'STRING', 'name': 'Time', 'description': 'Minute of trading to which this entry relates'},\n", " {'dataType': 'DOUBLE', 'name': 'StartPrice', 'description': 'Trading price at the start of period'},\n", " {'dataType': 'DOUBLE', 'name': 'MaxPrice', 'description': 'Maximum price over the period'},\n", " {'dataType': 'DOUBLE', 'name': 'MinPrice', 'description': 'Minimum price over the period'},\n", " {'dataType': 'DOUBLE', 'name': 'EndPrice', 'description': 'Trading price at the end of the period'},\n", " {'dataType': 'DOUBLE', 'name': 'TradedVolume', 'description': 'Total value traded'},\n", " {'dataType': 'INTEGER', 'name': 'NumberOfTrades', 'description': 'Number of distinct trades during the period'}\n", " ]\n", "}\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# call FinSpace to create the dataset if no ID was assigned\n", "# if an ID was assigned, will not create a dataset but will simply add data to it\n", "if dataset_id is None:\n", " # Create the dataset if it does not exist yet\n", " resp = finspace_manager.finspace_client.create_dataset(\n", " name = name, \n", " description = description, \n", " permissionGroupId = group_id,\n", " datasetPermissions = request_dataset_permissions,\n", " kind = \"TABULAR\",\n", " ownerInfo = basicOwnerInfo,\n", " schema = schema\n", " )\n", "\n", " dataset_id = resp[\"datasetId\"]\n", " \n", " time.sleep(5)\n", "\n", "print(f'Dataset ID: {dataset_id}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# use pandas to generate a range of dates between start and end\n", "dates = pd.date_range(start=start_date, end=end_date)\n", "\n", "for d in dates:\n", " s3_source = f'{root_folder}/{d.strftime(\"%Y-%m-%d\")}'\n", " print(f'Ingesting from: {s3_source}')\n", "\n", " try:\n", " resp = finspace_manager.finspace_client.instance.create_changeset(datasetId=dataset_id, changeType='APPEND', \n", " sourceType='S3', sourceParams={'s3SourcePath': s3_source}, \n", " formatType='CSV', \n", " formatParams={'separator': ',', 'withHeader': 'true'})\n", "\n", " changeset_id = resp['changeset']['id']\n", " \n", " except Exception as e:\n", " print(f'No Data, Weekend? {d} {e}')\n", " continue\n", "\n", " print(f'Changeset ID: {changeset_id}')\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def wait_for_ingestion(client, dataset_id: str, changeset_id: str, sleep_sec=10):\n", " \"\"\"\n", " function that will continuously poll the changeset creation to ensure it completes or fails before returning\n", " :param dataset_id: GUID of the dataset\n", " :type: str\n", " :param changeset_id: GUID of the changeset\n", " :type: str\n", " :param sleep_sec: seconds to wait between checks\n", " :type: int\n", " \"\"\"\n", " while True:\n", " resp1 = client.describe_changeset(datasetId=dataset_id, id=changeset_id)\n", "\n", " resp2 = resp1.get('changeset', '')\n", " status = resp2.get('status', '')\n", "\n", " if status == 'SUCCESS':\n", " print(f\"Changeset complete\")\n", " break\n", " elif status == 'PENDING' or status == 'RUNNING':\n", " print(f\"Changeset status is still PENDING, waiting {sleep_sec} sec ...\")\n", " time.sleep(sleep_sec)\n", " continue\n", " else:\n", " raise Exception(f\"Bad changeset status: {resp1}{status}, failing now.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wait_for_ingestion(finspace_manager.finspace_client, dataset_id=dataset_id, changeset_id=changeset_id)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print( f\"dataset_id: {dataset_id}\") \n", "\n", "resp = finspace_manager.finspace_client.list_data_views(datasetIdEquals = dataset_id, maxResults=100)\n", "\n", "existing_views = resp['dataViews']\n", "\n", "autoupdate_view_id = None\n", "\n", "for ss in existing_views:\n", " if ss['autoUpdate'] == True: \n", " autoupdate_view_id = ss['dataViewId']\n", " \n", "# create a an auto-update snapshot for this dataset if one does not already exist\n", "if autoupdate_view_id is None:\n", " print(\"creating auto-update view\")\n", "\n", " resp = finspace_manager.finspace_client.create_materialized_snapshot(\n", " destinationProperties={},\n", " autoUpdate=True,\n", " sortColumns=[],\n", " partitionColumns=[],\n", " destinationType = \"GLUE_TABLE\",\n", " datasetId=dataset_id)\n", " autoupdate_view_id = resp['id']\n", "else:\n", " print(f\"Exists: autoupdate_view_id = {autoupdate_view_id}\")\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Associate Attribute Set" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def list_attribute_sets(client):\n", " resp = client.list_dataset_types(sort='NAME')\n", " results = resp['datasetTypeSummaries']\n", "\n", " while \"nextToken\" in resp:\n", " resp = client.list_dataset_types(sort='NAME', nextToken=resp['nextToken'])\n", " results.extend(resp['datasetTypeSummaries'])\n", "\n", " return results\n", "\n", "def attribute_set(client, name: str):\n", " \"\"\"\n", " Exact name search for a dataset type of the given name\n", " :param name: name of the dataset type to find\n", " :param name: name of the dataset type to find\n", " :type: str\n", " :return\n", " \"\"\"\n", " all_dataset_types = list_attribute_sets(client)\n", " existing_dataset_type = next((c for c in all_dataset_types if c['name'].lower() == name.lower()), None)\n", "\n", " if existing_dataset_type:\n", " return existing_dataset_type\n", "\n", "def describe_attribute_set(client, attribute_set_id: str):\n", " \"\"\"\n", " Calls describe dataset type API function and only returns the dataset type portion of the response\n", " :param attribute_set_id: the GUID of the dataset type to get description of\n", " :type: str\n", " \"\"\"\n", " resp = None\n", " dataset_type_details_resp = client.describe_dataset_type(datasetTypeId=attribute_set_id)\n", "\n", " if 'datasetType' in dataset_type_details_resp:\n", " resp = dataset_type_details_resp['datasetType']\n", "\n", " return resp\n", "\n", "def associate_attribute_set(client, att_name: str, att_values: list, dataset_id: str):\n", " # get the attribute set by name, will need its id\n", " att_set = attribute_set(client, att_name)\n", "\n", " # get the dataset's information, will need the arn\n", " dataset_details_resp = client.describe_dataset_details(datasetId=dataset_id)\n", "\n", " dataset = dataset_details_resp.get(\"dataset\", None)\n", "\n", " if dataset is None:\n", " raise ValueError(f'No dataset found for id: {dataset_id}')\n", "\n", " # disassociate any existing relationship\n", " try:\n", " client.dissociate_dataset_from_attribute_set(datasetArn=dataset['arn'], attributeSetId=att_set['id'], datasetId=dataset_id)\n", " except:\n", " print(\"Nothing to disassociate\")\n", "\n", " arn = dataset['arn']\n", " attribute_set_id = att_set['id']\n", "\n", " client.associate_dataset_with_attribute_set(datasetArn=arn, attributeSetId=attribute_set_id, datasetId=dataset_id)\n", "\n", " resp = client.update_dataset_attribute_set_context(datasetArn=arn, datasetId=dataset_id, attributeSetId=attribute_set_id, values=att_values)\n", "\n", " if resp['ResponseMetadata']['HTTPStatusCode'] != 200:\n", " return resp\n", "\n", " return" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Associate an attribute set and fill its values\n", "print(f\"Associating values to attribute set: {att_name}\")\n", "\n", "associate_attribute_set(finspace_manager.finspace_client.instance, att_name=att_name, att_values=att_values, dataset_id=dataset_id)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import datetime\n", "print( f\"Last Run: {datetime.datetime.now()}\" )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "FinSpace PySpark (finspace-sparkmagic-84084/latest)", "language": "python", "name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:489461498020:image/finspace-sparkmagic-84084" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }