{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Sagemaker/Jupyter to work with Redshift\n", "\n", "## Pre-requisites\n", "\n", "You will need:\n", "\n", "* A redshift instance in a VPC. \n", "* A sagemaker notebook instance running this jupyter notebook in the same VPC as redshift.\n", "* A security group on the redshift instance that will allow access from the sagemaker notebook instance" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Install libraries" ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Requirement already satisfied: psycopg2-binary in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (2.8.5)\n", "\u001b[33mYou are using pip version 10.0.1, however version 20.1 is available.\n", "You should consider upgrading via the 'pip install --upgrade pip' command.\u001b[0m\n", "Requirement already satisfied: numpy in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (1.14.3)\n", "\u001b[33mYou are using pip version 10.0.1, however version 20.1 is available.\n", "You should consider upgrading via the 'pip install --upgrade pip' command.\u001b[0m\n", "Requirement already satisfied: matplotlib in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (3.0.3)\n", "Requirement already satisfied: python-dateutil>=2.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from matplotlib) (2.7.3)\n", "Requirement already satisfied: kiwisolver>=1.0.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from matplotlib) (1.0.1)\n", "Requirement already satisfied: pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from matplotlib) (2.2.0)\n", "Requirement already satisfied: cycler>=0.10 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from matplotlib) (0.10.0)\n", "Requirement already satisfied: numpy>=1.10.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from matplotlib) (1.14.3)\n", "Requirement already satisfied: six>=1.5 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from python-dateutil>=2.1->matplotlib) (1.11.0)\n", "Requirement already satisfied: setuptools in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from kiwisolver>=1.0.1->matplotlib) (39.1.0)\n", "\u001b[33mYou are using pip version 10.0.1, however version 20.1 is available.\n", "You should consider upgrading via the 'pip install --upgrade pip' command.\u001b[0m\n", "Requirement already satisfied: boto3 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (1.12.39)\n", "Requirement already satisfied: jmespath<1.0.0,>=0.7.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from boto3) (0.9.4)\n", "Requirement already satisfied: s3transfer<0.4.0,>=0.3.0 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from boto3) (0.3.3)\n", "Requirement already satisfied: botocore<1.16.0,>=1.15.39 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from boto3) (1.15.39)\n", "Requirement already satisfied: python-dateutil<3.0.0,>=2.1 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore<1.16.0,>=1.15.39->boto3) (2.7.3)\n", "Requirement already satisfied: urllib3<1.26,>=1.20; python_version != \"3.4\" in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore<1.16.0,>=1.15.39->boto3) (1.23)\n", "Requirement already satisfied: docutils<0.16,>=0.10 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from botocore<1.16.0,>=1.15.39->boto3) (0.14)\n", "Requirement already satisfied: six>=1.5 in /home/ec2-user/anaconda3/envs/python3/lib/python3.6/site-packages (from python-dateutil<3.0.0,>=2.1->botocore<1.16.0,>=1.15.39->boto3) (1.11.0)\n", "\u001b[33mYou are using pip version 10.0.1, however version 20.1 is available.\n", "You should consider upgrading via the 'pip install --upgrade pip' command.\u001b[0m\n" ] } ], "source": [ "!pip install psycopg2-binary\n", "!pip install numpy\n", "!pip install matplotlib\n", "!pip install boto3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Configure redshift connection\n", "\n", "### The secure way\n", "\n", "If you are planning on sharing this notebook or storing this notebook in version control you should never add any explicit credentials. In order to achieve this we will be using [AWS Secrets Manager](https://console.aws.amazon.com/secretsmanager/) to store our redshift credentials. \n", "\n", "#### Create new secret\n", "\n", "Once on the [AWS Secrets Manager](https://console.aws.amazon.com/secretsmanager/) console:\n", "\n", "* select: Store a new secret\n", "* select: Credentials for Redshift cluster\n", "* Enter username and password\n", "* Select a Redshift cluster\n", "* Click next\n", "\n", "On the next page:\n", "\n", "* Enter a human friendly name and description\n", "* Click next\n", "\n", "On the next page:\n", "\n", "* Click next\n", "\n", "On the next page:\n", "\n", "* Review your selections\n", "* Store secret\n", "\n", "#### Get machine readable name of secret\n", "\n", "\n", "#### Granting your sagemaker instance permission to use the secret\n", "\n", "Obviously we wouldn't want anyone to be able to read the secret. Nor would we want your sagemaker notebook instance to be able to read all the secrets. To achieve both objectives, let's grant our notebook instance IAM permissions to access the secret:\n", "\n", "On the [AWS Secrets Manager](https://console.aws.amazon.com/secretsmanager/) console click on your secret and find the Secret ARN. Replace the ARN information in the JSON below:\n", "\n", "```\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"secretsmanager:GetResourcePolicy\",\n", " \"secretsmanager:GetSecretValue\",\n", " \"secretsmanager:DescribeSecret\",\n", " \"secretsmanager:ListSecretVersionIds\"\n", " ],\n", " \"Resource\": [\n", " \"arn:aws:secretsmanager:us-west-2:111122223333:secret:aes128-1a2b3c\"\n", " ]\n", " }\n", " ]\n", "}\n", "```\n", "\n", "Now attach this policy as an inline policy to the execution role for your Sagemaker notebook instance:\n", "\n", "* Navigate to the [Amazon Sagemaker](https://us-west-2.console.aws.amazon.com/sagemaker/) console\n", "* Select Notebook Instances\n", "* Click on your notebook instance (the one running this notebook, most likely)\n", "* Under \"Permissions and Encryption\" click on the IAM role link\n", "* You should now be on an IAM console that allows you to \"Add inline policy\". Click on the link\n", "* On the \"Create Policy\" page that opens, click JSON, and replace the JSON lines that show up with the block above. \n", "* Click review policy\n", "* On the next page select a human friendly name for the policy and click \"Create policy\"\n", "\n", "Finally paste the ARN for you secret in the code block below" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# Put the ARN of your AWS Secrets Manager secret for your redshift cluster here:\n", "secret_name=\"arn:aws:secretsmanager:us-west-2:111122223333:secret:aes128-1a2b3c\"\n", "\n", "# This will get the secret from AWS Secrets Manager. \n", "import boto3\n", "import json\n", "\n", "session = boto3.session.Session()\n", "client = session.client(\n", " service_name='secretsmanager'\n", ")\n", "\n", "get_secret_value_response = client.get_secret_value(\n", " SecretId=secret_name\n", ")\n", "\n", "if 'SecretString' in get_secret_value_response:\n", " connection_info = json.loads(get_secret_value_response['SecretString'])\n", "else:\n", " print(\"ERROR: no secret data found\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### The insecure way\n", "\n", "Don't do this unless you are absolutely sure that nobody will ever see this notebook!\n", "\n", "On the [redshift console page](https://console.aws.amazon.com/redshiftv2/home) select clusters, then your cluster, then properties and look a connection string from which to extract the information required below. Presumably you know your database credentials. " ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# Please do not use this approach as it can expose credentials\n", "\n", "# connection_info = {\n", "# \"database\" : '', \n", "# \"host\" : '', \n", "# \"port\" : '5439', \n", "# \"user\" : '', \n", "# \"password\" : '' \n", "# }" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Establish the connection" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "# Sanity check for credentials\n", "expected_keys = set(['user', 'password', 'host', 'database', 'port'])\n", "if not expected_keys.issubset(connection_info.keys()):\n", " print(\"Expected values for \",expected_keys)\n", " print(\"Received values for \",set(connection_info.keys()))\n", " print(\"Please adjust query or assignment as required!\")" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# jdbc:redshift://HOST:PORT/DBNAME\n", "import time\n", "import psycopg2\n", "\n", "con=psycopg2.connect(\n", " dbname = connection_info[\"database\"], \n", " host = connection_info[\"host\"], \n", " port = connection_info[\"port\"], \n", " user = connection_info[\"user\"], \n", " password = connection_info[\"password\"]\n", ")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run basic queries against the database\n", "\n", "These queries make use of the [cursor class](https://www.psycopg.org/docs/cursor.html?highlight=arraysize). Also note that all of these queries do not persist to redshift unless you also call COMMIT.\n", "\n", "### Creating tables etc." ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# Set this to true to persist major changes to redshift. \n", "run_commits = False\n", "\n", "# Set this to false if set run_commits to true and want to have the created schema/table persist\n", "run_cleanup = True" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This will query current timestamp from redshift. \n", "The beauty of this query is that it doesn't require any tables to be set up. " ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[(datetime.datetime(2020, 5, 4, 22, 52, 10, 286561),)]\n" ] } ], "source": [ "cur = con.cursor()\n", "cur.execute(\"SELECT sysdate\")\n", "res = cur.fetchall()\n", "print(res)\n", "cur.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a schema. " ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "cur = con.cursor()\n", "cur.execute(\"\"\"\n", " CREATE SCHEMA IF NOT EXISTS pytest2\n", " \"\"\")\n", "cur.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check if we successfully created the schema" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Schema pytest2 successfully created? True\n", "All schemas:\n", "('aws_oracle_ext', 117417, 'rudpot')\n", "('dummy', 125265, 'rudpot')\n", "('information_schema', 17131, 'rdsdb')\n", "('octank', 117022, 'rudpot')\n", "('pg_catalog', 11, 'rdsdb')\n", "('pg_internal', 100, 'rdsdb')\n", "('pg_temp_1', 16924, 'rdsdb')\n", "('pg_toast', 99, 'rdsdb')\n", "('public', 2200, 'rdsdb')\n", "('pytest1', 125273, 'rudpot')\n", "('pytest2', 125356, 'rudpot')\n", "('random', 158043, 'rudpot')\n", "('wh', 158072, 'rudpot')\n" ] } ], "source": [ "cur = con.cursor()\n", "cur.execute(\"\"\"\n", " select s.nspname as table_schema,\n", " s.oid as schema_id, \n", " u.usename as owner\n", " from pg_catalog.pg_namespace s\n", " join pg_catalog.pg_user u on u.usesysid = s.nspowner\n", " order by table_schema;\"\"\")\n", "res = cur.fetchall()\n", "print(\"Schema pytest2 successfully created? \",'pytest2' in [x[0] for x in res])\n", "print(\"All schemas:\")\n", "for ii in res:\n", " print(ii)\n", "cur.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a table in new schema" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "cur = con.cursor()\n", "cur.execute(\"\"\"\n", " CREATE TABLE IF NOT EXISTS pytest2.pytest2 ( \n", " id INTEGER primary key, \n", " name CHARACTER VARYING, \n", " created_at TIMESTAMP without time zone DEFAULT (getdate() at time zone 'utc')\n", " )\n", " \"\"\")\n", "cur.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Insert some data - note that the timestamp is autogenerated based on default" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "cur = con.cursor()\n", "cur.execute(\"\"\"\n", " INSERT INTO pytest2.pytest2 VALUES ( \n", " 1, 'User1'\n", " )\n", " \"\"\")\n", "time.sleep(5)\n", "cur.execute(\"\"\"\n", " INSERT INTO pytest2.pytest2 VALUES ( \n", " 2, 'User2'\n", " )\n", " \"\"\")\n", "cur.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check the results - note the difference of 5s in the final output value" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Data retrieved:\n", "(1, 'User1', datetime.datetime(2020, 4, 2, 2, 6, 20))\n", "(2, 'User2', datetime.datetime(2020, 4, 2, 2, 6, 25))\n" ] } ], "source": [ "cur = con.cursor()\n", "cur.execute(\"SELECT * FROM pytest2.pytest2 LIMIT 2\")\n", "res = cur.fetchall()\n", "print(\"Data retrieved:\")\n", "for ii in res:\n", " print(ii)\n", "cur.close()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Do some basic plotting\n", "\n", "To see how to get data from the cursor. There's likely way better python libraries for this.\n", "\n", "Purely for demonstration purposes we will insert a timestamp to use as x value and a number to use as y value" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "cur = con.cursor()\n", "cur.execute(\"\"\"\n", " CREATE TABLE IF NOT EXISTS pytest2.pytest3 ( \n", " data_time TIMESTAMP without time zone DEFAULT (getdate() at time zone 'utc'),\n", " data_value DOUBLE PRECISION\n", " )\n", " \"\"\")\n", "cur.execute(\"INSERT INTO pytest2.pytest3 VALUES ( 'Jan 1, 2018 10:00:00', 1 )\")\n", "cur.execute(\"INSERT INTO pytest2.pytest3 VALUES ( 'Jan 1, 2018 10:15:00', 4 )\")\n", "cur.execute(\"INSERT INTO pytest2.pytest3 VALUES ( 'Jan 1, 2018 10:30:00', 9 )\")\n", "cur.execute(\"INSERT INTO pytest2.pytest3 VALUES ( 'Jan 1, 2018 10:45:00', 16 )\")\n", "cur.execute(\"INSERT INTO pytest2.pytest3 VALUES ( 'Jan 1, 2018 11:00:00', 25 )\")\n", "cur.close()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The we read it back and use numpy and pyplot to help us with some plotting" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "image/png": "\n", "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "import numpy as np\n", "import matplotlib.pyplot as plt\n", "%matplotlib inline\n", "\n", "cur = con.cursor()\n", "cur.execute(\"SELECT * FROM pytest2.pytest3\")\n", "res = cur.fetchall()\n", "cur.close()\n", "data=np.array(res)\n", "\n", "plt.plot(data[:,0],data[:,1])\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Persist or delete\n", "\n", "Persist changes to redshift if run_commits was set to true. If commit is called then any other tool querying your redshift database will now see your changes. Otherwise these changes are stuck in an open transaction.\n", "\n", "Warning: if you keep the kernel running then any other tool trying to make changes to this schema will be blocking until the transaction is explicitly committed or until it is aborted when the connections is closed. This can become confusing if you keep re-creating connections during testing because then the old connections will not get closed/cleaned up until you restart the kernel." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "if run_commits:\n", " cur = con.cursor()\n", " cur.execute(\"\"\"\n", " COMMIT\n", " \"\"\")\n", " cur.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Clean up. If you didn't select `run_commits` then this is essentially a no-op because all changes were temporary anyway. Note that we are shortcutting table deletion here by using the CASCADE keyword on the schema drop." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Schema pytest2 successfully deleted? True\n", "All schemas:\n", "('aws_oracle_ext', 117417, 'rudpot')\n", "('dummy', 125265, 'rudpot')\n", "('information_schema', 17131, 'rdsdb')\n", "('octank', 117022, 'rudpot')\n", "('pg_catalog', 11, 'rdsdb')\n", "('pg_internal', 100, 'rdsdb')\n", "('pg_temp_1', 16924, 'rdsdb')\n", "('pg_toast', 99, 'rdsdb')\n", "('public', 2200, 'rdsdb')\n", "('pytest1', 125273, 'rudpot')\n", "('random', 158043, 'rudpot')\n", "('wh', 158072, 'rudpot')\n" ] } ], "source": [ "if run_cleanup:\n", " cur = con.cursor()\n", " cur.execute(\"\"\"\n", " DROP SCHEMA pytest2 CASCADE\n", " \"\"\")\n", " cur.execute(\"\"\"\n", " select s.nspname as table_schema,\n", " s.oid as schema_id, \n", " u.usename as owner\n", " from pg_catalog.pg_namespace s\n", " join pg_catalog.pg_user u on u.usesysid = s.nspowner\n", " order by table_schema;\"\"\")\n", " res = cur.fetchall()\n", " print(\"Schema pytest2 successfully deleted? \",not 'pytest2' in [x[0] for x in res])\n", " print(\"All schemas:\")\n", " for ii in res:\n", " print(ii)\n", " cur.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Ensure all changes remain accessible in redshift for later review by setting `run_commits` to true and `run_cleanup` to false. " ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "if run_commits and run_cleanup:\n", " cur = con.cursor()\n", " cur.execute(\"\"\"\n", " COMMIT\n", " \"\"\")\n", " cur.close()\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleanup\n", "\n", "Close cursor to release any pending transactions " ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "con.close()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 4 }