--- title: "RStudio and EMR Connection" output: html_notebook --- We connect to EMR Spark by clicking the **Connections** tab, **New Connection** and running the `Connect to Amazon EMR Cluster` connect code snippet. Click **Ok** to run the code snippet. This will create a Livy Spark connection called `sc`. The `default` database is loaded by default which has no tables. You will see **(No tables)** in the **Connections** window. We need to change the database to `credit_card` to view and start working with the tables. ```{r} tbl_change_db(sc, 'credit_card') ``` Click **Refresh Connection Data** in the **Connections** pane and you will see the tables: ``` > cards > transactions > users ``` Clicking on the arrow icon will display the metadata for the table while the table icon will display the first 1000 rows. We can use the `DBI` package as an interface to Spark and execute Spark SQL queries. Lets use the `dbListTables()` to view existing tables. ```{r} library(DBI) dbListTables(sc) ``` Use `dbGetQuery()` to pass a SQL query to Spark. ```{r} dbGetQuery(sc, "select * from users limit 100") dbGetQuery(sc, "select * from cards limit 100") dbGetQuery(sc, "select * from transactions limit 100") ``` We can also use the `dplyr` package to prepare data in Spark. Lets list the tables using `src_tbls()`. ```{r} src_tbls(sc) ``` Let's `count()` how many transactions are in the transactions table. But first we need to reference the table using the `tbl()` function. ```{r} users_tbl <- tbl(sc, "users") cards_tbl <- tbl(sc, "cards") transactions_tbl <- tbl(sc, "transactions") ``` Let's run a count of the number of rows for each table. ```{r} count(users_tbl) count(cards_tbl) count(transactions_tbl) ``` So we have 2,001 users, 6,147 cards, and 24,386,901 transactions. We can also view the tables in the console. ```{r} transactions_tbl ``` We can also view how dplyr verbs are translated to Spark SQL. ```{r} show_query(transactions_tbl) ``` Now let's register our tables as Spark Data Frames and pull them into the cluster-wide in memory cache for better performance. We will also filter the header that gets placed in the first row for each table. ```{r} users_tbl <- tbl(sc, 'users') %>% filter(gender != 'Gender') sdf_register(users_tbl, "users_spark") tbl_cache(sc, 'users_spark') users_sdf <- tbl(sc, 'users_spark') cards_tbl <- tbl(sc, 'cards') %>% filter(expire_date != 'Expires') sdf_register(cards_tbl, "cards_spark") tbl_cache(sc, 'cards_spark') cards_sdf <- tbl(sc, 'cards_spark') transactions_tbl <- tbl(sc, 'transactions') %>% filter(amount != 'Amount') sdf_register(transactions_tbl, "transactions_spark") tbl_cache(sc, 'transactions_spark') transactions_sdf <- tbl(sc, 'transactions_spark') ``` Let's visually explore the number of transactions by year. ```{r} transactions_by_year <- transactions_sdf %>% count(year) %>% arrange(year) %>% collect() transactions_by_year ``` ```{r} library(ggplot2) ggplot(transactions_by_year) + geom_col(aes(year, as.integer(n))) + ylab('transactions') ``` We can also summarize data in the database as follows: ```{r} transactions_sdf %>% group_by(is_fraud) %>% count() ``` The `is_fraud` column appears to have fraud coded incorrectly. Lets remove the values that are not either `yes` or `no`. ```{r} transactions_sdf <- transactions_sdf %>% filter(is_fraud == 'Yes' | is_fraud == 'No') transactions_sdf %>% group_by(is_fraud) %>% count() ``` Lets view fraud by merchant category code. ```{r} transactions_sdf %>% group_by(merchant_category_code, is_fraud) %>% count() %>% arrange(merchant_category_code) ``` Suppose we want to view fraud using card information. We just need to join the tables and then group by the attribute. ```{r} cards_sdf %>% left_join(transactions_sdf, by = c("user_id", "card_id")) %>% group_by(card_brand, card_type, is_fraud) %>% count() %>% arrange(card_brand) ``` Now let's prepare a dataset that could be used for machine learning. Let's filter the transaction data to just include Discover credit cards while only keeping a subset of columns. ```{r} discover_sdf <- cards_sdf %>% filter(card_brand == 'Discover', card_type == 'Credit') %>% left_join(transactions_sdf, by = c("user_id", "card_id")) %>% select(user_id, is_fraud, merchant_category_code, use_chip, year, month, day, time_stamp, amount) ``` We will clean the dataset using the following transformations: - Convert is_fraud to binary attribute. - Remove transaction string from use_chip and rename it to type. - Combine year, month, and day into a data object. - Remove $ from amount and convert to a numeric data type. ```{r} discover_sdf <- discover_sdf %>% mutate(is_fraud = ifelse(is_fraud == 'Yes', 1, 0), type = regexp_replace(use_chip, ' Transaction', ''), type = as.character(type), type = tolower(type), date = paste(year, month, day, sep = '-'), date = as.Date(date), amount = regexp_replace(amount, '[$]', ''), amount = as.numeric(amount)) %>% select(-use_chip, -year, -month, -day) ``` ```{r} sdf_register(discover_sdf, 'discover') ``` ```{r} spark_disconnect(sc) ```