{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Polygon.io\n", "Notebook to use polygon.io with Spark and FinSpace\n", "\n", "## Reference\n", "[Polygon.io](https://polygon.io)\n", "[Python Client](https://github.com/polygon-io/client-python)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%local\n", "from aws.finspace.cluster import FinSpaceClusterManager\n", "\n", "# if this was already run, no need to run again\n", "if 'finspace_clusters' not in globals():\n", " finspace_clusters = FinSpaceClusterManager()\n", " finspace_clusters.auto_connect()\n", "else:\n", " print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " sc.install_pypi_package('polygon-api-client==0.2.11')\n", "except Exception as e:\n", " print('Packages already Installed')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Variables and Libraries\n", "\n", "You will need a Polygon client_key, fill its value below.\n", "\n", "**IMPORTANT** Use a group ID from your FinSpace to grant permissions to for the dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "import pandas as pd\n", "import urllib.parse as urlparse\n", "\n", "from aws.finspace.analytics import FinSpaceAnalyticsManager\n", "\n", "from polygon import RESTClient\n", "from urllib.parse import parse_qs\n", "\n", "#\n", "# User Group to grant access to the dataset\n", "# \n", "# FILL IN VALUES BELOW FOR YOUR GROUP ID AND CLIENT KEY\n", "# ----------------------------------------------------------------------\n", "group_id = '' \n", "dataset_id = None\n", "\n", "client_key = ''\n", "\n", "client = RESTClient(client_key)\n", "\n", "finspace_manager = FinSpaceAnalyticsManager(spark)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Get Tickers\n", "\n", "Using the Polygon APIs, create a table of all Tickers." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# function to extract the pagination cursor\n", "def get_cursor(url):\n", " parsed = urlparse.urlparse(url)\n", " cursor = parse_qs(parsed.query)['cursor']\n", " return cursor\n", "\n", "resp = client.reference_tickers_v3(limit=1000)\n", "\n", "all_tickers = []\n", "\n", "if resp.status == 'OK':\n", " all_tickers.extend(resp.results)\n", "\n", "while hasattr(resp, 'next_url'):\n", " print(\"Next, but first sleeping...\")\n", " time.sleep((60/5) + 1) # 5 calls per minute\n", " cursor = get_cursor(resp.next_url)\n", " resp = client.reference_tickers_v3(limit=1000, cursor=cursor)\n", "\n", " all_tickers.extend(resp.results)\n", "\n", "# create pandas dataframe from the responses \n", "tickers_df = pd.DataFrame.from_records(all_tickers)\n", "tickers_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Convert to Spark DataFrame" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.types import *\n", "\n", "# Auxiliar functions\n", "def equivalent_type(f):\n", " if f == 'datetime64[ns]': return TimestampType()\n", " elif f == 'int64': return LongType()\n", " elif f == 'int32': return IntegerType()\n", " elif f == 'float64': return FloatType()\n", " elif f == 'bool': return BooleanType()\n", " else: return StringType()\n", "\n", "def define_structure(string, format_type):\n", " try: typo = equivalent_type(format_type)\n", " except: typo = StringType()\n", " return StructField(string, typo)\n", "\n", "def get_schema(pandas_df):\n", " columns = list(pandas_df.columns)\n", " types = list(pandas_df.dtypes)\n", " struct_list = []\n", " for column, typo in zip(columns, types): \n", " struct_list.append(define_structure(column, typo))\n", " \n", " return StructType(struct_list)\n", "\n", "# Given pandas dataframe, it will return a spark's dataframe.\n", "def pandas_to_spark(pandas_df):\n", " p_schema = get_schema(pandas_df)\n", " \n", " return sqlContext.createDataFrame(pandas_df, p_schema)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tickersDF = pandas_to_spark(tickers_df)\n", "\n", "tickersDF.printSchema()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.functions import *\n", "\n", "# convert the datatime column of string to proper timestamp type\n", "tickersDF = ( tickersDF\n", " .withColumnRenamed('last_updated_utc', 'input_timestamp_str')\n", " .withColumn(\"last_updated_utc\",to_timestamp(\"input_timestamp_str\"))\n", " .drop('input_timestamp_str')\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# sample the table\n", "tickersDF.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create FinSpace Dataset\n", "\n", "Using the FinSpace APIs will define the Dataset, add the Changeset, create auto-updating view, and associate and populate attributes to the dataset. \n", "\n", "## Definitions\n", "\n", "Here are the various data elements we need for creating the dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Name for the dataset\n", "name = \"Ticker Universe\"\n", "\n", "# description for the dataset\n", "description = \"All ticker symbols which are supported by Polygon.io\"\n", "\n", "# this is the attribute set to use, will search for it in system, this name assumes the Capital Markets Sample Data Bundle was installed\n", "att_name = \"Capital Market Details\"\n", "\n", "# Attributes to associate, based on the definition of the attribute set\n", "att_values = [\n", " { 'field' : 'AssetClass', 'type' : 'TAXONOMY', 'values' : [ 'Equity', 'CommonStocks', 'Currencies', 'FXSpot', 'Crypto'] },\n", " { 'field' : 'EventType', 'type' : 'TAXONOMY', 'values' : [ ] },\n", " { 'field' : 'Exchange', 'type' : 'TAXONOMY', 'values' : [ ] },\n", " { 'field' : 'FinancialContentType', 'type' : 'TAXONOMY', 'values' : [ ] },\n", " { 'field' : 'RegionsAndCountries', 'type' : 'TAXONOMY', 'values' : [ ] }\n", "]\n", "\n", "# Permissions to grant the above group for the created dataset\n", "basicPermissions = [\n", " \"ViewDatasetDetails\",\n", " \"ReadDatasetData\",\n", " \"AddDatasetData\",\n", " \"CreateSnapshot\",\n", " \"EditDatasetMetadata\",\n", " \"ManageDatasetPermissions\",\n", " \"DeleteDataset\"\n", "]\n", "\n", "request_dataset_permissions = [{\"permission\": permissionName} for permissionName in basicPermissions]\n", "\n", "# All datasets have ownership\n", "basicOwnerInfo = {\n", " \"phoneNumber\" : \"12125551000\",\n", " \"email\" : \"jdoe@amazon.com\",\n", " \"name\" : \"Jane Doe\"\n", "}\n", "\n", "# schema of the dataset\n", "schema = {\n", " 'primaryKeyColumns': [],\n", " 'columns' : [\n", " {'dataType': 'STRING', 'name': 'ticker', 'description': 'The exchange symbol that this item is traded under'},\n", " {'dataType': 'STRING', 'name': 'name', 'description': 'The name of the asset. For stocks equities this will be the companies registered name. For crypto/fx this will be the name of the currency or coin pair'},\n", " {'dataType': 'STRING', 'name': 'market', 'description': 'The market type of the asset'},\n", " {'dataType': 'STRING', 'name': 'locale', 'description': 'The locale of the asset'},\n", " {'dataType': 'STRING', 'name': 'primary_exchange', 'description': 'The ISO code of the primary listing exchange for this asset'},\n", " {'dataType': 'STRING', 'name': 'type', 'description': 'The type of the asset'},\n", " {'dataType': 'BOOLEAN', 'name': 'active', 'description': 'Whether or not the asset is actively traded. False means the asset has been delisted'},\n", " {'dataType': 'STRING', 'name': 'currency_name', 'description': 'The name of the currency that this asset is traded with'},\n", " {'dataType': 'STRING', 'name': 'cik', 'description': 'The CIK number for this ticker'},\n", " {'dataType': 'STRING', 'name': 'composite_figi', 'description': 'The composite OpenFIGI number for this ticker'},\n", " {'dataType': 'STRING', 'name': 'share_class_figi', 'description': 'The share Class OpenFIGI number for this ticker'},\n", " {'dataType': 'STRING', 'name': 'currency_symbol', 'description': ''},\n", " {'dataType': 'STRING', 'name': 'base_currency_symbol', 'description': ''},\n", " {'dataType': 'STRING', 'name': 'base_currency_name', 'description': ''},\n", " {'dataType': 'DATETIME', 'name': 'last_updated_utc', 'description': 'The last time this asset record was updated'}\n", " ]\n", "}\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# call FinSpace to create the dataset if no ID was assigned\n", "# if an ID was assigned, will not create a dataset but will simply add data to it\n", "if dataset_id is None:\n", " # Create the dataset if it does not exist yet\n", " resp = finspace_manager.finspace_client.create_dataset(\n", " name = name, \n", " description = description, \n", " permissionGroupId = group_id,\n", " datasetPermissions = request_dataset_permissions,\n", " kind = \"TABULAR\",\n", " ownerInfo = basicOwnerInfo,\n", " schema = schema\n", " )\n", "\n", " dataset_id = resp[\"datasetId\"]\n", " \n", " time.sleep(5)\n", "\n", "print(f'Dataset ID: {dataset_id}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "resp = finspace_manager.finspace_client.instance.get_user_ingestion_info()\n", "\n", "upload_location = resp['ingestionPath']\n", "tickersDF.write.parquet(upload_location)\n", "\n", "resp = finspace_manager.finspace_client.instance.create_changeset(datasetId=dataset_id, changeType='REPLACE', \n", " sourceType='S3', sourceParams={'s3SourcePath': upload_location}, formatType='PARQUET', formatParams={})\n", "\n", "changeset_id = resp['changeset']['id']\n", "\n", "print(f\"changeset_id = {changeset_id}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def wait_for_ingestion(client, dataset_id: str, changeset_id: str, sleep_sec=10):\n", " \"\"\"\n", " function that will continuously poll the changeset creation to ensure it completes or fails before returning\n", " :param dataset_id: GUID of the dataset\n", " :type: str\n", " :param changeset_id: GUID of the changeset\n", " :type: str\n", " :param sleep_sec: seconds to wait between checks\n", " :type: int\n", " \"\"\"\n", " while True:\n", " resp1 = client.describe_changeset(datasetId=dataset_id, id=changeset_id)\n", "\n", " resp2 = resp1.get('changeset', '')\n", " status = resp2.get('status', '')\n", "\n", " if status == 'SUCCESS':\n", " print(f\"Changeset complete\")\n", " break\n", " elif status == 'PENDING' or status == 'RUNNING':\n", " print(f\"Changeset status is still PENDING, waiting {sleep_sec} sec ...\")\n", " time.sleep(sleep_sec)\n", " continue\n", " else:\n", " raise Exception(f\"Bad changeset status: {resp1}{status}, failing now.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wait_for_ingestion(finspace_manager.finspace_client, dataset_id=dataset_id, changeset_id=changeset_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create View of the Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print( f\"dataset_id: {dataset_id}\") \n", "\n", "resp = finspace_manager.finspace_client.list_data_views(datasetIdEquals = dataset_id, maxResults=100)\n", "\n", "existing_views = resp['dataViews']\n", "\n", "autoupdate_view_id = None\n", "\n", "for ss in existing_views:\n", " if ss['autoUpdate'] == True: \n", " autoupdate_view_id = ss.get('dataViewId', None)\n", " \n", "# create a an auto-update snapshot for this dataset if one does not already exist\n", "if autoupdate_view_id is None:\n", " print(\"creating auto-update view\")\n", "\n", " resp = finspace_manager.finspace_client.create_materialized_snapshot(\n", " destinationProperties={},\n", " autoUpdate=True,\n", " sortColumns=[],\n", " partitionColumns=[],\n", " destinationType = \"GLUE_TABLE\",\n", " datasetId=dataset_id)\n", " autoupdate_view_id = resp['id']\n", "else:\n", " print(f\"Exists: autoupdate_view_id = {autoupdate_view_id}\")\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Associate Attribute Set" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def list_attribute_sets(client):\n", " resp = client.list_dataset_types(sort='NAME')\n", " results = resp['datasetTypeSummaries']\n", "\n", " while \"nextToken\" in resp:\n", " resp = client.list_dataset_types(sort='NAME', nextToken=resp['nextToken'])\n", " results.extend(resp['datasetTypeSummaries'])\n", "\n", " return results\n", "\n", "def attribute_set(client, name: str):\n", " \"\"\"\n", " Exact name search for a dataset type of the given name\n", " :param name: name of the dataset type to find\n", " :param name: name of the dataset type to find\n", " :type: str\n", " :return\n", " \"\"\"\n", " all_dataset_types = list_attribute_sets(client)\n", " existing_dataset_type = next((c for c in all_dataset_types if c['name'].lower() == name.lower()), None)\n", "\n", " if existing_dataset_type:\n", " return existing_dataset_type\n", "\n", "def describe_attribute_set(client, attribute_set_id: str):\n", " \"\"\"\n", " Calls describe dataset type API function and only returns the dataset type portion of the response\n", " :param attribute_set_id: the GUID of the dataset type to get description of\n", " :type: str\n", " \"\"\"\n", " resp = None\n", " dataset_type_details_resp = client.describe_dataset_type(datasetTypeId=attribute_set_id)\n", "\n", " if 'datasetType' in dataset_type_details_resp:\n", " resp = dataset_type_details_resp['datasetType']\n", "\n", " return resp\n", "\n", "def associate_attribute_set(client, att_name: str, att_values: list, dataset_id: str):\n", " # get the attribute set by name, will need its id\n", " att_set = attribute_set(client, att_name)\n", "\n", " # get the dataset's information, will need the arn\n", " dataset_details_resp = client.describe_dataset_details(datasetId=dataset_id)\n", "\n", " dataset = dataset_details_resp.get(\"dataset\", None)\n", "\n", " if dataset is None:\n", " raise ValueError(f'No dataset found for id: {dataset_id}')\n", "\n", " # disassociate any existing relationship\n", " try:\n", " client.dissociate_dataset_from_attribute_set(datasetArn=dataset['arn'], attributeSetId=att_set['id'], datasetId=dataset_id)\n", " except:\n", " print(\"Nothing to disassociate\")\n", "\n", " arn = dataset['arn']\n", " attribute_set_id = att_set['id']\n", "\n", " foo = client.associate_dataset_with_attribute_set(datasetArn=arn, attributeSetId=attribute_set_id, datasetId=dataset_id)\n", "\n", " resp = client.update_dataset_attribute_set_context(datasetArn=arn, datasetId=dataset_id, attributeSetId=attribute_set_id, values=att_values)\n", "\n", " if resp['ResponseMetadata']['HTTPStatusCode'] != 200:\n", " return resp\n", "\n", " return" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Associate an attribute set and fill its values\n", "print(f\"Associating values to attribute set: {att_name}\")\n", "\n", "associate_attribute_set(finspace_manager.finspace_client, att_name=att_name, att_values=att_values, dataset_id=dataset_id)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(f\"\"\"\n", "ticker_dataset_id = '{dataset_id}'\n", "ticker_view_id = '{autoupdate_view_id}'\n", "\"\"\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import datetime\n", "print( f\"Last Run: {datetime.datetime.now()}\" )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "FinSpace PySpark (finspace-sparkmagic-84084/latest)", "language": "python", "name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:489461498020:image/finspace-sparkmagic-84084" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" }, "widgets": { "application/vnd.jupyter.widget-state+json": { "state": {}, "version_major": 2, "version_minor": 0 } } }, "nbformat": 4, "nbformat_minor": 5 }