{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "5b2c259d-772d-43e9-8267-388878cc4661",
   "metadata": {},
   "source": [
    "# Lab 3"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9f65537d-341e-4db1-9c23-47683e7de0f2",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>4. Specify connection details of the Amazon Redshift serverless endpoint we created.</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "204b4bef-6a7c-4bd7-8474-e16a82a066cc",
   "metadata": {},
   "outputs": [],
   "source": [
    "endpoint       = 'default.929963627956.us-east-1.redshift-serverless.amazonaws.com' # CHANGE THIS VALUE\n",
    "admin_username = 'admin'\n",
    "admin_password = 'Password123' # CHANGE THIS VALUE\n",
    "db_name        = 'dev'\n",
    "port           = 5439"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "334156a5-df1a-4742-9f0d-fe7c2d380512",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>5. Build the connection url used by [SQLAlchemy for Redshift](https://aws.amazon.com/blogs/big-data/use-the-amazon-redshift-sqlalchemy-dialect-to-interact-with-amazon-redshift/).</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fbeebb7d-12aa-4007-a35b-35d353b01fba",
   "metadata": {},
   "outputs": [],
   "source": [
    "from sqlalchemy.engine.url import URL\n",
    "redshift_url = URL.create(\n",
    "    drivername='redshift+redshift_connector',\n",
    "    host=endpoint,\n",
    "    port=port,\n",
    "    database=db_name,\n",
    "    username=admin_username,\n",
    "    password=admin_password\n",
    ")\n",
    "%reload_ext sql\n",
    "%config SqlMagic.displaylimit = 10\n",
    "%config SqlMagic.displaycon = False\n",
    "%sql $redshift_url"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6cfb36ce-c917-42c8-a0a7-3745edbf239f",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>6. Query data in Redshift in plain SQL.</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7eaab270-bcc2-458a-b2b0-2dddbfc1f48d",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT current_user, current_date;"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e789fd65-9d52-433b-a471-dfc9609a5d72",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>7. For Python users, data queried from Redshift can be converted into a Pandas dataframe.</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b34290e4-7aaa-41dc-947f-521a831ca1cc",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "result_set = %sql SELECT current_user, current_date\n",
    "user_df = result_set.DataFrame()\n",
    "print(type(user_df))\n",
    "user_df"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "63460f14-0c0a-46d0-8ba2-a051fc030391",
   "metadata": {},
   "source": [
    "# Lab 5\n",
    "#### <font color='#e28743'>1.1 From Jupyter notebook, start by creating an external schema in Amazon Redshift to reference external metadata.</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "19dcaecb-4396-4a46-85de-781079857870",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "CREATE EXTERNAL SCHEMA IF NOT EXISTS external_spectrumdb\n",
    "FROM DATA CATALOG\n",
    "DATABASE 'spectrumdb'\n",
    "IAM_ROLE default;"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5231153f-5f27-468b-b8f2-efdedf980024",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>1.2 After external schema is created, you can list available external schemas.</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "360597a6-b6c8-44d5-8588-daae98fc5546",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT schemaname, databasename FROM svv_external_schemas WHERE schemaname='external_spectrumdb';"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "825625ab-9ab0-4272-be3c-2f415e4e852f",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>1.3 You can also list available external tables in the external schema. Note that customer data resides in Amazon S3 and is not loaded into Amazon Redshift.</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "938bfb9b-a769-42e0-8049-ab601fd0726e",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT schemaname, tablename, location FROM svv_external_tables WHERE schemaname='external_spectrumdb';"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a8551d3b-b6c2-488c-8d01-673ef5e7d9ac",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>2.1 You can query external table customer to use Amazon Redshift Spectrum to query data residing in Amazon S3.</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1cced0c3-9329-486f-b5f3-81194cfbdda3",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM external_spectrumdb.customer;"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "aa869ff6-45f3-44db-ba48-3019b4b13711",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>2.2 Amazon Redshift Spectrum allows you to query external data using standard SQL. You can explore querying the data to aggregate and filter to gather population insights.</font>"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5b94fcf1",
   "metadata": {},
   "source": [
    "Query 1: Analyse customer gender"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "10601f6a-c723-481d-a821-b3c538b4e06c",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT sex as gender, count(*) FROM external_spectrumdb.customer GROUP BY sex;"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "59b57bc1",
   "metadata": {},
   "source": [
    "Query 2: Add additional filter on state"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a1899d67-cbba-4321-867d-b290050565df",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT sex as gender, count(*) FROM external_spectrumdb.customer WHERE state='VIC' GROUP BY sex;"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6a87ca25-4876-4f92-86fb-c37e834e509f",
   "metadata": {},
   "source": [
    "# Lab 6\n",
    "#### <font color='#e28743'>1. Create an external schema to establish connection between Amazon Redshift and Amazon Kinesis Data Stream.</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cdecde31-30fe-44b6-a9d4-c77db809162f",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "CREATE EXTERNAL SCHEMA IF NOT EXISTS kinesis_schema\n",
    "FROM KINESIS\n",
    "IAM_ROLE default;"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5dce51fd-1ec2-424c-9384-1125de00c7ac",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>2. Create a materialized view to ingest streaming data into Redshift. This uses the new Redshift streaming feature. The data in Kinesis data stream is in JSON format and this can be ingested as-is into Redshift using the SUPER data type.</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3249f2ba-09b9-4de3-9d93-cff986b90f44",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "CREATE MATERIALIZED VIEW order_stream_option_1 AS\n",
    "SELECT ApproximateArrivalTimestamp, JSON_PARSE(from_varbyte(Data, 'utf-8')) order_json\n",
    "FROM kinesis_schema.order_stream\n",
    "WHERE is_utf8(Data) AND is_valid_json(from_varbyte(Data, 'utf-8'));"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ee37f717-a04d-4a12-831e-aa5e6d590ccd",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>3. Refresh the materialized view. This is where the actual data ingestion happens. Data gets loaded from the Kinesis data stream into Amazon Redshift without having to stage it first in Amazon S3. This achieves faster performance and improved latency.</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "91ae4d75-563e-400b-ac0a-885bab455952",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "REFRESH MATERIALIZED VIEW order_stream_option_1;"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5084040e-344d-4cae-ae2d-11b03cb17940",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>4. You can query the streaming data.</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c16338f7-ab40-4315-9071-6f8ee12f2c35",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM order_stream_option_1 LIMIT 5;"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5ef7f5e9-2aa8-42fe-bde8-08cb7d5ec821",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>5. You can also query the streaming data and unpack individual attributes in the super data type. In this example, you are extracting the delivery state and origin state attributes from JSON. Using this information, you can identify what is the top 5 busiest consignment routes between states.</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "12db1597-7264-4383-8955-1d6f82557f61",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT order_json.delivery_state::VARCHAR, order_json.origin_state::VARCHAR, count(1) \n",
    "FROM order_stream_option_1\n",
    "GROUP BY order_json.delivery_state, order_json.origin_state\n",
    "ORDER BY count(1) DESC;"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e9cb60e8-54ab-4949-b748-c016d4bd30b0",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>6. You can combine Step 2 and Step 5 to create a materialized view that ingest streaming data into Redshift and unpack individual attributes in the super data type.</font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "0240adc4-4ee8-48d7-a1f7-f3a7cae57d57",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "CREATE MATERIALIZED VIEW order_stream_option_2 AS\n",
    "SELECT\n",
    "    ApproximateArrivalTimestamp as order_timestamp, \n",
    "    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'consignmentid',     true)::BIGINT       as consignmentid,\n",
    "    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'timestamp',         true)::VARCHAR(50)  as original_order_date,\n",
    "    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_address',  true)::VARCHAR(100) as delivery_address,\n",
    "    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_state',    true)::VARCHAR(50)  as delivery_state,\n",
    "    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'origin_address',    true)::VARCHAR(100) as origin_address,\n",
    "    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'origin_state',      true)::VARCHAR(50)  as origin_state,\n",
    "    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delay_probability', true)::VARCHAR(10)  as delay_probability,\n",
    "    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'days_to_deliver',   true)::INT          as days_to_deliver,\n",
    "    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_distance', true)::FLOAT        as delivery_distance,\n",
    "    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'userid',            true)::INT          as userid,\n",
    "    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'revenue',           true)::FLOAT        as revenue,\n",
    "    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'cost',              true)::FLOAT        as cost\n",
    "FROM kinesis_schema.order_stream\n",
    "WHERE is_utf8(Data) AND is_valid_json(from_varbyte(Data, 'utf-8'));"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4df048cf-d0f8-44b4-8ca5-3069b4e47bfd",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>7. You can query the most recent transactions that have been ingested into Redshift using this select statement. It compares the current_timestamp with the ApproximateArrivalTimestamp to measure ingestion latency. Wait for a few seconds and rerun the same query in a different cell. Notice that the data has changed due to the near realtime capability of Redshift.</font>"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "11b20d60",
   "metadata": {},
   "source": [
    "Query to get ingestion latency"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b679747c-79cd-428f-8a92-0e6c625860c4",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "REFRESH MATERIALIZED VIEW order_stream_option_2;\n",
    "SELECT current_timestamp, current_timestamp-order_timestamp as time_diff, * \n",
    "FROM order_stream_option_2\n",
    "ORDER BY order_timestamp desc LIMIT 2;"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a1569e25",
   "metadata": {},
   "source": [
    "Rerun same query after 5 seconds to see data streaming changes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f850811f",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "REFRESH MATERIALIZED VIEW order_stream_option_2;\n",
    "SELECT current_timestamp, current_timestamp-order_timestamp as time_diff, * \n",
    "FROM order_stream_option_2\n",
    "ORDER BY order_timestamp desc LIMIT 2;"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e5d52d16",
   "metadata": {},
   "source": [
    "#### <font color='#e28743'>8. Now that we have ingested data from both Amazon S3 and Amazon Kinesis we can analyse both historical and streaming data in the same SQL statement</font>"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d37da4c5",
   "metadata": {},
   "source": [
    "Query 1: Join Data between order data stream and the customer data in S3."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "6da15d5e",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM order_stream_option_2 kinesis_order \n",
    "INNER JOIN external_spectrumdb.customer s3_customer\n",
    "ON kinesis_order.userid = s3_customer.userid\n",
    "LIMIT 5"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "98a49a9d",
   "metadata": {},
   "source": [
    "Query 2: Find out which companies has most orders"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "20520dd0",
   "metadata": {
    "vscode": {
     "languageId": "sql"
    }
   },
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT s3_customer.company, count(kinesis_order.consignmentid) FROM order_stream_option_2 kinesis_order \n",
    "INNER JOIN external_spectrumdb.customer s3_customer\n",
    "ON kinesis_order.userid = s3_customer.userid\n",
    "group by s3_customer.company\n",
    "order by count(consignmentid) desc\n",
    "LIMIT 5"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4388e7de-bb4d-47cb-a19b-4b55384242d7",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_python3",
   "language": "python",
   "name": "conda_python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}