# Lab 3

#### <font color='#e28743'>4. Specify connection details of the Amazon Redshift serverless endpoint we created.</font>

In [None]:
endpoint       = 'default.929963627956.us-east-1.redshift-serverless.amazonaws.com' # CHANGE THIS VALUE
admin_username = 'admin'
admin_password = 'Password123' # CHANGE THIS VALUE
db_name        = 'dev'
port           = 5439

#### <font color='#e28743'>5. Build the connection url used by [SQLAlchemy for Redshift](https://aws.amazon.com/blogs/big-data/use-the-amazon-redshift-sqlalchemy-dialect-to-interact-with-amazon-redshift/).</font>

In [None]:
from sqlalchemy.engine.url import URL
redshift_url = URL.create(
    drivername='redshift+redshift_connector',
    host=endpoint,
    port=port,
    database=db_name,
    username=admin_username,
    password=admin_password
)
%reload_ext sql
%config SqlMagic.displaylimit = 10
%config SqlMagic.displaycon = False
%sql $redshift_url

#### <font color='#e28743'>6. Query data in Redshift in plain SQL.</font>

In [None]:
%%sql
SELECT current_user, current_date;

#### <font color='#e28743'>7. For Python users, data queried from Redshift can be converted into a Pandas dataframe.</font>

In [None]:
result_set = %sql SELECT current_user, current_date
user_df = result_set.DataFrame()
print(type(user_df))
user_df

# Lab 5
#### <font color='#e28743'>1.1 From Jupyter notebook, start by creating an external schema in Amazon Redshift to reference external metadata.</font>

In [None]:
%%sql
CREATE EXTERNAL SCHEMA IF NOT EXISTS external_spectrumdb
FROM DATA CATALOG
DATABASE 'spectrumdb'
IAM_ROLE default;

#### <font color='#e28743'>1.2 After external schema is created, you can list available external schemas.</font>

In [None]:
%%sql
SELECT schemaname, databasename FROM svv_external_schemas WHERE schemaname='external_spectrumdb';

#### <font color='#e28743'>1.3 You can also list available external tables in the external schema. Note that customer data resides in Amazon S3 and is not loaded into Amazon Redshift.</font>

In [None]:
%%sql
SELECT schemaname, tablename, location FROM svv_external_tables WHERE schemaname='external_spectrumdb';

#### <font color='#e28743'>2.1 You can query external table customer to use Amazon Redshift Spectrum to query data residing in Amazon S3.</font>

In [None]:
%%sql
SELECT * FROM external_spectrumdb.customer;

#### <font color='#e28743'>2.2 Amazon Redshift Spectrum allows you to query external data using standard SQL. You can explore querying the data to aggregate and filter to gather population insights.</font>

Query 1: Analyse customer gender

In [None]:
%%sql
SELECT sex as gender, count(*) FROM external_spectrumdb.customer GROUP BY sex;

Query 2: Add additional filter on state

In [None]:
%%sql
SELECT sex as gender, count(*) FROM external_spectrumdb.customer WHERE state='VIC' GROUP BY sex;

# Lab 6
#### <font color='#e28743'>1. Create an external schema to establish connection between Amazon Redshift and Amazon Kinesis Data Stream.</font>

In [None]:
%%sql
CREATE EXTERNAL SCHEMA IF NOT EXISTS kinesis_schema
FROM KINESIS
IAM_ROLE default;

#### <font color='#e28743'>2. Create a materialized view to ingest streaming data into Redshift. This uses the new Redshift streaming feature. The data in Kinesis data stream is in JSON format and this can be ingested as-is into Redshift using the SUPER data type.</font>

In [None]:
%%sql
CREATE MATERIALIZED VIEW order_stream_option_1 AS
SELECT ApproximateArrivalTimestamp, JSON_PARSE(from_varbyte(Data, 'utf-8')) order_json
FROM kinesis_schema.order_stream
WHERE is_utf8(Data) AND is_valid_json(from_varbyte(Data, 'utf-8'));

#### <font color='#e28743'>3. Refresh the materialized view. This is where the actual data ingestion happens. Data gets loaded from the Kinesis data stream into Amazon Redshift without having to stage it first in Amazon S3. This achieves faster performance and improved latency.</font>

In [None]:
%%sql
REFRESH MATERIALIZED VIEW order_stream_option_1;

#### <font color='#e28743'>4. You can query the streaming data.</font>

In [None]:
%%sql
SELECT * FROM order_stream_option_1 LIMIT 5;

#### <font color='#e28743'>5. You can also query the streaming data and unpack individual attributes in the super data type. In this example, you are extracting the delivery state and origin state attributes from JSON. Using this information, you can identify what is the top 5 busiest consignment routes between states.</font>

In [None]:
%%sql
SELECT order_json.delivery_state::VARCHAR, order_json.origin_state::VARCHAR, count(1) 
FROM order_stream_option_1
GROUP BY order_json.delivery_state, order_json.origin_state
ORDER BY count(1) DESC;

#### <font color='#e28743'>6. You can combine Step 2 and Step 5 to create a materialized view that ingest streaming data into Redshift and unpack individual attributes in the super data type.</font>

In [None]:
%%sql
CREATE MATERIALIZED VIEW order_stream_option_2 AS
SELECT
    ApproximateArrivalTimestamp as order_timestamp, 
    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'consignmentid',     true)::BIGINT       as consignmentid,
    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'timestamp',         true)::VARCHAR(50)  as original_order_date,
    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_address',  true)::VARCHAR(100) as delivery_address,
    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_state',    true)::VARCHAR(50)  as delivery_state,
    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'origin_address',    true)::VARCHAR(100) as origin_address,
    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'origin_state',      true)::VARCHAR(50)  as origin_state,
    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delay_probability', true)::VARCHAR(10)  as delay_probability,
    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'days_to_deliver',   true)::INT          as days_to_deliver,
    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'delivery_distance', true)::FLOAT        as delivery_distance,
    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'userid',            true)::INT          as userid,
    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'revenue',           true)::FLOAT        as revenue,
    JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'cost',              true)::FLOAT        as cost
FROM kinesis_schema.order_stream
WHERE is_utf8(Data) AND is_valid_json(from_varbyte(Data, 'utf-8'));

#### <font color='#e28743'>7. You can query the most recent transactions that have been ingested into Redshift using this select statement. It compares the current_timestamp with the ApproximateArrivalTimestamp to measure ingestion latency. Wait for a few seconds and rerun the same query in a different cell. Notice that the data has changed due to the near realtime capability of Redshift.</font>

Query to get ingestion latency

In [None]:
%%sql
REFRESH MATERIALIZED VIEW order_stream_option_2;
SELECT current_timestamp, current_timestamp-order_timestamp as time_diff, * 
FROM order_stream_option_2
ORDER BY order_timestamp desc LIMIT 2;

Rerun same query after 5 seconds to see data streaming changes

In [None]:
%%sql
REFRESH MATERIALIZED VIEW order_stream_option_2;
SELECT current_timestamp, current_timestamp-order_timestamp as time_diff, * 
FROM order_stream_option_2
ORDER BY order_timestamp desc LIMIT 2;

#### <font color='#e28743'>8. Now that we have ingested data from both Amazon S3 and Amazon Kinesis we can analyse both historical and streaming data in the same SQL statement</font>

Query 1: Join Data between order data stream and the customer data in S3.

In [None]:
%%sql
SELECT * FROM order_stream_option_2 kinesis_order 
INNER JOIN external_spectrumdb.customer s3_customer
ON kinesis_order.userid = s3_customer.userid
LIMIT 5

Query 2: Find out which companies has most orders

In [None]:
%%sql
SELECT s3_customer.company, count(kinesis_order.consignmentid) FROM order_stream_option_2 kinesis_order 
INNER JOIN external_spectrumdb.customer s3_customer
ON kinesis_order.userid = s3_customer.userid
group by s3_customer.company
order by count(consignmentid) desc
LIMIT 5