import time import json import gzip import os import shutil import boto3 import botocore.exceptions import pandas as pd import matplotlib.pyplot as plt from tqdm.auto import trange import util.notebook_utils def wait_till_delete(callback, check_time = 5, timeout = None): elapsed_time = 0 while timeout is None or elapsed_time < timeout: try: out = callback() except botocore.exceptions.ClientError as e: # When given the resource not found exception, deletion has occured if e.response['Error']['Code'] == 'ResourceNotFoundException': print('Successful delete') return else: raise time.sleep(check_time) # units of seconds elapsed_time += check_time raise TimeoutError( "Forecast resource deletion timed-out." ) def wait(callback, time_interval = 10): status_indicator = util.notebook_utils.StatusIndicator() while True: status = callback()['Status'] status_indicator.update(status) if status in ('ACTIVE', 'CREATE_FAILED'): break time.sleep(time_interval) status_indicator.end() return (status=="ACTIVE") def load_exact_sol(fname, item_id, is_schema_perm=False): exact = pd.read_csv(fname, header = None) exact.columns = ['item_id', 'timestamp', 'target'] if is_schema_perm: exact.columns = ['timestamp', 'target', 'item_id'] return exact.loc[exact['item_id'] == item_id] def get_or_create_iam_role( role_name ): iam = boto3.client("iam") assume_role_policy_document = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "forecast.amazonaws.com" }, "Action": "sts:AssumeRole" } ] } try: create_role_response = iam.create_role( RoleName = role_name, AssumeRolePolicyDocument = json.dumps(assume_role_policy_document) ) role_arn = create_role_response["Role"]["Arn"] print("Created", role_arn) print("Attaching policies...") iam.attach_role_policy( RoleName = role_name, PolicyArn = "arn:aws:iam::aws:policy/AmazonForecastFullAccess" ) iam.attach_role_policy( RoleName=role_name, PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess', ) print("Waiting for a minute to allow IAM role policy attachment to propagate") for i in trange(60): time.sleep(1.0) except iam.exceptions.EntityAlreadyExistsException: print("The role " + role_name + " already exists, skipping creation") role_arn = boto3.resource('iam').Role(role_name).arn print("Done.") return role_arn def delete_iam_role( role_name ): iam = boto3.client("iam") iam.detach_role_policy( PolicyArn = "arn:aws:iam::aws:policy/AmazonS3FullAccess", RoleName = role_name ) iam.detach_role_policy( PolicyArn = "arn:aws:iam::aws:policy/AmazonForecastFullAccess", RoleName = role_name ) iam.delete_role(RoleName=role_name) def create_bucket(bucket_name, region=None): """Create an S3 bucket in a specified region If a region is not specified, the bucket is created in the S3 default region (us-east-1). :param bucket_name: Bucket to create :param region: String region to create bucket in, e.g., 'us-west-2' :return: True if bucket created, else False """ try: if region is None: s3_client = boto3.client('s3') s3_client.create_bucket(Bucket=bucket_name) elif region == "us-east-1": s3_client = boto3.client('s3') s3_client.create_bucket(Bucket=bucket_name) else: s3_client = boto3.client('s3', region_name=region) location = {'LocationConstraint': region} s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration=location) except Exception as e: print(e) return False return True def plot_forecasts(fcsts, exact, freq = '1H', forecastHorizon=24, time_back = 80): p10 = pd.DataFrame(fcsts['Forecast']['Predictions']['p10']) p50 = pd.DataFrame(fcsts['Forecast']['Predictions']['p50']) p90 = pd.DataFrame(fcsts['Forecast']['Predictions']['p90']) pred_int = p50['Timestamp'].apply(lambda x: pd.Timestamp(x)) fcst_start_date = pred_int.iloc[0] fcst_end_date = pred_int.iloc[-1] time_int = exact['timestamp'].apply(lambda x: pd.Timestamp(x)) plt.plot(time_int[-time_back:],exact['target'].values[-time_back:], color = 'r') plt.plot(pred_int, p50['Value'].values, color = 'k') plt.fill_between(pred_int, p10['Value'].values, p90['Value'].values, color='b', alpha=0.3); plt.axvline(x=pd.Timestamp(fcst_start_date), linewidth=3, color='g', ls='dashed') plt.axvline(x=pd.Timestamp(fcst_end_date), linewidth=3, color='g', ls='dashed') plt.xticks(rotation=30) plt.legend(['Target', 'Forecast'], loc = 'lower left') def extract_gz( src, dst ): print( f"Extracting {src} to {dst}" ) with open(dst, 'wb') as fd_dst: with gzip.GzipFile( src, 'rb') as fd_src: data = fd_src.read() fd_dst.write(data) print("Done.") def read_explainability_export(BUCKET_NAME, s3_path): """Read explainability export files Inputs: BUCKET_NAME = S3 bucket name s3_path = S3 path to export files , everything after "s3://BUCKET_NAME/" in S3 URI path to your files Return: Pandas dataframe with all files concatenated row-wise """ # set s3 path s3 = boto3.resource('s3') s3_bucket = boto3.resource('s3').Bucket(BUCKET_NAME) s3_depth = s3_path.split("/") s3_depth = len(s3_depth) - 1 # set local path local_write_path = "explainability_exports" if (os.path.exists(local_write_path) and os.path.isdir(local_write_path)): shutil.rmtree('explainability_exports') if not(os.path.exists(local_write_path) and os.path.isdir(local_write_path)): os.makedirs(local_write_path) # concat part files part_filename = "" part_files = list(s3_bucket.objects.filter(Prefix=s3_path)) print(f"Number .part files found: {len(part_files)}") for file in part_files: # There will be a collection of CSVs, modify this to go get them all if "csv" in file.key: part_filename = file.key.split('/')[s3_depth] window_object = s3.Object(BUCKET_NAME, file.key) file_size = window_object.content_length if file_size > 0: s3.Bucket(BUCKET_NAME).download_file(file.key, local_write_path+"/"+part_filename) # Read from local dir and combine all the part files temp_dfs = [] for entry in os.listdir(local_write_path): if os.path.isfile(os.path.join(local_write_path, entry)): df = pd.read_csv(os.path.join(local_write_path, entry), index_col=None, header=0) temp_dfs.append(df) # Return assembled .part files as pandas Dataframe fcst_df = pd.concat(temp_dfs, axis=0, ignore_index=True, sort=False) return fcst_df