{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Ingest data with Redshift\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. \n", "\n", "\n", "\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook demonstrates how to set up a database with Redshift and query data with it.\n", "\n", "Amazon Redshift is a fully managed data warehouse that allows you to run complex analytic queries against petabytes of structured data. Your queries are distributed and parallelized across multiple physical resources, and you can easily scale your Amazon Redshift environment up and down depending on your business needs.\n", "\n", "You can also check the [existing notebook](https://github.com/aws/amazon-sagemaker-examples/blob/master/advanced_functionality/working_with_redshift_data/working_with_redshift_data.ipynb) for more information on how to load data from and save data to Redshift." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## When should you use Redshift?\n", "\n", "While Athena is mostly used to run ad-hoc queries on Amazon S3 data lake, Redshift is usually recommended for large structured data sets, or traditional relational database; it does well with performing aggregations, complex joins, and inner queries. You would need to set up and load the cluster before using it; and you need to load data into created tables. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set up Redshift\n", "First we are going to make sure we have policy attached to our role (The role we will create specifically for the Redshift task) to access Redshift. You can do this through IAM client as below, or through the AWS console.\n", "\n", "**Note: You would need IAMFullAccess to attach policies to the role.**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Attach IAMFullAccess Policy from Console\n", "\n", "**1.** Go to **Sagemaker Console**, choose **notebook instances** in the navigation panel, then select your notebook instance to view the details. Then under **Permissions and Encryption**, click on the **IAM role ARN** link and it will take you to your role summery in the **IAM Console**. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "<div>\n", "<img src=\"image/athena-iam-1.png\" width=\"300\"/>\n", "</div>" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**2.** Click on **Create Policy** under **Permissions**." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "<div>\n", "<img src=\"image/athena-iam-2.PNG\" width=\"300\"/>\n", "</div>" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**3.** In the **Attach Permissions** page, search for **IAMFullAccess**. It will show up in the policies search results if it has not been attached to your role yet. Select the checkbox for the **IAMFullAccess** Policy, then click **Attach Policy**. You now have the policy successfully attached to your role." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "<div>\n", "<img src=\"image/athena-iam-3.PNG\" width=\"500\"/>\n", "</div>" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dataset\n", "\n", "This example uses the California Housing dataset, which was originally published in:\n", "\n", "> Pace, R. Kelley, and Ronald Barry. \"Sparse spatial autoregressions.\" Statistics & Probability Letters 33.3 (1997): 291-297." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%pip install -qU 'sagemaker>=2.15.0' 'PyAthena==1.10.7' 'awswrangler==1.2.0' 'SQLAlchemy==1.3.13'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import io\n", "import boto3\n", "import sagemaker\n", "import json\n", "from sagemaker import get_execution_role\n", "import os\n", "from sklearn.datasets import *\n", "import pandas as pd\n", "from botocore.exceptions import ClientError\n", "import awswrangler as wr\n", "from datetime import date\n", "\n", "# Get region\n", "session = boto3.session.Session()\n", "region_name = session.region_name\n", "\n", "# Get SageMaker session & default S3 bucket\n", "sagemaker_session = sagemaker.Session()\n", "bucket = sagemaker_session.default_bucket() # replace with your own bucket name if you have one\n", "role = sagemaker.get_execution_role()\n", "prefix = \"data/tabular/california_housing\"\n", "filename = \"california_housing.csv\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iam = boto3.client(\"iam\")\n", "sts = boto3.client(\"sts\")\n", "redshift = boto3.client(\"redshift\")\n", "sm = boto3.client(\"sagemaker\")\n", "s3 = sagemaker_session.boto_session.resource(\"s3\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "role_name = role.split(\"/\")[-1]\n", "print(\"Your Role name used to create this notebook is: {}\".format(role_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Download data from online resources and write data to S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# helper functions to upload data to s3\n", "def write_to_s3(filename, bucket, prefix):\n", " # put one file in a separate folder. This is helpful if you read and prepare data with Athena\n", " filename_key = filename.split(\".\")[0]\n", " key = \"{}/{}/{}\".format(prefix, filename_key, filename)\n", " return s3.Bucket(bucket).upload_file(filename, key)\n", "\n", "\n", "def upload_to_s3(bucket, prefix, filename):\n", " url = \"s3://{}/{}/{}\".format(bucket, prefix, filename)\n", " print(\"Writing to {}\".format(url))\n", " write_to_s3(filename, bucket, prefix)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tabular_data = fetch_california_housing()\n", "tabular_data_full = pd.DataFrame(tabular_data.data, columns=tabular_data.feature_names)\n", "tabular_data_full[\"target\"] = pd.DataFrame(tabular_data.target)\n", "tabular_data_full.to_csv(\"california_housing.csv\", index=False)\n", "\n", "upload_to_s3(bucket, \"data/tabular\", filename)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Redshift Role\n", "The policy enables Redshift to assume the role. The services can then perform any tasks granted by the permissions policy assigned to the role (which we will attach to it later). " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "assume_role_policy_doc = {\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\"Service\": \"redshift.amazonaws.com\"},\n", " \"Action\": \"sts:AssumeRole\",\n", " }\n", " ],\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create Role\n", "iam_redshift_role_name = \"Tabular_Redshift\"\n", "try:\n", " iam_role_redshift = iam.create_role(\n", " RoleName=iam_redshift_role_name,\n", " AssumeRolePolicyDocument=json.dumps(assume_role_policy_doc),\n", " Description=\"Tabular data Redshift Role\",\n", " )\n", "except ClientError as e:\n", " if e.response[\"Error\"][\"Code\"] == \"EntityAlreadyExists\":\n", " print(\"Role already exists\")\n", " else:\n", " print(\"Unexpected error: %s\" % e)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# get role arn\n", "role_rs = iam.get_role(RoleName=\"Tabular_Redshift\")\n", "iam_role_redshift_arn = role_rs[\"Role\"][\"Arn\"]\n", "print(\"Your Role arn used to create a Redshift Cluster is: {}\".format(iam_role_redshift_arn))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Policy Document\n", "We will create policies we used to access S3 and Athena. The two policies we will create here are: \n", "* S3FullAccess: `arn:aws:iam::aws:policy/AmazonS3FullAccess`\n", "* AthenaFullAccess: `arn:aws:iam::aws:policy/AmazonAthenaFullAccess`\n", "\n", "You can check the policy document in the IAM console and copy the policy file here." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# s3FullAccess\n", "my_redshift_to_s3 = {\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [{\"Effect\": \"Allow\", \"Action\": \"s3:*\", \"Resource\": \"*\"}],\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Athena Full Access\n", "my_redshift_to_athena = {\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\"Effect\": \"Allow\", \"Action\": [\"athena:*\"], \"Resource\": [\"*\"]},\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"glue:CreateDatabase\",\n", " \"glue:DeleteDatabase\",\n", " \"glue:GetDatabase\",\n", " \"glue:GetDatabases\",\n", " \"glue:UpdateDatabase\",\n", " \"glue:CreateTable\",\n", " \"glue:DeleteTable\",\n", " \"glue:BatchDeleteTable\",\n", " \"glue:UpdateTable\",\n", " \"glue:GetTable\",\n", " \"glue:GetTables\",\n", " \"glue:BatchCreatePartition\",\n", " \"glue:CreatePartition\",\n", " \"glue:DeletePartition\",\n", " \"glue:BatchDeletePartition\",\n", " \"glue:UpdatePartition\",\n", " \"glue:GetPartition\",\n", " \"glue:GetPartitions\",\n", " \"glue:BatchGetPartition\",\n", " ],\n", " \"Resource\": [\"*\"],\n", " },\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"s3:GetBucketLocation\",\n", " \"s3:GetObject\",\n", " \"s3:ListBucket\",\n", " \"s3:ListBucketMultipartUploads\",\n", " \"s3:ListMultipartUploadParts\",\n", " \"s3:AbortMultipartUpload\",\n", " \"s3:CreateBucket\",\n", " \"s3:PutObject\",\n", " ],\n", " \"Resource\": [\"arn:aws:s3:::aws-athena-query-results-*\"],\n", " },\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\"s3:GetObject\", \"s3:ListBucket\"],\n", " \"Resource\": [\"arn:aws:s3:::athena-examples*\"],\n", " },\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\"s3:ListBucket\", \"s3:GetBucketLocation\", \"s3:ListAllMyBuckets\"],\n", " \"Resource\": [\"*\"],\n", " },\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\"sns:ListTopics\", \"sns:GetTopicAttributes\"],\n", " \"Resource\": [\"*\"],\n", " },\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"cloudwatch:PutMetricAlarm\",\n", " \"cloudwatch:DescribeAlarms\",\n", " \"cloudwatch:DeleteAlarms\",\n", " ],\n", " \"Resource\": [\"*\"],\n", " },\n", " {\"Effect\": \"Allow\", \"Action\": [\"lakeformation:GetDataAccess\"], \"Resource\": [\"*\"]},\n", " ],\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " policy_redshift_s3 = iam.create_policy(\n", " PolicyName=\"Tabular_RedshiftPolicyToS3\", PolicyDocument=json.dumps(my_redshift_to_s3)\n", " )\n", " print(\"Policy created.\")\n", "except ClientError as e:\n", " if e.response[\"Error\"][\"Code\"] == \"EntityAlreadyExists\":\n", " print(\"Policy already exists\")\n", " else:\n", " print(\"Unexpected error: %s\" % e)\n", "\n", "account_id = sts.get_caller_identity()[\"Account\"]\n", "policy_redshift_s3_arn = f\"arn:aws:iam::{account_id}:policy/Tabular_RedshiftPolicyToS3\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " policy_redshift_athena = iam.create_policy(\n", " PolicyName=\"Tabular_RedshiftPolicyToAthena\",\n", " PolicyDocument=json.dumps(my_redshift_to_athena),\n", " )\n", " print(\"Policy created.\")\n", "except ClientError as e:\n", " if e.response[\"Error\"][\"Code\"] == \"EntityAlreadyExists\":\n", " print(\"Policy already exists\")\n", " else:\n", " print(\"Unexpected error: %s\" % e)\n", "\n", "account_id = sts.get_caller_identity()[\"Account\"]\n", "policy_redshift_athena_arn = f\"arn:aws:iam::{account_id}:policy/Tabular_RedshiftPolicyToAthena\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Attach Policy to Role" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Attach RedshiftPolicyToAthena policy\n", "try:\n", " response = iam.attach_role_policy(\n", " PolicyArn=policy_redshift_athena_arn, RoleName=iam_redshift_role_name\n", " )\n", "except ClientError as e:\n", " if e.response[\"Error\"][\"Code\"] == \"EntityAlreadyExists\":\n", " print(\"Policy is already attached. This is ok.\")\n", " else:\n", " print(\"Unexpected error: %s\" % e)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Attach RedshiftPolicyToS3 policy\n", "try:\n", " response = iam.attach_role_policy(\n", " PolicyArn=policy_redshift_s3_arn, RoleName=iam_redshift_role_name\n", " )\n", "except ClientError as e:\n", " if e.response[\"Error\"][\"Code\"] == \"EntityAlreadyExists\":\n", " print(\"Policy is already attached. This is ok.\")\n", " else:\n", " print(\"Unexpected error: %s\" % e)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Making Sure your Role **to run this Notebook** has the following policy attached:\n", "\n", "* `SecretsManagerReadWrite`: we will use this service to store and retrive our Redshift Credentials.\n", "* `AmazonRedshiftFullAccess`: we will use this role to create a Redshift cluster from the notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# making sure you have secret manager policy attached to role\n", "try:\n", " policy = \"SecretsManagerReadWrite\"\n", " response = iam.attach_role_policy(\n", " PolicyArn=\"arn:aws:iam::aws:policy/{}\".format(policy), RoleName=role_name\n", " )\n", " print(\"Policy %s has been succesfully attached to role: %s\" % (policy, role_name))\n", "except ClientError as e:\n", " if e.response[\"Error\"][\"Code\"] == \"EntityAlreadyExists\":\n", " print(\"Policy is already attached.\")\n", " else:\n", " print(\"Unexpected error: %s \" % e)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# making sure you have RedshiftFullAccess policy attached to role\n", "from botocore.exceptions import ClientError\n", "\n", "try:\n", " policy = \"AmazonRedshiftFullAccess\"\n", " response = iam.attach_role_policy(\n", " PolicyArn=\"arn:aws:iam::aws:policy/{}\".format(policy), RoleName=role_name\n", " )\n", " print(\"Policy %s has been succesfully attached to role: %s\" % (policy, role_name))\n", "except ClientError as e:\n", " if e.response[\"Error\"][\"Code\"] == \"EntityAlreadyExists\":\n", " print(\"Policy is already attached. \")\n", " else:\n", " print(\"Unexpected error: %s \" % e)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Optional: Create Redshift Cluster\n", "\n", "Most of the times we have a Redshift cluster already up and running and we want to connect to the cluster in-use, but if you want to create a new cluster, you can follow the steps below to create one.\n", "*Note that only some Instance Types support Redshift Query Editor, so be careful when you specify the Redshift Cluster Nodes.*(https://docs.aws.amazon.com/redshift/latest/mgmt/query-editor.html)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "notebook_instance_name = sm.list_notebook_instances()[\"NotebookInstances\"][0][\n", " \"NotebookInstanceName\"\n", "]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Secret in Secrets Manager\n", "\n", "__Your IAM role will need permission to create a secret and get its value.__ This can be accomplished with the SecretsManagerReadWrite managed policy.\n", "\n", "AWS Secrets Manager is a service that enables you to easily rotate, manage, and retrieve database credentials, API keys, and other secrets throughout their lifecycle. Using Secrets Manager, you can secure and manage secrets used to access resources in the AWS Cloud, on third-party services, and on-premises.\n", "\n", "*note that `MasterUserPassword` must contain at least 1 upper case letter and at least 1 decimal digit.\n", "\n", "Ensure that you change the secret password to be unique and secure." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "secretsmanager = boto3.client(\"secretsmanager\")\n", "\n", "try:\n", " response = secretsmanager.create_secret(\n", " Name=\"tabular_redshift_login\",\n", " Description=\"California Housing data New Cluster Redshift Login\",\n", " SecretString='[{\"username\":\"awsuser\"},{\"password\":\"Californiahousing1\"}]',\n", " Tags=[\n", " {\"Key\": \"name\", \"Value\": \"tabular_redshift_login\"},\n", " ],\n", " )\n", "except ClientError as e:\n", " if e.response[\"Error\"][\"Code\"] == \"ResourceExistsException\":\n", " print(\"Secret already exists. This is ok.\")\n", " else:\n", " print(\"Unexpected error: %s\" % e)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# And retrieving the secret again\n", "secretsmanager = boto3.client(\"secretsmanager\")\n", "import json\n", "\n", "secret = secretsmanager.get_secret_value(SecretId=\"tabular_redshift_login\")\n", "cred = json.loads(secret[\"SecretString\"])\n", "\n", "master_user_name = cred[0][\"username\"]\n", "master_user_pw = cred[1][\"password\"]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Set up parameters\n", "# Redshift configuration parameters\n", "redshift_cluster_identifier = \"redshiftdemo\"\n", "database_name = \"california_housing\"\n", "cluster_type = \"multi-node\"\n", "\n", "node_type = \"dc2.large\"\n", "number_nodes = \"2\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When creating a new cluster, you want to make sure that the Redshift VPC is the same one you used to create your notebook in. Your VPC should have the following two VPC attributes set to **yes**: **DNS resolution** and **DNS hostnames**. You can either specify a **security group** or specify a created **cluster subnet group name** (which you will create from the Redshift console).\n", "\n", "If you are not using default VPC and using **security group** returns VPC error, you can try create a subnet group in Redshift Console, by choose **Configurations** -> **subnet groups** -> **create cluster subnet group**, then specify the **VPC** and **subnet** you want to choose and you created this notebook in. Specify the `ClusterSubnetGroupName` in the following command with the subnet group you created." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Optional: Get Security Group ID\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "notebook_instance = sm.describe_notebook_instance(NotebookInstanceName=notebook_instance_name)\n", "security_group_id = notebook_instance[\"SecurityGroups\"][0]\n", "print(security_group_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Redshift Cluster using Subnet Group" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = redshift.create_cluster(\n", " DBName=database_name,\n", " ClusterIdentifier=redshift_cluster_identifier,\n", " ClusterType=cluster_type,\n", " NodeType=node_type,\n", " NumberOfNodes=int(number_nodes),\n", " MasterUsername=master_user_name,\n", " MasterUserPassword=master_user_pw,\n", " # ClusterSubnetGroupName=\"<cluster-subnet-group-1>\", # you can either specify an existing subnet group (change this to your Subnet Group name), or use the security group ID that was retrieved above\n", " IamRoles=[iam_role_redshift_arn],\n", " VpcSecurityGroupIds=[security_group_id],\n", " Port=5439,\n", " PubliclyAccessible=False,\n", ")\n", "\n", "print(response)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Wait until the status of your redshift cluster become **available**." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# check cluster status\n", "response = redshift.describe_clusters(ClusterIdentifier=redshift_cluster_identifier)\n", "cluster_status = response[\"Clusters\"][0][\"ClusterStatus\"]\n", "print(\"Your Redshift Cluster Status is: \" + cluster_status)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Existing Redshift Cluster\n", "### Prerequisites\n", "Your existing Redshift cluster have to be in the **same VPC** as your notebook instance.\n", "\n", "Also, note that this Notebook instance needs to resolve to a private IP when connecting to the Redshift instance. There are two ways to resolve the Redshift DNS name to a private IP:\n", "\n", "The Redshift cluster is not publicly accessible so by default it will resolve to private IP.\n", "The Redshift cluster is publicly accessible and has an EIP associated with it but when accessed from within a VPC, it should resolve to private IP of the Redshift cluster. This is possible by setting following two VPC attributes to yes: **DNS resolution** and **DNS hostnames**. For instructions on setting that up, see Redshift public docs on [Managing Clusters in an Amazon Virtual Private Cloud (VPC)](https://docs.aws.amazon.com/redshift/latest/mgmt/managing-clusters-vpc.html).\n", "\n", "We will use [sqlalchemy](https://pypi.org/project/SQLAlchemy/) to connect to the redshift database engine." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sqlalchemy import create_engine\n", "from sqlalchemy.orm import sessionmaker" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Retrive Redshift credentials from Secret Manager" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "secretsmanager = boto3.client(\"secretsmanager\")\n", "secret = secretsmanager.get_secret_value(SecretId=\"tabular_redshift_login\")\n", "cred = json.loads(secret[\"SecretString\"])\n", "\n", "master_user_name = cred[0][\"username\"]\n", "master_user_pw = cred[1][\"password\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Set up parameters for connection: replace with your own parameters\n", "We are going to use the data and schema created in the sequel notebook Ingest_data_with_Athena.ipynb. If you see an error below, please make sure you run through the [02_Ingest_data_with_Athena.ipynb](02_Ingest_data_with_Athena_v1.ipynb) notebook before the next steps." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "redshift_cluster_identifier = \"redshiftdemo\"\n", "\n", "database_name_redshift = \"california_housing\"\n", "database_name_athena = \"tabular_california_housing\"\n", "\n", "redshift_port = \"5439\"\n", "\n", "schema_redshift = \"redshift\"\n", "schema_spectrum = \"spectrum\"\n", "\n", "table_name_csv = \"california_housing_athena\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Check cluster status to see if it is available" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# check cluster status\n", "response = redshift.describe_clusters(ClusterIdentifier=redshift_cluster_identifier)\n", "cluster_status = response[\"Clusters\"][0][\"ClusterStatus\"]\n", "print(\"Cluster status is:\", cluster_status)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Get Redshift Endpoint Address & IAM Role\n", "redshift_endpoint_address = response[\"Clusters\"][0][\"Endpoint\"][\"Address\"]\n", "iam_role = response[\"Clusters\"][0][\"IamRoles\"][0][\"IamRoleArn\"]\n", "\n", "print(\"Redshift endpoint: {}\".format(redshift_endpoint_address))\n", "print(\"IAM Role: {}\".format(iam_role))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Engine\n", "https://docs.sqlalchemy.org/en/13/core/engines.html" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Connect to Redshift Database Engine\n", "engine = create_engine(\n", " \"postgresql://{}:{}@{}:{}/{}\".format(\n", " master_user_name,\n", " master_user_pw,\n", " redshift_endpoint_address,\n", " redshift_port,\n", " database_name_redshift,\n", " )\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Session: we will use this session to run SQL commands" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# config session\n", "session = sessionmaker()\n", "session.configure(bind=engine)\n", "s = session()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Method 1: Access Data without Moving it to Redshift: Amazon Redshift Spectrum\n", "[Redshift Spectrum](https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum.html) is used to query data directly from files on Amazon S3.You will need to create external tables in an external schema. The external schema references a database in the external data catalog and provides the IAM role ARN that authorizes your cluster to access Amazon S3 on your behalf.\n", "#### Get table and schema information from the Glue Catalog: getting meta data from data catalog and connecting to the Athena database" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "statement = \"\"\"\n", "rollback;\n", "create external schema if not exists {} from data catalog \n", " database '{}' \n", " iam_role '{}'\n", " create external database if not exists\n", "\"\"\".format(\n", " schema_spectrum, database_name_athena, iam_role\n", ")\n", "\n", "s.execute(statement)\n", "s.commit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Run a sample query through Redshift Spectrum" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "statement = \"\"\"\n", "select *\n", " from {}.{} limit 10\n", "\"\"\".format(\n", " schema_spectrum, table_name_csv\n", ")\n", "\n", "df = pd.read_sql_query(statement, engine)\n", "df.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Method 2: Loading Data into Redshift from Athena\n", "To load data into Redshift, you need to either use `COPY` command or `INSERT INTO` command to move data into a table from data files. Copied files may reside in an S3 bucket, an EMR cluster, or on a remote host accessed.\n", "\n", "#### Create and Upload Data into Athena Database" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "database_name = \"tabular_california_housing\"\n", "table_name_csv = \"california_housing_athena\"\n", "\n", "# SQL statement to execute\n", "statement = \"\"\"CREATE EXTERNAL TABLE IF NOT EXISTS {}.{}(\n", " MedInc double,\n", " HouseAge double,\n", " AveRooms double,\n", " AveBedrms double,\n", " Population double,\n", " AveOccup double,\n", " Latitude double,\n", " Longitude double, \n", " MedValue double\n", "\n", ") ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\\\n' LOCATION '{}'\n", "TBLPROPERTIES ('skip.header.line.count'='1')\"\"\".format(\n", " database_name, table_name_csv, data_s3_path\n", ")\n", "\n", "# Execute statement using connection cursor\n", "cursor = connect(region_name=region_name, s3_staging_dir=s3_staging_dir).cursor()\n", "cursor.execute(statement)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# verify the table has been created\n", "statement = \"SHOW TABLES in {}\".format(database_name)\n", "cursor.execute(statement)\n", "\n", "df_show = as_pandas(cursor)\n", "df_show.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Schema in Redshift" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create schema\n", "statement = \"\"\"create schema if not exists {}\"\"\".format(schema_redshift)\n", "\n", "s = session()\n", "s.execute(statement)\n", "s.commit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Redshift Table" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "table_name_redshift = table_name_csv + \"_\" + \"redshift_insert\"\n", "statement = \"\"\"\n", "rollback;\n", "create table if not exists redshift.{}(\n", " MedInc float,\n", " HouseAge float,\n", " AveRooms float,\n", " AveBedrms float,\n", " Population float,\n", " AveOccup float,\n", " Latitude float,\n", " Longitude float, \n", " MedValue float)\"\"\".format(\n", " table_name_redshift\n", ")\n", "\n", "s.execute(statement)\n", "s.commit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### `Insert into` data into the table we created\n", "https://docs.aws.amazon.com/redshift/latest/dg/c_Examples_of_INSERT_30.html" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "table_name_redshift = table_name_csv + \"_\" + \"redshift_insert\"\n", "\n", "statement = \"\"\"\n", " insert into redshift.{}\n", " select * from {}.{} \n", " \"\"\".format(\n", " table_name_redshift, schema_spectrum, table_name_csv\n", ")\n", "s.execute(statement)\n", "s.commit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Query data in Redshift" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "statement = \"\"\"\n", " select * from redshift.{} limit 10\n", "\"\"\".format(\n", " table_name_redshift\n", ")\n", "df = pd.read_sql_query(statement, engine)\n", "df.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Method 3: Copy data directly from S3\n", "You can also `Copy` Data into a new table.\n", "https://docs.aws.amazon.com/redshift/latest/dg/tutorial-loading-run-copy.html\n", "\n", "#### Create a new Schema in Redshift" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create a new sample table\n", "table_name_redshift = table_name_csv + \"_\" + \"redshift_copy\"\n", "statement = \"\"\"\n", "rollback;\n", "create table if not exists redshift.{}(\n", " MedInc float,\n", " HouseAge float,\n", " AveRooms float,\n", " AveBedrms float,\n", " Population float,\n", " AveOccup float,\n", " Latitude float,\n", " Longitude float, \n", " MedValue float)\"\"\".format(\n", " table_name_redshift\n", ")\n", "\n", "s.execute(statement)\n", "s.commit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Copy data into Redshift table\n", "Redshift assumes your data comes in pipe delimited, so if you are reading in csv or txt, be sure to specify the `delimiter`. To load data that is in `CSV` format, add `csv` to your `COPY` command. Also since we are reading directly from S3, if your data has header, remember to add `ignoreheader` to your command." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "table_name_redshift = table_name_csv + \"_\" + \"redshift_copy\"\n", "data_s3_path = \"s3://{}/data/tabular/california_housing/california_housing.csv\".format(bucket)\n", "statement = \"\"\"\n", "rollback;\n", "copy redshift.{} \n", " from '{}'\n", " iam_role '{}'\n", " csv\n", " ignoreheader 1\n", " \"\"\".format(\n", " table_name_redshift, data_s3_path, iam_role\n", ")\n", "s.execute(statement)\n", "s.commit()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "statement = \"\"\"\n", " select * from redshift.{} limit 10\n", "\"\"\".format(\n", " table_name_redshift\n", ")\n", "df_copy = pd.read_sql_query(statement, engine)\n", "df_copy.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Error Handling\n", "\n", "Sometimes you might see an error stating\" Load into table 'part' failed. Check 'stl_load_errors' system table for details.\", and below is a helpful function to check where the copying process went wrong. You can find more information in the [Redshift Load Error documentation](https://docs.aws.amazon.com/redshift/latest/dg/r_STL_LOAD_ERRORS.html)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "statement = \"\"\"\n", "select query, substring(filename,22,25) as filename,line_number as line, \n", "substring(colname,0,12) as column, type, position as pos, substring(raw_line,0,30) as line_text,\n", "substring(raw_field_value,0,15) as field_text, \n", "substring(err_reason,0,45) as reason\n", "from stl_load_errors \n", "order by query desc\n", "limit 10\"\"\"\n", "error = pd.read_sql_query(statement, engine)\n", "error.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Method 4: AWS Data Wrangler\n", "\n", "You can find more information on how AWS Data Wrangler works at [this tutorial](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/008%20-%20Redshift%20-%20Copy%20%26%20Unload.ipynb)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### AWS Data Wrangler Get Engine Function\n", "Run this command within a private subnet. You can find your host address by going to the Redshift Console, then choose **Clusters** -> **Property** -> **Connection details** -> **View all connection details** -> **Node IP address** -> **Private IP address**.\n", "https://aws-data-wrangler.readthedocs.io/en/latest/stubs/awswrangler.db.get_engine.html#awswrangler.db.get_engine" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "private_ip = redshift.describe_clusters(ClusterIdentifier=redshift_cluster_identifier)[\"Clusters\"][\n", " 0\n", "][\"ClusterNodes\"][0][\"PrivateIPAddress\"]\n", "print(\"Private IP address is: \", private_ip)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "engine = wr.db.get_engine(\n", " db_type=\"postgresql\",\n", " host=private_ip, # Private IP address of your Redshift Cluster\n", " port=redshift_port,\n", " database=database_name_redshift,\n", " user=master_user_name,\n", " password=master_user_pw,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = wr.db.read_sql_query(\"SELECT * FROM redshift.{}\".format(table_name_redshift), con=engine)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Citation\n", "\n", "Data Science On AWS workshops, Chris Fregly, Antje Barth, https://www.datascienceonaws.com/" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Notebook CI Test Results\n", "\n", "This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n" ] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 4 }