{ "cells": [ { "cell_type": "markdown", "id": "4c7d5e9f", "metadata": {}, "source": [ "## Connect DocumentDb using spark connector from EMR Studio Notebook using Pyspark, Spark Scala, and SparkR\n", "\n", "#### Topics covered in this example\n", "\n", "* Configuring mongodb spark connector\n", "* Configuring mongodb input database URI\n", "* Configuring mongodb output database URI\n", "* Connecting to AWS DocumentDB using mongodb spark connector to read data into Spark DF\n", "* Connecting to AWS DocumentDB using mongodb spark connector to write data from Spark DF to DocumentDB" ] }, { "cell_type": "markdown", "id": "1d779439", "metadata": {}, "source": [ "## Table of Contents:\n", "\n", "1. [Prerequisites](#Prerequisites)\n", "2. [Introduction](#Introduction)\n", "3. [Load the configuration in memory](#Load-the-configuration-in-memory)\n", "4. [Read data using Pyspark](#Read-data-using-Pyspark)\n", "5. [Write data using Pyspark](#Write-data-using-Pyspark)\n", "6. [Read data using Scala](#Read-data-using-Scala)\n", "7. [Write data using Scala](#Write-data-using-Scala)\n", "8. [Read data using SparkR](#Read-data-using-SparkR)\n", "9. [Write data using SparkR](#Write-data-using-SparkR)" ] }, { "cell_type": "markdown", "id": "99a57308", "metadata": {}, "source": [ "## Prerequisites" ] }, { "cell_type": "markdown", "id": "2f8bec46", "metadata": {}, "source": [ " 1. This notebook support Multi-language support for Spark kernels\n", " 2. Mongo Spark Connector Version - mongo-spark-connector_2.12:3.0.1\n", " 3. EMR Version - emr-6.4.0\n", " 4. DocumentDB Engine Version - docdb 4.0.0" ] }, { "cell_type": "markdown", "id": "40d6cbd2", "metadata": {}, "source": [ "## Introduction" ] }, { "cell_type": "markdown", "id": "71f3992d", "metadata": {}, "source": [ "This notebooks shows how to connect to DocumentDB using mongo spark connector(mongo-spark-connector_2.12:3.0.1) from Amazon EMR Studio Notebook using Pyspark, Scala, SparkR" ] }, { "cell_type": "markdown", "id": "832f5897", "metadata": {}, "source": [ "## Load the configuration in memory" ] }, { "cell_type": "code", "execution_count": null, "id": "3a453ec7", "metadata": {}, "outputs": [], "source": [ "%%configure -f\n", "{\n", " \"conf\": {\n", " \"spark.mongodb.input.uri\": \"mongodb://:@:/.?readPreference=secondaryPreferred\",\n", " \"spark.mongodb.output.uri\": \"mongodb://:@:/.\",\n", " \"spark.jars.packages\": \"org.mongodb.spark:mongo-spark-connector_2.12:3.0.1\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "id": "eb52648d", "metadata": {}, "source": [ "## Read data using Pyspark" ] }, { "cell_type": "code", "execution_count": null, "id": "bbc8b684", "metadata": {}, "outputs": [], "source": [ "%%pyspark\n", "df = spark.read.format(\"mongo\").option(\"database\", \"\").option(\"collection\", \"\").load()\n", "df.show()" ] }, { "cell_type": "markdown", "id": "7eccd39c", "metadata": {}, "source": [ "## Write data using Pyspark" ] }, { "cell_type": "code", "execution_count": null, "id": "fef2a682", "metadata": {}, "outputs": [], "source": [ "%%pyspark\n", "people = spark.createDataFrame([(\"Bilbo Baggins\", 50), (\"Gandalf\", 1000), (\"Thorin\", 195), (\"Balin\", 178), (\"Kili\", 77),\n", " (\"Dwalin\", 169), (\"Oin\", 167), (\"Gloin\", 158), (\"Fili\", 82), (\"Bombur\", None)], [\"name\", \"age\"])\n", "people.show()\n", "people.write.format(\"mongo\").mode(\"append\").option(\"database\",\n", "\"\").option(\"collection\", \"\").save()\n", "df_people = spark.read.format(\"mongo\").option(\"database\", \"\").option(\"collection\", \"\").load()\n", "df_people.show()" ] }, { "cell_type": "markdown", "id": "9b059706", "metadata": {}, "source": [ "## Read data using Scala" ] }, { "cell_type": "code", "execution_count": null, "id": "8b2563e1", "metadata": {}, "outputs": [], "source": [ "%%scalaspark\n", "val df = spark.read.format(\"mongo\").option(\"database\", \"\").option(\"collection\", \"\").load()\n", "df.show()" ] }, { "cell_type": "markdown", "id": "75d2a02b", "metadata": {}, "source": [ "## Write data using Scala" ] }, { "cell_type": "code", "execution_count": null, "id": "63966e6d", "metadata": {}, "outputs": [], "source": [ "%%scalaspark\n", "import com.mongodb.spark._\n", "import com.mongodb.spark.config._\n", "val writeConfig = WriteConfig(Map(\"collection\" -> \"\", \"writeConcern.w\" -> \"majority\"), Some(WriteConfig(sc)))\n", "val sparkDocuments = sc.parallelize((1 to 10).map(i => Document.parse(s\"{spark: $i}\")))\n", "MongoSpark.save(sparkDocuments, writeConfig)\n", "val numbers_df = spark.read.format(\"mongo\").option(\"database\", \"\").option(\"collection\", \"\").load()\n", "numbers_df.show()" ] }, { "cell_type": "markdown", "id": "8b2a8692", "metadata": {}, "source": [ "## Read data using SparkR" ] }, { "cell_type": "code", "execution_count": null, "id": "11d58b43", "metadata": {}, "outputs": [], "source": [ "%%rspark\n", "df <- read.df(\"\", source = \"com.mongodb.spark.sql.DefaultSource\", database = \"\", collection = \"\")\n", "showDF(df)" ] }, { "cell_type": "markdown", "id": "3609e158", "metadata": {}, "source": [ "## Write data using SparkR" ] }, { "cell_type": "code", "execution_count": null, "id": "ffddd791", "metadata": {}, "outputs": [], "source": [ "%%rspark\n", "charactersRdf <- data.frame(list(name=c(\"Bilbo Baggins\", \"Gandalf\", \"Thorin\",\n", " \"Balin\", \"Kili\", \"Dwalin\", \"Oin\", \"Gloin\", \"Fili\", \"Bombur\"),\n", " age=c(50, 1000, 195, 178, 77, 169, 167, 158, 82, NA)))\n", "charactersSparkdf <- createDataFrame(charactersRdf)\n", "write.df(charactersSparkdf, \"\", source = \"com.mongodb.spark.sql.DefaultSource\",\n", " mode = \"overwrite\", database = \"\", collection = \"\")\n", "characters_df <- read.df(\"\", source = \"com.mongodb.spark.sql.DefaultSource\",\n", " database = \"\", collection = \"\")\n", "showDF(characters_df)" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }