{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Running Spark History Server: \n", "#### (Note: this feature will only work in a local development environment with docker installed or on a Sagemaker Notebook Instance. This feature does not currently work in SageMaker Studio.)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# import packages\n", "import json\n", "import ast\n", "import sagemaker\n", "from sagemaker.spark.processing import PySparkProcessor\n", "from sagemaker.network import NetworkConfig\n", "\n", "sagemaker_session = sagemaker.Session()\n", "\n", "with open(\"../ml_pipeline/params/pipeline_params.json\", \"r\") as f:\n", " pipeline_params = json.load(f)\n", "\n", "# getting pre-process spark ui log s3 output location\n", "process_spark_ui_log_output = pipeline_params[\"process_spark_ui_log_output\"].format(pipeline_params[\"trial\"])\n", "\n", "# setting up processing arguments\n", "process_args = [\n", " \"--input_table\", pipeline_params[\"pyspark_process_data_input\"],\n", " \"--output_table\", pipeline_params[\"pyspark_process_data_output\"]\n", "]\n", "# import spark config used in pipeline run\n", "with open(\"../src/spark_configuration/configuration.json\", \"r\") as f:\n", " spark_conf = json.load(f)\n", "spark_conf = json.dumps(spark_conf)\n", "\n", "# transforming string into literal \n", "spark_conf = ast.literal_eval(spark_conf)\n", "\n", "# get network configuration\n", "network_config = NetworkConfig (\n", " encrypt_inter_container_traffic=True,\n", " security_group_ids=pipeline_params[\"network_security_group_ids\"],\n", " subnets=pipeline_params[\"network_subnet_ids\"]\n", ")\n", "\n", "# Create Spark Processor\n", "spark_processor = PySparkProcessor(\n", " base_job_name=pipeline_params[\"pyspark_process_name\"],\n", " framework_version=pipeline_params[\"pyspark_framework_version\"],\n", " role=pipeline_params[\"pipeline_role\"],\n", " instance_count=pipeline_params[\"pyspark_process_instance_count\"],\n", " instance_type=pipeline_params[\"pyspark_process_instance_type\"],\n", " sagemaker_session=sagemaker_session,\n", " volume_kms_key=pipeline_params[\"pyspark_process_volume_kms\"],\n", " output_kms_key=pipeline_params[\"pyspark_process_output_kms\"],\n", " network_config=network_config,\n", ")\n", "spark_processor.run(\n", " submit_app=pipeline_params[\"pyspark_process_code\"],\n", " submit_py_files=[pipeline_params[\"pyspark_helper_code\"]],\n", " arguments=process_args,\n", " spark_event_logs_s3_uri=process_spark_ui_log_output,\n", " logs=False,\n", " kms_key=pipeline_params[\"pyspark_process_volume_kms\"],\n", " configuration=spark_conf\n", ")\n", "\n", "# Run spark history server to show Spark UI\n", "spark_processor.start_history_server(spark_event_logs_s3_uri=process_spark_ui_log_output)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Starting Spark UI\n", "Let's start a history server to visualize your logs" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Run spark history server to show Spark UI\n", "spark_processor.start_history_server(spark_event_logs_s3_uri=process_spark_ui_log_output)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### The Spark UI output can be visualized at:\n", "https://\\.notebook.\\.sagemaker.aws/proxy/15050" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#TODO: replace values with correct info\n", "notebook_name = \"test-spark-ui\"\n", "region = \"us-east-1\"\n", " \n", "from IPython.core.display import display, HTML\n", "\n", "display(\n", " HTML(\n", " 'Review Spark UI'.format(\n", " notebook_name, region\n", " )\n", " )\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Terminating Spark History Server\n", "Remember to terminate your server once you are ready with your analysis" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Terminate spark history server\n", "spark_processor.terminate_history_server()" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }