--- title: "Parallel Data Processing in R with SageMaker Processing" output: rmarkdown::github_document knit: (function(inputFile, encoding) { rmarkdown::render(inputFile, encoding = encoding, output_dir = "rendered") }) --- Clear the work space ```{r} rm(list = ls()) ``` Import the required packages and Sagemaker ```{r, message=FALSE, results='hide'} suppressWarnings(library(dplyr)) suppressWarnings(library(reticulate)) suppressWarnings(library(readr)) path_to_python <- system("which python", intern = TRUE) use_python(path_to_python) sagemaker <- import('sagemaker') ``` Set up SageMaker execution role and S3 Bucket ```{r} role = sagemaker$get_execution_role() session = sagemaker$Session() bucket = session$default_bucket() account_id <- session$account_id() region <- session$boto_region_name local_path <- dirname(rstudioapi::getSourceEditorContext()$path) ``` Refer to Container_Build.rmd to build the container. Container should be built prior to step execution ```{r} #Your container_uri should look something like container_uri <- paste(account_id, "dkr.ecr", region, "amazonaws.com/sagemaker-r-parallel-processing:1.0", sep=".") print(container_uri) ``` Let's start by uploading our data set into Amazon S3 ```{r} local_dataset_path <- paste0(local_path,"/dataset/") dataset_files <- list.files(path=local_dataset_path, pattern="\\.csv$", full.names=TRUE) for (file in dataset_files){ session$upload_data(file, bucket=bucket, key_prefix="sagemaker-rstudio-example/split") } input_s3_split_location <- paste0("s3://", bucket, "/sagemaker-rstudio-example/split") ``` Setting up the filter processing job. This will take our dataset and process them over multiple instances with a feature engineering script filter.R Our example uses two instances. ```{r} filter_processor <- sagemaker$processing$ScriptProcessor(command=list("Rscript"), image_uri=container_uri, role=role, instance_count=2L, instance_type="ml.m5.large") output_s3_filter_location <- paste0("s3://", bucket, "/sagemaker-rstudio-example/filtered") s3_filter_input <- sagemaker$processing$ProcessingInput(source=input_s3_split_location, destination="/opt/ml/processing/input", s3_data_distribution_type="ShardedByS3Key", s3_data_type="S3Prefix") s3_filter_output <- sagemaker$processing$ProcessingOutput(output_name="bank-additional-full-filtered", destination=output_s3_filter_location, source="/opt/ml/processing/output") filtering_step <- sagemaker$workflow$steps$ProcessingStep(name="FilterProcessingStep", code=paste0(local_path, "/preprocessing/filter.R"), processor=filter_processor, inputs=list(s3_filter_input), outputs=list(s3_filter_output)) ``` Setting up the data processing job. This will take our processed dataset, and split it into a train, test and validation set ready for training. This uses the process.R script. ```{r} script_processor <- sagemaker$processing$ScriptProcessor(command=list("Rscript"), image_uri=container_uri, role=role, instance_count=1L, instance_type="ml.m5.large") output_s3_processed_location <- paste0("s3://", bucket, "/sagemaker-rstudio-example/processed") s3_processed_input <- sagemaker$processing$ProcessingInput(source=output_s3_filter_location, destination="/opt/ml/processing/input", s3_data_type="S3Prefix") s3_processed_output <- sagemaker$processing$ProcessingOutput(output_name="bank-additional-full-processed", destination=output_s3_processed_location, source="/opt/ml/processing/output") processing_step <- sagemaker$workflow$steps$ProcessingStep(name="ProcessingStep", code=paste0(local_path, "/preprocessing/process.R"), processor=script_processor, inputs=list(s3_processed_input), outputs=list(s3_processed_output), depends_on=list(filtering_step)) ``` This step will kick off the pipeline to initiate each processing job. ```{r} pipeline = sagemaker$workflow$pipeline$Pipeline( name="BankAdditionalPipelineUsingR", steps=list(filtering_step, processing_step) ) upserted <- pipeline$upsert(role_arn=role) execution <- pipeline$start() execution$describe() execution$wait() ```