In [None]:
!pip install -qq sagemaker boto3

Restart the kernel after having executed the above cell.

In [None]:
import sagemaker
session = sagemaker.Session()

###### CLUSTER CONFIGURATION
cluster_id = input("The name of your Redshift cluster:")
cluster_role_name = input("The name of the Role you've associated to your Redshift Cluster (not the ARN, default: myRedshiftRole):") or "myRedshiftRole"
cluster_role_arn = f'arn:aws:iam::{session.account_id()}:role/service-role/{cluster_role_name}'
database = input("The database of your Redshift cluster (default: dev)") or 'dev'
db_user = input("The user of your Redshift cluster (default: awsuser)") or 'awsuser'

###### OUTPUT S3 PATH
bucket = input("Your S3 bucket (leave empty for default):") or session.default_bucket()
key_prefix = input("The path where to save the output of the Redshift query in S3 (default: redshift-demo/redshift2processing/data/)") or "redshift-demo/redshift2processing/data/"
output_s3_uri = f's3://{bucket}/{key_prefix}'

###### QUERY STRING
query_string = "select * from users" # this will work on the default Free Tier Redshift cluster. Change if needed.

# Output the info
print(f'\n\nCluster ID: {cluster_id}\nRole ARN: {cluster_role_arn}\nOutput S3 URI: {output_s3_uri}')

In [None]:
from sagemaker.workflow.lambda_step import LambdaStep, Lambda, LambdaOutput, LambdaOutputTypeEnum
from sagemaker.workflow.parameters import ParameterString, ParameterInteger
from sagemaker.workflow.steps import ProcessingStep, ProcessingInput, ProcessingOutput
from sagemaker.sklearn import SKLearnProcessor
from sagemaker import get_execution_role

# Pipelines parameters
sql_query = ParameterString(name='SQL_QUERY', default_value=query_string)
s3_path = ParameterString(name='S3_PATH', default_value=output_s3_uri)
role = ParameterString(name='REDSHIFT_ROLE', default_value=cluster_arn_role)
partition_by_column = ParameterString(name='PARTITION_BY_COLUMN', default_value='state')
processing_instance_type = ParameterString(name='PROCESS_INSTANCE_TYPE', default_value='ml.m5.xlarge')
processing_instance_count = ParameterInteger(name='PROCESS_INSTANCE_COUNT', default_value=3)

##################
### Pipeline steps
##################
# Lambda Step - Unload from Redshift with partitions
l = Lambda(
 function_name='RedshiftPartitionUnloader',
 script='lambda/handler.py',
 handler='handler.lambda_handler',
 timeout=60*5,
 memory_size=256, 
 execution_role_arn='arn:aws:iam::859755744029:role/LambdasCanDoEverything'
)
lambda_step = LambdaStep(
 name='RedshiftPartitionUnloaderLAMBDA',
 lambda_func=l,
 inputs={
 'SQL_QUERY': sql_query,
 'S3_PATH': s3_path,
 'REDSHIFT_ROLE': role,
 'PARTITION_BY_COLUMN': partition_by_column
 },
 outputs=[LambdaOutput('status', LambdaOutputTypeEnum.String), LambdaOutput('s3_path', LambdaOutputTypeEnum.String)]
)
# Processing Step - Read from S3 partitioned data and transform
p = SKLearnProcessor(
 framework_version='0.23-1',
 role=get_execution_role(),
 instance_type=processing_instance_type,
 instance_count=processing_instance_count
)
processing_step = ProcessingStep(
 name='RedshiftPartitionUnloaderPROCESSING',
 processor=p,
 inputs=[ProcessingInput(
 source=s3_path, 
 destination='/opt/ml/processing/input/data/', 
 s3_data_distribution_type='ShardedByS3Key'
 )],
 outputs=[
 ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
 ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
 ],
 code='processing.py', depends_on=[lambda_step]
)

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
 name='RedshiftPartitionUnloaderPIPELINE', parameters=[sql_query, s3_path, role, partition_by_column], steps=[lambda_step, processing_step]
)
pipeline.upsert(role_arn=get_execution_role())

In [None]:
execution = pipeline.start()
execution.wait()