{ "metadata": { "version": 1, "disable_limits": false, "instance_type": "ml.m5.4xlarge" }, "parameters": [], "nodes": [ { "node_id": "801ca503-8805-4b20-883e-1d7d52d06cca", "type": "SOURCE", "operator": "sagemaker.s3_source_0.1", "parameters": { "dataset_definition": { "__typename": "S3CreateDatasetDefinitionOutput", "datasetSourceType": "S3", "name": "forecasted-values-dummy.csv", "description": null, "s3ExecutionContext": { "__typename": "S3ExecutionContext", "s3Uri": "s3://sagemaker-example-files-prod-us-east-1/datasets/tabular/timeseries-quantile-selection-dataflow/forecasted-values-dummy.csv", "s3ContentType": "csv", "s3HasHeader": true, "s3FieldDelimiter": ",", "s3DirIncludesNested": false, "s3AddsFilenameColumn": false, "s3RoleArn": null } } }, "inputs": [], "outputs": [ { "name": "default", "sampling": { "sampling_method": "sample_by_limit", "limit_rows": 10000 } } ] }, { "node_id": "cbc3bfbc-fefd-4b94-967d-fac3d5e3904e", "type": "TRANSFORM", "operator": "sagemaker.spark.infer_and_cast_type_0.1", "parameters": {}, "trained_parameters": {}, "inputs": [ { "name": "df", "node_id": "801ca503-8805-4b20-883e-1d7d52d06cca", "output_name": "default" } ], "outputs": [ { "name": "default" } ] }, { "node_id": "b43dac16-1f78-4234-b813-d1d5f4e309f6", "type": "TRANSFORM", "operator": "sagemaker.spark.custom_code_0.1", "parameters": { "operator": "Python (PySpark)", "pyspark_parameters": { "code": "from pyspark.sql.functions import sum,min,max\n\ndf = df.groupBy(\"item_id\",\"region\") \\\n .agg(sum(\"target_value\").alias(\"target_value\"), \\\n sum(\"p50\").alias(\"p50\"), \\\n sum(\"p60\").alias(\"p60\"), \\\n sum(\"p70\").alias(\"p70\"), \\\n sum(\"p80\").alias(\"p80\"), \\\n sum(\"p90\").alias(\"p90\") \\\n )\n\n# fill missing with zero\ndf = df.na.fill(value=0)\n\ndf = df.withColumnRenamed(\"region\",\"location\")\n\n\n\n\n\n" }, "name": "aggregate backtest window" }, "inputs": [ { "name": "df", "node_id": "f35dbd50-607c-4cb6-a6f1-57b618d8fd60", "output_name": "default" } ], "outputs": [ { "name": "default" } ] }, { "node_id": "13ba976a-a1be-41b3-9b49-5db89a12db1a", "type": "SOURCE", "operator": "sagemaker.s3_source_0.1", "parameters": { "dataset_definition": { "__typename": "S3CreateDatasetDefinitionOutput", "datasetSourceType": "S3", "name": "item-financial-metadata-dummy.csv", "description": null, "s3ExecutionContext": { "__typename": "S3ExecutionContext", "s3Uri": "s3://sagemaker-example-files-prod-us-east-1/datasets/tabular/timeseries-quantile-selection-dataflow/item-financial-metadata-dummy.csv", "s3ContentType": "csv", "s3HasHeader": true, "s3FieldDelimiter": ",", "s3DirIncludesNested": false, "s3AddsFilenameColumn": false, "s3RoleArn": null } } }, "inputs": [], "outputs": [ { "name": "default", "sampling": { "sampling_method": "sample_by_limit", "limit_rows": 500 } } ] }, { "node_id": "300533c9-437d-4166-85d7-a94100bcc06a", "type": "TRANSFORM", "operator": "sagemaker.spark.infer_and_cast_type_0.1", "parameters": {}, "trained_parameters": {}, "inputs": [ { "name": "df", "node_id": "13ba976a-a1be-41b3-9b49-5db89a12db1a", "output_name": "default" } ], "outputs": [ { "name": "default" } ] }, { "node_id": "d911306a-f747-454a-bef5-c33a3098d9ff", "type": "TRANSFORM", "operator": "sagemaker.spark.custom_code_0.1", "parameters": { "operator": "Python (PySpark)", "pyspark_parameters": { "code": "retain_cols = (\"item_id\",\"location\")\n\ndf = df[df['quantile']=='p50']\n\ndf = df[df['p50_net']>0]\n\n# select student id and student name\ndf = df.select(*retain_cols)\n" }, "name": "Export p50 keys" }, "inputs": [ { "name": "df", "node_id": "637c8e39-bc23-40ac-83de-6440cf7bdb85", "output_name": "default" } ], "outputs": [ { "name": "default" } ] }, { "node_id": "acd0d273-4c7d-4d22-b1cc-a17539d9a4fd", "type": "TRANSFORM", "operator": "sagemaker.spark.custom_code_0.1", "parameters": { "operator": "Python (PySpark)", "pyspark_parameters": { "code": "retain_cols = (\"item_id\",\"location\")\n\ndf = df[df['quantile']=='p60']\ndf = df[df['p60_net']>0]\n\n# select student id and student name\ndf = df.select(*retain_cols)\n" }, "name": "Export p60 keys" }, "inputs": [ { "name": "df", "node_id": "637c8e39-bc23-40ac-83de-6440cf7bdb85", "output_name": "default" } ], "outputs": [ { "name": "default" } ] }, { "node_id": "9c2fbbb0-c699-43fb-b740-788ef22719e3", "type": "TRANSFORM", "operator": "sagemaker.spark.custom_code_0.1", "parameters": { "operator": "Python (PySpark)", "pyspark_parameters": { "code": "retain_cols = (\"item_id\",\"location\")\n\ndf = df[df['quantile']=='p70']\ndf = df[df['p70_net']>0]\n\n# select student id and student name\ndf = df.select(*retain_cols)\n" }, "name": "Export p70 keys" }, "inputs": [ { "name": "df", "node_id": "637c8e39-bc23-40ac-83de-6440cf7bdb85", "output_name": "default" } ], "outputs": [ { "name": "default" } ] }, { "node_id": "bdb41b20-3fe7-4906-b055-08b69e27543d", "type": "TRANSFORM", "operator": "sagemaker.spark.custom_code_0.1", "parameters": { "operator": "Python (PySpark)", "pyspark_parameters": { "code": "retain_cols = (\"item_id\",\"location\")\n\ndf = df[df['quantile']=='p80']\ndf = df[df['p80_net']>0]\n\n# select student id and student name\ndf = df.select(*retain_cols)\n" }, "name": "Export p80 keys" }, "inputs": [ { "name": "df", "node_id": "637c8e39-bc23-40ac-83de-6440cf7bdb85", "output_name": "default" } ], "outputs": [ { "name": "default" } ] }, { "node_id": "2de643eb-ff05-43b0-8b88-9ee3cbb492f2", "type": "TRANSFORM", "operator": "sagemaker.spark.custom_code_0.1", "parameters": { "operator": "Python (PySpark)", "pyspark_parameters": { "code": "retain_cols = (\"item_id\",\"location\")\n\ndf = df[df['quantile']=='p90']\ndf = df[df['p90_net']>0]\n\n# select student id and student name\ndf = df.select(*retain_cols)\n\n" }, "name": "Export p90 keys" }, "inputs": [ { "name": "df", "node_id": "637c8e39-bc23-40ac-83de-6440cf7bdb85", "output_name": "default" } ], "outputs": [ { "name": "default" } ] }, { "node_id": "85712024-8fc6-40d7-a6ca-cf7d298aa435", "type": "TRANSFORM", "operator": "sagemaker.spark.join_multi_keys_0.1", "parameters": { "join_keys": [ { "left": "item_id", "right": "item_id" }, { "left": "location", "right": "location" } ], "join_type": "leftouter" }, "inputs": [ { "name": "df", "node_id": "b43dac16-1f78-4234-b813-d1d5f4e309f6", "output_name": "default" }, { "name": "df", "node_id": "300533c9-437d-4166-85d7-a94100bcc06a", "output_name": "default" } ], "outputs": [ { "name": "default" } ] }, { "node_id": "637c8e39-bc23-40ac-83de-6440cf7bdb85", "type": "TRANSFORM", "operator": "sagemaker.spark.custom_code_0.1", "parameters": { "operator": "Python (PySpark)", "pyspark_parameters": { "code": "from pyspark.sql.functions import greatest, least, coalesce, round\nfrom pyspark.sql import functions as F\n\nquantile_list=['p50','p60','p70','p80','p90']\n\n\n# compute cost of overstock (co) and cost of understock (cu)\ndf = df.withColumn('unit_co', (df['item_cost_of_goods'] + df['item_holding_cost']) - df['item_salvage_cost'])\ndf = df.withColumn('unit_cu', (df['item_value'] - df['item_cost_of_goods']))\n\n\n# for each quantile compute loss function\nfor c in quantile_list:\n\n # replace negative values with zero, then round\n df.withColumn(c, F.when(df[c] < 0, 0))\n df = df.withColumn(c,round(c,2))\n \n \n # compute quantile metrics\n df = df.withColumn('co',(greatest(df[c], df.target_value) - df.target_value) * df.unit_co)\n df = df.withColumn('revenue', least(df[c], df.target_value) * df.unit_cu)\n df = df.withColumn(c+'_net', round(df.revenue + df.co,2) ) \n\n# initialize with fixed-quantile override\ndf = df.withColumn(\"quantile\",df.fixed_quantile)\n\n# determine greatest net revenue per quantile\ndf = df.withColumn(\"optimized_net\", greatest(df.p50_net, df.p60_net, df.p70_net, df.p80_net, df.p90_net) )\n\n# set winning quantile\nfor c in quantile_list:\n str = c+'_net'\n df = df.withColumn(\"quantile\", coalesce(df.quantile, F.when( df[str] == df.optimized_net,c)))\n\n\n#round target value\ndf = df.withColumn(\"target_value\",round(\"target_value\",2))\n\n# remove undesired columns\ndrop_cols = (\"unit_co\",\"unit_cu\",\"cu\",\"co\", \"optimized_net\", \"revenue\")\ndf = df.drop(*drop_cols) \n" }, "name": "compute-quantile-financial" }, "inputs": [ { "name": "df", "node_id": "85712024-8fc6-40d7-a6ca-cf7d298aa435", "output_name": "default" } ], "outputs": [ { "name": "default" } ] }, { "node_id": "f35dbd50-607c-4cb6-a6f1-57b618d8fd60", "type": "TRANSFORM", "operator": "sagemaker.spark.custom_code_0.1", "parameters": { "operator": "Python (PySpark)", "pyspark_parameters": { "code": "# this step can be used to filter backtest data to only a recent period when desired\n# this allows the quantile selection to be more responsive\n# CAUTION: this is defaulting to 36 months ago; change this setting accordingly\n\nfrom pyspark.sql.functions import current_date, substring\nfrom pyspark.sql import functions as F\n\n# set working storage column\nmonths_to_add = -36 #note negative value -N months ago\ndf = df.withColumn(\"timestamp\", substring(\"timestamp\",1,10))\ndf = df.withColumn(\"filter_date\", F.add_months(\"current_date\", months_to_add))\n\n# apply filter\ndf = df[df['timestamp']>=df['filter_date']]\n\n# drop working storage column\ndf = df.drop('filter_date')" }, "name": "filter to last N-periods" }, "inputs": [ { "name": "df", "node_id": "cbc3bfbc-fefd-4b94-967d-fac3d5e3904e", "output_name": "default" } ], "outputs": [ { "name": "default" } ] } ] }