{ "cells": [ { "cell_type": "markdown", "id": "5cc7fcb2-41bb-4c4f-9b9a-b01798ff622f", "metadata": { "execution": { "iopub.execute_input": "2022-11-20T06:27:55.989263Z", "iopub.status.busy": "2022-11-20T06:27:55.988861Z", "iopub.status.idle": "2022-11-20T06:27:56.010302Z", "shell.execute_reply": "2022-11-20T06:27:56.009491Z", "shell.execute_reply.started": "2022-11-20T06:27:55.989217Z" }, "tags": [] }, "source": [ "# Connect to Amazon Redshift with Pyspark using EMR-RedShift connector from EMR Studio using Role " ] }, { "cell_type": "markdown", "id": "6ae9b1b8-14d5-4164-baee-ca2f4d2ff0fc", "metadata": {}, "source": [ "## Prerequisites\n", "In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.\n", "\n", "- EMR EC2 cluster with release 6.9.0 on higher\n", "- This example we connect to Amazon Redshift cluster, hence the EMR cluster attached to this notebook must have the connectivity (VPC) and appropriate rules (Security Group).\n", " - EMR 6.9.0 cluster should be attached to this notebook and should have the Spark, JupyterEnterpriseGateway, and Livy applications installed. \n", "- Source table exists in RedShift with sample data\n", "- Target table exists in RedShift with or without data\n", "- To use EMR-RedShift connector with Amazon EMR Studio Notebooks, you must first copy the jar files from the local file system to HDFS, present on the master node of the EMR cluster, follow setup steps." ] }, { "cell_type": "markdown", "id": "637de15c-b1d6-4c43-beb3-79ef8a1dad36", "metadata": {}, "source": [ "## Introduction\n", "\n", "In this example we use Pyspark to connect to a table in Amazon Redshift using spark-redshift connector.\n", "\n", "Starting from EMR release 6.9.0, Redshift JDBC driver >= 2.1 is packaged into the environment. With the new version of JDBC driver, you can specify the JDBC URL without including the raw username and password. Instead, you can specify jdbc:redshift:iam:// scheme, which will make JDBC driver to use your EMR Serverless job execution role to fetch the credentials automatically. \n", "\n", "See [Here](https://docs.aws.amazon.com/redshift/latest/mgmt/generating-iam-credentials-configure-jdbc-odbc.html) for more information on configuring JDBC connection to use IAM credentials.\n" ] }, { "cell_type": "markdown", "id": "fcd9ed3f-a2a1-47d8-96ed-30ac570310d6", "metadata": {}, "source": [ "## Setup\n", "Create an S3 bucket location to be used as a temporary location for Redshift dataset. For example: s3://EXAMPLE-BUCKET/temporary-redshift-dataset/\n", "\n", "- Create an AWS IAM role which will be associated to the Amazon Redshift cluster. Make sure that this IAM role has access to read and write to the above mentioned S3 bucket location with the appropriate IAM policy. More details:\n", "\n", " [Create AWS IAM role for Amazon Redshift](https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum-create-role.html)\n", "\n", " [Associate IAM role with Amazon Redshift cluster](https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum-add-role.html)\n", "\n", "- Connect to the master node of the cluster using SSH and then copy the jar files from the local filesystem to HDFS as shown in the following examples. In the example, we create a directory in HDFS for clarity of file management. You can choose your own destination in HDFS, if desired.\n", "\n", " `hdfs dfs -mkdir -p /apps/emr_rs_connector/lib`\n", "\n", " `hdfs dfs -copyFromLocal /usr/share/aws/redshift/jdbc/RedshiftJDBC.jar /apps/emr_rs_connector/lib/RedshiftJDBC.jar`\n", "\n", " `hdfs dfs -copyFromLocal /usr/share/aws/redshift/spark-redshift/lib/spark-redshift.jar /apps/emr_rs_connector/lib/spark-redshift.jar`\n", "\n", " `hdfs dfs -copyFromLocal /usr/share/aws/redshift/spark-redshift/lib/spark-avro.jar /apps/emr_rs_connector/lib/spark-avro.jar`\n", "\n", " `hdfs dfs -copyFromLocal /usr/share/aws/redshift/spark-redshift/lib/minimal-json.jar /apps/emr_rs_connector/lib/minimal-json.jar`\n", "\n", " `hdfs dfs -ls /apps/emr_rs_connector/lib`\n", "\n" ] }, { "cell_type": "markdown", "id": "f09b3ad6-eb00-401b-8834-87546fd02b90", "metadata": {}, "source": [ "## Configure to use jar file in studio notebook" ] }, { "cell_type": "code", "execution_count": null, "id": "2cf647d9-831d-4276-b8bb-d7b2a4cd7d01", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%configure -f\n", "{\n", " \"conf\" : {\n", " \"spark.jars\":\"hdfs:///apps/emr_rs_connector/lib/RedshiftJDBC.jar,hdfs:///apps/emr_rs_connector/lib/minimal-json.jar,hdfs:///apps/emr_rs_connector/lib/spark-avro.jar,hdfs:///apps/emr_rs_connector/lib/spark-redshift.jar\",\n", " \"spark.pyspark.python\" : \"python3\",\n", " \"spark.pyspark.virtualenv.enable\" : \"true\",\n", " \"spark.pyspark.virtualenv.type\" : \"native\",\n", " \"spark.pyspark.virtualenv.bin.path\" : \"/usr/bin/virtualenv\"\n", "\n", " }\n", "}" ] }, { "cell_type": "markdown", "id": "f3fb38b0-e7e8-4503-906b-96d6730c8867", "metadata": {}, "source": [ "## Connect to Amazon Redshift using pyspark" ] }, { "cell_type": "code", "execution_count": null, "id": "de481081-525c-4749-bcfd-97dcf6e69e4e", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%pyspark\n", "\n", "import pyspark\n", "from pyspark.sql import SparkSession\n", "from pyspark import SparkContext\n", "from pyspark.sql import SQLContext\n", "\n", "\n", "#jdbc:redshift:iam://examplecluster...redshift.amazonaws.com:5439/DB\n", "str_jdbc_url=\"jdbc:redshift:iam://...redshift.amazonaws.com:5439/replace_DB_name?ApplicationName=EMRRedshiftSparkConnection\"\n", "str_src_table=\" \"\n", "str_tgt_table=\" \"\n", "str_s3_path=\" \"\n", "str_iam_role=\" \"\n", "\n", "#sc = SparkContext().getOrCreate() # Existing SC\n", "\n", "sql_context = SQLContext(sc)\n", "\n", "\n", "jdbcDF = sql_context.read\\\n", " .format(\"io.github.spark_redshift_community.spark.redshift\")\\\n", " .option(\"url\", str_jdbc_url)\\\n", " .option(\"dbtable\", str_src_table)\\\n", " .option(\"aws_iam_role\",str_iam_role)\\\n", " .option(\"tempdir\", str_s3_path)\\\n", " .load()\n", "\n", "jdbcDF.limit(5).show()\n", "\n", "\n", "jdbcDF.write \\\n", " .format(\"io.github.spark_redshift_community.spark.redshift\") \\\n", " .option(\"url\", str_jdbc_url) \\\n", " .option(\"dbtable\", str_tgt_table) \\\n", " .option(\"tempdir\", str_s3_path) \\\n", " .option(\"aws_iam_role\",str_iam_role) \\\n", " .mode(\"append\")\\\n", " .save()\n", "\n", "\n" ] }, { "cell_type": "markdown", "id": "0ae2ea22-2709-45f5-bfeb-793d19a10500", "metadata": {}, "source": [ "## Connect to Amazon Redshift using scalaspark" ] }, { "cell_type": "code", "execution_count": null, "id": "e23f7b64-b555-4bf4-a715-acc4586593e4", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%scalaspark\n", "\n", "//Declare the variables and replace the variables values as appropiate\n", "\n", "//jdbc:redshift:iam://examplecluster...redshift.amazonaws.com:5439/DB\n", "val str_jdbc_url=str_jdbc_url=\"jdbc:redshift:iam://...redshift.amazonaws.com:5439/replace_DB_name?ApplicationName=EMRRedshiftSparkConnection\"\n", "val str_src_table=\" \"\n", "val str_tgt_table=\" \"\n", "val str_s3_path=\" \"\n", "val str_iam_role=\" \"\n", "\n", "//Read data from source table\n", "val jdbcDF = (spark.read.format(\"io.github.spark_redshift_community.spark.redshift\")\n", " .option(\"url\", str_jdbc_url)\n", " .option(\"dbtable\", str_src_table)\n", " .option(\"tempdir\", str_s3_path)\n", " .option(\"aws_iam_role\", str_iam_role)\n", " .load())\n", "\n", "// Write data to target table\n", "\n", "jdbcDF.limit(5).show()\n", "\n", "\n", "jdbcDF.write.mode(\"append\").\n", " format(\"io.github.spark_redshift_community.spark.redshift\").option(\"url\", str_jdbc_url).option(\"dbtable\", str_tgt_table).option(\"aws_iam_role\", str_iam_role).option(\"tempdir\", str_s3_path).save()\n" ] }, { "cell_type": "markdown", "id": "cf56be3c-38ab-4013-9e22-c739f04e6ca9", "metadata": {}, "source": [ "## Connect to Amazon Redshift using SparkR" ] }, { "cell_type": "code", "execution_count": null, "id": "e309bfb9-b914-4b5f-a39b-1371122643b3", "metadata": {}, "outputs": [], "source": [ "%%rspark\n", "#Declare the variables and replace the variables values as appropiate\n", "\n", "#jdbc:redshift:iam://examplecluster...redshift.amazonaws.com:5439/DB\n", "str_jdbc_url=\"jdbc:redshift:iam://...redshift.amazonaws.com:5439/replace_DB_name?ApplicationName=EMRRedshiftSparkConnection\"\n", "str_src_table=\" \"\n", "str_tgt_table=\" \"\n", "str_s3_path=\" \"\n", "str_iam_role=\" \"\n", "\n", "# Read data from source table\n", "\n", "df <- read.df(\n", " NULL,\n", " \"io.github.spark_redshift_community.spark.redshift\",\n", " aws_iam_role = str_iam_role,\n", " tempdir = str_s3_path,\n", " dbtable = str_src_table,\n", " url = str_jdbc_url)\n", "\n", "showDF(df)" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }