# Titanic Survival Prediction

This sample runs a Spark ML pipeline to train a classfication model using random forest on AWS Elastic Map Reduce(EMR).


## The dataset

Check Kaggle [Titanic: Machine Learning from Disaster](https://www.kaggle.com/c/titanic) for more details about this problem. 70% training dataset is used to train model and rest 30% for validation.

Please upload training dataset [train.csv](https://www.kaggle.com/c/titanic/data) to your s3 bucket. We already download file for you. Please find `titanic/train.csv` in the folder.


## Spark ML Job

Please check [aws-emr-titanic-ml-example](https://github.com/Jeffwan/aws-emr-titanic-ml-example) for example spark project.

To get jar file, you can clone that project and run following commands. We also prepare jar for you if you don't like to build your own.

```shell
sbt clean package

# copy this jar to your s3 bucket. main class is `com.amazonaws.emr.titanic.Titanic`
ls target/scala-2.11/titanic-survivors-prediction_2.11-1.0.jar
```

## Prerequsite

1. Prepare training dataset and training library

In [None]:
# Create a bucket to store sample data
import random, string
HASH = ''.join([random.choice(string.ascii_lowercase) for n in range(16)] + [random.choice(string.digits) for n in range(16)])
AWS_REGION = 'us-west-2'
S3_BUCKET = '{}-kubeflow-pipeline-data'.format(HASH)
S3_PIPELINE_PATH='s3://{}/emr/titanic/'.format(S3_BUCKET)

!aws s3 mb s3://$S3_BUCKET --region $AWS_REGION

# Copy traing.csv and library to your bucket
!aws s3 cp titanic/train.csv $S3_PIPELINE_PATH
!aws s3 cp titanic/titanic-survivors-prediction_2.11-1.0.jar $S3_PIPELINE_PATH

2. Grant EMR permission

This pipeline use aws-secret to get access to EMR services, please make sure you have a aws-secret in kubeflow namespace and attach AmazonElasticMapReduceFullAccess policy.


```yaml
apiVersion: v1
kind: Secret
metadata:
 name: aws-secret
 namespace: kubeflow
type: Opaque
data:
 AWS_ACCESS_KEY_ID: YOUR_BASE64_ACCESS_KEY
 AWS_SECRET_ACCESS_KEY: YOUR_BASE64_SECRET_ACCESS

```

> Note: To get base64 string, try `echo -n $AWS_ACCESS_KEY_ID | base64`


## Build pipeliene

1. Run the following command to load Kubeflow Pipelines SDK

In [None]:
import kfp
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret

2. Load reusable emr components.

In [None]:
emr_create_cluster_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/aws/emr/create_cluster/component.yaml')
emr_submit_spark_job_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/aws/emr/submit_spark_job/component.yaml')
emr_delete_cluster_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/aws/emr/delete_cluster/component.yaml')

3. Create pipeline. We will create an EMR cluster first, run the spark ml workload and then tear down the EMR cluster.

In [None]:
@dsl.pipeline(
 name='Titanic Suvival Prediction Pipeline',
 description='Predict survival on the Titanic'
)

def titanic_suvival_prediction(region='us-west-2',
 log_s3_uri=S3_PIPELINE_PATH + "logs",
 cluster_name="emr-cluster",
 job_name='spark-ml-trainner',
 input=S3_PIPELINE_PATH + 'train.csv',
 output=S3_PIPELINE_PATH + 'output',
 jar_path=S3_PIPELINE_PATH + 'titanic-survivors-prediction_2.11-1.0.jar',
 main_class='com.amazonaws.emr.titanic.Titanic',
 instance_type="m4.xlarge",
 instance_count="3"
 ):

 create_cluster = emr_create_cluster_op(
 region=region,
 name=cluster_name,
 instance_type=instance_type,
 instance_count=instance_count,
 log_s3_uri=log_s3_uri,
 ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

 training_and_prediction = emr_submit_spark_job_op(
 region=region,
 jobflow_id=create_cluster.output,
 job_name=job_name,
 jar_path=jar_path,
 main_class=main_class,
 input=input,
 output=output
 ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))

 delete_cluster = emr_delete_cluster_op(
 region=region,
 jobflow_id=create_cluster.output,
 dependent=training_and_prediction.outputs['job_id']
 ).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))


4. Compile your pipline

In [None]:
kfp.compiler.Compiler().compile(titanic_suvival_prediction,'titanic-survival-prediction.zip')

5. Deploy your pipeline

In [None]:
client = kfp.Client()
# What if it's already there? 
aws_experiment = client.create_experiment(name='aws')
my_run = client.run_pipeline(aws_experiment.id, 'titanic-survival-prediction', 
 'titanic-survival-prediction.zip')

Once the pipeline done, you can go to the S3 path specified in output to check your prediction results. There're three columes, `PassengerId`, `prediction`, `Survived (Ground True value)`