{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%conf numRows=5 logger=true" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "end_time": "2020-03-18T22:38:05.895407Z", "start_time": "2020-03-18T22:37:48.160Z" } }, "source": [ "## 2. Ingest A New Incremental CSV File\n", "### Look at record 12, the `state` is changed in the file" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DelimitedExtract\",\n", " \"name\": \"extract incremental data\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"inputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/data/update_contacts.csv\",\n", " \"outputView\": \"delta_raw\", \n", " \"delimiter\": \"Comma\",\n", " \"header\": false,\n", " \"authentication\": {\n", " \"method\": \"AmazonIAM\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2.2 Apply Data Type (reused schema file)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"TypingTransform\",\n", " \"name\": \"apply table schema 0 to incremental load\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"schemaURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/meta/contact_meta_0.json\",\n", " \"inputView\": \"delta_raw\", \n", " \"outputView\": \"delta_typed\",\n", " \"authentication\": {\n", " \"method\": \"AmazonIAM\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "end_time": "2020-06-07T15:02:50.155313Z", "start_time": "2020-06-07T15:02:50.125Z" } }, "source": [ "## 2.3 Data Quality Control (reused sql script)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sqlvaildate outputView=\"fail_fast\" name=\"validation\" description=\"fail the job if data transform is failed\" environments=dev,test sqlParams=inputView=delta_typed\n", "\n", "SELECT SUM(error) = 0 AS valid\n", " ,TO_JSON(\n", " NAMED_STRUCT('count', COUNT(error), 'errors', SUM(error))\n", " ) AS message\n", "FROM \n", "(\n", " SELECT CASE WHEN SIZE(_errors) > 0 THEN 1 ELSE 0 END AS error \n", " FROM ${inputView}\n", ") base" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "end_time": "2020-05-31T05:01:13.796275Z", "start_time": "2020-05-31T05:01:13.734Z" } }, "source": [ "## 2.4 Add Calculated Fields (reused sql script)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%env \n", "ETL_CONF_CURRENT_TIMESTAMP=CURRENT_TIMESTAMP()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"update_load\" name=\"add calc field for SCD\" environments=dev,test sqlParams=table_name=delta_typed,now=${ETL_CONF_CURRENT_TIMESTAMP}\n", "\n", "SELECT id,name,email,state, CAST(${now} AS timestamp) AS valid_from, CAST(null AS timestamp) AS valid_to\n", ",1 AS iscurrent, md5(concat(name,email,state)) AS checksum \n", "FROM ${table_name}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2.5 Output Incremental data to Delta Lake\n", "### Delta Lake is an optimized data lake to support Time Travel, ACID transaction" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DeltaLakeLoad\",\n", " \"name\": \"Initial load to Data Lake\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"inputView\": \"update_load\",\n", " \"outputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/output/delta_load/\",\n", " \"numPartitions\": 2\n", " \"saveMode\": \"Overwrite\",\n", " \"authentication\": {\n", " \"method\": \"AmazonIAM\"\n", " }\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Arc", "language": "javascript", "name": "arc" }, "language_info": { "codemirror_mode": "javascript", "file_extension": ".json", "mimetype": "javascript", "name": "arc", "nbconvert_exporter": "arcexport", "version": "3.8.0" } }, "nbformat": 4, "nbformat_minor": 4 }