{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%conf numRows=5 logger=true" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 1. Initial Table Load" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DelimitedExtract\",\n", " \"name\": \"extract initial table\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"inputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/data/initial_contacts.csv\",\n", " \"outputView\": \"initial_raw\", \n", " \"delimiter\": \"Comma\",\n", " \"header\": false,\n", " \"quote\": \"None\",\n", " \"authentication\": {\n", " \"method\": \"AmazonIAM\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Check Original Data Schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%printschema \n", "initial_raw" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "start_time": "2020-03-03T08:30:30.028Z" } }, "source": [ "## 1.2 Apply Data Type" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"TypingTransform\",\n", " \"name\": \"apply table schema 0\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"schemaURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/meta/contact_meta_0.json\",\n", " \"inputView\": \"initial_raw\", \n", " \"outputView\": \"initial_typed\",\n", " \"authentication\": {\n", " \"method\": \"AmazonIAM\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Check Typed Data Schema & Stats" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%printschema \n", "initial_typed" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.3 Data Quality Control" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sqlvaildate outputView=\"fail_fast\" name=\"validation\" description=\"fail the job if data transform is failed\" environments=dev,test sqlParams=inputView=initial_typed\n", "\n", "SELECT SUM(error) = 0 AS valid\n", " ,TO_JSON(\n", " NAMED_STRUCT('count', COUNT(error), 'errors', SUM(error))\n", " ) AS message\n", "FROM \n", "(\n", " SELECT CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS error \n", " FROM ${inputView}\n", ") base" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.4 Add Calculated Fields for SCD Type 2\n", "### CURRENT_TIMESTAMP will be passed in automatically, when the ETL job is triggered" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%env \n", "ETL_CONF_CURRENT_TIMESTAMP=current_timestamp()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"initial_load\" name=\"add calc field for SCD\" environments=dev,test sqlParams=table_name=initial_typed,now=${ETL_CONF_CURRENT_TIMESTAMP}\n", "\n", "SELECT id,name,email,state, CAST(${now} AS timestamp) AS valid_from, CAST(null AS timestamp) AS valid_to\n", ",1 AS iscurrent, md5(concat(name,email,state)) AS checksum \n", "FROM ${table_name}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1.5 Load to Delta Lake as the initial daily snaptshot table\n", "### Delta Lake is an optimized data lake to support Time Travel, ACID transaction" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DeltaLakeLoad\",\n", " \"name\": \"Initial load to Data Lake\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"inputView\": \"initial_load\",\n", " \"outputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/output/contact_snapshot/\",\n", " \"numPartitions\": 2\n", " \"saveMode\": \"Overwrite\",\n", " \"authentication\": {\n", " \"method\": \"AmazonIAM\"\n", " }\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Arc", "language": "javascript", "name": "arc" }, "language_info": { "codemirror_mode": "javascript", "file_extension": ".json", "mimetype": "javascript", "name": "arc", "nbconvert_exporter": "arcexport", "version": "3.8.0" } }, "nbformat": 4, "nbformat_minor": 4 }