In [None]:
%conf numRows=5 logger=true

## 2. Ingest A New Incremental CSV File
### Look at record 12, the `state` is changed in the file

In [None]:
{
  "type": "DelimitedExtract",
  "name": "extract incremental data",
  "environments": ["dev", "test"],
  "inputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/data/update_contacts.csv",
  "outputView": "delta_raw",            
  "delimiter": "Comma",
  "header": false,
  "authentication": {
     "method": "AmazonIAM"
  }
}

## 2.2 Apply Data Type (reused schema file)

In [None]:
{
  "type": "TypingTransform",
  "name": "apply table schema 0 to incremental load",
  "environments": ["dev", "test"],
  "schemaURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/meta/contact_meta_0.json",
  "inputView": "delta_raw",            
  "outputView": "delta_typed",
  "authentication": {
     "method": "AmazonIAM"
  }
}

## 2.3 Data Quality Control (reused sql script)

In [None]:
%sqlvaildate outputView="fail_fast" name="validation" description="fail the job if data transform is failed" environments=dev,test sqlParams=inputView=delta_typed

SELECT SUM(error) = 0 AS valid
      ,TO_JSON(
        NAMED_STRUCT('count', COUNT(error), 'errors', SUM(error))
      ) AS message
FROM 
(
  SELECT CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS error 
  FROM ${inputView}
) base

## 2.4 Add Calculated Fields (reused sql script)

In [None]:
%env 
ETL_CONF_CURRENT_TIMESTAMP=CURRENT_TIMESTAMP()

In [None]:
%sql outputView="update_load" name="add calc field for SCD" environments=dev,test sqlParams=table_name=delta_typed,now=${ETL_CONF_CURRENT_TIMESTAMP}

SELECT id,name,email,state, CAST(${now} AS timestamp) AS valid_from, CAST(null AS timestamp) AS valid_to
,1 AS iscurrent, md5(concat(name,email,state)) AS checksum 
FROM ${table_name}

## 2.5 Output Incremental data to Delta Lake
### Delta Lake is an optimized data lake to support Time Travel, ACID transaction

In [None]:
{
  "type": "DeltaLakeLoad",
  "name": "Initial load to Data Lake",
  "environments": ["dev", "test"],
  "inputView": "update_load",
  "outputURI": "s3a://"${ETL_CONF_DATALAKE_LOC}"/app_code/output/delta_load/",
  "numPartitions": 2
  "saveMode": "Overwrite",
  "authentication": {
     "method": "AmazonIAM"
  }
}