In [None]:
%conf numRows=5 logger=true

# 1. Initial Table Load

In [None]:
{
 "type": "DelimitedExtract",
 "name": "extract initial table",
 "environments": ["dev", "test"],
 "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/data/initial_contacts.csv",
 "outputView": "initial_raw", 
 "delimiter": "Comma",
 "header": false,
 "quote": "None",
 "authentication": {
 "method": "AmazonIAM"
 }
}

## Check Original Data Schema

In [None]:
%printschema 
initial_raw

## 1.2 Apply Data Type

In [None]:
{
 "type": "TypingTransform",
 "name": "apply table schema 0",
 "environments": ["dev", "test"],
 "schemaURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/meta/contact_meta_0.json",
 "inputView": "initial_raw", 
 "outputView": "initial_typed",
 "authentication": {
 "method": "AmazonIAM"
 }
}

## Check Typed Data Schema & Stats

In [None]:
%printschema 
initial_typed

## 1.3 Data Quality Control

In [None]:
%sqlvaildate outputView="fail_fast" name="validation" description="fail the job if data transform is failed" environments=dev,test sqlParams=inputView=initial_typed

SELECT SUM(error) = 0 AS valid
 ,TO_JSON(
 NAMED_STRUCT('count', COUNT(error), 'errors', SUM(error))
 ) AS message
FROM 
(
 SELECT CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS error 
 FROM ${inputView}
) base

## 1.4 Add Calculated Fields for SCD Type 2
### CURRENT_TIMESTAMP will be passed in automatically, when the ETL job is triggered

In [None]:
%env 
ETL_CONF_CURRENT_TIMESTAMP=current_timestamp()

In [None]:
%sql outputView="initial_load" name="add calc field for SCD" environments=dev,test sqlParams=table_name=initial_typed,now=${ETL_CONF_CURRENT_TIMESTAMP}

SELECT id,name,email,state, CAST(${now} AS timestamp) AS valid_from, CAST(null AS timestamp) AS valid_to
,1 AS iscurrent, md5(concat(name,email,state)) AS checksum 
FROM ${table_name}

## 1.5 Load to Delta Lake as the initial daily snaptshot table
### Delta Lake is an optimized data lake to support Time Travel, ACID transaction

In [None]:
{
 "type": "DeltaLakeLoad",
 "name": "Initial load to Data Lake",
 "environments": ["dev", "test"],
 "inputView": "initial_load",
 "outputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/contact_snapshot/",
 "numPartitions": 2
 "saveMode": "Overwrite",
 "authentication": {
 "method": "AmazonIAM"
 }
}