{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%conf \n", "numRows=5\n", "streaming=false" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 1. Extract static data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DelimitedExtract\",\n", " \"name\": \"extract initial table\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"inputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/data/initial_contacts.csv\",\n", " \"schemaURI\":\"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/meta/contact_meta_0.json\",\n", " \"outputView\": \"initial_raw\", \n", " \"delimiter\": \"Comma\",\n", " \"header\": false,\n", " \"quote\": \"None\"\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"TypingTransform\",\n", " \"name\": \"apply table schema 0\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"schemaURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/meta/contact_meta_0.json\",\n", " \"inputView\": \"initial_raw\", \n", " \"outputView\": \"initial_typed\",\n", " \"numPartitions\": 1\n", " \"persist\":true\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"DelimitedExtract\",\n", " \"name\": \"extract updated data\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"inputURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/data/update_contacts.csv\",\n", " \"schemaURI\":\"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/meta/contact_meta_0.json\",\n", " \"outputView\": \"delta_raw\", \n", " \"delimiter\": \"Comma\",\n", " \"header\": false,\n", " \"quote\": \"None\"\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"TypingTransform\",\n", " \"name\": \"apply table schema 0\",\n", " \"environments\": [\"dev\", \"test\"],\n", " \"schemaURI\": \"s3a://\"${ETL_CONF_DATALAKE_LOC}\"/app_code/meta/contact_meta_0.json\",\n", " \"inputView\": \"delta_raw\", \n", " \"outputView\": \"delta_typed\",\n", " \"numPartitions\": 1\n", " \"persist\":true\n", "}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 2. Turn on Spark Streaming" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%conf \n", "streaming=true\n", "streamingDuration=30" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 2.1 Convert static data to stream\n", "- Initial stream = Initial dataset\n", "- Delta stream = Incremental dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"RateExtract\",\n", " \"name\": \"create a streaming source\",\n", " \"environments\": [\n", " \"production\",\n", " \"test\"\n", " ],\n", " \"outputView\": \"initial_stream\",\n", " \"numPartitions\": 1,\n", " \"rowsPerSecond\": 5\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"stream_a\" name=\"simulate a stream\" sqlParams=input_table=initial_typed,stream_table=initial_stream numPartitions=1\n", "\n", "SELECT *\n", "FROM ${stream_table} \n", "INNER JOIN ${input_table}\n", "ON ${input_table}._index = ${stream_table}.value" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "{\n", " \"type\": \"RateExtract\",\n", " \"name\": \"create a streaming source\",\n", " \"environments\": [\n", " \"production\",\n", " \"test\"\n", " ],\n", " \"outputView\": \"delta_stream\",\n", " \"numPartitions\": 1,\n", " \"rowsPerSecond\": 5\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"stream_b\" name=\"simulate b stream\" sqlParams=input_table=delta_typed,stream_table=delta_stream numPartitions=1\n", "\n", "SELECT *\n", "FROM ${stream_table} \n", "INNER JOIN ${input_table}\n", "ON ${input_table}._index = ${stream_table}.value" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%sql outputView=\"join_streams\" name=\"join two streams\"\n", "\n", "SELECT\n", " initial.id as initial_id,\n", " initial.name as initial_name,\n", " initial.email as initial_email,\n", " initial.state as initial_state,\n", " delta.email as delta_email,\n", " delta.state as delta_state\n", "FROM stream_a initial\n", "INNER JOIN stream_b delta\n", "ON initial.id = delta.id\n", "where initial.email<>delta.email or initial.state<>delta.state\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Arc", "language": "javascript", "name": "arc" }, "language_info": { "file_extension": "arc", "mimetype": "text/arc", "name": "arc", "nbconvert_exporter": "text", "version": "2.4.2" } }, "nbformat": 4, "nbformat_minor": 2 }