{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "### Input Redshift Cluster Endpoint and User\n", "Please input your redshift cluster endpoint and existing user on that cluster." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "REDSHIFT_WORKGROUP_NAME=\"your redshift serverless workgroup name\"\n", "REDSHIFT_DATABASE_NAME=\"your redshift serverless database name\"\n", "REDSHIFT_END_POINT=\"your redshift end point without port number and database name\"\n", "REDSHIFT_USER=\"your redshift super user\"" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Process to GRANT privilege to IAM Role user\n", "This step grants create model permission to the IAM role user account on redshift that is being used in redshift data api." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import redshift_connector\n", "import boto3\n", "\n", "from sagemaker import get_execution_role\n", "\n", "role = get_execution_role()\n", "\n", "iamr_userid=('IAMR:'+role.split('/')[1])\n", "\n", "rs_client = boto3.client('redshift')\n", "serverless_cluster_id = REDSHIFT_WORKGROUP_NAME\n", "response = rs_client.get_cluster_credentials(\n", " DbUser=REDSHIFT_USER, ClusterIdentifier=serverless_cluster_id, AutoCreate=False,\n", " DurationSeconds=3600\n", " )\n", "db_user = response['DbUser']\n", "db_password = response['DbPassword']\n", "conn = redshift_connector.connect(\n", " host=REDSHIFT_END_POINT,\n", " database=REDSHIFT_DATABASE_NAME,\n", " user=db_user,\n", " password=db_password\n", " )\n", "\n", "cursor = conn.cursor()\n", "conn.autocommit = True\n", "cursor.execute(\"grant create model to \" + '\"'+ iamr_userid +'\";')\n", "conn.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Setup Run SQL function using Redshift Data API to get SQL query output directly into pandas dataframe\n", "In this step, we are creating function run_sql, which we will use to get SQL query output directly into pandas dataframe. We will also use this function to run DDL statements" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import time\n", "import pandas as pd\n", "import numpy as np\n", "import matplotlib.pyplot as plt\n", "\n", "session = boto3.session.Session()\n", "region = session.region_name\n", "\n", "\n", "def run_sql(sql_text):\n", " client = boto3.client(\"redshift-data\")\n", " res = client.execute_statement(Database=REDSHIFT_DATABASE_NAME, WorkgroupName=REDSHIFT_WORKGROUP_NAME, Sql=sql_text)\n", " query_id = res[\"Id\"]\n", " done = False\n", " while not done:\n", " time.sleep(1)\n", " status_description = client.describe_statement(Id=query_id)\n", " status = status_description[\"Status\"]\n", " if status == \"FAILED\":\n", " raise Exception('SQL query failed:' + query_id + \": \" + status_description[\"Error\"])\n", " elif status == \"FINISHED\":\n", " if status_description['ResultRows']>0:\n", " results = client.get_statement_result(Id=query_id)\n", " column_labels = []\n", " for i in range(len(results[\"ColumnMetadata\"])): column_labels.append(results[\"ColumnMetadata\"][i]['label'])\n", " records = []\n", " for record in results.get('Records'):\n", " records.append([list(rec.values())[0] for rec in record])\n", " df = pd.DataFrame(np.array(records), columns=column_labels)\n", " return df\n", " else:\n", " return query_id\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data Preparation Script\n", "Data preparation script to be run on Redshift" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "setup_script = \"\"\"\n", "DROP TABLE IF EXISTS ridership CASCADE;\n", "DROP TABLE IF EXISTS weather CASCADE;\n", "DROP TABLE IF EXISTS holiday CASCADE;\n", "DROP TABLE IF EXISTS trip_data CASCADE;\n", "\n", "CREATE TABLE IF NOT EXISTS ridership\n", "( trip_id INT\n", ", trip_duration_seconds INT\n", ", trip_start_time timestamp\n", ", trip_stop_time timestamp\n", ", from_station_name VARCHAR(50)\n", ", to_station_name VARCHAR(50)\n", ", from_station_id SMALLINT\n", ", to_station_id SMALLINT\n", ", user_type VARCHAR(20));\n", "\n", "CREATE TABLE IF NOT EXISTS weather\n", "( longitude_x DECIMAL(5,2)\n", ", latitude_y DECIMAL(5,2)\n", ", station_name VARCHAR(20)\n", ", climate_id BIGINT\n", ", datetime_utc TIMESTAMP\n", ", weather_year SMALLINT\n", ", weather_month SMALLINT\n", ", weather_day SMALLINT\n", ", time_utc VARCHAR(5)\n", ", temp_c DECIMAL(5,2)\n", ", temp_flag VARCHAR(1)\n", ", dew_point_temp_c DECIMAL(5,2)\n", ", dew_point_temp_flag VARCHAR(1)\n", ", rel_hum SMALLINT\n", ", rel_hum_flag VARCHAR(1)\n", ", precip_amount_mm DECIMAL(5,2)\n", ", precip_amount_flag VARCHAR(1)\n", ", wind_dir_10s_deg VARCHAR(10)\n", ", wind_dir_flag VARCHAR(1)\n", ", wind_spd_kmh VARCHAR(10)\n", ", wind_spd_flag VARCHAR(1)\n", ", visibility_km VARCHAR(10)\n", ", visibility_flag VARCHAR(1)\n", ", stn_press_kpa DECIMAL(5,2)\n", ", stn_press_flag VARCHAR(1)\n", ", hmdx SMALLINT\n", ", hmdx_flag VARCHAR(1)\n", ", wind_chill VARCHAR(10)\n", ", wind_chill_flag VARCHAR(1)\n", ", weather VARCHAR(10));\n", "\n", "CREATE TABLE IF NOT EXISTS holiday\n", "( holiday_date DATE\n", ", description VARCHAR(100));\n", "\n", "\n", "COPY ridership FROM \n", "'s3://redshift-ml-bikesharing-data/bike-sharing-data/ridership/'\n", "IAM_ROLE 'arn:aws:iam:::role/RedshiftML'\n", "FORMAT csv IGNOREHEADER 1 DATEFORMAT 'auto' TIMEFORMAT 'auto' REGION 'us-west-2' gzip;\n", "\n", "COPY weather FROM\n", "'s3://redshift-ml-bikesharing-data/bike-sharing-data/weather/'\n", "IAM_ROLE 'arn:aws:iam:::role/RedshiftML'\n", "FORMAT csv IGNOREHEADER 1 DATEFORMAT 'auto' TIMEFORMAT 'auto' REGION 'us-west-2' gzip;\n", "\n", "COPY holiday FROM\n", "'s3://redshift-ml-bikesharing-data/bike-sharing-data/holiday/'\n", "IAM_ROLE 'arn:aws:iam:::role/RedshiftML'\n", "FORMAT csv IGNOREHEADER 1 DATEFORMAT 'auto' TIMEFORMAT 'auto' REGION 'us-west-2' gzip;\n", "\n", "CREATE OR REPLACE VIEW ridership_view AS\n", "SELECT\n", " trip_time\n", " , trip_count\n", " , TO_CHAR(trip_time,'hh24') ::INT trip_hour\n", " , TO_CHAR(trip_time, 'dd') :: INT trip_day\n", " , TO_CHAR(trip_time, 'mm') :: INT trip_month\n", " , TO_CHAR(trip_time, 'yy') :: INT trip_year\n", " , TO_CHAR(trip_time, 'q') :: INT trip_quarter\n", " , TO_CHAR(trip_time, 'w') :: INT trip_month_week\n", " , TO_CHAR(trip_time, 'd') :: INT trip_week_day\n", "FROM \n", " (SELECT \n", " CASE\n", " WHEN TRUNC(r.trip_start_time) < '2017-07-01'::DATE\n", " THEN CONVERT_TIMEZONE('US/Eastern', DATE_TRUNC('hour',r.trip_start_time))\n", " ELSE DATE_TRUNC('hour',r.trip_start_time)\n", " END trip_time\n", " , COUNT(1) trip_count\n", " FROM \n", " ridership r\n", " WHERE r.trip_duration_seconds BETWEEN 60 AND 60 * 60 * 24\n", " GROUP BY\n", " 1);\n", "\n", "CREATE OR REPLACE VIEW weather_view AS\n", "SELECT \n", " CONVERT_TIMEZONE('US/Eastern', \n", " DATE_TRUNC('hour',datetime_utc)) daytime\n", " , ROUND(AVG(temp_c)) temp_c\n", " , ROUND(AVG(precip_amount_mm)) precip_amount_mm\n", "FROM weather\n", "GROUP BY 1;\n", "\n", "DROP TABLE IF EXISTS trip_data;\n", "CREATE TABLE trip_data AS \n", "SELECT \n", " r.trip_time\n", " ,r.trip_count\n", " ,r.trip_hour\n", " ,r.trip_day\n", " ,r.trip_month\n", " ,r.trip_year\n", " ,r.trip_quarter\n", " ,r.trip_month_week\n", " ,r.trip_week_day\n", " ,w.temp_c\n", " ,w.precip_amount_mm\n", " ,CASE\n", " WHEN h.holiday_date IS NOT NULL\n", " THEN 1\n", " WHEN TO_CHAR(r.trip_time,'D')::INT IN (1,7)\n", " THEN 1\n", " ELSE 0\n", " END is_holiday\n", " , ROW_NUMBER() OVER (ORDER BY RANDOM()) serial_number\n", "FROM \n", " ridership_view r\n", "JOIN weather_view w\n", " ON ( r.trip_time = w.daytime )\n", "LEFT OUTER JOIN holiday h\n", " ON ( TRUNC(r.trip_time) = h.holiday_date );\n", "\n", "\"\"\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run data preparation script in Redshift" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for sql_text in setup_script.split(\";\"):\n", " run_sql(sql_text);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read SQL output with Pandas Dataframe" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = run_sql(\"select trip_count, trip_time from trip_data\");\n", "df.head(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Visualize data using matplotlib" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plt.hist(df.trip_count)\n", "plt.xlabel('trip count')\n", "plt.title('trip count histogram')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = run_sql(\"select round(trip_count/100) trip_count, trip_hour from trip_data\");\n", "df.head(10)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plt.scatter(df.trip_hour,df.trip_count)\n", "plt.ylabel('trip count (100x)')\n", "plt.xlabel('trip hour')\n", "plt.title('trip count vs hour of day scatter plot ')\n", "plt.grid(True)\n", "plt.tight_layout()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run Create Model statement to create a new ML model with REdshift ML" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "run_sql(\"\"\"\n", "CREATE MODEL predict_rental_count \n", "FROM (SELECT\n", " trip_count,trip_hour,trip_day,trip_month,trip_year,trip_quarter,\n", " trip_month_week, trip_week_day, temp_c, precip_amount_mm, is_holiday\n", "FROM trip_data)\n", "TARGET trip_count\n", "FUNCTION predict_rental_count\n", "IAM_ROLE 'arn:aws:iam:::role/RedshiftML'\n", "PROBLEM_TYPE regression\n", "OBJECTIVE 'mse'\n", "SETTINGS (s3_bucket 'redshiftml-')\n", "\"\"\");" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View Model Progress " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = run_sql(\"SHOW MODEL predict_rental_count\")\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run Prediction and compare actual vs predicted\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = run_sql(\"\"\"\n", "SELECT trip_time, actual_count, predicted_count, ( actual_count - predicted_count ) difference\n", "FROM \n", "(SELECT\n", " trip_time\n", ",trip_count AS actual_count\n", ",PREDICT_RENTAL_COUNT (trip_hour, trip_day, trip_month, trip_year, trip_quarter, trip_month_week, trip_week_day, temp_c, precip_amount_mm, is_holiday) predicted_count\n", "FROM trip_data\n", ") LIMIT 5;\n", "\"\"\");\n", "df\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.1" }, "vscode": { "interpreter": { "hash": "aee8b7b246df8f9039afb4144a1f6fd8d2ca17a180786b69acc140d282b71a49" } } }, "nbformat": 4, "nbformat_minor": 4 }