# TEST - Run the preprocessing step on AWS Batch
This notebook will run preprocess.py on the AWS Batch container using the default API.

#### Dataset
We will use the census dataset from `sagemaker-examples` for this demo. If you wish to test with another dataset, you will need to modify the logic within preprocess.py.



In [1]:
import pandas as pd
import utils
import boto3
import json
import sagemaker
import uuid

In [2]:
# change bucket name to the name of the S3 bucket that was created
bucket = 'REPLACE ME'
region = sagemaker.Session().boto_region_name
account = boto3.client('sts').get_caller_identity().get('Account')

# PRINT
print('region:', region)
print('bucket:', bucket)
print('account', account)

region: ap-southeast-1
bucket: sagemaker-to-batch
account 239577782971


# Download dataset locally

In [3]:
utils.mkpath_if_not_exist('data')

In [4]:
s3 = boto3.client("s3")
s3.download_file(
    "sagemaker-sample-data-{}".format(region),
    "processing/census/census-income.csv",
    "data/census-income.csv",
)
df = pd.read_csv("data/census-income.csv")
df.to_csv("data/dataset.csv")
df.head()

Unnamed: 0,age,class of worker,detailed industry recode,detailed occupation recode,education,wage per hour,enroll in edu inst last wk,marital stat,major industry code,major occupation code,...,country of birth father,country of birth mother,country of birth self,citizenship,own business or self employed,fill inc questionnaire for veteran's admin,veterans benefits,weeks worked in year,year,income
0,73,Not in universe,0,0,High school graduate,0,Not in universe,Widowed,Not in universe or children,Not in universe,...,United-States,United-States,United-States,Native- Born in the United States,0,Not in universe,2,0,95,- 50000.
1,58,Self-employed-not incorporated,4,34,Some college but no degree,0,Not in universe,Divorced,Construction,Precision production craft & repair,...,United-States,United-States,United-States,Native- Born in the United States,0,Not in universe,2,52,94,- 50000.
2,18,Not in universe,0,0,10th grade,0,High school,Never married,Not in universe or children,Not in universe,...,Vietnam,Vietnam,Vietnam,Foreign born- Not a citizen of U S,0,Not in universe,2,0,95,- 50000.
3,9,Not in universe,0,0,Children,0,Not in universe,Never married,Not in universe or children,Not in universe,...,United-States,United-States,United-States,Native- Born in the United States,0,Not in universe,0,0,94,- 50000.
4,10,Not in universe,0,0,Children,0,Not in universe,Never married,Not in universe or children,Not in universe,...,United-States,United-States,United-States,Native- Born in the United States,0,Not in universe,0,0,94,- 50000.


# Upload Input data and code to S3

In [5]:
s3.upload_file('data/dataset.csv',bucket,'data/sample/census/dataset.csv', )

In [6]:
s3.upload_file('preprocess.py',bucket,'code-repo/sagemaker-process-code/preprocess.py')
s3.upload_file('sagemaker_entry_point.py',bucket,'code-repo/sagemaker-process-code/sagemaker_entry_point.py')
s3.upload_file('utils.py',bucket,'code-repo/sagemaker-process-code/utils.py')

# Run AWS Batch Job

In [7]:
def create_unique_job_name(base_job_name, max_length=60):
    import uuid
    from datetime import datetime
    datetime_uuid=datetime.strftime(datetime.now(), "%Y%m%d%H%S")
    return f'{base_job_name}-{datetime_uuid}'[:max_length]

def get_account_details():
    account = boto3.client('sts').get_caller_identity().get('Account')
    region = boto3.session.Session().region_name
    env= 'dev' if account =='REPLACE ME' else 'prod'
   
    return account,region,env

def run_process_job(train_test_split,
                    validation_flag,
                    job_queue,
                    job_definition,
                    batch_platform='arm64'
                   ):
    
    batch = boto3.client('batch')
    
    account,region,env = get_account_details()
    # Change input_bucket, code_bucket and output_bucket to the name of your S3 bucket
    INPUT_BUCKET= 'REPLACE ME'
    CODE_BUCKET = 'REPLACE ME'
    OUTPUT_BUCKET = 'REPLACE ME'
    PREPROCESING_ENTRYPOINT = f"s3://{CODE_BUCKET}/code-repo/sagemaker-process-code/sagemaker_entry_point.py"

    #AWS BATCH PARAMETERS
    job_name=create_unique_job_name(base_job_name=f'data-process-aws-batch-run'.replace('_','-'),max_length=60)
        
    parameters = {
    'train-test-split' : train_test_split,
    'validataion-flag' : validation_flag,
    'env': env,
    'entry-point': PREPROCESING_ENTRYPOINT,
    'input-bucket': INPUT_BUCKET,
    'output-bucket': OUTPUT_BUCKET,
    'code-bucket': CODE_BUCKET,
    }

    try:
        # Submit a Batch Job
        response = batch.submit_job(jobQueue=job_queue, 
                                    jobName=job_name,
                                    jobDefinition=job_definition, 
                                    parameters=parameters)
        # Log response from AWS Batch
        print(f"Job submitted for jobName={job_name} for parameters={parameters}, Response: " + json.dumps(response, indent=2))
        # Return the jobId
        jobId = response['jobId']
        return jobId
    except Exception as e:
        print(e)
        message = f'Error submitting Batch Job for jobName={job_name} for parameters={parameters}'
        print(message)
        raise Exception(message)

In [8]:
#ARM 64 JOB QUEUE
job_queue=f'arn:aws:batch:{region}:{account}:job-queue/aws-dev-aws-batch-arm64-job-queue'
job_definition=f'arn:aws:batch:{region}:{account}:job-definition/aws-dev-aws-batch-arm64-job-definition'
    
job_id = run_process_job(train_test_split="0.2",
                         validation_flag="True",
                         job_queue=job_queue,
                         job_definition=job_definition,
                        )
    

Job submitted for jobName=data-process-aws-batch-run-202206281019 for parameters={'train-test-split': '0.2', 'validataion-flag': 'True', 'env': 'dev', 'region': 'ap-southeast-1', 'entry-point': 's3://sagemaker-to-batch/code-repo/sagemaker-process-code/sagemaker_entry_point.py', 'input-bucket': 'sagemaker-to-batch', 'output-bucket': 'sagemaker-to-batch', 'code-bucket': 'sagemaker-to-batch'}, Response: {
  "ResponseMetadata": {
    "RequestId": "cd5cb545-4c32-4671-a6a7-9166ae565f2b",
    "HTTPStatusCode": 200,
    "HTTPHeaders": {
      "date": "Tue, 28 Jun 2022 02:18:20 GMT",
      "content-type": "application/json",
      "content-length": "194",
      "connection": "keep-alive",
      "x-amzn-requestid": "cd5cb545-4c32-4671-a6a7-9166ae565f2b",
      "access-control-allow-origin": "*",
      "x-amz-apigw-id": "UaS06HNiSQ0FaMQ=",
      "access-control-expose-headers": "X-amzn-errortype,X-amzn-requestid,X-amzn-errormessage,X-amzn-trace-id,X-amz-apigw-id,date",
      "x-amzn-trace-id": "R

------