In [None]:
%conf 
numRows=12
showLog=true

# 1. Initial Table Load

In [None]:
{
  "type": "DelimitedExtract",
  "name": "extract initial table",
  "environments": ["dev", "test"],
  "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/data/initial_contacts.csv",
  "outputView": "initial_raw",            
  "delimiter": "Comma",
  "header": false,
  "quote": "None",
  "authentication": {
     "method": "AmazonIAM"
  }
}

## 1.2 Check Original Data Schema

In [None]:
%printschema 
initial_raw

## 1.3 Apply Data Type

In [None]:
{
  "type": "TypingTransform",
  "name": "apply table schema 0",
  "environments": ["dev", "test"],
  "schemaURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/meta/contact_meta_0.json",
  "inputView": "initial_raw",            
  "outputView": "initial_typed",
  "authentication": {
     "method": "AmazonIAM"
  }
}

## 1.4 Check Typed Data Schema & Stats

In [None]:
%printschema 
initial_typed

## 1.5 Data Quality Control

In [None]:
%sqlvaildate outputView="fail_fast" name="validation" description="fail the job if data transform is failed" environments=dev,test sqlParams=inputView=initial_typed

SELECT SUM(error) = 0 AS valid
      ,TO_JSON(
        NAMED_STRUCT('count', COUNT(error), 'errors', SUM(error))
      ) AS message
FROM 
(
  SELECT CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS error 
  FROM ${inputView}
) base

## 1.6 Add Calculated Fields for SCD Type 2
### CURRENT_TIMESTAMP will be passed in automatically, when the ETL job is triggered

In [None]:
%env
ETL_CONF_CURRENT_TIMESTAMP=current_timestamp()

In [None]:
%sql outputView="initial_load" name="add calc field for SCD" environments=dev,test sqlParams=table_name=initial_typed,now=${ETL_CONF_CURRENT_TIMESTAMP}

SELECT id,name,email,state, ${now} AS valid_from, CAST(null AS timestamp) AS valid_to
,1 AS iscurrent, md5(concat(name,email,state)) AS checksum 
FROM ${table_name}

## 1.7 Initial load to Delta Lake
### Delta Lake is an optimized data lake to support Time Travel, ACID transaction

In [None]:
{
  "type": "DeltaLakeLoad",
  "name": "Initial load to Data Lake",
  "environments": ["dev", "test"],
  "inputView": "initial_load",
  "outputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/contact/",
  "numPartitions": 2,
  "saveMode": "Overwrite",
  "authentication": {
     "method": "AmazonIAM"
  }
}

# SCD Type2 Implementation

## 2. Ingest A New Incremental CSV File
### Look at record 12, the `state` is changed in the file

In [None]:
{
  "type": "DelimitedExtract",
  "name": "extract incremental data",
  "environments": ["dev", "test"],
  "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/data/update_contacts.csv",
  "outputView": "delta_raw",            
  "delimiter": "Comma",
  "header": false,
  "authentication": {
     "method": "AmazonIAM"
  }
}

## 2.1 Apply Data Type (reused schema file)

In [None]:
{
  "type": "TypingTransform",
  "name": "apply table schema 0 to incremental load",
  "environments": ["dev", "test"],
  "schemaURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/meta/contact_meta_0.json",
  "inputView": "delta_raw",            
  "outputView": "delta_typed",
  "authentication": {
     "method": "AmazonIAM"
  }
}

## 2.2 Data Quality Control (reused sql script)

In [None]:
%sqlvaildate outputView="fail_fast" name="validation" description="fail the job if data transform is failed" environments=dev,test sqlParams=inputView=delta_typed

SELECT SUM(error) = 0 AS valid
      ,TO_JSON(
        NAMED_STRUCT('count', COUNT(error), 'errors', SUM(error))
      ) AS message
FROM 
(
  SELECT CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS error 
  FROM ${inputView}
) base

## 2.3 Add Calculated Fields (reused sql script)

In [None]:
%env
ETL_CONF_CURRENT_TIMESTAMP=current_timestamp()

In [None]:
%sql outputView="update_load" name="add calc field for SCD" environments=dev,test sqlParams=table_name=delta_typed,now=${ETL_CONF_CURRENT_TIMESTAMP}

SELECT id,name,email,state, ${now} AS valid_from, CAST(null AS timestamp) AS valid_to
,1 AS iscurrent, md5(concat(name,email,state)) AS checksum 
FROM ${table_name}

## 2.4 Prepare Datasets for SCD Type2 Insert

- Generate extra rows for changed records.
- The 'null' merge_key means it will be inserted, not update existing records according to the rule in SCD type2

In [None]:
%sql outputView="staged_update" name="generate extra rows for SCD" environments=dev,test

SELECT NULL AS mergeKey, new.*
FROM initial_load old
INNER JOIN update_load new
ON old.id = new.id
WHERE old.iscurrent=true
AND old.checksum<>new.checksum

UNION

SELECT id AS mergeKey, *
FROM update_load

## 2.5 Perform the Type 2 SCD

In [None]:
{
  "type": "DeltaLakeMergeLoad",
  "name": "merge with existing contacts data",
  "environments": [
    "dev",
    "test"
  ],
  "inputView": "staged_update",
  "outputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/contact/"
  "numPartitions": 2,
  "condition": "source.mergeKey = target.id",
  "whenMatchedUpdate": {
    "condition": "target.iscurrent = true AND source.checksum <> target.checksum",
    "values": {
      "valid_to": ${ETL_CONF_CURRENT_TIMESTAMP},
      "iscurrent": false
    }
  },
  "whenNotMatchedByTargetInsert": {}
}

# 3. Create a Delta Lake table in Athena
### Build up a Glue data catalog from Athena.We are using token based authentication to access Athena, no more long live credentials is required from secrets manager. 

In [None]:
{
  "type": "JDBCExecute",
  "name": "Create glue data catalog",
  "environments": [
    "dev",
    "test"
  ],
  "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/sql/create_table_contact.sql",
  "jdbcURL": "jdbc:awsathena://AwsRegion="${AWS_DEFAULT_REGION}";S3OutputLocation=s3://"${ETL_CONF_DATALAKE_LOC}"/athena-query-result;AwsCredentialsProviderClass=com.amazonaws.auth.WebIdentityTokenCredentialsProvider",
  "sqlParams":{
    "datalake_loc": "'s3://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/contact/_symlink_format_manifest/'",
    "table_name": "default.deltalake_contact_jhub"
  }
}

## 4. Query Delta Lake (optional)
### to skip in a productionized ETL job, use a fake environment `uat`

In [None]:
{
  "type": "DeltaLakeExtract",
  "name": "read contact Delta Lake table",
  "description": "read contact table",
  "environments": [
    "uat"
  ],
  "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/contact/",
  "outputView": "contact"
}

## Confirm 92 records are expired

In [None]:
%sql outputView="expired_count" name="expired_count" environments=uat
SELECT count(*) FROM contact WHERE valid_to is not null

In [None]:
%metadata 
contact

 ## Confirm we now have 1192 records

In [None]:
%sql outputView="total_count" name="total_count" environments=uat
SELECT count(*) FROM contact

## View one of the changed records

In [None]:
%sql outputView="validate_type2" name="validate_type2" environments=uat
SELECT * FROM contact WHERE id=12