{ "cells": [ { "cell_type": "markdown", "id": "c0c8c8f9", "metadata": {}, "source": [ "## Connect DynamoDB using spark connector from EMR Studio Notebook using Spark Scala\n", "\n", "#### Topics covered in this example\n", "\n", "* Configuring the EMR-DynamoDB Connector\n", "* Connecting to AWS DynamoDB using EMR-DynamoDB Connector to read data into Spark DF and get the count" ] }, { "cell_type": "markdown", "id": "66cbe6a8", "metadata": {}, "source": [ "## Table of Contents:\n", "\n", "1. [Prerequisites](#Prerequisites)\n", "2. [Introduction](#Introduction)\n", "3. [Load the configuration in memory](#Load-the-configuration-in-memory)\n", "4. [Read data using Scala](#Read-data-using-Scala)" ] }, { "cell_type": "markdown", "id": "f74c4358", "metadata": {}, "source": [ "## Prerequisites" ] }, { "cell_type": "markdown", "id": "d35bce95", "metadata": {}, "source": [ " 1. EMR Version - emr-6.4.0\n", " 2. The notebook is using Amazon EMR-DynamoDB Connector(emr-ddb-hadoop.jar) which is used to access data in Amazon DynamoDB using Apache Hadoop, Apache Hive, and Apache Spark in Amazon EMR. (https://docs.aws.amazon.com/emr/latest/ReleaseGuide/EMRforDynamoDB.html)\n", " 3. The Amazon EMR-DynamoDB Connector is locally available within EMR at location below: \n", " \n", " `/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar`\n", " 4. To locally use the spark.jars in the configuration using the `file://` protocol, Livy does not allow local files to be uploaded to the user session by default. So we need to update the livy configuration to allow the local directory and restart livy server to have these take effects.\n", " \n", " ##### The following commands should be run from the EMR cluster attached to the notebook\n", " \n", " a. Open the Livy configuration file\n", " \n", " `vim /etc/livy/conf/livy.conf`\n", " \n", " b. Add the ddb jars:\n", " \n", " `livy.file.local-dir-whitelist /usr/share/aws/emr/ddb/lib/`\n", " \n", " c. Restart Livy\n", " \n", " `systemctl restart livy-server.service`\n" ] }, { "cell_type": "markdown", "id": "9e0bfc9d", "metadata": {}, "source": [ "## Introduction" ] }, { "cell_type": "markdown", "id": "fc8fb425", "metadata": {}, "source": [ "This notebook shows how to connect to DynamoDB using DynamoDB Spark connector(emr-ddb-hadoop) from Amazon EMR Studio Notebook using Scala" ] }, { "cell_type": "markdown", "id": "e705dd8e", "metadata": {}, "source": [ "## Load the configuration in memory" ] }, { "cell_type": "code", "execution_count": null, "id": "d2b1831e", "metadata": {}, "outputs": [], "source": [ "%%configure -f\n", "{\n", " \"conf\": {\n", " \"spark.jars\":\"file:///usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar\" \n", " }\n", "}" ] }, { "cell_type": "markdown", "id": "664d194a", "metadata": {}, "source": [ "## Read data using Scala" ] }, { "cell_type": "code", "execution_count": null, "id": "3b102e29", "metadata": {}, "outputs": [], "source": [ "import org.apache.hadoop.io.Text;\n", "import org.apache.hadoop.dynamodb.DynamoDBItemWritable\n", "import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat\n", "import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat\n", "import org.apache.hadoop.mapred.JobConf\n", "import org.apache.hadoop.io.LongWritable\n", "\n", "var jobConf = new JobConf(sc.hadoopConfiguration)\n", "jobConf.set(\"dynamodb.servicename\", \"dynamodb\")\n", "jobConf.set(\"dynamodb.input.tableName\", \"\") // Pointing to DynamoDB table\n", "jobConf.set(\"dynamodb.endpoint\", \"\")\n", "jobConf.set(\"dynamodb.regionid\", \"\")\n", "jobConf.set(\"dynamodb.throughput.read\", \"1\")\n", "jobConf.set(\"dynamodb.throughput.read.percent\", \"1\")\n", "jobConf.set(\"dynamodb.version\", \"2011-12-05\")\n", " \n", "jobConf.set(\"mapred.output.format.class\", \"org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat\")\n", "jobConf.set(\"mapred.input.format.class\", \"org.apache.hadoop.dynamodb.read.DynamoDBInputFormat\")\n", " \n", "var orders = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])\n", " \n", "// Doing a count of items on Orders table\n", "orders.count()" ] } ], "metadata": { "kernelspec": { "display_name": "Spark", "language": "", "name": "sparkkernel" }, "language_info": { "codemirror_mode": "text/x-scala", "mimetype": "text/x-scala", "name": "scala", "pygments_lexer": "scala" } }, "nbformat": 4, "nbformat_minor": 5 }