{ "cells": [ { "cell_type": "markdown", "id": "a28da63f", "metadata": {}, "source": [ "# Connect to Amazon Redshift with Pyspark, Spark Scala, and SparkR\n", "\n", "\n", "## Table of Contents:\n", "\n", "1. [Prerequisites](#Prerequisites)\n", "2. [Introduction](#Introduction)\n", "3. [Setup](#Setup)\n", "4. [Connect to Amazon Redshift using Pyspark](#Connect-to-Amazon-Redshift-using-Pyspark)\n", "5. [Connect to Amazon Redshift using Scala](#Connect-to-Amazon-Redshift-using-Scala)\n", "6. [Connect to Amazon Redshift using SparkR](#Connect-to-Amazon-Redshift-using-SparkR)\n", "\n", "\n", "## Prerequisites\n", "\n", "In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.\n", "* This example we connect to Amazon Redshift cluster, hence the EMR cluster attached to this notebook must have the connectivity (VPC) and appropriate rules (Security Group).\n", "\n", "\n", "## Introduction\n", "In this example we use Pyspark, Spark Scala, and Spark R to connect to a table in Amazon Redshift using spark-redshift connector.\n", "\n", "[spark-redshift](#https://github.com/spark-redshift-community/spark-redshift) is a performant Amazon Redshift data source for Apache Spark\n", "\n", "## Setup\n", "\n", "* Create an S3 bucket location to be used as a temporary location for Redshift dataset. For example: s3://EXAMPLE-BUCKET/temporary-redshift-dataset/\n", "\n", "* Create an AWS IAM role which will be associated to the Amazon Redshift cluster. Make sure that this IAM role has access to read and write to the above mentioned S3 bucket location with the appropriate IAM policy. More details:\n", "\n", " * [Create AWS IAM role for Amazon Redshift](#https://docs.aws.amazon.com/redshift/latest/gsg/rs-gsg-create-an-iam-role.html)\n", " * [Associate IAM role with Amazon Redshift cluster](#https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum-add-role.html)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "618db6ef", "metadata": {}, "outputs": [], "source": [ "%%configure -f\n", "{ \n", " \"conf\": \n", " {\n", " \"spark.jars.packages\": \"org.apache.spark:spark-avro_2.11:2.4.2,io.github.spark-redshift-community:spark-redshift_2.11:4.0.1\"\n", " }\n", "}" ] }, { "cell_type": "markdown", "id": "c3433ed7", "metadata": {}, "source": [ "## Connect to Amazon Redshift using Pyspark" ] }, { "cell_type": "code", "execution_count": null, "id": "8220a165", "metadata": {}, "outputs": [], "source": [ "%%pyspark\n", "\n", "#Declare the variables and replace the variables values as appropiate\n", "\n", "str_jdbc_url=\"jdbc:redshift://:5439/dev?user=&password=\"\n", "str_dbname=\"\"\n", "str_tgt_table=\"\"\n", "str_s3_path=\"s3://\"\n", "str_iam_role=\"\"\n", "\n", "# Read data from source table\n", "\n", "jdbcDF = spark.read \\\n", " .format(\"io.github.spark_redshift_community.spark.redshift\") \\\n", " .option(\"url\", str_jdbc_url) \\\n", " .option(\"dbtable\", str_dbname) \\\n", " .option(\"tempdir\", str_s3_path) \\\n", " .option(\"aws_iam_role\",str_iam_role) \\\n", " .load()\n", "\n", "jdbcDF.limit(5).show()\n", "\n", "# Write data to target table\n", "\n", "jdbcDF.write \\\n", " .format(\"io.github.spark_redshift_community.spark.redshift\") \\\n", " .option(\"url\", str_jdbc_url) \\\n", " .option(\"dbtable\", str_tgt_table) \\\n", " .option(\"tempdir\", str_s3_path) \\\n", " .option(\"aws_iam_role\",str_iam_role).mode(\"append\").save()" ] }, { "cell_type": "markdown", "id": "bd9f2fad", "metadata": {}, "source": [ "## Connect to Amazon Redshift using Scala" ] }, { "cell_type": "code", "execution_count": null, "id": "c7f17fdd", "metadata": {}, "outputs": [], "source": [ "%%scalaspark\n", "\n", "#Declare the variables and replace the variables values as appropiate\n", "\n", "val str_jdbc_url=\"jdbc:redshift://:5439/dev?user=&password=\"\n", "val str_dbname=\"\"\n", "val str_tgt_table=\"\"\n", "val str_s3_path=\"s3://\"\n", "val str_iam_role=\"\"\n", "val str_username=\"\"\n", "val str_password=\"\"\n", "\n", "# Read data from source table\n", "val jdbcDF = (spark.read.format(\"io.github.spark_redshift_community.spark.redshift\")\n", " .option(\"url\", str_jdbc_url)\n", " .option(\"dbtable\", str_dbname)\n", " .option(\"tempdir\", str_s3_path)\n", " .option(\"aws_iam_role\", str_iam_role)\n", " .load())\n", "\n", "# Write data to target table\n", "\n", "jdbcDF.limit(5).show()\n", "\n", "jdbcDF.write.mode(\"append\").\n", " format(\"io.github.spark_redshift_community.spark.redshift\").option(\"url\", str_jdbc_url).option(\"dbtable\", str_tgt_table).option(\"aws_iam_role\", str_iam_role).option(\"tempdir\", str_s3_path).save()\n", " \n" ] }, { "cell_type": "markdown", "id": "d20e8bcd", "metadata": {}, "source": [ "## Connect to Amazon Redshift using SparkR" ] }, { "cell_type": "code", "execution_count": null, "id": "eecde2e3", "metadata": {}, "outputs": [], "source": [ "%%rspark\n", "\n", "#Declare the variables and replace the variables values as appropiate\n", "\n", "str_jdbc_url=\"jdbc:redshift://:5439/dev?user=&password=\"\n", "str_dbname=\"\"\n", "str_tgt_table=\"\"\n", "str_s3_path=\"s3://\"\n", "str_iam_role=\"\"\n", "\n", "# Read data from source table\n", "\n", "df <- read.df(\n", " NULL,\n", " \"io.github.spark_redshift_community.spark.redshift\",\n", " aws_iam_role = str_iam_role,\n", " tempdir = str_s3_path,\n", " dbtable = str_src_table,\n", " url = str_jdbc_url)\n", "\n", "showDF(df)" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }