{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install -qq sagemaker boto3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Restart the kernel after having executed the above cell." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "session = sagemaker.Session()\n", "\n", "###### CLUSTER CONFIGURATION\n", "cluster_id = input(\"The name of your Redshift cluster:\")\n", "cluster_role_name = input(\"The name of the Role you've associated to your Redshift Cluster (not the ARN, default: myRedshiftRole):\") or \"myRedshiftRole\"\n", "cluster_role_arn = f'arn:aws:iam::{session.account_id()}:role/service-role/{cluster_role_name}'\n", "database = input(\"The database of your Redshift cluster (default: dev)\") or 'dev'\n", "db_user = input(\"The user of your Redshift cluster (default: awsuser)\") or 'awsuser'\n", "\n", "###### OUTPUT S3 PATH\n", "bucket = input(\"Your S3 bucket (leave empty for default):\") or session.default_bucket()\n", "key_prefix = input(\"The path where to save the output of the Redshift query in S3 (default: redshift-demo/redshift2processing/data/)\") or \"redshift-demo/redshift2processing/data/\"\n", "output_s3_uri = f's3://{bucket}/{key_prefix}'\n", "\n", "###### QUERY STRING\n", "query_string = \"select * from users\" # this will work on the default Free Tier Redshift cluster. Change if needed.\n", "\n", "# Output the info\n", "print(f'\\n\\nCluster ID: {cluster_id}\\nRole ARN: {cluster_role_arn}\\nOutput S3 URI: {output_s3_uri}')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.lambda_step import LambdaStep, Lambda, LambdaOutput, LambdaOutputTypeEnum\n", "from sagemaker.workflow.parameters import ParameterString, ParameterInteger\n", "from sagemaker.workflow.steps import ProcessingStep, ProcessingInput, ProcessingOutput\n", "from sagemaker.sklearn import SKLearnProcessor\n", "from sagemaker import get_execution_role\n", "\n", "# Pipelines parameters\n", "sql_query = ParameterString(name='SQL_QUERY', default_value=query_string)\n", "s3_path = ParameterString(name='S3_PATH', default_value=output_s3_uri)\n", "role = ParameterString(name='REDSHIFT_ROLE', default_value=cluster_arn_role)\n", "partition_by_column = ParameterString(name='PARTITION_BY_COLUMN', default_value='state')\n", "processing_instance_type = ParameterString(name='PROCESS_INSTANCE_TYPE', default_value='ml.m5.xlarge')\n", "processing_instance_count = ParameterInteger(name='PROCESS_INSTANCE_COUNT', default_value=3)\n", "\n", "##################\n", "### Pipeline steps\n", "##################\n", "# Lambda Step - Unload from Redshift with partitions\n", "l = Lambda(\n", " function_name='RedshiftPartitionUnloader',\n", " script='lambda/handler.py',\n", " handler='handler.lambda_handler',\n", " timeout=60*5,\n", " memory_size=256, \n", " execution_role_arn='arn:aws:iam::859755744029:role/LambdasCanDoEverything'\n", ")\n", "lambda_step = LambdaStep(\n", " name='RedshiftPartitionUnloaderLAMBDA',\n", " lambda_func=l,\n", " inputs={\n", " 'SQL_QUERY': sql_query,\n", " 'S3_PATH': s3_path,\n", " 'REDSHIFT_ROLE': role,\n", " 'PARTITION_BY_COLUMN': partition_by_column\n", " },\n", " outputs=[LambdaOutput('status', LambdaOutputTypeEnum.String), LambdaOutput('s3_path', LambdaOutputTypeEnum.String)]\n", ")\n", "# Processing Step - Read from S3 partitioned data and transform\n", "p = SKLearnProcessor(\n", " framework_version='0.23-1',\n", " role=get_execution_role(),\n", " instance_type=processing_instance_type,\n", " instance_count=processing_instance_count\n", ")\n", "processing_step = ProcessingStep(\n", " name='RedshiftPartitionUnloaderPROCESSING',\n", " processor=p,\n", " inputs=[ProcessingInput(\n", " source=s3_path, \n", " destination='/opt/ml/processing/input/data/', \n", " s3_data_distribution_type='ShardedByS3Key'\n", " )],\n", " outputs=[\n", " ProcessingOutput(output_name=\"train\", source=\"/opt/ml/processing/train\"),\n", " ProcessingOutput(output_name=\"test\", source=\"/opt/ml/processing/test\"),\n", " ],\n", " code='processing.py', depends_on=[lambda_step]\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.workflow.pipeline import Pipeline\n", "\n", "pipeline = Pipeline(\n", " name='RedshiftPartitionUnloaderPIPELINE', parameters=[sql_query, s3_path, role, partition_by_column], steps=[lambda_step, processing_step]\n", ")\n", "pipeline.upsert(role_arn=get_execution_role())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution = pipeline.start()\n", "execution.wait()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:eu-west-1:470317259841:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }