# Distributed Data Processing using Apache Spark and SageMaker Processing with Magic

In [4]:
%pyspark?

[0;31mDocstring:[0m
::

 %pyspark [--base_job_name BASE_JOB_NAME] [--submit_app SUBMIT_APP]
 [--framework_version FRAMEWORK_VERSION]
 [--instance_type INSTANCE_TYPE]
 [--instance_count INSTANCE_COUNT]
 [--max_runtime_in_seconds MAX_RUNTIME_IN_SECONDS]
 [--submit_py_files [SUBMIT_PY_FILES [SUBMIT_PY_FILES ...]]]
 [--submit_jars [SUBMIT_JARS [SUBMIT_JARS ...]]]
 [--submit_files [SUBMIT_FILES [SUBMIT_FILES ...]]]
 [--arguments '--foo bar --baz 123']
 [--spark_event_logs_s3_uri SPARK_EVENT_LOGS_S3_URI]
 [--logs [LOGS]] [--name_contains NAME_CONTAINS]
 [--max_result MAX_RESULT]
 {submit,list,status,delete}

Pyspark processor magic command

positional arguments:
 {submit,list,status,delete}

processor:
 --base_job_name BASE_JOB_NAME
 Prefix for processing name. If not specified, the
 processor generates a default job name, based on the
 training image name and current timestamp.
 --submit_app SUBMIT_APP
 Path (local or S3) to Python file to submit to Spark
 as the primary application
 --fr

### Setup S3 bucket locations

First, setup some locations in the default SageMaker bucket to store the raw input datasets and the Spark job output.

In [13]:
import logging
import sagemaker
from time import gmtime, strftime

sagemaker_logger = logging.getLogger("sagemaker")
sagemaker_logger.setLevel(logging.INFO)
sagemaker_logger.addHandler(logging.StreamHandler())

sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()

Couldn't call 'get_role' to get Role ARN from role name workshop-sagemaker to get Role path.


Next, you'll download the example dataset from a SageMaker staging bucket.

In [4]:
# Fetch the dataset from the SageMaker bucket
!wget https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv -O abalone.csv

--2020-12-17 16:38:38-- https://s3-us-west-2.amazonaws.com/sparkml-mleap/data/abalone/abalone.csv
Resolving s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)... 52.218.252.144
Connecting to s3-us-west-2.amazonaws.com (s3-us-west-2.amazonaws.com)|52.218.252.144|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 191873 (187K) [binary/octet-stream]
Saving to: ‘abalone.csv’


2020-12-17 16:38:39 (506 KB/s) - ‘abalone.csv’ saved [191873/191873]



In [4]:
# Upload the raw input dataset to a unique S3 location
timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())
prefix = "sagemaker/spark-preprocess-demo/{}".format(timestamp_prefix)
input_prefix_abalone = "{}/input/raw/abalone".format(prefix)
input_preprocessed_prefix_abalone = "{}/input/preprocessed/abalone".format(prefix)
sagemaker_session.upload_data(path='abalone.csv', bucket=bucket, key_prefix=input_prefix_abalone)
print(bucket, input_prefix_abalone)

sagemaker-eu-west-1-245582572290 sagemaker/spark-preprocess-demo/2020-12-17-17-19-06/input/raw/abalone


### Write the PySpark script

The source for a preprocessing script is in the cell below. The cell uses the `%%pyspark submit` directive to submit python application from cell to PySparkProcessor. This script does some basic feature engineering on a raw input dataset. In this example, the dataset is the [Abalone Data Set](https://archive.ics.uci.edu/ml/datasets/abalone) and the code below performs string indexing, one hot encoding, vector assembly, and combines them into a pipeline to perform these transformations in order. The script then does an 80-20 split to produce training and validation datasets as output.

In [1]:
%%pyspark submit --logs --base_job_name sm-spark --arguments '--s3_input_bucket sagemaker-eu-west-1-245582572290 --s3_input_key_prefix sagemaker/spark-preprocess-demo/2020-12-17-17-19-06/input/raw/abalone --s3_output_bucket sagemaker-eu-west-1-245582572290 --s3_output_key_prefix sagemaker/spark-preprocess-demo/2020-12-17-17-19-06/input/preprocessed/abalone'
from __future__ import print_function
from __future__ import unicode_literals

import argparse
import csv
import os
import shutil
import sys
import time

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
 OneHotEncoder,
 StringIndexer,
 VectorAssembler,
 VectorIndexer,
)
from pyspark.sql.functions import *
from pyspark.sql.types import (
 DoubleType,
 StringType,
 StructField,
 StructType,
)


def csv_line(data):
 r = ','.join(str(d) for d in data[1])
 return str(data[0]) + "," + r


def main():
 parser = argparse.ArgumentParser(description="app inputs and outputs")
 parser.add_argument("--s3_input_bucket", type=str, help="s3 input bucket")
 parser.add_argument("--s3_input_key_prefix", type=str, help="s3 input key prefix")
 parser.add_argument("--s3_output_bucket", type=str, help="s3 output bucket")
 parser.add_argument("--s3_output_key_prefix", type=str, help="s3 output key prefix")
 args = parser.parse_args()

 spark = SparkSession.builder.appName("PySparkApp").getOrCreate()

 # This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
 spark.sparkContext._jsc.hadoopConfiguration().set("mapred.output.committer.class",
 "org.apache.hadoop.mapred.FileOutputCommitter")

 # Defining the schema corresponding to the input data. The input data does not contain the headers
 schema = StructType([StructField("sex", StringType(), True), 
 StructField("length", DoubleType(), True),
 StructField("diameter", DoubleType(), True),
 StructField("height", DoubleType(), True),
 StructField("whole_weight", DoubleType(), True),
 StructField("shucked_weight", DoubleType(), True),
 StructField("viscera_weight", DoubleType(), True), 
 StructField("shell_weight", DoubleType(), True), 
 StructField("rings", DoubleType(), True)])

 # Downloading the data from S3 into a Dataframe
 total_df = spark.read.csv(('s3://' + os.path.join(args.s3_input_bucket, args.s3_input_key_prefix,
 'abalone.csv')), header=False, schema=schema)

 #StringIndexer on the sex column which has categorical value
 sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")
 
 #one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)
 sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")

 #vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format
 assembler = VectorAssembler(inputCols=["sex_vec", 
 "length", 
 "diameter", 
 "height", 
 "whole_weight", 
 "shucked_weight", 
 "viscera_weight", 
 "shell_weight"], 
 outputCol="features")
 
 # The pipeline comprises of the steps added above
 pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])
 
 # This step trains the feature transformers
 model = pipeline.fit(total_df)
 
 # This step transforms the dataset with information obtained from the previous fit
 transformed_total_df = model.transform(total_df)
 
 # Split the overall dataset into 80-20 training and validation
 (train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])
 
 # Convert the train dataframe to RDD to save in CSV format and upload to S3
 train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))
 train_lines = train_rdd.map(csv_line)
 train_lines.saveAsTextFile('s3://' + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, 'train'))
 
 # Convert the validation dataframe to RDD to save in CSV format and upload to S3
 validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))
 validation_lines = validation_rdd.map(csv_line)
 validation_lines.saveAsTextFile('s3://' + os.path.join(args.s3_output_bucket, args.s3_output_key_prefix, 'validation'))


if __name__ == "__main__":
 main()

Couldn't call 'get_role' to get Role ARN from role name workshop-sagemaker to get Role path.


submit:
 {
 "arguments": [
 "--s3_input_bucket",
 "sagemaker-eu-west-1-245582572290",
 "--s3_input_key_prefix",
 "sagemaker/spark-preprocess-demo/2020-12-17-17-19-06/input/raw/abalone",
 "--s3_output_bucket",
 "sagemaker-eu-west-1-245582572290",
 "--s3_output_key_prefix",
 "sagemaker/spark-preprocess-demo/2020-12-17-17-19-06/input/preprocessed/abalone"
 ],
 "base_job_name": "sm-spark",
 "framework_version": "2.4",
 "instance_count": 1,
 "instance_type": "ml.c4.xlarge",
 "logs": true,
 "max_result": 10,
 "max_runtime_in_seconds": 1200,
 "name_contains": "spark",
 "role": "arn:aws:iam::245582572290:role/workshop-sagemaker",
 "submit_app": "/tmp/tmp-02acb435-c907-4d1c-9e8f-bb0bd5bc5582.py",
 "wait": true
}

Job Name: sm-spark-2020-12-18-19-04-49-672
Inputs: [{'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-1-245582572290/sm-spark-2020-12-18-19-04-49-672/input/code/tmp-02acb435-c907-4d1c-9e8f-bb0bd5bc5582.py', 'LocalPath': '/opt/ml/processing/input/co

# Stop latest processing Job

In [5]:
%pyspark delete

{
 "AppSpecification": {
 "ContainerArguments": [
 "'--s3_input_bucket'",
 "'sagemaker-eu-west-1-245582572290'",
 "'--s3_input_key_prefix'",
 "'sagemaker/spark-preprocess-demo/2020-12-17-17-19-06/input/raw/abalone'",
 "'--s3_output_bucket'",
 "'sagemaker-eu-west-1-245582572290'",
 "'sagemaker/spark-preprocess-demo/2020-12-17-17-19-06/input/preprocessed/abalone'"
 ],
 "ContainerEntrypoint": [
 "smspark-submit",
 "/opt/ml/processing/input/code/tmp-2bb9dee7-1e13-4010-ab48-2deba9da0094.py"
 ],
 "ImageUri": "571004829621.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-spark-processing:2.4-cpu"
 },
 "CreationTime": "2020-12-17 18:57:03.608000+00:00",
 "Environment": {},
 "LastModifiedTime": "2020-12-17 18:57:22.834000+00:00",
 "ProcessingInputs": [
 {
 "AppManaged": false,
 "InputName": "code",
 "S3Input": {
 "LocalPath": "/opt/ml/processing/input/code",
 "S3CompressionType": "None",
 "S3DataDistributionType": "FullyReplicated",
 "S3DataType": "S3Prefix",
 "S3InputMode": "File",
 "S3Uri": "s3://sa

## Describe latest processing Job

In [5]:
%pyspark status

{
 "AppSpecification": {
 "ContainerArguments": [
 "--s3_input_bucket",
 "sagemaker-eu-west-1-245582572290",
 "--s3_input_key_prefix",
 "sagemaker/spark-preprocess-demo/2020-12-17-17-19-06/input/raw/abalone",
 "--s3_output_bucket",
 "sagemaker-eu-west-1-245582572290",
 "--s3_output_key_prefix",
 "sagemaker/spark-preprocess-demo/2020-12-17-17-19-06/input/preprocessed/abalone"
 ],
 "ContainerEntrypoint": [
 "smspark-submit",
 "/opt/ml/processing/input/code/tmp-02acb435-c907-4d1c-9e8f-bb0bd5bc5582.py"
 ],
 "ImageUri": "571004829621.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-spark-processing:2.4-cpu"
 },
 "CreationTime": "2020-12-18 19:04:50.120000+00:00",
 "Environment": {},
 "LastModifiedTime": "2020-12-18 19:10:38.739000+00:00",
 "ProcessingEndTime": "2020-12-18 19:10:38.737000+00:00",
 "ProcessingInputs": [
 {
 "AppManaged": false,
 "InputName": "code",
 "S3Input": {
 "LocalPath": "/opt/ml/processing/input/code",
 "S3CompressionType": "None",
 "S3DataDistributionType": "FullyReplicated"

## List processing jobs

In [3]:
%pyspark list

{
 "NextToken": "cIws2QhTXUIa8bi8X9aU7gCAR0Xdc3x9L/Ofg4vsVMTtcNqRqLcpBqE42+cDc29TFQi5WMns8YJ3nEv3nZkSmUmnBY0xgo1e/hoJ+sIFCZ7+RpBLRYj09ElNRWC8SrP8c41w+DHUtr+4lKQAoT9hd9TTBxl9pmgioXWtSwHWKuCo/f5Vfe8UUgbAAtr41ZVH4c5TrGHpjouaACJ8UDkh0MDULBPv/d83dxOOTMWDVCVj/0ytPuelGYpiLJKgc83zhlXqYUKIBv9W0YA732xprs5J3FK0yllAjjSkE3by9UW66XYmSpom5qL6CNGJK6e2Ffc77HZSXQz8E2J4cQTGtZgtcajeqk6XZcZHRbQSmKd43UMM5+AcjtfvZTBJSNrk9a9ysrPf5csKJOH6R5SXsDOTlVX73YemxbbU9bpmaAk8KkeuYBrBC1+85Q0krRrdyAuB/kZxy0AyCyNuW1Wi6WZGZadSrAVJFIiKXTuyocxjNXWmQHgFhWAOdLRMx/2BFmtZzk8xTj9K6q72UOnvayvZ4FIRAt8sLQN50Vr46SFSM5B/fY+T4j/CPgWhXLY=",
 "ProcessingJobSummaries": [
 {
 "CreationTime": "2020-12-18 19:04:50.120000+00:00",
 "LastModifiedTime": "2020-12-18 19:10:38.739000+00:00",
 "ProcessingEndTime": "2020-12-18 19:10:38.737000+00:00",
 "ProcessingJobArn": "arn:aws:sagemaker:eu-west-1:245582572290:processing-job/sm-spark-2020-12-18-19-04-49-672",
 "ProcessingJobName": "sm-spark-2020-12-18-19-04-49-672",
 "ProcessingJobStatus": "Complete

### Validate Data Processing Results

Next, validate the output of our data preprocessing job by looking at the first 5 rows of the output dataset.

In [7]:
!aws s3 cp --quiet s3://sagemaker-eu-west-1-245582572290/sagemaker/spark-preprocess-demo/2020-12-17-17-19-06/input/preprocessed/abalone/train/part-00000 - | head -n5

6.0,0.0,0.0,0.29,0.21,0.075,0.275,0.113,0.0675,0.035
5.0,0.0,0.0,0.29,0.225,0.075,0.14,0.0515,0.0235,0.04
7.0,0.0,0.0,0.305,0.225,0.07,0.1485,0.0585,0.0335,0.045
7.0,0.0,0.0,0.305,0.23,0.08,0.156,0.0675,0.0345,0.048
9.0,0.0,0.0,0.33,0.26,0.08,0.2,0.0625,0.05,0.07
