{"metadata": {"version": 1, "disable_limits": false, "instance_type": "ml.m5.4xlarge"}, "nodes": [{"node_id": "99b95b91-c14d-4de6-bfb6-f9fe514e54e7", "type": "SOURCE", "operator": "sagemaker.s3_source_0.1", "parameters": {"dataset_definition": {"__typename": "S3CreateDatasetDefinitionOutput", "datasetSourceType": "S3", "name": "sample_data.csv", "description": null, "s3ExecutionContext": {"__typename": "S3ExecutionContext", "s3Uri": "s3://sagemaker-us-east-2-738335684114/churn/input/sample_data.csv", "s3ContentType": "csv", "s3HasHeader": true, "s3FieldDelimiter": ",", "s3DirIncludesNested": false, "s3AddsFilenameColumn": false}}}, "inputs": [], "outputs": [{"name": "default"}]}, {"node_id": "d04eac2a-92a9-4539-b22f-f0f30aa29877", "type": "TRANSFORM", "operator": "sagemaker.spark.infer_and_cast_type_0.1", "parameters": {}, "trained_parameters": {"schema": {"ts": "long", "userId": "float", "sessionId": "long", "page": "string", "auth": "string", "method": "string", "status": "long", "level": "string", "itemInSession": "long", "location": "string", "userAgent": "string", "lastName": "string", "firstName": "string", "registration": "float", "gender": "string", "artist": "string", "song": "string", "length": "string"}}, "inputs": [{"name": "df", "node_id": "99b95b91-c14d-4de6-bfb6-f9fe514e54e7", "output_name": "default"}], "outputs": [{"name": "default"}]}, {"node_id": "3ec1a26b-9329-4424-8a61-689f918728be", "type": "TRANSFORM", "operator": "sagemaker.spark.manage_columns_0.1", "parameters": {"operator": "Drop column", "drop_column_parameters": {"column_to_drop": ["method", "status", "location", "userAgent", "lastName", "firstName"]}}, "inputs": [{"name": "df", "node_id": "d04eac2a-92a9-4539-b22f-f0f30aa29877", "output_name": "default"}], "outputs": [{"name": "default"}]}, {"node_id": "bfa252ed-9fc0-4898-be8f-2f336104ea02", "type": "TRANSFORM", "operator": "sagemaker.spark.handle_missing_0.1", "parameters": {"operator": "Drop missing", "drop_missing_parameters": {"input_column": ["userId", "registration"]}, "impute_parameters": {"column_type": "Numeric", "numeric_parameters": {"strategy": "Approximate Median"}}}, "inputs": [{"name": "df", "node_id": "3ec1a26b-9329-4424-8a61-689f918728be", "output_name": "default"}], "outputs": [{"name": "default"}]}, {"node_id": "57816c99-ee32-4a1b-9439-77391b14a487", "type": "TRANSFORM", "operator": "sagemaker.spark.custom_code_0.1", "parameters": {"operator": "Python (PySpark)", "pyspark_parameters": {"code": "from pyspark.sql import functions as f\nfrom pyspark.sql import Window\n\n# Session Duration and Events\nw = Window.partitionBy([f.col(\"sessionId\")])\ndf = df.withColumn(\"session_start\", f.min(\"ts\").over(w))\ndf = df.withColumn(\"session_end\", f.max(\"ts\").over(w))\ndf = df.withColumn(\"session_duration\", df[\"session_end\"] - df[\"session_start\"])\ndf = df.withColumn(\"session_num_event\", f.count(\"page\").over(w))\n\n# Aggregate by User\nw = Window.partitionBy([f.col(\"userId\")])\ndf = df.withColumn(\"num_sessions\", f.approx_count_distinct(\"sessionId\").over(w))\ndf = df.withColumn(\"avg_time_per_session\", f.mean(\"session_duration\").over(w))\ndf = df.withColumn(\"avg_events_per_session\", f.mean(\"session_num_event\").over(w))\n\n# Cleanup\ndf = df.drop(\"session_start\", \"session_end\", \"session_duration\", \"session_num_event\")\n"}, "name": "Session Aggregation"}, "inputs": [{"name": "df", "node_id": "3c33556d-29be-4505-99ca-891970b65bc7", "output_name": "default"}], "outputs": [{"name": "default"}]}, {"node_id": "ed12d92c-6a2b-45bb-be2d-e6cb9d115b90", "type": "TRANSFORM", "operator": "sagemaker.spark.custom_code_0.1", "parameters": {"operator": "Python (PySpark)", "pyspark_parameters": {"code": "# Table is available as variable `df`\nfrom pyspark.sql import functions as f\nfrom pyspark.sql import Window\n\nw = Window.partitionBy([f.col(\"userId\")])\n\n# User Features\ndf = df.withColumn(\"num_artists\", f.approx_count_distinct(\"artist\").over(w))\ndf = df.withColumn(\"num_songs\", f.approx_count_distinct(\"song\").over(w))\n\ndf = df.withColumn(\"num_pages\", f.count(\"page\").over(w))\ndf = df.withColumn(\"num_nextsong\", f.sum(f.when(df[\"page\"] == \"NextSong\", 1).otherwise(0)).over(w))\ndf = df.withColumn(\"num_ads\", f.sum(f.when(df[\"page\"] == \"Roll Advert\", 1).otherwise(0)).over(w))\ndf = df.withColumn(\"num_thumbsup\", f.sum(f.when(df[\"page\"] == \"Thumbs Up\", 1).otherwise(0)).over(w))\ndf = df.withColumn(\"num_thumbsdown\", f.sum(f.when(df[\"page\"] == \"Thumbs Down\", 1).otherwise(0)).over(w))\ndf = df.withColumn(\"num_playlist\", f.sum(f.when(df[\"page\"] == \"Add to Playlist\", 1).otherwise(0)).over(w))\ndf = df.withColumn(\"num_addfriend\", f.sum(f.when(df[\"page\"] == \"Add Friend\", 1).otherwise(0)).over(w))\ndf = df.withColumn(\"num_error\", f.sum(f.when(df[\"page\"] == \"Error\", 1).otherwise(0)).over(w))\ndf = df.withColumn(\"user_downgrade\", f.sum(f.when(df[\"page\"] == \"Downgrade\", 1).otherwise(0)).over(w))\ndf = df.withColumn(\"user_upgrade\", f.sum(f.when(df[\"page\"] == \"Upgrade\", 1).otherwise(0)).over(w))\ndf = df.withColumn(\"percentage_song\", df[\"num_nextsong\"] / df[\"num_pages\"])\ndf = df.withColumn(\"percentage_ad\", df[\"num_ads\"] / df[\"num_pages\"])\n"}, "name": "User Aggregation"}, "inputs": [{"name": "df", "node_id": "57816c99-ee32-4a1b-9439-77391b14a487", "output_name": "default"}], "outputs": [{"name": "default"}]}, {"node_id": "facaee66-be58-4db4-92f9-67139bdbbb1c", "type": "TRANSFORM", "operator": "sagemaker.spark.manage_columns_0.1", "parameters": {"operator": "Drop column", "drop_column_parameters": {"column_to_drop": ["sessionId", "page", "auth", "level", "itemInSession", "registration", "gender", "artist", "song", "length", "ts"]}}, "inputs": [{"name": "df", "node_id": "ed12d92c-6a2b-45bb-be2d-e6cb9d115b90", "output_name": "default"}], "outputs": [{"name": "default"}]}, {"node_id": "594f94f6-04d0-49ad-8e56-4e5e2bdb4ce5", "type": "TRANSFORM", "operator": "sagemaker.spark.manage_rows_0.1", "parameters": {"operator": "Drop duplicates", "drop_duplicates_parameters": {}, "sort_parameters": {"order": "Ascending"}}, "inputs": [{"name": "df", "node_id": "facaee66-be58-4db4-92f9-67139bdbbb1c", "output_name": "default"}], "outputs": [{"name": "default"}]}, {"node_id": "3c33556d-29be-4505-99ca-891970b65bc7", "type": "TRANSFORM", "operator": "sagemaker.spark.custom_code_0.1", "parameters": {"operator": "Python (PySpark)", "pyspark_parameters": {"code": "from pyspark.sql import functions as f\nfrom pyspark.sql import Window\nimport time\nimport datetime\n\ndate = \"01/07/2020\"\ntimestamp = time.mktime(datetime.datetime.strptime(date, \"%d/%m/%Y\").timetuple()) * 1000\n\nw = Window.partitionBy([f.col(\"userId\")])\n\n# Set user as churn if user churned after cutoff date\ndf = df.withColumn(\n 'churn',\n f.array_contains(\n f.collect_set(df.filter(df['ts'] >= timestamp)[\"page\"]).over(w),\n \"Cancellation Confirmation\"))\n\n# Filter Data out after cutoff date\ndf = df.filter(df['ts'] < timestamp)\n\n# Drop cancelled user before cutoff\ndf = df.withColumn(\n 'churn_before',\n f.array_contains(\n f.collect_set(df.filter(df['ts'] < timestamp)[\"page\"]).over(w),\n \"Cancellation Confirmation\"))\ndf = df.filter(df['churn_before'] == False)\ndf = df.drop(\"churn_before\")\n"}, "name": "Target Aggregation"}, "inputs": [{"name": "df", "node_id": "bfa252ed-9fc0-4898-be8f-2f336104ea02", "output_name": "default"}], "outputs": [{"name": "default"}]}, {"node_id": "0381585c-e54f-41d6-90f2-3ab2a5bafc31", "type": "VISUALIZATION", "operator": "sagemaker.visualizations.histogram_0.1", "parameters": {"name": "Churn", "x_axis": "churn"}, "inputs": [{"name": "df", "node_id": "594f94f6-04d0-49ad-8e56-4e5e2bdb4ce5", "output_name": "default"}], "outputs": [{"name": "default"}]}]}