# Yahoo Finance API

Using the Yahoo Finance APIs and the Python library yfinance will download and create a series of datasets in FinSpace.

## Datasets to Create
1. Daily HLOC for all history
2. Corporate Actions
3. Dividends
4. Information

## References
- [yfinance GitHub](https://github.com/ranaroussi/yfinance)
- [Reliably download historical market data from Yahoo! Finance with Python](https://github.com/ranaroussi/yfinance)

In [None]:
%local
from aws.finspace.cluster import FinSpaceClusterManager

# if this was already run, no need to run again
if 'finspace_clusters' not in globals():
 finspace_clusters = FinSpaceClusterManager()
 finspace_clusters.auto_connect()
else:
 print(f'connected to cluster: {finspace_clusters.get_connected_cluster_id()}')

In [None]:
try:
 sc.install_pypi_package('yfinance==0.1.70')
except Exception as e:
 print('Packages already Installed')

# Variables and Libraries

**Important** that you fill in the identifiers for the ticker dataset and view you will be using as the universe of tickers to pull data for. These are identifiers specific to the ticker history you would have created with the polygon_import notebook.

In [None]:
# imports
import time

import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib as plt

from io import StringIO
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType

from aws.finspace.analytics import FinSpaceAnalyticsManager

from datetime import datetime, timedelta

###
### Populate with values from your FinSpace
###
### Use the dataset and view created from polygon_import.ipynb
###
ticker_dataset_id = ''
ticker_view_id = ''

# User Group to grant access to the dataset
group_id = ''

# if this is to work with an existing dataset
dataset_id = None

finspace_manager = FinSpaceAnalyticsManager(spark)

## Read in the Tickers Dataset

This would have been created from the polygon_import notebook.

In [None]:
tickersDF = finspace_manager.read_data_view(ticker_dataset_id, ticker_view_id)
tickersDF.printSchema()

In [None]:
tickersDF.show(5)
print(f'Tickers: {tickersDF.count()}')

# Stock Prices

Using the APIs and Spark, construct a dataframe of all historical prices.

In [None]:
schema = StructType(
 [
 StructField('ticker', StringType(), True), 
 StructField('date', DateType(), True),
 StructField('open', DoubleType(), True),
 StructField('high', DoubleType(), True),
 StructField('low', DoubleType(), True),
 StructField('close', DoubleType(), True),
 StructField('volume', DoubleType(), True),
 StructField('dividends', DoubleType(), True), 
 StructField('stock_splits', DoubleType(), True), 
 ]
)

def fetch_tick(group, pdf):
 tick = group[0]
 period = 'max'
 interval = '1d'
 try:
 aTicker = yf.Ticker(tick)
 raw = aTicker.history(period=period, interval=interval )[['Open', 'High', 'Low', 'Close', 'Volume', 'Dividends', 'Stock Splits']]
 # fill in missing business days
 idx = pd.date_range(raw.index.min(), raw.index.max(), freq='B')
 # use last observation carried forward for missing value
 output_df = raw.reindex(idx, method='pad')
 # Pandas does not keep index (date) when converted into spark dataframe
 output_df['date'] = output_df.index
 output_df['ticker'] = tick
 new_cols = [elem.strip().lower().replace(" ", "_") for elem in output_df.columns] 
 output_df.columns = new_cols
 return output_df
 except:
 print(f'Issue with {tick}')
 return pd.DataFrame(columns = schema.names)

# parallel grab of data, meaning the apply is executed across executors, data is gathered faster
stockDF = ( tickersDF
 .groupBy("ticker")
 .applyInPandas(fetch_tick, schema=schema)
)

# show does few calls to yfinance b/c its only going to need 5 rows...
stockDF.show(5)

In [None]:
stockDF.printSchema()

# Create FinSpace Dataset

Using the FinSpace APIs will define the Dataset, add the Changeset, create auto-updating view, and associate and populate attributes to the dataset. 

## Definitions

Here are the various data elements we need for creating the dataset.

In [None]:
# Name for the dataset
name = "EOD Prices"

# description for the dataset
description = "Equity EOD Price History from Yahoo Finance"

# this is the attribute set to use, will search for it in system, this name assumes the Capital Markets Sample Data Bundle was installed
att_name = "Capital Market Details"

# Attributes to associate, based on the definition of the attribute set
att_values = [
 { 'field' : 'AssetClass', 'type' : 'TAXONOMY', 'values' : [ 'Equity', 'CommonStocks', 'Currencies' ] },
 { 'field' : 'EventType', 'type' : 'TAXONOMY', 'values' : [ ] },
 { 'field' : 'Exchange', 'type' : 'TAXONOMY', 'values' : [ ] },
 { 'field' : 'FinancialContentType', 'type' : 'TAXONOMY', 'values' : [ ] },
 { 'field' : 'RegionsAndCountries', 'type' : 'TAXONOMY', 'values' : [ ] }
]

# Permissions to grant the above group for the created dataset
basicPermissions = [
 "ViewDatasetDetails",
 "ReadDatasetData",
 "AddDatasetData",
 "CreateSnapshot",
 "EditDatasetMetadata",
 "ManageDatasetPermissions",
 "DeleteDataset"
]

request_dataset_permissions = [{"permission": permissionName} for permissionName in basicPermissions]

# All datasets have ownership
basicOwnerInfo = {
 "phoneNumber" : "12125551000",
 "email" : "jdoe@amazon.com",
 "name" : "Jane Doe"
}

# schema of the dataset
schema = {
 'primaryKeyColumns': [],
 'columns' : [
 {'dataType': 'STRING', 'name': 'ticker', 'description': 'The exchange symbol that this item is traded under'},
 {'dataType': 'DATE', 'name': 'date', 'description': 'Reporting date'},
 {'dataType': 'DOUBLE', 'name': 'open', 'description': 'Open Price'},
 {'dataType': 'DOUBLE', 'name': 'high', 'description': 'High Price'},
 {'dataType': 'DOUBLE', 'name': 'low', 'description': 'Low Price'},
 {'dataType': 'DOUBLE', 'name': 'close', 'description': 'Close Price'},
 {'dataType': 'DOUBLE', 'name': 'volume', 'description': 'Number of Shares Traded'},
 {'dataType': 'DOUBLE', 'name': 'dividends', 'description': 'Any dividends paid'},
 {'dataType': 'DOUBLE', 'name': 'stock_splits', 'description': 'Any stock sploits'}
 ]
}

In [None]:
# call FinSpace to create the dataset if no ID was assigned
# if an ID was assigned, will not create a dataset but will simply add data to it
if dataset_id is None:
 # Create the dataset if it does not exist yet
 resp = finspace_manager.finspace_client.create_dataset(
 name = name, 
 description = description, 
 permissionGroupId = group_id,
 datasetPermissions = request_dataset_permissions,
 kind = "TABULAR",
 ownerInfo = basicOwnerInfo,
 schema = schema
 )

 dataset_id = resp["datasetId"]
 
print(f'Dataset ID: {dataset_id}')

In [None]:
resp = finspace_manager.finspace_client.instance.get_user_ingestion_info()

upload_location = resp['ingestionPath']
stockDF.write.parquet(upload_location)

resp = finspace_manager.finspace_client.instance.create_changeset(datasetId=dataset_id, changeType='REPLACE', 
 sourceType='S3', sourceParams={'s3SourcePath': upload_location}, formatType='PARQUET', formatParams={})

changeset_id = resp['changeset']['id']

print(f"changeset_id = {changeset_id}")

In [None]:
def wait_for_ingestion(client, dataset_id: str, changeset_id: str, sleep_sec=10):
 """
 function that will continuously poll the changeset creation to ensure it completes or fails before returning
 :param dataset_id: GUID of the dataset
 :type: str
 :param changeset_id: GUID of the changeset
 :type: str
 :param sleep_sec: seconds to wait between checks
 :type: int
 """
 while True:
 resp1 = client.describe_changeset(datasetId=dataset_id, id=changeset_id)

 resp2 = resp1.get('changeset', '')
 status = resp2.get('status', '')

 if status == 'SUCCESS':
 print(f"Changeset complete")
 break
 elif status == 'PENDING' or status == 'RUNNING':
 print(f"Changeset status is still PENDING, waiting {sleep_sec} sec ...")
 time.sleep(sleep_sec)
 continue
 else:
 raise Exception(f"Bad changeset status: {status}, failing now.")

In [None]:
wait_for_ingestion(finspace_manager.finspace_client, dataset_id=dataset_id, changeset_id=changeset_id)

# Create View of the Dataset

In [None]:
print( f"dataset_id: {dataset_id}") 

resp = finspace_manager.finspace_client.list_data_views(datasetIdEquals = dataset_id, maxResults=100)

existing_views = resp['dataViews']

autoupdate_view_id = None

for ss in existing_views:
 if ss['autoUpdate'] == True: 
 autoupdate_view_id = ss.get('dataViewId', None)
 
# create a an auto-update snapshot for this dataset if one does not already exist
if autoupdate_view_id is None:
 print("creating auto-update view")

 resp = finspace_manager.finspace_client.create_materialized_snapshot(
 destinationProperties={},
 autoUpdate=True,
 sortColumns=[],
 partitionColumns=[],
 destinationType = "GLUE_TABLE",
 datasetId=dataset_id)
 autoupdate_view_id = resp['id']
else:
 print(f"Exists: autoupdate_view_id = {autoupdate_view_id}")
 

## Associate Attribute Set

In [None]:
def list_attribute_sets(client):
 resp = client.list_dataset_types(sort='NAME')
 results = resp['datasetTypeSummaries']

 while "nextToken" in resp:
 resp = client.list_dataset_types(sort='NAME', nextToken=resp['nextToken'])
 results.extend(resp['datasetTypeSummaries'])

 return results

def attribute_set(client, name: str):
 """
 Exact name search for a dataset type of the given name
 :param name: name of the dataset type to find
 :param name: name of the dataset type to find
 :type: str
 :return
 """
 all_dataset_types = list_attribute_sets(client)
 existing_dataset_type = next((c for c in all_dataset_types if c['name'].lower() == name.lower()), None)

 if existing_dataset_type:
 return existing_dataset_type

def describe_attribute_set(client, attribute_set_id: str):
 """
 Calls describe dataset type API function and only returns the dataset type portion of the response
 :param attribute_set_id: the GUID of the dataset type to get description of
 :type: str
 """
 resp = None
 dataset_type_details_resp = client.describe_dataset_type(datasetTypeId=attribute_set_id)

 if 'datasetType' in dataset_type_details_resp:
 resp = dataset_type_details_resp['datasetType']

 return resp

def associate_attribute_set(client, att_name: str, att_values: list, dataset_id: str):
 # get the attribute set by name, will need its id
 att_set = attribute_set(client, att_name)

 # get the dataset's information, will need the arn
 dataset_details_resp = client.describe_dataset_details(datasetId=dataset_id)

 dataset = dataset_details_resp.get("dataset", None)

 if dataset is None:
 raise ValueError(f'No dataset found for id: {dataset_id}')

 # disassociate any existing relationship
 try:
 client.dissociate_dataset_from_attribute_set(datasetArn=dataset['arn'], attributeSetId=att_set['id'], datasetId=dataset_id)
 except:
 print("Nothing to disassociate")

 arn = dataset['arn']
 attribute_set_id = att_set['id']

 client.associate_dataset_with_attribute_set(datasetArn=arn, attributeSetId=attribute_set_id, datasetId=dataset_id)

 resp = client.update_dataset_attribute_set_context(datasetArn=arn, datasetId=dataset_id, attributeSetId=attribute_set_id, values=att_values)

 if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
 return resp

 return

In [None]:
# Associate an attribute set and fill its values
print(f"Associating values to attribute set: {att_name}")

associate_attribute_set(finspace_manager.finspace_client, att_name=att_name, att_values=att_values, dataset_id=dataset_id)

In [None]:
import datetime
print( f"Last Run: {datetime.datetime.now()}" )