{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%conf \n", "numRows=12\n", "showLog=true" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 1. Initial Table Load" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DelimitedExtract\",\n", " \"name\": \"extract initial table\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"inputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/data/initial_contacts.csv\",\n", " \"outputView\": \"initial_raw\", \n", " \"delimiter\": \"Comma\",\n", " \"header\": false,\n", " \"quote\": \"None\",\n", " \"authentication\": {\n", " \"method\": \"AmazonIAM\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.2 Check Original Data Schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%printschema \n", "initial_raw" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "start_time": "2020-03-03T08:30:30.028Z" } }, "source": [ "## 1.3 Apply Data Type" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"TypingTransform\",\n", " \"name\": \"apply table schema 0\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"schemaURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/meta/contact_meta_0.json\",\n", " \"inputView\": \"initial_raw\", \n", " \"outputView\": \"initial_typed\",\n", " \"authentication\": {\n", " \"method\": \"AmazonIAM\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.4 Check Typed Data Schema & Stats" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%printschema \n", "initial_typed" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.5 Data Quality Control" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sqlvaildate outputView=\"fail_fast\" name=\"validation\" description=\"fail the job if data transform is failed\" environments=dev,test sqlParams=inputView=initial_typed\n", "\n", "SELECT SUM(error) = 0 AS valid\n", " ,TO_JSON(\n", " NAMED_STRUCT('count', COUNT(error), 'errors', SUM(error))\n", " ) AS message\n", "FROM \n", "(\n", " SELECT CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS error \n", " FROM ${inputView}\n", ") base" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.6 Add Calculated Fields for SCD Type 2\n", "### CURRENT_TIMESTAMP will be passed in automatically, when the ETL job is triggered" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%env\n", "ETL_CONF_CURRENT_TIMESTAMP=current_timestamp()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"initial_load\" name=\"add calc field for SCD\" environments=dev,test sqlParams=table_name=initial_typed,now=${ETL_CONF_CURRENT_TIMESTAMP}\n", "\n", "SELECT id,name,email,state, ${now} AS valid_from, CAST(null AS timestamp) AS valid_to\n", ",1 AS iscurrent, md5(concat(name,email,state)) AS checksum \n", "FROM ${table_name}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.7 Initial load to Delta Lake\n", "### Delta Lake is an optimized data lake to support Time Travel, ACID transaction" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DeltaLakeLoad\",\n", " \"name\": \"Initial load to Data Lake\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"inputView\": \"initial_load\",\n", " \"outputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/output/contact/\",\n", " \"numPartitions\": 2,\n", " \"saveMode\": \"Overwrite\",\n", " \"authentication\": {\n", " \"method\": \"AmazonIAM\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "end_time": "2020-05-31T04:55:34.761654Z", "start_time": "2020-05-31T04:55:34.738Z" } }, "source": [ "# SCD Type2 Implementation" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "end_time": "2020-03-18T22:38:05.895407Z", "start_time": "2020-03-18T22:37:48.160Z" } }, "source": [ "## 2. Ingest A New Incremental CSV File\n", "### Look at record 12, the `state` is changed in the file" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DelimitedExtract\",\n", " \"name\": \"extract incremental data\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"inputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/data/update_contacts.csv\",\n", " \"outputView\": \"delta_raw\", \n", " \"delimiter\": \"Comma\",\n", " \"header\": false,\n", " \"authentication\": {\n", " \"method\": \"AmazonIAM\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2.1 Apply Data Type (reused schema file)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"TypingTransform\",\n", " \"name\": \"apply table schema 0 to incremental load\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"schemaURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/meta/contact_meta_0.json\",\n", " \"inputView\": \"delta_raw\", \n", " \"outputView\": \"delta_typed\",\n", " \"authentication\": {\n", " \"method\": \"AmazonIAM\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "end_time": "2020-06-07T15:02:50.155313Z", "start_time": "2020-06-07T15:02:50.125Z" } }, "source": [ "## 2.2 Data Quality Control (reused sql script)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sqlvaildate outputView=\"fail_fast\" name=\"validation\" description=\"fail the job if data transform is failed\" environments=dev,test sqlParams=inputView=delta_typed\n", "\n", "SELECT SUM(error) = 0 AS valid\n", " ,TO_JSON(\n", " NAMED_STRUCT('count', COUNT(error), 'errors', SUM(error))\n", " ) AS message\n", "FROM \n", "(\n", " SELECT CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS error \n", " FROM ${inputView}\n", ") base" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "end_time": "2020-05-31T05:01:13.796275Z", "start_time": "2020-05-31T05:01:13.734Z" } }, "source": [ "## 2.3 Add Calculated Fields (reused sql script)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%env\n", "ETL_CONF_CURRENT_TIMESTAMP=current_timestamp()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"update_load\" name=\"add calc field for SCD\" environments=dev,test sqlParams=table_name=delta_typed,now=${ETL_CONF_CURRENT_TIMESTAMP}\n", "\n", "SELECT id,name,email,state, ${now} AS valid_from, CAST(null AS timestamp) AS valid_to\n", ",1 AS iscurrent, md5(concat(name,email,state)) AS checksum \n", "FROM ${table_name}" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "end_time": "2020-05-31T05:03:33.741024Z", "start_time": "2020-05-31T05:03:33.247Z" } }, "source": [ "## 2.4 Prepare Datasets for SCD Type2 Insert\n", "\n", "- Generate extra rows for changed records.\n", "- The 'null' merge_key means it will be inserted, not update existing records according to the rule in SCD type2" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"staged_update\" name=\"generate extra rows for SCD\" environments=dev,test\n", "\n", "SELECT NULL AS mergeKey, new.*\n", "FROM initial_load old\n", "INNER JOIN update_load new\n", "ON old.id = new.id\n", "WHERE old.iscurrent=true\n", "AND old.checksum<>new.checksum\n", "\n", "UNION\n", "\n", "SELECT id AS mergeKey, *\n", "FROM update_load" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2.5 Perform the Type 2 SCD" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DeltaLakeMergeLoad\",\n", " \"name\": \"merge with existing contacts data\",\n", " \"environments\": [\n", " \"dev\",\n", " \"test\"\n", " ],\n", " \"inputView\": \"staged_update\",\n", " \"outputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/output/contact/\"\n", " \"numPartitions\": 2,\n", " \"condition\": \"source.mergeKey = target.id\",\n", " \"whenMatchedUpdate\": {\n", " \"condition\": \"target.iscurrent = true AND source.checksum <> target.checksum\",\n", " \"values\": {\n", " \"valid_to\": ${ETL_CONF_CURRENT_TIMESTAMP},\n", " \"iscurrent\": false\n", " }\n", " },\n", " \"whenNotMatchedByTargetInsert\": {}\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 3. Create a Delta Lake table in Athena\n", "### Build up a Glue data catalog from Athena.We are using token based authentication to access Athena, no more long live credentials is required from secrets manager. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"JDBCExecute\",\n", " \"name\": \"Create glue data catalog\",\n", " \"environments\": [\n", " \"dev\",\n", " \"test\"\n", " ],\n", " \"inputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/sql/create_table_contact.sql\",\n", " \"jdbcURL\": \"jdbc:awsathena://AwsRegion=\"${AWS_DEFAULT_REGION}\";S3OutputLocation=s3://\"${ETL_CONF_DATALAKE_LOC}\"/athena-query-result;AwsCredentialsProviderClass=com.amazonaws.auth.WebIdentityTokenCredentialsProvider\",\n", " \"sqlParams\":{\n", " \"datalake_loc\": \"'s3://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/output/contact/_symlink_format_manifest/'\",\n", " \"table_name\": \"default.deltalake_contact_jhub\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 4. Query Delta Lake (optional)\n", "### to skip in a productionized ETL job, use a fake environment `uat`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DeltaLakeExtract\",\n", " \"name\": \"read contact Delta Lake table\",\n", " \"description\": \"read contact table\",\n", " \"environments\": [\n", " \"uat\"\n", " ],\n", " \"inputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/output/contact/\",\n", " \"outputView\": \"contact\"\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Confirm 92 records are expired" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"expired_count\" name=\"expired_count\" environments=uat\n", "SELECT count(*) FROM contact WHERE valid_to is not null" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%metadata \n", "contact" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " ## Confirm we now have 1192 records" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"total_count\" name=\"total_count\" environments=uat\n", "SELECT count(*) FROM contact" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## View one of the changed records" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"validate_type2\" name=\"validate_type2\" environments=uat\n", "SELECT * FROM contact WHERE id=12" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Arc", "language": "javascript", "name": "arc" }, "language_info": { "codemirror_mode": "javascript", "file_extension": ".json", "mimetype": "javascript", "name": "arc", "nbconvert_exporter": "arcexport", "version": "3.13.1" } }, "nbformat": 4, "nbformat_minor": 4 }