{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. Read initial & incremental tables from Delta Lake" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DeltaLakeExtract\",\n", " \"name\": \"read initial load table\",\n", " \"description\": \"read initial load table\",\n", " \"environments\": [\n", " \"dev\",\n", " \"test\"\n", " ],\n", " \"inputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/output/contact_snapshot/\",\n", " \"outputView\": \"current_snapshot\"\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DeltaLakeExtract\",\n", " \"name\": \"read contact Delta Lake table\",\n", " \"description\": \"read contact table\",\n", " \"environments\": [\n", " \"dev\",\n", " \"test\"\n", " ],\n", " \"inputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/output/delta_load/\",\n", " \"outputView\": \"delta_data\"\n", "}" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "end_time": "2020-05-31T05:03:33.741024Z", "start_time": "2020-05-31T05:03:33.247Z" } }, "source": [ "## 3.2 Prepare Datasets for SCD Type2 Insert\n", "\n", "- Generate extra rows for changed records.\n", "- The 'null' merge_key means it will be inserted, not update existing records according to the rule in SCD type2" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"staged_update\" name=\"generate extra rows for SCD\" environments=dev,test\n", "\n", "SELECT NULL AS mergeKey, new.*\n", "FROM current_snapshot old\n", "INNER JOIN delta_data new\n", "ON old.id = new.id\n", "WHERE old.iscurrent=true\n", "AND old.checksum<>new.checksum\n", "\n", "UNION\n", "\n", "SELECT id AS mergeKey, *\n", "FROM delta_data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3.3 Implement the Type 2 SCD merge operation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%conf logger=true" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DeltaLakeMergeLoad\",\n", " \"name\": \"merge with existing contacts data\",\n", " \"environments\": [\n", " \"dev\",\n", " \"test\"\n", " ],\n", " \"inputView\": \"staged_update\",\n", " \"outputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/output/contact_snapshot/\"\n", " \"condition\": \"source.mergeKey = target.id\",\n", " \"whenMatchedUpdate\": {\n", " \"condition\": \"target.iscurrent = true AND source.checksum <> target.checksum\",\n", " \"values\": {\n", " \"valid_to\": ${ETL_CONF_CURRENT_TIMESTAMP},\n", " \"iscurrent\": false\n", " }\n", " },\n", " \"whenNotMatchedByTargetInsert\": {},\n", " \"numPartitions\": 1\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3.4 Create a Delta Lake table in Athena\n", "### Build up a Glue Data Catalog via Athena. This step can be done by Glue Crawler. However, it makes sense if we refresh partitions, create/update data catalog at the end of each ETL process, which is provides the data lineage contro at a single place." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"JDBCExecute\",\n", " \"name\": \"Create glue data catalog\",\n", " \"environments\": [\n", " \"dev\",\n", " \"test\"\n", " ],\n", " \"inputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/sql/create_table_contact.sql\",\n", " \"jdbcURL\": \"jdbc:awsathena://AwsRegion=\"${AWS_DEFAULT_REGION}\";S3OutputLocation=s3://\"${ETL_CONF_DATALAKE_LOC}\"/athena-query-result;AwsCredentialsProviderClass=com.amazonaws.auth.WebIdentityTokenCredentialsProvider\",\n", " \"sqlParams\":{\n", " \"datalake_loc\": \"'s3://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/output/contact_snapshot/_symlink_format_manifest/'\",\n", " \"table_name\": \"default.contact_snapshot\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 4. Query Delta Lake (validation steps)\n", "### to stop executing the followings in a productionized ETL job, use a fake environment `uat`\n", "### the same queries can be run in Athena" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DeltaLakeExtract\",\n", " \"name\": \"read contact Delta Lake table\",\n", " \"description\": \"read contact table\",\n", " \"environments\": [\n", " \"uat\"\n", " ],\n", " \"inputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/output/contact_snapshot\",\n", " \"outputView\": \"contact_snapshot\"\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Confirm 92 records are expired" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"expired_count\" name=\"expired_count\" environments=uat\n", "SELECT count(*) FROM contact_snapshot WHERE valid_to is not null" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%metadata \n", "contact_snapshot" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " ## Confirm we now have 1192 records" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"total_count\" name=\"total_count\" environments=uat\n", "SELECT count(*) FROM contact_snapshot" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## View one of the changed records" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"validate_type2\" name=\"validate_type2\" environments=uat\n", "SELECT * FROM contact_snapshot WHERE id=12" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Arc", "language": "javascript", "name": "arc" }, "language_info": { "codemirror_mode": "javascript", "file_extension": ".json", "mimetype": "javascript", "name": "arc", "nbconvert_exporter": "arcexport", "version": "3.8.0" } }, "nbformat": 4, "nbformat_minor": 4 }