## 3. Read initial & incremental tables from Delta Lake

In [None]:
{
 "type": "DeltaLakeExtract",
 "name": "read initial load table",
 "description": "read initial load table",
 "environments": [
 "dev",
 "test"
 ],
 "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/contact_snapshot/",
 "outputView": "current_snapshot"
}

In [None]:
{
 "type": "DeltaLakeExtract",
 "name": "read contact Delta Lake table",
 "description": "read contact table",
 "environments": [
 "dev",
 "test"
 ],
 "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/delta_load/",
 "outputView": "delta_data"
}

## 3.2 Prepare Datasets for SCD Type2 Insert

- Generate extra rows for changed records.
- The 'null' merge_key means it will be inserted, not update existing records according to the rule in SCD type2

In [None]:
%sql outputView="staged_update" name="generate extra rows for SCD" environments=dev,test

SELECT NULL AS mergeKey, new.*
FROM current_snapshot old
INNER JOIN delta_data new
ON old.id = new.id
WHERE old.iscurrent=true
AND old.checksum<>new.checksum

UNION

SELECT id AS mergeKey, *
FROM delta_data

## 3.3 Implement the Type 2 SCD merge operation

In [None]:
%conf logger=true

In [None]:
{
 "type": "DeltaLakeMergeLoad",
 "name": "merge with existing contacts data",
 "environments": [
 "dev",
 "test"
 ],
 "inputView": "staged_update",
 "outputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/contact_snapshot/"
 "condition": "source.mergeKey = target.id",
 "whenMatchedUpdate": {
 "condition": "target.iscurrent = true AND source.checksum <> target.checksum",
 "values": {
 "valid_to": ${ETL_CONF_CURRENT_TIMESTAMP},
 "iscurrent": false
 }
 },
 "whenNotMatchedByTargetInsert": {},
 "numPartitions": 1
}

## 3.4 Create a Delta Lake table in Athena
### Build up a Glue Data Catalog via Athena. This step can be done by Glue Crawler. However, it makes sense if we refresh partitions, create/update data catalog at the end of each ETL process, which is provides the data lineage contro at a single place.

In [None]:
{
 "type": "JDBCExecute",
 "name": "Create glue data catalog",
 "environments": [
 "dev",
 "test"
 ],
 "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/sql/create_table_contact.sql",
 "jdbcURL": "jdbc:awsathena://AwsRegion="${AWS_DEFAULT_REGION}";S3OutputLocation=s3://"${ETL_CONF_DATALAKE_LOC}"/athena-query-result;AwsCredentialsProviderClass=com.amazonaws.auth.WebIdentityTokenCredentialsProvider",
 "sqlParams":{
 "datalake_loc": "'s3://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/contact_snapshot/_symlink_format_manifest/'",
 "table_name": "default.contact_snapshot"
 }
}

# 4. Query Delta Lake (validation steps)
### to stop executing the followings in a productionized ETL job, use a fake environment `uat`
### the same queries can be run in Athena

In [None]:
{
 "type": "DeltaLakeExtract",
 "name": "read contact Delta Lake table",
 "description": "read contact table",
 "environments": [
 "uat"
 ],
 "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/contact_snapshot",
 "outputView": "contact_snapshot"
}

## Confirm 92 records are expired

In [None]:
%sql outputView="expired_count" name="expired_count" environments=uat
SELECT count(*) FROM contact_snapshot WHERE valid_to is not null

In [None]:
%metadata 
contact_snapshot

 ## Confirm we now have 1192 records

In [None]:
%sql outputView="total_count" name="total_count" environments=uat
SELECT count(*) FROM contact_snapshot

## View one of the changed records

In [None]:
%sql outputView="validate_type2" name="validate_type2" environments=uat
SELECT * FROM contact_snapshot WHERE id=12