# vim: set fdm=indent: """ ___ / | ____ ___ ____ _____ ____ ____ / /| | / __ `__ \/ __ `/_ / / __ \/ __ \ / ___ |/ / / / / / /_/ / / /_/ /_/ / / / / /_/ |_/_/ /_/ /_/\__,_/ /___/\____/_/ /_/ ______ __ / ____/___ ________ _________ ______/ /_ / /_ / __ \/ ___/ _ \/ ___/ __ `/ ___/ __/ / __/ / /_/ / / / __/ /__/ /_/ (__ ) /_ /_/ \____/_/ \___/\___/\__,_/____/\__/ ___ __ __ / | _____________ / /__ _________ _/ /_____ _____ / /| |/ ___/ ___/ _ \/ / _ \/ ___/ __ `/ __/ __ \/ ___/ / ___ / /__/ /__/ __/ / __/ / / /_/ / /_/ /_/ / / /_/ |_\___/\___/\___/_/\___/_/ \__,_/\__/\____/_/ GITHUB: https://github.com/aws-samples/simple-forecast-solution/ USAGE: streamlit run -- ./app.py --local-dir LOCAL_DIR [--landing-page-url URL] OPTIONS: --local-dir LOCAL_DIR /path/to/ a local directory from which the UI will look for files. --landing-page-url URL URL of the AFA landing page """ import argparse import datetime import gc import glob import gzip import json import logging import os import pathlib import re import tempfile import textwrap import time from collections import OrderedDict from concurrent import futures from textwrap import dedent from urllib.parse import urlparse import awswrangler as wr import boto3 import cloudpickle import numpy as np import pandas as pd import plotly.express as pex import plotly.graph_objects as go import streamlit as st from awswrangler.exceptions import NoFilesFound from botocore.exceptions import ClientError from humanfriendly import format_timespan from joblib import Parallel, delayed from lambdamap.lambdamap import LambdaExecutor, LambdaFunction from sspipe import px from st_aggrid import AgGrid, GridOptionsBuilder, JsCode from stqdm import stqdm from streamlit import session_state as state from toolz.itertoolz import partition_all from afa import ( EXP_COLS, GROUP_COLS, calc_smape, calc_wape, make_demand_classification, make_health_summary, process_forecasts, run_cv_select, run_pipeline, ) ST_STATIC_PATH = pathlib.Path(st.__path__[0]).joinpath("static") ST_DOWNLOADS_PATH = ST_STATIC_PATH.joinpath("downloads") LAMBDAMAP_FUNC = "AfaLambdaMapFunction" LOCAL_DIR = "/home/ec2-user/SageMaker" if not os.path.exists(ST_DOWNLOADS_PATH): ST_DOWNLOADS_PATH.mkdir() FREQ_MAP = OrderedDict(Daily="D", Weekly="W-MON", Monthly="MS") FREQ_MAP_AFC = OrderedDict(Daily="D", Weekly="W", Monthly="M") FREQ_MAP_LONG = { "D": "Daily", "W-MON": "Weekly", "W": "Weekly", "M": "Monthly", "MS": "Monthly", } FREQ_MAP_PD = { "D": "D", "W": "W-MON", "W-SUN": "W-MON", "W-MON": "W-MON", "M": "MS", "MS": "MS", } METRIC = "smape" MAX_LAMBDAS = 1000 def validate(df): """Validate a dataset.""" err_msgs = [] warn_msgs = [] # check column names for col in EXP_COLS: if col not in df: err_msgs.append(f"missing **{col}** column") msgs = {"errors": err_msgs, "warnings": warn_msgs} is_valid_file = len(err_msgs) == 0 return df, msgs, is_valid_file @st.cache def load_file(path): """ """ if path.endswith(".csv.gz"): compression = "gzip" elif path.endswith(".csv"): compression = None else: raise NotImplementedError return pd.read_csv(path, dtype={"timestamp": str}, compression=compression) def _sum(y): if np.all(pd.isnull(y)): return np.nan return np.nansum(y) def _resample(df2, freq): df2 = ( df2.groupby(["channel", "family", "item_id"]) .resample(freq) .demand.sum(min_count=1) ) return df2 def process_data(df, freq, chunksize=None): """ """ df["timestamp"] = pd.DatetimeIndex(df["timestamp"]) df.set_index("timestamp", inplace=True) groups = df.groupby(["channel", "family", "item_id"], sort=False) if chunksize is None: chunksize = min(groups.ngroups, 1000) total = int(np.ceil(groups.ngroups / chunksize)) all_results = [] for chunk in stqdm(partition_all(chunksize, groups), total=total, desc="Progress"): results = Parallel(n_jobs=-1)(delayed(_resample)(dd, freq) for _, dd in chunk) all_results.extend(results) df = pd.concat(all_results).reset_index(["channel", "family", "item_id"]) df.index.name = None return df class StreamlitExecutor(LambdaExecutor): """Custom LambdaExecutor to display a progress bar in the app.""" def map(self, func, payloads, local_mode=False): """ """ if local_mode: f = func else: f = LambdaFunction(func, self._client, self._lambda_arn) ex = self._executor wait_for = [ex.submit(f, *p["args"], **p["kwargs"]) for p in payloads] return wait_for def display_progress(wait_for, desc=None): """ """ # display progress of the futures pbar = stqdm(desc=desc, total=len(wait_for)) prev_n_done = 0 n_done = sum(f.done() for f in wait_for) while n_done != len(wait_for): diff = n_done - prev_n_done pbar.update(diff) prev_n_done = n_done n_done = sum(f.done() for f in wait_for) time.sleep(0.25) diff = n_done - prev_n_done pbar.update(diff) return def run_lambdamap(df, horiz, freq): """ """ payloads = [] freq = FREQ_MAP_PD[freq] if freq[0] == "W": cv_periods = None cv_stride = 2 elif freq[0] == "M": cv_periods = None cv_stride = 1 else: raise NotImplementedError # with st.spinner(f":rocket: Launching forecasts via AWS Lambda (Îģ)..."): # resample the dataset to the forecast frequency before running # lambdamap start = time.time() df2 = get_df_resampled(df, freq) print(f"completed in {format_timespan(time.time()-start)}") groups = df2.groupby(GROUP_COLS, as_index=False, sort=False) # generate payload for _, dd in groups: payloads.append( { "args": (dd, horiz, freq), "kwargs": { "metric": "smape", "cv_periods": cv_periods, "cv_stride": cv_stride, }, } ) # launch jobs in chunks of 1000 executor = StreamlitExecutor( max_workers=min(MAX_LAMBDAS, len(payloads)), lambda_arn=LAMBDAMAP_FUNC ) wait_for = executor.map(run_cv_select, payloads) display_progress(wait_for, "đŸ”Ĩ Generating forecasts") return wait_for def get_df_resampled(df, freq): groups = df.groupby(["channel", "family", "item_id"], sort=False) chunksize = min(1000, groups.ngroups) total = int(np.ceil(float(groups.ngroups) / chunksize)) all_results = [] for chunk in stqdm( partition_all(chunksize, groups), total=total, desc="Batch Preparation Progress" ): results = Parallel(n_jobs=-1)(delayed(_resample)(dd, freq) for _, dd in chunk) all_results.extend(results) df2 = pd.concat(all_results).reset_index(["channel", "family", "item_id"]) df2 = _resample(df, freq).reset_index(["channel", "family", "item_id"]) df2.index.name = None state["report"]["data"]["df2"] = df2 return df2 def display_ag_grid( df, auto_height=False, paginate=False, comma_cols=None, selection_mode=None, use_checkbox=False, ): """ Parameters ---------- df : pd.DataFrame auto_height : bool pagination : bool comma_cols : tuple or list Numeric columns to apply comma thousands separator. """ gb = GridOptionsBuilder.from_dataframe(df) # gb.configure_selection("single") gb.configure_auto_height(auto_height) gb.configure_pagination(enabled=paginate) if selection_mode is not None: gb.configure_selection(selection_mode=selection_mode, use_checkbox=use_checkbox) comma_renderer = JsCode( textwrap.dedent( """ function(params) { return params.value .toString() .split( /(?=(?:\d{3})+(?:\.|$))/g ).join( "," ) } """ ) ) for col in comma_cols: gb.configure_column(col, cellRenderer=comma_renderer) response = AgGrid(df, gridOptions=gb.build(), allow_unsafe_jscode=True) return response def valid_launch_freqs(): data_freq = state.report["data"]["freq"] valid_freqs = ["D", "W", "M"] if data_freq in ("D",): # don't allow daily forecasting yet valid_freqs = valid_freqs[1:] elif data_freq in ( "W", "W-MON", ): valid_freqs = valid_freqs[1:] elif data_freq in ( "M", "MS", ): valid_freqs = valid_freqs[2:] else: raise NotImplementedError return valid_freqs def create_presigned_url(s3_path, expiration=3600): """Generate a presigned URL to share an S3 object :param bucket_name: string :param object_name: string :param expiration: Time in seconds for the presigned URL to remain valid :return: Presigned URL as string. If error, returns None. """ parsed_url = urlparse(s3_path, allow_fragments=False) bucket_name = parsed_url.netloc object_name = parsed_url.path.strip("/") # Generate a presigned URL for the S3 object s3_client = boto3.client("s3") try: response = s3_client.generate_presigned_url( "get_object", Params={"Bucket": bucket_name, "Key": object_name}, ExpiresIn=expiration, ) except ClientError as e: logging.error(e) return None # The response contains the presigned URL return response def make_df_backtests(df_results, parallel=False): """Expand df_results to a "long" dataframe with the columns: channel, family, item_id, timestamp, actual, backtest. """ def _expand(dd): ts = np.hstack(dd["ts_cv"].apply(np.hstack)) ys = np.hstack(dd["y_cv"].apply(np.hstack)) yp = np.hstack(dd["yp_cv"].apply(np.hstack)) df = pd.DataFrame({"timestamp": ts, "demand": ys, "backtest": yp}) return df groups = df_results.query("rank == 1").groupby( ["channel", "family", "item_id"], as_index=True, sort=False ) if parallel: df_backtests = groups.parallel_apply(_expand) else: df_backtests = groups.apply(_expand) df_backtests["timestamp"] = pd.DatetimeIndex(df_backtests["timestamp"]) return df_backtests.reset_index(["channel", "family", "item_id"]) def save_report(report_fn): """ """ if "report" not in state or "name" not in state["report"]: return if "path" not in state["report"]["data"]: st.warning( textwrap.dedent( """ Warning: unable to save report, no input data was loaded. """ ) ) return start = time.time() with st.spinner(":hourglass_flowing_sand: Saving Report ..."): tmp = tempfile.TemporaryDirectory() local_path = os.path.join(tmp.name, report_fn) # save the report locally cloudpickle.dump(state["report"], gzip.open(local_path, "wb")) # upload the report to s3 s3_path = f'{state["report"]["afa"]["s3_afa_reports_path"]}/{report_fn}' parsed_url = urlparse(s3_path, allow_fragments=False) bucket = parsed_url.netloc key = parsed_url.path.strip("/") s3_client = boto3.client("s3") try: s3_client.upload_file(local_path, bucket, key) signed_url = create_presigned_url(s3_path) st.info( textwrap.dedent( f""" The report can be downloaded [here]({signed_url}). """ ) ) except ClientError as e: logging.error(e) st.text(f"(completed in {format_timespan(time.time() - start)})") return def make_df_reports(bucket, prefix): s3 = boto3.client("s3") df = pd.DataFrame() df["filename"] = [ e["Key"] for p in s3.get_paginator("list_objects_v2").paginate( Bucket=bucket, Prefix=prefix ) for e in p["Contents"] ] # df["s3_path"] = "s3://" + bucket + "/" + df["filename"] df["filename"] = df["filename"].apply(os.path.basename) return df # # Panels # def make_mask(df, channel, family, item_id): mask = np.ones(len(df)).astype(bool) # only mask when all three keys are non-empty if channel == "" or family == "" or item_id == "": return ~mask mask &= df["channel"].str.upper() == channel.upper() mask &= df["family"].str.upper() == family.upper() mask &= df["item_id"].str.upper() == item_id.upper() return mask @st.cache def make_downloads(df_pred, df_results): """ """ pred_fn = os.path.join( ST_DOWNLOADS_PATH, f"{state.uploaded_file.name}_fcast.csv.gz" ) results_fn = os.path.join( ST_DOWNLOADS_PATH, f"{state.uploaded_file.name}_results.csv.gz" ) state.df_pred.to_csv(pred_fn, index=False, compression="gzip") state.df_results.to_csv(results_fn, index=False, compression="gzip") return pred_fn, results_fn def _info(s): st.info(textwrap.dedent(s)) def _success(s): st.success(textwrap.dedent(s)) def _write(s): st.write(textwrap.dedent(s)) def panel_create_report(expanded=True): """Display the 'Load Data' panel.""" def _load_data(path): if path.endswith(".csv"): compression = None elif path.endswith(".csv.gz"): compression = "gzip" else: raise NotImplementedError df = pd.read_csv( path, dtype={"timestamp": str, "channel": str, "family": str, "item_id": str}, compression=compression, ) return df freq = state["report"]["data"].get("freq", None) st.markdown("## Create Report") with st.expander("âŦ†ī¸ Load + Validate Data", expanded=expanded): st.write( f"""Step 1 – Create a new forecast report by selecting an uploaded file containing the demand history for your use-case. You must also specify the frequency of the demand (e.g. _Daily_, _Weekly_, or _Monthly_). Demand history files are uploaded using the [SageMaker Notebook interface]({state["landing_page_url"]})""" ) now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") st.button( "Refresh Files", help="Refresh the _File_ selector with recently uploaded files.", ) with st.form("create_report_form"): report_name = st.text_input( "Report Name (optional)", help="You may optionally give this report a name, otherwise one " "will be automatically generated.", ) _cols = st.columns([3, 1]) with _cols[0]: fn = file_selectbox( "File (.csv or .csv.gz files)", args.local_dir, help="This file contains the demand history as either a " "`.csv` or `.csv.gz` file.", ) with _cols[1]: freq = st.selectbox( "Frequency", list(s for s in FREQ_MAP.values() if s != "D"), format_func=lambda s: FREQ_MAP_LONG[s], help="This input file must contain demand history at a " "_daily_, _weekly_, or _monthly_ frequency.", ) btn_validate = st.form_submit_button("Load & Validate") if btn_validate: start = time.time() if fn is None: st.error( textwrap.dedent( """ **Error** No files were selected. 1. Upload your file(s). 2. Click the **Refresh Files** button. 3. Select the file from the dropdown box. 4. Select the **Frequency**. 5. Click the **Validate** button. #### """ ) ) st.stop() if report_name == "": now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") report_name = f"AfaReport_{now_str}" if report_name != "" and re.match(r"^[A-Za-z0-9-_]*$", report_name) is None: st.error( dedent( """ The report name may only contain: - uppercase letters - lowercase letters - numbers - dashes ('-') - underscores ('_') #### """ ) ) else: # temporarily load the file for validation and store it in state # iff the data is valid with st.spinner(":hourglass_flowing_sand: Validating file ..."): df, msgs, is_valid_file = validate( _load_data(fn) ) # .drop(["timestamp", "channel"], axis=1)) if is_valid_file: with st.spinner(":hourglass_flowing_sand: Processing file ..."): state.report["name"] = report_name state.report["data"]["path"] = fn state.report["data"]["sz_bytes"] = os.path.getsize(fn) state.report["data"]["freq"] = freq # impute missing dates from the validated dataframe, this # will fill in the missing timestamps with null demand values state.report["data"]["df"] = process_data( df, state.report["data"]["freq"] ) state.report["data"]["is_valid"] = True # clear any existing data health check results, this forces # a rechecking of data health state.report["data"]["df_health"] = None st.text( f"(completed in {format_timespan(time.time() - start)})" ) else: err_bullets = "\n".join("- " + s for s in msgs["errors"]) st.error(f"**Validation failed**\n\n{err_bullets}") if state.report["data"].get("is_valid", False): _success( f""" `{os.path.basename(state.report["data"]["path"])}` is **valid** """ ) return def panel_load_report(expanded=True): """ """ def format_func(s): if s == "local": return "Local Filesystem" elif s == "s3": return "â˜ī¸ S3" st.markdown("## Load Report") with st.expander("📂 Load Report", expanded=expanded): st.write( f"""Optional – Alternatively, you can load a previously-generated report. Report files must have the `.pkl.gz` file extension and can be uploaded using the [SageMaker Notebook interface]({state["landing_page_url"]}).""" ) report_source = st.radio("Source", ["local"], format_func=format_func) _cols = st.columns([3, 1]) with _cols[0]: if report_source == "local": fn = file_selectbox( "File", os.path.join(args.local_dir), globs=("*.pkl.gz",) ) elif report_source == "s3": pass else: raise NotImplementedError load_report_btn = st.button("Load", key="load_report_btn") with _cols[1]: st.write("##") st.button("Refresh Files", key="refresh_report_files_btn") if load_report_btn: start = time.time() with st.spinner(":hourglass_flowing_sand: Loading Report ..."): state["report"] = cloudpickle.load(gzip.open(fn, "rb")) st.text(f"(completed in {format_timespan(time.time() - start)})") state["prev_state"] = "report_loaded" return def panel_data_health(): """ """ df = state.report["data"].get("df", None) df_health = state.report["data"].get("df_health", None) freq = state.report["data"].get("freq", None) if df is None: return st.header("Data Health") with st.expander("â¤ī¸ Data Health", expanded=True): st.write( """Step 2 – Inspect the characteristics of the dataset for irregularities prior to generating any forecasts. For example, missing channels, families, item IDs; or unusually short/long timeseries lengths.""" ) with st.spinner("Performing data health check ..."): start = time.time() # check iff required if df_health is None: df_health = make_health_summary(df, state.report["data"]["freq"]) # save the health check results state.report["data"]["df_health"] = df_health # calc. ranked series by demand state.report["data"]["df_ranks"] = ( df.groupby(["channel", "family", "item_id"]) .agg({"demand": sum}) .sort_values(by="demand", ascending=False) ) num_series = df_health.shape[0] num_channels = df_health["channel"].nunique() num_families = df_health["family"].nunique() num_item_ids = df_health["item_id"].nunique() first_date = df_health["timestamp_min"].dt.strftime("%Y-%m-%d").min() last_date = df_health["timestamp_max"].dt.strftime("%Y-%m-%d").max() if freq == "D": duration_unit = "D" duration_str = "days" elif freq in ( "W", "W-MON", ): duration_unit = "W" duration_str = "weeks" elif freq in ( "M", "MS", ): duration_unit = "M" duration_str = "months" else: raise NotImplementedError duration = pd.Timestamp(last_date).to_period(duration_unit) - pd.Timestamp( first_date ).to_period(duration_unit) with st.container(): _cols = st.columns(3) with _cols[0]: st.markdown("#### Summary") st.text( textwrap.dedent( f""" No. series:\t{num_series} No. channels:\t{num_channels} No. families:\t{num_families} No. item IDs:\t{num_item_ids} """ ) ) with _cols[1]: st.markdown("#### Timespan") st.text( f"Frequency:\t{FREQ_MAP_LONG[freq]}\n" f"Duration:\t{duration.n} {duration_str}\n" f"First date:\t{first_date}\n" f"Last date:\t{last_date}\n" ) # f"% missing:\t{int(np.round(pc_missing*100,0))}") with _cols[2]: st.markdown("#### Timeseries Lengths") fig = pex.box(df_health, x="demand_nonnull_count", height=160) fig.update_layout( margin={"t": 5, "b": 0, "r": 0, "l": 0}, xaxis_title=duration_str, height=100, ) st.plotly_chart(fig, use_container_width=True) st.text(f"(completed in {format_timespan(time.time() - start)})") return def panel_launch(): """ """ def _format_func(short): if short == "local": s = " Local" if short == "lambdamap": s = "AWS Lambda" return s df = state.report["data"].get("df", None) df_health = state.report["data"].get("df_health", None) horiz = state.report["afa"].get("horiz", None) freq = state.report["afa"].get("freq", None) if df is None or df_health is None: return st.header("Statistical Forecasts") with st.expander("🚀 Launch", expanded=True): st.write( """Step 3 – Generate forecasts by training and evaluating 75+ configurations of [statistical forecasting models](https://otexts.com/fpp3/) for each timeseries in parallel using AWS Lambda. A forecast at the desired _horizon length_ and _frequency_ is then generated using the each individual timeseries' best model. This process typically completes at a rate of 500–1,000 timeseries/min. """ ) with st.form("afa_form"): with st.container(): _cols = st.columns(3) with _cols[0]: horiz = st.number_input("Horizon Length", value=1, min_value=1) with _cols[1]: freq = st.selectbox( "Forecast Frequency", valid_launch_freqs(), 0, format_func=lambda s: FREQ_MAP_LONG[s], ) with _cols[2]: backend = st.selectbox( "Compute Backend", ["lambdamap"], 0, _format_func ) btn_launch = st.form_submit_button("Launch") if btn_launch: start = time.time() # save form data state.report["afa"]["freq"] = freq state.report["afa"]["horiz"] = horiz state.report["afa"]["backend"] = backend df = state.report["data"]["df"] freq_in = state.report["data"]["freq"] freq_out = state.report["afa"]["freq"] if backend == "local": wait_for = run_pipeline( df, freq_in, freq_out, metric=METRIC, cv_stride=2, backend="futures", horiz=horiz, ) display_progress(wait_for, "đŸ”Ĩ Generating forecasts") raw_results = [f.result() for f in futures.as_completed(wait_for)] elif backend == "lambdamap": with st.spinner(":rocket: Launching forecasts via AWS Lambda (Îģ)..."): all_raw_results = [] groups = df.groupby(["channel", "family", "item_id"], sort=False) chunksize = min(5000, groups.ngroups) # divide the dataset into chunks df["grp"] = groups.ngroup() % int( np.ceil(groups.ngroups / chunksize) ) groups = df.groupby("grp", sort=False) total = df["grp"].nunique() for _, dd in stqdm(groups, total=total, desc="Overall Progress"): wait_for = run_lambdamap(dd, horiz, freq_out) raw_results = [ f.result() for f in futures.as_completed(wait_for) ] all_raw_results.extend(raw_results) raw_results = all_raw_results else: raise NotImplementedError with st.spinner("âŗ Calculating results ..."): # generate the results and predictions as dataframes ( df_results, df_preds, df_model_dist, best_err, naive_err, ) = process_forecasts(wait_for, METRIC) # generate the demand classifcation info df_demand_cln = make_demand_classification(df, freq_in) # save results and forecast data state.report["afa"]["df_results"] = df_results state.report["afa"]["df_preds"] = df_preds state.report["afa"]["df_demand_cln"] = df_demand_cln state.report["afa"]["df_model_dist"] = df_model_dist state.report["afa"]["best_err"] = best_err state.report["afa"]["naive_err"] = naive_err state.report["afa"]["job_duration"] = time.time() - start job_duration = state.report["afa"].get("job_duration", None) if job_duration: st.text(f"(completed in {format_timespan(job_duration)})") return def panel_accuracy(): """ """ df = state.report["data"].get("df", None) df_demand_cln = state.report["afa"].get("df_demand_cln", None) df_results = state.report["afa"].get("df_results", None) df_model_dist = state["report"]["afa"].get("df_model_dist", None) naive_err = state["report"]["afa"].get("naive_err", None) horiz = state.report["afa"].get("horiz", None) freq_out = state.report["afa"].get("freq", None) if df is None or df_results is None or df_model_dist is None: return def _calc_metrics(dd, metric="smape"): if metric == "smape": metric_func = calc_smape elif metric == "wape": metric_func = calc_wape else: raise NotImplementedError ys = np.hstack(dd["y_cv"].apply(np.hstack)) yp = np.hstack(dd["yp_cv"].apply(np.hstack)) return metric_func(ys, yp) df_acc = ( df_results.groupby(["channel", "family", "item_id"], as_index=False, sort=True) .apply(lambda dd: _calc_metrics(dd, METRIC)) .rename({None: METRIC}, axis=1) ) with st.expander("đŸŽ¯ Forecast Summary", expanded=True): _write( """ Step 4 – The forecast error is calculated as the [symmetric mean absolute percentage error (SMAPE)](https://en.wikipedia.org/wiki/Symmetric_mean_absolute_percentage_error) via sliding window backtesting. Forecast _accuracy_ is calculated as `100-SMAPE` and is averaged across all timeseries to give the _overall accuracy_. The overall accuracy of the best naive models is used as a baseline. The _classification_ distribution indicates the percentage timeseries that have a _short_, _medium_, or _continuous_ lifecycle. The _Best Models_ chart shows the distribution of each model type that were selected as the best model across the dataset. """ ) df_cln = pd.DataFrame({"category": ["short", "medium", "continuous"]}) df_cln = df_cln.merge( df_demand_cln["category"] .value_counts(normalize=True) .reset_index() .rename({"index": "category", "category": "frac"}, axis=1), on="category", how="left", ) df_cln = df_cln.fillna(0.0) df_cln["frac"] *= 100 df_cln["frac"] = df_cln["frac"].astype(int) _cols = st.columns(3) with _cols[0]: st.markdown("#### Parameters") st.text( f"Horiz. Length:\t{horiz}\n" f"Frequency:\t{FREQ_MAP_LONG[freq_out]}" ) st.markdown("#### Classification") st.text( f"Short:\t\t{df_cln.iloc[0]['frac']} %\n" f"Medium:\t\t{df_cln.iloc[1]['frac']} %\n" f"Continuous:\t{df_cln.iloc[2]['frac']} %" ) with _cols[1]: st.markdown("#### Best Models") df_model_dist = df_model_dist.query("perc > 0") labels = df_model_dist["model_type"].values values = df_model_dist["perc"].values fig = go.Figure(data=[go.Pie(labels=labels, values=values, hole=0.40)]) fig.update(layout_showlegend=False) fig.update_layout( margin={"t": 0, "b": 0, "r": 20, "l": 20}, width=200, height=150, ) fig.update_traces(textinfo="percent+label") st.plotly_chart(fig) acc_val = (1 - np.nanmean(df_acc[METRIC])) * 100.0 acc_naive = (1 - naive_err.err_mean) * 100.0 with _cols[2]: st.markdown("#### Overall Accuracy") st.markdown( f"
{acc_val:.0f}%
" f"({np.clip(acc_val - acc_naive, 0, None):.0f}% increase vs. naive)", unsafe_allow_html=True, ) return @st.cache() def make_df_top( df, df_results, groupby_cols, dt_start, dt_stop, cperc_thresh, metric="smape" ): """ """ def calc_period_metrics(dd, dt_start, dt_stop): """ """ dt_start = pd.Timestamp(dt_start) dt_stop = pd.Timestamp(dt_stop) ts = np.hstack(dd["ts_cv"].apply(np.hstack)) ix = (ts >= dt_start) & (ts <= dt_stop) ys = np.hstack(dd["y_cv"].apply(np.hstack))[ix] yp = np.hstack(dd["yp_cv"].apply(np.hstack))[ix] if metric == "smape": error = calc_smape(ys, yp) elif metric == "wape": error = calc_wape(ys, yp) else: raise NotImplementedError return error metric_name = f"{metric}_mean" df.index.name = "timestamp" dt_start = pd.Timestamp(dt_start).strftime("%Y-%m-%d") dt_stop = pd.Timestamp(dt_stop).strftime("%Y-%m-%d") df2 = df.query(f"timestamp >= '{dt_start}' and timestamp <= '{dt_stop}'") total_demand = df2["demand"].sum() # calculate per-group demand % df_grp_demand = df2.groupby(groupby_cols, as_index=False, sort=False).agg( {"demand": sum} ) df_grp_demand["perc"] = df_grp_demand["demand"] / total_demand * 100 # get the best models for each group df_grp_metrics = ( df_results.query("rank == 1") .groupby(groupby_cols, as_index=False, sort=False) .apply(lambda dd: calc_period_metrics(dd, dt_start, dt_stop)) .pipe(pd.DataFrame) .rename({None: metric_name}, axis=1) .reset_index() ) df_grp_metrics["accuracy"] = 100 * (1 - df_grp_metrics[metric_name]) df_grp_metrics.drop(["index", metric_name], axis=1, inplace=True) # combine, sort, and display df_grp = df_grp_demand.merge( df_grp_metrics, on=groupby_cols, how="left" ).sort_values(by="demand", ascending=False) df_grp["cperc"] = df_grp["perc"].cumsum() df_grp = df_grp.query(f"cperc <= {cperc_thresh}") df_grp.rename( {"perc": "% total demand", "accuracy": "% accuracy"}, axis=1, inplace=True ) df_grp.drop("cperc", axis=1, inplace=True) # calc. summary row df_grp_summary = df_grp.agg({"demand": sum, "% accuracy": np.nanmean}) df_grp_summary["% total demand"] = np.round( 100 * df_grp_summary["demand"] / total_demand, 1 ) df_grp_summary = pd.DataFrame(df_grp_summary).T[ ["demand", "% total demand", "% accuracy"] ] df_grp_summary.insert(0, "group by", ", ".join(groupby_cols)) df_grp_summary["% accuracy"] = df_grp_summary["% accuracy"].round(0) df_grp["demand"] = df_grp["demand"].round(0) df_grp["% total demand"] = df_grp["% total demand"].round(1) df_grp["% accuracy"] = df_grp["% accuracy"].round(0) df_grp.insert(0, "rank", np.arange(df_grp.shape[0]) + 1) df_grp_summary["demand"] = df_grp_summary["demand"].round(0) df_grp_summary["% total demand"] = df_grp_summary["% total demand"].round(1) return df_grp, df_grp_summary @st.cache() def make_ml_df_top( df, df_backtests, groupby_cols, dt_start, dt_stop, cperc_thresh, metric ): """ """ def calc_period_metrics(dd, dt_start, dt_stop): """ """ dt_start = pd.Timestamp(dt_start) dt_stop = pd.Timestamp(dt_stop) ts = dd["timestamp"] ix = (ts >= dt_start) & (ts <= dt_stop) ys = dd["target_value"][ix] yp = dd["demand"][ix] if metric == "smape": error = calc_smape(ys, yp) elif metric == "wape": error = calc_wape(ys, yp) else: raise NotImplementedError return error df.index.name = "timestamp" dt_start = pd.Timestamp(dt_start).strftime("%Y-%m-%d") dt_stop = pd.Timestamp(dt_stop).strftime("%Y-%m-%d") df2 = df.query(f"timestamp >= '{dt_start}' and timestamp <= '{dt_stop}'") total_demand = df2["demand"].sum() # calculate per-group demand % df_grp_demand = df2.groupby(groupby_cols, as_index=False, sort=False).agg( {"demand": sum} ) df_grp_demand["perc"] = df_grp_demand["demand"] / total_demand * 100 # get the best models for each group df_grp_metrics = ( df_backtests.groupby(groupby_cols, as_index=False, sort=False) .apply(lambda dd: calc_period_metrics(dd, dt_start, dt_stop)) .rename({None: metric}, axis=1) ) df_grp_metrics["accuracy"] = 100 * (1 - df_grp_metrics[metric]) df_grp_metrics.drop(metric, axis=1, inplace=True) # combine, sort, and display df_grp = df_grp_demand.merge( df_grp_metrics, on=groupby_cols, how="left" ).sort_values(by="demand", ascending=False) df_grp["cperc"] = df_grp["perc"].cumsum() df_grp = df_grp.query(f"cperc <= {cperc_thresh}") df_grp.rename( {"perc": "% total demand", "accuracy": "% accuracy"}, axis=1, inplace=True ) df_grp.drop("cperc", axis=1, inplace=True) # calc. summary row df_grp_summary = df_grp.agg({"demand": sum, "% accuracy": np.nanmean}) df_grp_summary["% total demand"] = np.round( 100 * df_grp_summary["demand"] / total_demand, 1 ) df_grp_summary = pd.DataFrame(df_grp_summary).T[ ["demand", "% total demand", "% accuracy"] ] df_grp_summary.insert(0, "group by", ", ".join(groupby_cols)) df_grp_summary["% accuracy"] = df_grp_summary["% accuracy"].round(0) df_grp["demand"] = df_grp["demand"].round(0) df_grp["% total demand"] = df_grp["% total demand"].round(1) df_grp["% accuracy"] = df_grp["% accuracy"].round(0) df_grp.insert(0, "rank", np.arange(df_grp.shape[0]) + 1) df_grp_summary["demand"] = df_grp_summary["demand"].round(0) df_grp_summary["% total demand"] = df_grp_summary["% total demand"].round(1) return df_grp, df_grp_summary def panel_top_performers(): """ """ df = state.report["data"].get("df", None) df_results = state.report["afa"].get("df_results", None) if df is None or df_results is None: return with st.expander("🏆 Top Performers", expanded=True): _write( """ Step 5 – Inspect the forecast accuracy of individual channels, families, and item IDs (and each subset combination therein) for specific time periods and for groups of items that cover a given percentage of total demand. For example, you can inspect the accuracy for the smaller subset of items that cover 80% of demand in the most recent six-month period. """ ) st.write("#### Filters") _cols = st.columns([2, 1, 1]) dt_min = df.index.min() dt_max = df.index.max() with _cols[0]: groupby_cols = st.multiselect( "Group By", ["channel", "family", "item_id"], ["channel", "family", "item_id"], ) with _cols[1]: dt_start = st.date_input( "Start", value=dt_min, min_value=dt_min, max_value=dt_max ) with _cols[2]: dt_stop = st.date_input( "Stop", value=dt_max, min_value=dt_min, max_value=dt_max ) cperc_thresh = st.slider( "Percentage of total demand", step=5, value=80, format="%d%%" ) dt_start = dt_start.strftime("%Y-%m-%d") dt_stop = dt_stop.strftime("%Y-%m-%d") start = time.time() with st.spinner("Processing top performers ..."): df_grp, df_grp_summary = make_df_top( df, df_results, groupby_cols, dt_start, dt_stop, cperc_thresh, METRIC ) st.write("#### Group Summary") with st.spinner("Loading **Summary** table"): display_ag_grid(df_grp_summary, auto_height=True, comma_cols=("demand",)) st.write("#### Groups") with st.spinner("Loading **Groups** table ..."): display_ag_grid(df_grp, paginate=True, comma_cols=("demand",)) st.text(f"(completed in {format_timespan(time.time() - start)})") if st.button("Export"): with st.spinner( ":hourglass_flowing_sand: Exporting **Top Performers** ..." ): start = time.time() # write the dataframe to s3 now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") basename = os.path.basename(state["report"]["data"]["path"]) s3_afa_export_path = state["report"]["afa"]["s3_afa_export_path"] s3_path = ( f"{s3_afa_export_path}/{basename}_{now_str}" "_afa-top-performers.csv.gz" ) wr.s3.to_csv(df_grp, s3_path, compression="gzip", index=False) # generate presigned s3 url for user to download signed_url = create_presigned_url(s3_path) st.info( textwrap.dedent( f""" Download the top performers file [here]({signed_url}) `(completed in {format_timespan(time.time() - start)})` """ ) ) return def panel_visualization(): """ """ df = state.report["data"].get("df", None) df_results = state.report["afa"].get("df_results", None) df_preds = state.report["afa"].get("df_preds", None) if df is None or df_results is None or df_preds is None: return freq = state.report["afa"]["freq"] horiz = state.report["afa"]["horiz"] start = time.time() df_top = ( df.groupby(["channel", "family", "item_id"], as_index=False) .agg({"demand": sum}) .sort_values(by="demand", ascending=False) ) channel_vals = [""] + sorted(df_results["channel"].unique()) family_vals = [""] + sorted(df_results["family"].unique()) item_id_vals = [""] + sorted(df_results["item_id"].unique()) channel_index = channel_vals.index(df_top["channel"].iloc[0]) family_index = family_vals.index(df_top["family"].iloc[0]) item_id_index = item_id_vals.index(df_top["item_id"].iloc[0]) with st.expander("đŸ‘ī¸ Visualization", expanded=True): _write( """ Step 6 – Plot the historic, backtest, and forecasted demand for each timeseries. """ ) with st.form("viz_form"): st.markdown("#### Filter By") _cols = st.columns(3) with _cols[0]: channel_choice = st.selectbox( "Channel", channel_vals, index=channel_index ) with _cols[1]: family_choice = st.selectbox("Family", family_vals, index=family_index) with _cols[2]: item_id_choice = st.selectbox( "Item ID", item_id_vals, index=item_id_index ) viz_form_button = st.form_submit_button("Apply") if viz_form_button: pass results_mask = make_mask( df_results, channel_choice, family_choice, item_id_choice ) pred_mask = make_mask(df_preds, channel_choice, family_choice, item_id_choice) df_plot = df_preds[pred_mask] if len(df_plot) > 0: # display the line chart y = df_plot.query("type == 'actual'")["demand"] y_ts = df_plot.query("type == 'actual'")["timestamp"] yp = df_plot.query("type == 'fcast'")["demand"] yp_ts = df_plot.query("type == 'fcast'")["timestamp"] fig = go.Figure() fig.add_trace( go.Scatter( x=y_ts, y=y, mode="lines", name="actual", fill="tozeroy", line={"width": 3}, ) ) fig.add_trace( go.Scatter( x=yp_ts, y=yp, mode="lines", name="forecast", fill="tozeroy", line={"width": 3}, ) ) # plot dd = df_results[results_mask].query("rank == 1").iloc[0] df_backtest = ( pd.DataFrame( {"yp": np.hstack(dd["yp_cv"])}, index=pd.DatetimeIndex(np.hstack(dd["ts_cv"])), ) .sort_index() .resample(FREQ_MAP_PD[freq]) .apply(np.nanmean) ) fig.add_trace( go.Scatter( x=df_backtest.index, y=df_backtest.yp, mode="lines", name="backtest (mean)", line_dash="dot", line_color="black", ) ) # fig.update_layout( # xaxis={ # "showgrid": True, # "gridcolor": "lightgrey", # }, # yaxis={ # "showgrid": True, # "gridcolor": "lightgrey", # } # ) fig.update_layout( margin={"t": 0, "b": 0, "r": 0, "l": 0}, height=250, legend={ "orientation": "h", "yanchor": "bottom", "y": 1.0, "xanchor": "left", "x": 0.0, }, ) fig.update_xaxes( rangeslider_visible=True, ) initial_range = pd.date_range(end=yp_ts.max(), periods=horiz * 8, freq=freq) initial_range = [max(initial_range[0], y_ts.min()), initial_range[-1]] fig["layout"]["xaxis"].update(range=initial_range) st.plotly_chart(fig, use_container_width=True) plot_duration = time.time() - start st.text(f"(completed in {format_timespan(plot_duration)})") return def download_afc_files(): """ """ df = state["report"]["data"]["df"] status_dict = parse_s3_json(state.report["afc"]["status_json_s3_path"]) s3_export_path = status_dict["s3_export_path"] prefix = status_dict["prefix"] horiz = state["report"]["afc"]["horiz"] freq = state["report"]["afc"]["freq"] preds_s3_prefix = f"{s3_export_path}/{prefix}/{prefix}_processed.csv" results_s3_prefix = ( f"{s3_export_path}/{prefix}/accuracy-metrics-values/Accuracy_{prefix}_*.csv" ) backtests_s3_prefix = ( f"{s3_export_path}/{prefix}/" f"forecasted-values/Forecasts_{prefix}_BacktestExportJob_*.csv" ) _df_preds = wr.s3.read_csv( preds_s3_prefix, dtype={"channel": str, "family": str, "item_id": str} ) _preds = [] for _, dd in _df_preds.groupby( ["channel", "family", "item_id"], as_index=False, sort=False ): dd.sort_values(by="timestamp", ascending=True, inplace=True) if dd.shape[0] > horiz: dd = dd.iloc[1:, :] _preds.append(dd) df_preds = pd.concat(_preds) df_preds["type"] = "fcast" df_preds["timestamp"] = pd.DatetimeIndex(df_preds["timestamp"]) df_actual = state["report"]["data"].get("df2", None) if df_actual is None: df_actual = get_df_resampled(df, freq) df_preds = df_preds.append( df_actual.reset_index() .rename({"index": "timestamp"}, axis=1) .assign(type="actual") ) df_preds["channel"] = df_preds["channel"].str.upper() df_preds["family"] = df_preds["family"].str.upper() df_preds["item_id"] = df_preds["item_id"].str.upper() freq = FREQ_MAP_PD[state.report["afc"]["freq"]] df_results = wr.s3.read_csv( results_s3_prefix, dtype={"channel": str, "family": str, "item_id": str} ) df_results[["channel", "family", "item_id"]] = df_results["item_id"].str.split( "@@", expand=True ) df_backtests = wr.s3.read_csv(backtests_s3_prefix) df_backtests[["channel", "family", "item_id"]] = df_backtests["item_id"].str.split( "@@", expand=True ) df_backtests["timestamp"] = pd.DatetimeIndex( df_backtests["backtestwindow_end_time"] ) df_backtests["p10"] = np.clip(df_backtests["p10"], 0, None) df_backtests["demand"] = np.round(np.clip(df_backtests["p50"], 0, None), 0) df_backtests["target_value"] = df_backtests["target_value"].round(0) df_backtests = df_backtests[ [ "timestamp", "channel", "family", "item_id", "demand", "p10", "p90", "target_value", ] ] df_backtests.sort_values( by=["channel", "family", "item_id", "timestamp"], inplace=True ) return df_preds, df_results, df_backtests def panel_downloads(): """ """ df = state.report["data"].get("df", None) df_results = state.report["afa"].get("df_results", None) df_preds = state.report["afa"].get("df_preds", None) df_afc_results = state.report["afc"].get("df_results", None) df_afc_preds = state.report["afc"].get("df_preds", None) if df is None or df_results is None or (df_preds is None and df_afc_preds is None): return with st.expander("âŦ‡ī¸ Export Forecasts", expanded=True): _write( """ Export the forecasts and backtests as `.csv.gz` files. """ ) # use cached forecast files if previously generated afa_forecasts_s3_path = state.report["afa"].get("forecasts_s3_path", None) afa_backtests_s3_path = state.report["afa"].get("backtests_s3_path", None) afc_forecasts_s3_path = state.report["afc"].get("forecasts_s3_path", None) afc_backtests_s3_path = state.report["afc"].get("backtests_s3_path", None) export_forecasts_btn = st.button( "Export Statistical Forecasts", key="afa_export_forecast_btn" ) if export_forecasts_btn: start = time.time() s3_afa_export_path = state["report"]["afa"]["s3_afa_export_path"] with st.spinner(":hourglass_flowing_sand: Exporting Forecasts ..."): # export the forecast file to s3 if it doesnt exist if afa_forecasts_s3_path is None: now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") basename = os.path.basename(state["report"]["data"]["path"]) afa_forecasts_s3_path = ( f"{s3_afa_export_path}/{basename}_{now_str}" "_afa-forecasts.csv.gz" ) wr.s3.to_csv( df_preds, afa_forecasts_s3_path, compression="gzip", index=False ) state["report"]["afa"]["forecasts_s3_path"] = afa_forecasts_s3_path else: pass forecasts_signed_url = create_presigned_url(afa_forecasts_s3_path) st.markdown("#### Statistical Forecasts") st.markdown("####") st.info( textwrap.dedent( f""" Download the forecasts file [here]({forecasts_signed_url}) `(completed in {format_timespan(time.time()-start)})`. """ ) ) with st.spinner(":hourglass_flowing_sand: Exporting Backtests ..."): # export the forecast file to s3 if it doesnt exist if afa_backtests_s3_path is None: now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") basename = os.path.basename(state["report"]["data"]["path"]) afa_backtests_s3_path = ( f"{s3_afa_export_path}/{basename}_{now_str}" "_afa-backtests.csv.gz" ) df_backtests = df_results[GROUP_COLS + ["y_cv", "yp_cv"]].copy() # convert df_results to csv-friendly backtests df_backtests["y_cv"] = df_backtests["y_cv"].apply( lambda xs: xs.tolist() ) df_backtests["yp_cv"] = df_backtests["yp_cv"].apply( lambda xs: xs.tolist() ) df_backtests.rename( {"y_cv": "bt_actuals", "yp_cv": "bt_forecast"}, axis=1, inplace=True, ) wr.s3.to_csv( df_backtests, afa_backtests_s3_path, compression="gzip", index=False, ) state["report"]["afa"]["backtests_s3_path"] = afa_backtests_s3_path else: pass backtests_signed_url = create_presigned_url(afa_backtests_s3_path) st.info( textwrap.dedent( f""" Download the forecast backtests file [here]({backtests_signed_url}) `(completed in {format_timespan(time.time()-start)})`. """ ) ) df_afc_preds = state["report"]["afc"].get("df_preds", None) df_afc_results = state["report"]["afc"].get("df_results", None) export_afc_forecasts_btn = st.button( "Export Machine Learning Forecasts", key="afc_export_forecast_btn" ) if export_afc_forecasts_btn: if df_afc_preds is None or df_afc_results is None: st.info("Machine learning forecasts are not yet ready.") s3_afc_export_path = state["report"]["afc"]["s3_afc_export_path"] if state.report["afc"].get("status_json_s3_path", None): st.markdown("#### Machine Learning Forecasts") st.markdown("####") # read the raw amazon forecast files from here start = time.time() if afc_forecasts_s3_path is None: try: df_preds = state["report"]["afc"]["df_preds"] df_preds["demand"] = np.ceil( df_preds["demand"].clip(0).fillna(0.0) ) now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") basename = os.path.basename(state["report"]["data"]["path"]) afc_forecasts_path = ( f"{s3_afc_export_path}/{basename}_{now_str}" "_afc-forecasts.csv.gz" ) wr.s3.to_csv( df_preds[ [ "timestamp", "channel", "family", "item_id", "demand", "type", ] ], afc_forecasts_path, compression="gzip", index=False, ) state["report"]["afc"]["forecasts_s3_path"] = afc_forecasts_path afc_forecasts_s3_path = afc_forecasts_path except NoFilesFound: pass else: pass forecasts_signed_url = create_presigned_url(afc_forecasts_s3_path) st.info( textwrap.dedent( f""" Download the forecasts file [here]({forecasts_signed_url}) `(completed in {format_timespan(time.time()-start)})`. """ ) ) start = time.time() if afc_backtests_s3_path is None: try: df_backtests = state["report"]["afc"]["df_backtests"] now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") basename = os.path.basename(state["report"]["data"]["path"]) afc_backtests_s3_path = ( f"{s3_afc_export_path}/{basename}_{now_str}" "_afc-backtests.csv.gz" ) wr.s3.to_csv( df_backtests.rename( { "demand": "bt_forecasts", "target_value": "bt_actuals", }, axis=1, ).drop(["p10", "p90"], axis=1), afc_backtests_s3_path, compression="gzip", index=False, ) state["report"]["afc"][ "backtests_s3_path" ] = afc_backtests_s3_path except NoFilesFound: pass else: pass backtests_signed_url = create_presigned_url(afc_backtests_s3_path) st.info( textwrap.dedent( f""" Download the forecast backtests file [here]({backtests_signed_url}) `(completed in {format_timespan(time.time()-start)})`. """ ) ) with st.expander("â„šī¸ Export File Formats", expanded=True): st.write( dedent( """ #### Common columns - `timestamp` – String, date of the demand, in the format `YYYY-mm-dd` (e.g. "2020-12-25") - `channel` – String, the originating store or platform of the demand (e.g. Website, Store-22) - `family` – String, the category of the item (e.g. Shirts) - `item_id` – String, the unique item identifier/SKU code (e.g. SKU29292) - `demand` – Numeric, the demand amount of the item, which must be >= 0 (e.g. 413) - `type` – String - "actual" when `demand` is the historic demand - "fcast" when `demand` is the forecasted demand #### Statistical Forecasts - Forecasts file columns - `timestamp`, `channel`, `family`, `item_id`, `demand`, `type` - Backtests file columns - `channel`, `family`, `item_id` - `bt_actuals` – list of sliding window actuals, each window is the length of the forecast horizon. - `bt_forecast` – list of sliding window forecasts, each window is the length of the forecast horizon and has a 1:1 correspondence with the windows in `bt_actuals`. - The `timestamp` column is omitted to reduce the file size, however, the first and last sliding windows correspond to the first and last timestamps of the historic demand, respectively. #### Machine Learning Forecasts - Forecasts file columns - `timestamp`, `channel`, `family`, `item_id`, `demand`, `type` - Backtests file columns - `timestamp`, `channel`, `family`, `item_id` - `bt_actuals` – the actual demand for the backtest period - `bt_forecast` – the forecasted demand for the backtest period #### """ ), unsafe_allow_html=True, ) return # # ML Forecasting Panels # def parse_s3_json(path): """ """ parsed_url = urlparse(path, allow_fragments=False) bucket = parsed_url.netloc key = parsed_url.path.strip("/") s3 = boto3.resource("s3") s3_obj = s3.Object(bucket_name=bucket, key=key) status_dict = json.loads(s3_obj.get()["Body"].read()) return status_dict def panel_ml_launch(): """ """ df = state.report["data"].get("df", None) if df is None: return st.header("Machine Learning Forecasts") with st.expander("🚀 Launch", expanded=True): st.write( "_Optional_ – Launch machine learning forecasts using the " "[Amazon Forecast](https://aws.amazon.com/forecast/) managed service." ) with st.form("ml_form"): _cols = st.columns(3) with _cols[0]: horiz = st.number_input( "Horizon Length", key="ml_horiz_input", value=1, min_value=1 ) with _cols[1]: freq = st.selectbox( "Forecast Frequency", valid_launch_freqs(), 0, format_func=lambda s: FREQ_MAP_LONG[s], key="ml_freq_input", ) with _cols[2]: st.selectbox("Algorithm", ["AutoML"], 0, key="ml_algo_input") ml_form_button = st.form_submit_button("Launch") # Launch Amazon Forecast job if ml_form_button: with st.spinner("🚀 Launching ML forecasting job ..."): state.report["afc"]["horiz"] = horiz state.report["afc"]["freq"] = freq execution_arn, prefix, status_json_s3_path = run_ml_state_machine() state.report["afc"]["execution_arn"] = execution_arn state.report["afc"]["status_json_s3_path"] = status_json_s3_path state.report["afc"]["prefix"] = prefix st.info( dedent( f""" Job submitted, the ARN is: - {execution_arn} #### """ ) ) execution_arn = state.report["afc"].get("execution_arn", None) check_job_status_btn = st.button("🔄 Check Job Status") if check_job_status_btn and execution_arn is not None: with st.spinner("âŗ Checking job status ..."): sfn_status, status_dict = refresh_ml_state_machine_status() sfn_state = status_dict["PROGRESS"]["state"] aws_console_url = ( "https://console.aws.amazon.com/states/home#/executions/" f"details/{execution_arn})" ) st.info( textwrap.dedent( f""" **Status:** {sfn_status} **Stage:** {sfn_state} **Execution ARN:** `{execution_arn}` **AWS Console:** [view]({aws_console_url}) """ ) ) if sfn_status == "SUCCEEDED": # download the results with st.spinner("âŗ Loading ML forecasts and results..."): df_preds, df_results, df_backtests = download_afc_files() state["report"]["afc"]["df_preds"] = df_preds state["report"]["afc"]["df_results"] = df_results state["report"]["afc"]["df_backtests"] = df_backtests _cols = st.columns([2, 0.485]) with _cols[1]: ml_stop_button = st.button("🛑 Stop Job") if ml_stop_button: sfn_client = boto3.client("stepfunctions") resp = sfn_client.stop_execution(executionArn=execution_arn) st.write(resp) return def calc_afc_ml_accuracies(metric="smape"): """ """ if metric == "smape": metric_func = calc_smape elif metric == "wape": metric_func = calc_wape else: raise NotImplementedError df_backtests = state.report["afc"]["df_backtests"] df_accuracies = ( df_backtests | px.groupby(["channel", "family", "item_id"], sort=False) | px.apply( lambda dd: metric_func( dd["target_value"].clip(0, None), dd["demand"].clip(0, None) ) ) | px.reset_index() | px.rename({0: metric}, axis=1) | px.assign(smape=px[metric].clip(0, 1)) | px.assign(acc=(1 - px[metric]) * 100) ) return df_accuracies def panel_ml_forecast_summary(): """ """ df = state.report["data"].get("df", None) df_preds = state.report["afc"].get("df_preds", None) df_results = state.report["afc"].get("df_results", None) df_backtests = state.report["afc"].get("df_backtests", None) smape_url = ( "https://en.wikipedia.org/wiki/Symmetric_mean_absolute_percentage_error)" ) backtesting_url = ( "https://docs.aws.amazon.com/forecast/latest/dg/metrics.html#backtesting)." ) if df is None or df_results is None or df_backtests is None or df_preds is None: return with st.expander("đŸŽ¯ Forecast Summary", expanded=True): df_accuracies = calc_afc_ml_accuracies(METRIC) ml_acc = df_accuracies["acc"].mean() _cols = st.columns([3, 1]) with _cols[0]: st.write( dedent( f""" The forecast error is calculated as the [symmetric mean absolute percentage error (SMAPE)]({smape_url}) via sliding window backtesting. Forecast _accuracy_ is calculated as `100-SMAPE` and is averaged across all timeseries to give the _overall accuracy_. Note: Due to the limitations of the ML forecasting approach, backtests were only generated over [the five most recent windows before the horizon]({backtesting_url}). """ ) ) with _cols[1]: st.markdown("#### Overall Accuracy") st.markdown( f"
{ml_acc:.0f}%
", unsafe_allow_html=True, ) return def panel_ml_top_performers(): """ """ df = state.report["data"].get("df", None) df_results = state.report["afc"].get("df_results", None) if df is None or df_results is None: return df_backtests = state.report["afc"]["df_backtests"] with st.expander("🏆 Top Performers", expanded=True): _write( """ Inspect the forecast accuracy of individual channels, families, and item IDs (and each subset combination therein) for specific groups of items during a given backtest period. """ ) st.write("#### Filters") # dt_min and dt_max are the time boundaries of the backtesting # for amazon forecast, this is relatively short dt_min = df_backtests["timestamp"].min() dt_max = df_backtests["timestamp"].max() _cols = st.columns([2, 1, 1]) with _cols[0]: groupby_cols = st.multiselect( "Group By", ["channel", "family", "item_id"], ["channel", "family", "item_id"], key="ml_top_perf_groupby", ) with _cols[1]: dt_start = st.date_input( "Start", value=dt_min, min_value=dt_min, max_value=dt_max, key="ml_dt_start", ) with _cols[2]: dt_stop = st.date_input( "Stop", value=dt_max, min_value=dt_min, max_value=dt_max, key="ml_dt_stop", ) cperc_thresh = st.slider( "Percentage of total demand", step=5, value=80, format="%d%%", key="ml_perc_demand", ) dt_start = dt_start.strftime("%Y-%m-%d") dt_stop = dt_stop.strftime("%Y-%m-%d") start = time.time() with st.spinner("Processing top performers ..."): df_grp, df_grp_summary = make_ml_df_top( df, df_backtests, groupby_cols, dt_start, dt_stop, cperc_thresh, METRIC ) st.write("#### Group Summary") with st.spinner("Loading **Summary** table"): display_ag_grid(df_grp_summary, auto_height=True, comma_cols=("demand",)) st.write("#### Groups") with st.spinner("Loading **Groups** table ..."): display_ag_grid(df_grp, paginate=True, comma_cols=("demand",)) st.text(f"(completed in {format_timespan(time.time() - start)})") if st.button("Export", key="ml_top_perf_export_btn"): with st.spinner( ":hourglass_flowing_sand: Exporting **Top Performers** ..." ): start = time.time() # write the dataframe to s3 now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") basename = os.path.basename(state["report"]["data"]["path"]) s3_afc_export_path = state["report"]["afc"]["s3_afc_export_path"] s3_path = ( f"{s3_afc_export_path}/{basename}_{now_str}" "_afc-top-performers.csv.gz" ) wr.s3.to_csv(df_grp, s3_path, compression="gzip", index=False) # generate presigned s3 url for user to download signed_url = create_presigned_url(s3_path) st.info( textwrap.dedent( f""" Download the top performers file [here]({signed_url}) `(completed in {format_timespan(time.time() - start)})` """ ) ) return def panel_ml_visualization(): """ """ df = state.report["data"].get("df", None) df_ml_results = state.report["afc"].get("df_results", None) df_ml_preds = state.report["afc"].get("df_preds", None) df_ml_backtests = state.report["afc"].get("df_backtests", None) if df is None or df_ml_results is None or df_ml_preds is None: return freq = state.report["afc"]["freq"] horiz = state.report["afc"]["horiz"] start = time.time() df_top = ( df.groupby(["channel", "family", "item_id"], as_index=False) .agg({"demand": sum}) .sort_values(by="demand", ascending=False) ) channel_vals = [""] + sorted(df_ml_results["channel"].unique()) family_vals = [""] + sorted(df_ml_results["family"].unique()) item_id_vals = [""] + sorted(df_ml_results["item_id"].unique()) channel_index = channel_vals.index(df_top["channel"].iloc[0]) family_index = family_vals.index(df_top["family"].iloc[0]) item_id_index = item_id_vals.index(df_top["item_id"].iloc[0]) with st.expander("đŸ‘ī¸ Visualization", expanded=True): with st.form("ml_viz_form"): st.markdown("#### Filter By") _cols = st.columns(3) with _cols[0]: channel_choice = st.selectbox( "Channel", channel_vals, index=channel_index, key="ml_results_channel", ) with _cols[1]: family_choice = st.selectbox( "Family", family_vals, index=family_index, key="ml_results_family" ) with _cols[2]: item_id_choice = st.selectbox( "Item ID", item_id_vals, index=item_id_index, key="ml_results_item" ) viz_form_button = st.form_submit_button("Apply") if viz_form_button: pass pred_mask = make_mask( df_ml_preds, channel_choice, family_choice, item_id_choice ) backtest_mask = make_mask( df_ml_backtests, channel_choice, family_choice, item_id_choice ) df_plot = df_ml_preds[pred_mask] _df_backtests = df_ml_backtests[backtest_mask] if len(df_plot) > 0: # display the line chart # fig = pex.line(df_plot, x="timestamp", y="demand", color="type") y = df_plot.query("type == 'actual'")["demand"] y_ts = df_plot.query("type == 'actual'")["timestamp"] yp = df_plot.query("type == 'fcast'")["demand"] yp_ts = df_plot.query("type == 'fcast'")["timestamp"] fig = go.Figure() fig.add_trace( go.Scatter( x=y_ts, y=y, mode="lines+markers", name="actual", fill="tozeroy", line={"width": 3}, marker=dict(size=4), ) ) fig.add_trace( go.Scatter( x=yp_ts, y=np.round(yp, 0), mode="lines+markers", name="forecast", fill="tozeroy", marker=dict(size=4), ) ) fig.add_trace( go.Scatter( x=_df_backtests["timestamp"], y=np.round(_df_backtests.demand, 0), mode="lines", name="backtest", line_dash="dot", line_color="black", ) ) fig.update_layout( margin={"t": 0, "b": 0, "r": 0, "l": 0}, height=250, legend={ "orientation": "h", "yanchor": "bottom", "y": 1.0, "xanchor": "left", "x": 0.0, }, ) fig.update_xaxes(rangeslider_visible=True) initial_range = pd.date_range(end=yp_ts.max(), periods=horiz * 8, freq=freq) initial_range = [max(initial_range[0], y_ts.min()), initial_range[-1]] fig["layout"]["xaxis"].update(range=initial_range) st.plotly_chart(fig, use_container_width=True) plot_duration = time.time() - start st.text(f"(completed in {format_timespan(plot_duration)})") return def run_ml_state_machine(): """Execute the Amazon Forecast state machine.""" AFC_FORECAST_HORIZON = state.report["afc"]["horiz"] AFC_FORECAST_FREQUENCY = state.report["afc"]["freq"] df = state.report["data"].get("df", None) fn = state.report["data"]["path"] if df is None: raise Exception("No dataframe was loaded") data_freq = state.report["data"]["freq"] if data_freq in ("D",): pass elif data_freq in ( "W", "W-MON", ): data_freq = "W" elif data_freq in ( "M", "MS", ): data_freq = "M" else: raise NotImplementedError # state.df is already resampled to same frequency as the forecast freq. state_machine_arn = None # generate a unique prefix for the Amazon Forecast resources now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") prefix = f"AfaAfc{now_str}" # get the state machine arn and s3 paths ssm_client = boto3.client("ssm") state_machine_arn = ssm_client.get_parameter(Name="AfaAfcStateMachineArn")[ "Parameter" ]["Value"] s3_input_path = ssm_client.get_parameter(Name="AfaS3InputPath")["Parameter"][ "Value" ].rstrip("/") s3_output_path = ssm_client.get_parameter(Name="AfaS3OutputPath")["Parameter"][ "Value" ].rstrip("/") # generate amazon forecast compatible data with st.spinner("Launching Amazon Forecast job ..."): df_afc = ( df | px.reset_index() | px.rename({"index": "timestamp"}, axis=1) | px.assign( item_id=px["channel"] + "@@" + px["family"] + "@@" + px["item_id"] ) | px[["timestamp", "demand", "item_id"]] | px.sort_values(by=["item_id", "timestamp"]) ) df_afc["timestamp"] = pd.DatetimeIndex(df_afc["timestamp"]).strftime("%Y-%m-%d") afc_input_fn = re.sub("(.csv.gz)", ".csv", os.path.basename(fn)) s3_input_path = f"{s3_input_path}/{afc_input_fn}" # upload the input csv to s3 wr.s3.to_csv(df_afc, s3_input_path, index=False) # upload local re-sampled csv file to s3 input path client = boto3.client("stepfunctions") resp = client.start_execution( stateMachineArn=state_machine_arn, input=json.dumps( { "prefix": prefix, "data_freq": data_freq, "horiz": AFC_FORECAST_HORIZON, "freq": AFC_FORECAST_FREQUENCY, "s3_path": s3_input_path, "s3_export_path": s3_output_path, } ), ) status_json_s3_path = os.path.join(s3_output_path, f"{prefix}_status.json") return resp["executionArn"], prefix, status_json_s3_path def refresh_ml_state_machine_status(): """ """ sfn_client = boto3.client("stepfunctions") resp = sfn_client.describe_execution( executionArn=state.report["afc"]["execution_arn"] ) sfn_status = resp["status"] status_dict = parse_s3_json(state.report["afc"]["status_json_s3_path"]) return sfn_status, status_dict def file_selectbox(label, folder, globs=("*.csv", "*.csv.gz"), **kwargs): """ """ if folder.startswith("s3://"): raise NotImplementedError else: fns = [] for pat in globs: fns.extend(glob.glob(os.path.join(folder, pat))) fn = st.selectbox(label, fns, format_func=lambda s: os.path.basename(s), **kwargs) return fn def nav_radio_format_func(s): if s == "create_report": return "📄 Create Report" elif s == "load_report": return "âŦ†ī¸ Load Report" return if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--local-dir", type=str, help="/path/to local folder to store input/output files.", default=os.path.expanduser("~/SageMaker/"), ) parser.add_argument( "--lambdamap-function", type=str, help="ARN/name of the lambdamap function", default="AfaLambdaMapFunction", ) parser.add_argument( "--landing-page-url", type=str, help="URL of the AFA landing page", default="#" ) parser.add_argument( "--max-lambdas", type=int, help="URL of the AFA landing page", default=MAX_LAMBDAS, ) args = parser.parse_args() MAX_LAMBDAS = args.max_lambdas if not os.path.exists(os.path.expanduser(args.local_dir)): raise Exception(f"{args.local_dir} does not exist") landing_page_url = "https://" + re.sub(r"^(https*://)", "", args.landing_page_url) state["landing_page_url"] = landing_page_url # # Sidebar # st.sidebar.title("Amazon Forecast Accelerator") st.sidebar.markdown( textwrap.dedent( """ - [source code @ github](https://github.com/aws-samples/simple-forecast-solution) """ ) ) clear_report_btn = st.sidebar.button("❌ Clear Report") if clear_report_btn: state.pop("report") gc.collect() if "report" not in state: state["report"] = {"data": {}, "afa": {}, "afc": {}} # populate state global variables from ssm ssm_client = boto3.client("ssm") if "s3_afc_export_path" not in state["report"]["afc"]: state["report"]["afc"]["s3_afc_export_path"] = ssm_client.get_parameter( Name="AfaS3OutputPath" )["Parameter"]["Value"].rstrip("/") if "s3_bucket" not in state["report"]: state["report"]["s3_bucket"] = ssm_client.get_parameter(Name="AfaS3Bucket")[ "Parameter" ]["Value"].strip("/") if "s3_afa_export_path" not in state["report"]: state["report"]["afa"][ "s3_afa_export_path" ] = f's3://{state["report"]["s3_bucket"]}/afa-exports' if "s3_afa_reports_path" not in state["report"]: state["report"]["afa"][ "s3_afa_reports_path" ] = f's3://{state["report"]["s3_bucket"]}/afa-reports' if "s3_afc_export_path" not in state["report"]: state["report"]["afc"][ "s3_afc_export_path" ] = f's3://{state["report"]["s3_bucket"]}/afc-exports' if "s3_afc_reports_path" not in state["report"]: state["report"]["afc"][ "s3_afc_reports_path" ] = f's3://{state["report"]["s3_bucket"]}/afc-reports' # st.write(state["report"]) # # Main page # panel_create_report(expanded=True) panel_load_report(expanded=False) panel_data_health() panel_launch() panel_accuracy() panel_top_performers() panel_visualization() panel_ml_launch() panel_ml_forecast_summary() panel_ml_top_performers() panel_ml_visualization() if "df" in state["report"]["data"]: st.markdown("## Export") panel_downloads() def panel_save_report(): if ( "data" not in state["report"] or "path" not in state["report"]["data"] or "df" not in state["report"]["data"] ): return with st.expander("💾 Save Report", expanded=True): _write( """ Save this report for future use, note that the filename must have the `.pkl.gz` file extension. You can then re-load the report using the [Load Report](#load-report) form. """ ) default_name = f'{state.report["name"]}.report.pkl.gz' report_fn = st.text_input( "File name", value=default_name, help="Please note that the report file name needs to have a " "`.pkl.gz` file extension.", ) save_btn = st.button("Save") if save_btn: save_report(report_fn) panel_save_report()