# Connect to Amazon Redshift with Pyspark using EMR-RedShift connector from EMR Studio using Role 

## Prerequisites
In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.

- EMR EC2 cluster with release 6.9.0 on higher
- This example we connect to Amazon Redshift cluster, hence the EMR cluster attached to this notebook must have the connectivity (VPC) and appropriate rules (Security Group).
 - EMR 6.9.0 cluster should be attached to this notebook and should have the Spark, JupyterEnterpriseGateway, and Livy applications installed. 
- Source table exists in RedShift with sample data
- Target table exists in RedShift with or without data
- To use EMR-RedShift connector with Amazon EMR Studio Notebooks, you must first copy the jar files from the local file system to HDFS, present on the master node of the EMR cluster, follow setup steps.

## Introduction

In this example we use Pyspark to connect to a table in Amazon Redshift using spark-redshift connector.

Starting from EMR release 6.9.0, Redshift JDBC driver >= 2.1 is packaged into the environment. With the new version of JDBC driver, you can specify the JDBC URL without including the raw username and password. Instead, you can specify jdbc:redshift:iam:// scheme, which will make JDBC driver to use your EMR Serverless job execution role to fetch the credentials automatically. 

See [Here](https://docs.aws.amazon.com/redshift/latest/mgmt/generating-iam-credentials-configure-jdbc-odbc.html) for more information on configuring JDBC connection to use IAM credentials.


## Setup
Create an S3 bucket location to be used as a temporary location for Redshift dataset. For example: s3://EXAMPLE-BUCKET/temporary-redshift-dataset/

- Create an AWS IAM role which will be associated to the Amazon Redshift cluster. Make sure that this IAM role has access to read and write to the above mentioned S3 bucket location with the appropriate IAM policy. More details:

 [Create AWS IAM role for Amazon Redshift](https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum-create-role.html)

 [Associate IAM role with Amazon Redshift cluster](https://docs.aws.amazon.com/redshift/latest/dg/c-getting-started-using-spectrum-add-role.html)

- Connect to the master node of the cluster using SSH and then copy the jar files from the local filesystem to HDFS as shown in the following examples. In the example, we create a directory in HDFS for clarity of file management. You can choose your own destination in HDFS, if desired.

 `hdfs dfs -mkdir -p /apps/emr_rs_connector/lib`

 `hdfs dfs -copyFromLocal /usr/share/aws/redshift/jdbc/RedshiftJDBC.jar /apps/emr_rs_connector/lib/RedshiftJDBC.jar`

 `hdfs dfs -copyFromLocal /usr/share/aws/redshift/spark-redshift/lib/spark-redshift.jar /apps/emr_rs_connector/lib/spark-redshift.jar`

 `hdfs dfs -copyFromLocal /usr/share/aws/redshift/spark-redshift/lib/spark-avro.jar /apps/emr_rs_connector/lib/spark-avro.jar`

 `hdfs dfs -copyFromLocal /usr/share/aws/redshift/spark-redshift/lib/minimal-json.jar /apps/emr_rs_connector/lib/minimal-json.jar`

 `hdfs dfs -ls /apps/emr_rs_connector/lib`



## Configure to use jar file in studio notebook

In [None]:
%%configure -f
{
 "conf" : {
 "spark.jars":"hdfs:///apps/emr_rs_connector/lib/RedshiftJDBC.jar,hdfs:///apps/emr_rs_connector/lib/minimal-json.jar,hdfs:///apps/emr_rs_connector/lib/spark-avro.jar,hdfs:///apps/emr_rs_connector/lib/spark-redshift.jar",
 "spark.pyspark.python" : "python3",
 "spark.pyspark.virtualenv.enable" : "true",
 "spark.pyspark.virtualenv.type" : "native",
 "spark.pyspark.virtualenv.bin.path" : "/usr/bin/virtualenv"

 }
}

## Connect to Amazon Redshift using pyspark

In [None]:
%%pyspark

import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext


#jdbc:redshift:iam://examplecluster...redshift.amazonaws.com:5439/DB
str_jdbc_url="jdbc:redshift:iam://...redshift.amazonaws.com:5439/replace_DB_name?ApplicationName=EMRRedshiftSparkConnection"
str_src_table=" "
str_tgt_table=" "
str_s3_path=" "
str_iam_role=" "

#sc = SparkContext().getOrCreate() # Existing SC

sql_context = SQLContext(sc)


jdbcDF = sql_context.read\
 .format("io.github.spark_redshift_community.spark.redshift")\
 .option("url", str_jdbc_url)\
 .option("dbtable", str_src_table)\
 .option("aws_iam_role",str_iam_role)\
 .option("tempdir", str_s3_path)\
 .load()

jdbcDF.limit(5).show()


jdbcDF.write \
 .format("io.github.spark_redshift_community.spark.redshift") \
 .option("url", str_jdbc_url) \
 .option("dbtable", str_tgt_table) \
 .option("tempdir", str_s3_path) \
 .option("aws_iam_role",str_iam_role) \
 .mode("append")\
 .save()




## Connect to Amazon Redshift using scalaspark

In [None]:
%%scalaspark

//Declare the variables and replace the variables values as appropiate

//jdbc:redshift:iam://examplecluster...redshift.amazonaws.com:5439/DB
val str_jdbc_url=str_jdbc_url="jdbc:redshift:iam://...redshift.amazonaws.com:5439/replace_DB_name?ApplicationName=EMRRedshiftSparkConnection"
val str_src_table=" "
val str_tgt_table=" "
val str_s3_path=" "
val str_iam_role=" "

//Read data from source table
val jdbcDF = (spark.read.format("io.github.spark_redshift_community.spark.redshift")
 .option("url", str_jdbc_url)
 .option("dbtable", str_src_table)
 .option("tempdir", str_s3_path)
 .option("aws_iam_role", str_iam_role)
 .load())

// Write data to target table

jdbcDF.limit(5).show()


jdbcDF.write.mode("append").
 format("io.github.spark_redshift_community.spark.redshift").option("url", str_jdbc_url).option("dbtable", str_tgt_table).option("aws_iam_role", str_iam_role).option("tempdir", str_s3_path).save()


## Connect to Amazon Redshift using SparkR

In [None]:
%%rspark
#Declare the variables and replace the variables values as appropiate

#jdbc:redshift:iam://examplecluster...redshift.amazonaws.com:5439/DB
str_jdbc_url="jdbc:redshift:iam://...redshift.amazonaws.com:5439/replace_DB_name?ApplicationName=EMRRedshiftSparkConnection"
str_src_table=" "
str_tgt_table=" "
str_s3_path=" "
str_iam_role=" "

# Read data from source table

df <- read.df(
 NULL,
 "io.github.spark_redshift_community.spark.redshift",
 aws_iam_role = str_iam_role,
 tempdir = str_s3_path,
 dbtable = str_src_table,
 url = str_jdbc_url)

showDF(df)