{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Yahoo Finance API\n", "\n", "Using the Yahoo Finance APIs and the Python library yfinance will download and create a series of datasets in FinSpace.\n", "\n", "## Datasets to Create\n", "1. Daily HLOC for all history\n", "2. Corporate Actions\n", "3. Dividends\n", "4. Information\n", "\n", "## References\n", "- [yfinance GitHub](https://github.com/ranaroussi/yfinance)\n", "- [Reliably download historical market data from Yahoo! Finance with Python](https://github.com/ranaroussi/yfinance)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%local\n", "from aws.finspace.cluster import FinSpaceClusterManager\n", "\n", "# if this was already run, no need to run again\n", "if 'finspace_clusters' not in globals():\n", " finspace_clusters = FinSpaceClusterManager()\n", " finspace_clusters.auto_connect()\n", "else:\n", " print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " sc.install_pypi_package('yfinance==0.1.70')\n", "except Exception as e:\n", " print('Packages already Installed')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Variables and Libraries\n", "\n", "**Important** that you fill in the identifiers for the ticker dataset and view you will be using as the universe of tickers to pull data for. These are identifiers specific to the ticker history you would have created with the polygon_import notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# imports\n", "import time\n", "\n", "import yfinance as yf\n", "import pandas as pd\n", "import numpy as np\n", "import matplotlib as plt\n", "\n", "from io import StringIO\n", "from pyspark.sql.types import *\n", "from pyspark.sql import functions as F\n", "from pyspark.sql.functions import udf\n", "from pyspark.sql import Window\n", "from pyspark.sql.functions import pandas_udf, PandasUDFType\n", "\n", "from aws.finspace.analytics import FinSpaceAnalyticsManager\n", "\n", "from datetime import datetime, timedelta\n", "\n", "###\n", "### Populate with values from your FinSpace\n", "###\n", "### Use the dataset and view created from polygon_import.ipynb\n", "###\n", "ticker_dataset_id = ''\n", "ticker_view_id = ''\n", "\n", "# User Group to grant access to the dataset\n", "group_id = ''\n", "\n", "# if this is to work with an existing dataset\n", "dataset_id = None\n", "\n", "finspace_manager = FinSpaceAnalyticsManager(spark)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Read in the Tickers Dataset\n", "\n", "This would have been created from the polygon_import notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tickersDF = finspace_manager.read_data_view(ticker_dataset_id, ticker_view_id)\n", "tickersDF.printSchema()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tickersDF.show(5)\n", "print(f'Tickers: {tickersDF.count()}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Stock Prices\n", "\n", "Using the APIs and Spark, construct a dataframe of all historical prices." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "schema = StructType(\n", " [\n", " StructField('ticker', StringType(), True), \n", " StructField('date', DateType(), True),\n", " StructField('open', DoubleType(), True),\n", " StructField('high', DoubleType(), True),\n", " StructField('low', DoubleType(), True),\n", " StructField('close', DoubleType(), True),\n", " StructField('volume', DoubleType(), True),\n", " StructField('dividends', DoubleType(), True), \n", " StructField('stock_splits', DoubleType(), True), \n", " ]\n", ")\n", "\n", "def fetch_tick(group, pdf):\n", " tick = group[0]\n", " period = 'max'\n", " interval = '1d'\n", " try:\n", " aTicker = yf.Ticker(tick)\n", " raw = aTicker.history(period=period, interval=interval )[['Open', 'High', 'Low', 'Close', 'Volume', 'Dividends', 'Stock Splits']]\n", " # fill in missing business days\n", " idx = pd.date_range(raw.index.min(), raw.index.max(), freq='B')\n", " # use last observation carried forward for missing value\n", " output_df = raw.reindex(idx, method='pad')\n", " # Pandas does not keep index (date) when converted into spark dataframe\n", " output_df['date'] = output_df.index\n", " output_df['ticker'] = tick\n", " new_cols = [elem.strip().lower().replace(\" \", \"_\") for elem in output_df.columns] \n", " output_df.columns = new_cols\n", " return output_df\n", " except:\n", " print(f'Issue with {tick}')\n", " return pd.DataFrame(columns = schema.names)\n", "\n", "# parallel grab of data, meaning the apply is executed across executors, data is gathered faster\n", "stockDF = ( tickersDF\n", " .groupBy(\"ticker\")\n", " .applyInPandas(fetch_tick, schema=schema)\n", ")\n", "\n", "# show does few calls to yfinance b/c its only going to need 5 rows...\n", "stockDF.show(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stockDF.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create FinSpace Dataset\n", "\n", "Using the FinSpace APIs will define the Dataset, add the Changeset, create auto-updating view, and associate and populate attributes to the dataset. \n", "\n", "## Definitions\n", "\n", "Here are the various data elements we need for creating the dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Name for the dataset\n", "name = \"EOD Prices\"\n", "\n", "# description for the dataset\n", "description = \"Equity EOD Price History from Yahoo Finance\"\n", "\n", "# this is the attribute set to use, will search for it in system, this name assumes the Capital Markets Sample Data Bundle was installed\n", "att_name = \"Capital Market Details\"\n", "\n", "# Attributes to associate, based on the definition of the attribute set\n", "att_values = [\n", " { 'field' : 'AssetClass', 'type' : 'TAXONOMY', 'values' : [ 'Equity', 'CommonStocks', 'Currencies' ] },\n", " { 'field' : 'EventType', 'type' : 'TAXONOMY', 'values' : [ ] },\n", " { 'field' : 'Exchange', 'type' : 'TAXONOMY', 'values' : [ ] },\n", " { 'field' : 'FinancialContentType', 'type' : 'TAXONOMY', 'values' : [ ] },\n", " { 'field' : 'RegionsAndCountries', 'type' : 'TAXONOMY', 'values' : [ ] }\n", "]\n", "\n", "# Permissions to grant the above group for the created dataset\n", "basicPermissions = [\n", " \"ViewDatasetDetails\",\n", " \"ReadDatasetData\",\n", " \"AddDatasetData\",\n", " \"CreateSnapshot\",\n", " \"EditDatasetMetadata\",\n", " \"ManageDatasetPermissions\",\n", " \"DeleteDataset\"\n", "]\n", "\n", "request_dataset_permissions = [{\"permission\": permissionName} for permissionName in basicPermissions]\n", "\n", "# All datasets have ownership\n", "basicOwnerInfo = {\n", " \"phoneNumber\" : \"12125551000\",\n", " \"email\" : \"jdoe@amazon.com\",\n", " \"name\" : \"Jane Doe\"\n", "}\n", "\n", "# schema of the dataset\n", "schema = {\n", " 'primaryKeyColumns': [],\n", " 'columns' : [\n", " {'dataType': 'STRING', 'name': 'ticker', 'description': 'The exchange symbol that this item is traded under'},\n", " {'dataType': 'DATE', 'name': 'date', 'description': 'Reporting date'},\n", " {'dataType': 'DOUBLE', 'name': 'open', 'description': 'Open Price'},\n", " {'dataType': 'DOUBLE', 'name': 'high', 'description': 'High Price'},\n", " {'dataType': 'DOUBLE', 'name': 'low', 'description': 'Low Price'},\n", " {'dataType': 'DOUBLE', 'name': 'close', 'description': 'Close Price'},\n", " {'dataType': 'DOUBLE', 'name': 'volume', 'description': 'Number of Shares Traded'},\n", " {'dataType': 'DOUBLE', 'name': 'dividends', 'description': 'Any dividends paid'},\n", " {'dataType': 'DOUBLE', 'name': 'stock_splits', 'description': 'Any stock sploits'}\n", " ]\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# call FinSpace to create the dataset if no ID was assigned\n", "# if an ID was assigned, will not create a dataset but will simply add data to it\n", "if dataset_id is None:\n", " # Create the dataset if it does not exist yet\n", " resp = finspace_manager.finspace_client.create_dataset(\n", " name = name, \n", " description = description, \n", " permissionGroupId = group_id,\n", " datasetPermissions = request_dataset_permissions,\n", " kind = \"TABULAR\",\n", " ownerInfo = basicOwnerInfo,\n", " schema = schema\n", " )\n", "\n", " dataset_id = resp[\"datasetId\"]\n", " \n", "print(f'Dataset ID: {dataset_id}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "resp = finspace_manager.finspace_client.instance.get_user_ingestion_info()\n", "\n", "upload_location = resp['ingestionPath']\n", "stockDF.write.parquet(upload_location)\n", "\n", "resp = finspace_manager.finspace_client.instance.create_changeset(datasetId=dataset_id, changeType='REPLACE', \n", " sourceType='S3', sourceParams={'s3SourcePath': upload_location}, formatType='PARQUET', formatParams={})\n", "\n", "changeset_id = resp['changeset']['id']\n", "\n", "print(f\"changeset_id = {changeset_id}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def wait_for_ingestion(client, dataset_id: str, changeset_id: str, sleep_sec=10):\n", " \"\"\"\n", " function that will continuously poll the changeset creation to ensure it completes or fails before returning\n", " :param dataset_id: GUID of the dataset\n", " :type: str\n", " :param changeset_id: GUID of the changeset\n", " :type: str\n", " :param sleep_sec: seconds to wait between checks\n", " :type: int\n", " \"\"\"\n", " while True:\n", " resp1 = client.describe_changeset(datasetId=dataset_id, id=changeset_id)\n", "\n", " resp2 = resp1.get('changeset', '')\n", " status = resp2.get('status', '')\n", "\n", " if status == 'SUCCESS':\n", " print(f\"Changeset complete\")\n", " break\n", " elif status == 'PENDING' or status == 'RUNNING':\n", " print(f\"Changeset status is still PENDING, waiting {sleep_sec} sec ...\")\n", " time.sleep(sleep_sec)\n", " continue\n", " else:\n", " raise Exception(f\"Bad changeset status: {status}, failing now.\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "wait_for_ingestion(finspace_manager.finspace_client, dataset_id=dataset_id, changeset_id=changeset_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Create View of the Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print( f\"dataset_id: {dataset_id}\") \n", "\n", "resp = finspace_manager.finspace_client.list_data_views(datasetIdEquals = dataset_id, maxResults=100)\n", "\n", "existing_views = resp['dataViews']\n", "\n", "autoupdate_view_id = None\n", "\n", "for ss in existing_views:\n", " if ss['autoUpdate'] == True: \n", " autoupdate_view_id = ss.get('dataViewId', None)\n", " \n", "# create a an auto-update snapshot for this dataset if one does not already exist\n", "if autoupdate_view_id is None:\n", " print(\"creating auto-update view\")\n", "\n", " resp = finspace_manager.finspace_client.create_materialized_snapshot(\n", " destinationProperties={},\n", " autoUpdate=True,\n", " sortColumns=[],\n", " partitionColumns=[],\n", " destinationType = \"GLUE_TABLE\",\n", " datasetId=dataset_id)\n", " autoupdate_view_id = resp['id']\n", "else:\n", " print(f\"Exists: autoupdate_view_id = {autoupdate_view_id}\")\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Associate Attribute Set" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def list_attribute_sets(client):\n", " resp = client.list_dataset_types(sort='NAME')\n", " results = resp['datasetTypeSummaries']\n", "\n", " while \"nextToken\" in resp:\n", " resp = client.list_dataset_types(sort='NAME', nextToken=resp['nextToken'])\n", " results.extend(resp['datasetTypeSummaries'])\n", "\n", " return results\n", "\n", "def attribute_set(client, name: str):\n", " \"\"\"\n", " Exact name search for a dataset type of the given name\n", " :param name: name of the dataset type to find\n", " :param name: name of the dataset type to find\n", " :type: str\n", " :return\n", " \"\"\"\n", " all_dataset_types = list_attribute_sets(client)\n", " existing_dataset_type = next((c for c in all_dataset_types if c['name'].lower() == name.lower()), None)\n", "\n", " if existing_dataset_type:\n", " return existing_dataset_type\n", "\n", "def describe_attribute_set(client, attribute_set_id: str):\n", " \"\"\"\n", " Calls describe dataset type API function and only returns the dataset type portion of the response\n", " :param attribute_set_id: the GUID of the dataset type to get description of\n", " :type: str\n", " \"\"\"\n", " resp = None\n", " dataset_type_details_resp = client.describe_dataset_type(datasetTypeId=attribute_set_id)\n", "\n", " if 'datasetType' in dataset_type_details_resp:\n", " resp = dataset_type_details_resp['datasetType']\n", "\n", " return resp\n", "\n", "def associate_attribute_set(client, att_name: str, att_values: list, dataset_id: str):\n", " # get the attribute set by name, will need its id\n", " att_set = attribute_set(client, att_name)\n", "\n", " # get the dataset's information, will need the arn\n", " dataset_details_resp = client.describe_dataset_details(datasetId=dataset_id)\n", "\n", " dataset = dataset_details_resp.get(\"dataset\", None)\n", "\n", " if dataset is None:\n", " raise ValueError(f'No dataset found for id: {dataset_id}')\n", "\n", " # disassociate any existing relationship\n", " try:\n", " client.dissociate_dataset_from_attribute_set(datasetArn=dataset['arn'], attributeSetId=att_set['id'], datasetId=dataset_id)\n", " except:\n", " print(\"Nothing to disassociate\")\n", "\n", " arn = dataset['arn']\n", " attribute_set_id = att_set['id']\n", "\n", " client.associate_dataset_with_attribute_set(datasetArn=arn, attributeSetId=attribute_set_id, datasetId=dataset_id)\n", "\n", " resp = client.update_dataset_attribute_set_context(datasetArn=arn, datasetId=dataset_id, attributeSetId=attribute_set_id, values=att_values)\n", "\n", " if resp['ResponseMetadata']['HTTPStatusCode'] != 200:\n", " return resp\n", "\n", " return" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Associate an attribute set and fill its values\n", "print(f\"Associating values to attribute set: {att_name}\")\n", "\n", "associate_attribute_set(finspace_manager.finspace_client, att_name=att_name, att_values=att_values, dataset_id=dataset_id)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import datetime\n", "print( f\"Last Run: {datetime.datetime.now()}\" )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "FinSpace PySpark (finspace-sparkmagic-84084/latest)", "language": "python", "name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:489461498020:image/finspace-sparkmagic-84084" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" }, "widgets": { "application/vnd.jupyter.widget-state+json": { "state": {}, "version_major": 2, "version_minor": 0 } } }, "nbformat": 4, "nbformat_minor": 5 }