{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Titanic Survival Prediction\n", "\n", "This sample runs a Spark ML pipeline to train a classfication model using random forest on AWS Elastic Map Reduce(EMR).\n", "\n", "\n", "## The dataset\n", "\n", "Check Kaggle [Titanic: Machine Learning from Disaster](https://www.kaggle.com/c/titanic) for more details about this problem. 70% training dataset is used to train model and rest 30% for validation.\n", "\n", "Please upload training dataset [train.csv](https://www.kaggle.com/c/titanic/data) to your s3 bucket. We already download file for you. Please find `titanic/train.csv` in the folder.\n", "\n", "\n", "## Spark ML Job\n", "\n", "Please check [aws-emr-titanic-ml-example](https://github.com/Jeffwan/aws-emr-titanic-ml-example) for example spark project.\n", "\n", "To get jar file, you can clone that project and run following commands. We also prepare jar for you if you don't like to build your own.\n", "\n", "```shell\n", "sbt clean package\n", "\n", "# copy this jar to your s3 bucket. main class is `com.amazonaws.emr.titanic.Titanic`\n", "ls target/scala-2.11/titanic-survivors-prediction_2.11-1.0.jar\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prerequsite" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "1. Prepare training dataset and training library" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create a bucket to store sample data\n", "import random, string\n", "HASH = ''.join([random.choice(string.ascii_lowercase) for n in range(16)] + [random.choice(string.digits) for n in range(16)])\n", "AWS_REGION = 'us-west-2'\n", "S3_BUCKET = '{}-kubeflow-pipeline-data'.format(HASH)\n", "S3_PIPELINE_PATH='s3://{}/emr/titanic/'.format(S3_BUCKET)\n", "\n", "!aws s3 mb s3://$S3_BUCKET --region $AWS_REGION\n", "\n", "# Copy traing.csv and library to your bucket\n", "!aws s3 cp titanic/train.csv $S3_PIPELINE_PATH\n", "!aws s3 cp titanic/titanic-survivors-prediction_2.11-1.0.jar $S3_PIPELINE_PATH" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "2. Grant EMR permission" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This pipeline use aws-secret to get access to EMR services, please make sure you have a aws-secret in kubeflow namespace and attach AmazonElasticMapReduceFullAccess policy.\n", "\n", "\n", "```yaml\n", "apiVersion: v1\n", "kind: Secret\n", "metadata:\n", " name: aws-secret\n", " namespace: kubeflow\n", "type: Opaque\n", "data:\n", " AWS_ACCESS_KEY_ID: YOUR_BASE64_ACCESS_KEY\n", " AWS_SECRET_ACCESS_KEY: YOUR_BASE64_SECRET_ACCESS\n", "\n", "```\n", "\n", "> Note: To get base64 string, try `echo -n $AWS_ACCESS_KEY_ID | base64`\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Build pipeliene" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "1. Run the following command to load Kubeflow Pipelines SDK" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import kfp\n", "from kfp import components\n", "from kfp import dsl\n", "from kfp.aws import use_aws_secret" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "2. Load reusable emr components." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "emr_create_cluster_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/aws/emr/create_cluster/component.yaml')\n", "emr_submit_spark_job_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/aws/emr/submit_spark_job/component.yaml')\n", "emr_delete_cluster_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/aws/emr/delete_cluster/component.yaml')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "3. Create pipeline. We will create an EMR cluster first, run the spark ml workload and then tear down the EMR cluster." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@dsl.pipeline(\n", " name='Titanic Suvival Prediction Pipeline',\n", " description='Predict survival on the Titanic'\n", ")\n", "\n", "def titanic_suvival_prediction(region='us-west-2',\n", " log_s3_uri=S3_PIPELINE_PATH + \"logs\",\n", " cluster_name=\"emr-cluster\",\n", " job_name='spark-ml-trainner',\n", " input=S3_PIPELINE_PATH + 'train.csv',\n", " output=S3_PIPELINE_PATH + 'output',\n", " jar_path=S3_PIPELINE_PATH + 'titanic-survivors-prediction_2.11-1.0.jar',\n", " main_class='com.amazonaws.emr.titanic.Titanic',\n", " instance_type=\"m4.xlarge\",\n", " instance_count=\"3\"\n", " ):\n", "\n", " create_cluster = emr_create_cluster_op(\n", " region=region,\n", " name=cluster_name,\n", " instance_type=instance_type,\n", " instance_count=instance_count,\n", " log_s3_uri=log_s3_uri,\n", " ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))\n", "\n", " training_and_prediction = emr_submit_spark_job_op(\n", " region=region,\n", " jobflow_id=create_cluster.output,\n", " job_name=job_name,\n", " jar_path=jar_path,\n", " main_class=main_class,\n", " input=input,\n", " output=output\n", " ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))\n", "\n", " delete_cluster = emr_delete_cluster_op(\n", " region=region,\n", " jobflow_id=create_cluster.output,\n", " dependent=training_and_prediction.outputs['job_id']\n", " ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "4. Compile your pipline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "kfp.compiler.Compiler().compile(titanic_suvival_prediction,'titanic-survival-prediction.zip')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "5. Deploy your pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client = kfp.Client()\n", "# What if it's already there? \n", "aws_experiment = client.create_experiment(name='aws')\n", "my_run = client.run_pipeline(aws_experiment.id, 'titanic-survival-prediction', \n", " 'titanic-survival-prediction.zip')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once the pipeline done, you can go to the S3 path specified in output to check your prediction results. There're three columes, `PassengerId`, `prediction`, `Survived (Ground True value)`" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.7" } }, "nbformat": 4, "nbformat_minor": 2 }