{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Gain customer insights, Part 1. Connect to Amazon Aurora MySQL database, data loading and extraction" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "----\n", "## Table of contents\n", "\n", "### Section 1. Setup \n", "1. [Prepare the Amazon Aurora MySQL Database](#Prepare-the-Amazon-Aurora-MySQL-Database)\n", "2. [Download the Customer Churn Data](#Download-the-Customer-Churn-Data)\n", "3. [Create Database, Table, Load Data in Amazon Aurora MySQL](#Create-Database,-Table,-Load-Data-in-Amazon-Aurora-MySQL)\n", "4. [Load Customer Messages to Database](#Load-Customer-Messages-to-Database)\n", "\n", "### Section 2. Export data from Amazon Aurora to S3\n", "1. [Export data from Amazon Aurora to S3](#Section-2.-Export-data-from-Amazon-Aurora-to-S3-for-use-in-Machine-Learning)\n", "\n", "----\n", "\n", "Begin by upgrading pip. To connect to the database we will use [mysql.connector](https://dev.mysql.com/doc/connector-python/en/) module. MySQL Connector/Python enables Python programs to access MySQL databases. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "\n", "# upgrade pip\n", "!{sys.executable} -m pip install --upgrade pip \n", "# install mysql.connector\n", "!{sys.executable} -m pip install mysql.connector" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For this use case, we've created the S3 bucket and appropriate IAM roles for you during the launch of the AWS CloudFormation template. The bucket name was saved in a parameter file called \"cloudformation_values.py\" during creation of the notebook instance, along with the DB secret name and ML endpoint name." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# import installed module\n", "import mysql.connector as mysql\n", "import json\n", "import os\n", "import pandas as pd\n", "import numpy as np\n", "import boto3\n", "\n", "# to write data stream to S3\n", "from io import StringIO \n", "\n", "# import variables with values about the secret, region, s3 bucket, sagemaker endpoint\n", "# this file is generated during the creation of the SageMaker notebook instance\n", "import cloudformation_values as cfvalues" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we set up some parameters we'll use in the rest of the notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3 = boto3.resource('s3')\n", "# get the session information\n", "session = boto3.Session()\n", "# get the region\n", "region = cfvalues.REGION\n", "\n", "# S3 bucket was created during the launch of the CloudFormation stack\n", "bucket_name = cfvalues.S3BUCKET\n", "prefix = 'sagemaker/xgboost-churn'\n", "source_data = 'source_churn_data.csv'\n", "source_data_file_name = prefix + '/' + source_data\n", "ml_data = 'aurora/churn_data'\n", "\n", "# AWS Secrets stores our database credentials. \n", "db_secret_name = cfvalues.DBSECRET" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Prepare the Amazon Aurora MySQL Database" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll create some customer data in our Amazon Aurora database, for use during the rest of our scenario. \n", "\n", "To do so, we'll take some publicly available \"customer data\", and load it into our database. We'll get the data from the Internet, write it out to S3, then load it into Aurora from S3. That will get us to the starting point of our scenario.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we are using administrative credentials for connecting to the database. The credentials were created during the database creation, and are stored in AWS Secrets Manager. We'll retrieve the secret, extract the credentials and the database endpoint name, and use them to connect to the database. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Get the secret from AWS Secrets manager. Extract user, password, host.\n", "from utilities import get_secret\n", "get_secret_value_response = get_secret(db_secret_name, region)\n", "creds = json.loads(get_secret_value_response['SecretString'])\n", "db_user = creds['username']\n", "db_password = creds['password']\n", "# Writer endpoint\n", "db_host = creds['host']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create connection to the database\n", "cnx = mysql.connect(user = db_user, \n", " password = db_password,\n", " host = db_host)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create cursor (allows traversal over the rows in the result set)\n", "dbcursor = cnx.cursor()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Demonstrate the connection and functionality by showing the existing databases:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# send a query to show all existing databases and loop over the results to print:\n", "dbcursor.execute(\"SHOW DATABASES\")\n", "for x in dbcursor:\n", " print(x)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To disconnect from the database:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cnx.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Download the Customer Churn Data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The dataset we use is publicly available and is mentioned in the book [Discovering Knowledge in Data](https://www.amazon.com/dp/0470908742/) by Daniel T. Larose. It is attributed by the author to the University of California Irvine Repository of Machine Learning Datasets. The content of each column in the data is described in another notebook [here](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/introduction_to_applying_machine_learning/xgboost_customer_churn/xgboost_customer_churn.ipynb).\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not os.path.exists(\"DKD2e_data_sets.zip\"):\n", " !wget http://dataminingconsultant.com/DKD2e_data_sets.zip\n", " !unzip -o DKD2e_data_sets.zip\n", "else:\n", " print(\"File has been already downloaded\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# read the customer churn data to pandas DataFrame\n", "churn = pd.read_csv('./Data sets/churn.txt')\n", "# review the top rows\n", "churn.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# get number of rows and columns in the data\n", "churn.shape" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see that the column names in this source data set have mixed case, spaces and special characters - all items that can easily cause grief in databases and when transferring data between formats and systems. To avoid these challenges, we'll simplify the column names before loading the data to Amazon Aurora." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "new_columns = [\"state\", \n", " \"acc_length\", \n", " \"area_code\", \n", " \"phone\",\n", " \"int_plan\",\n", " \"vmail_plan\",\n", " \"vmail_msg\",\n", " \"day_mins\", \n", " \"day_calls\",\n", " \"day_charge\",\n", " \"eve_mins\",\n", " \"eve_calls\",\n", " \"eve_charge\",\n", " \"night_mins\",\n", " \"night_calls\",\n", " \"night_charge\", \n", " \"int_mins\",\n", " \"int_calls\",\n", " \"int_charge\",\n", " \"cust_service_calls\",\n", " \"churn\"]\n", "# create a dictionary where keys are the old column names and the values are the new column names\n", "renaming_dict = dict(list(zip(list(churn.columns), new_columns)))\n", "# rename the columns\n", "churn = churn.rename(columns = renaming_dict)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "churn.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The resulting data frame looks much better!\n", "\n", "Now we'll write our sample data out to S3. We'll then bulk load the data from S3 directly into Amazon Aurora." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "csv_buffer = StringIO()\n", "churn.to_csv(csv_buffer, index = False)\n", "s3.Object(bucket_name, source_data_file_name).put(Body = csv_buffer.getvalue())\n", "print('s3://' + bucket_name + '/' + source_data_file_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Database, Table, Load Data in Amazon Aurora MySQL\n", "\n", "Now, we want to create the target database and table in Amazon Aurora, so we can load the data. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "database_name = \"telecom_customer_churn\"\n", "churn_table = \"customers\"\n", "customer_msgs_table = \"customer_message\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Connect to the database server and create a cursor object to traverse over the fetched results." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cnx = mysql.connect(user = db_user, \n", " password = db_password,\n", " host = db_host)\n", "\n", "dbcursor = cnx.cursor(buffered = True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a database:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# send a query to create a database\n", "dbcursor.execute(\"CREATE DATABASE IF NOT EXISTS {}\".format(database_name))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# send a query to show all existing databases and fetch all results:\n", "dbcursor.execute(\"SHOW DATABASES\")\n", "databases = dbcursor.fetchall()\n", "print(databases)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# switch to the database 'telecom_customer_churn'\n", "dbcursor.execute(\"USE {}\".format(database_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we will create a table to hold customer churn data. The column definition was taken from [this blog](https://aws.amazon.com/blogs/aws/new-for-amazon-aurora-use-machine-learning-directly-from-your-databases/)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# here we delete the table 'customers' if it already exists\n", "dbcursor.execute(\"DROP TABLE IF EXISTS {}\".format(churn_table))\n", "# then, we define a new table:\n", "dbcursor.execute(\"\"\"CREATE TABLE {}\n", " (state VARCHAR(2048), \n", " acc_length BIGINT(20),\n", " area_code BIGINT(20),\n", " phone VARCHAR(2048),\n", " int_plan VARCHAR(2048),\n", " vmail_plan VARCHAR(2048),\n", " vmail_msg BIGINT(20),\n", " day_mins DOUBLE, \n", " day_calls BIGINT(20),\n", " day_charge DOUBLE,\n", " eve_mins DOUBLE,\n", " eve_calls BIGINT(20),\n", " eve_charge DOUBLE,\n", " night_mins DOUBLE,\n", " night_calls BIGINT(20),\n", " night_charge DOUBLE, \n", " int_mins DOUBLE,\n", " int_calls BIGINT(20),\n", " int_charge DOUBLE,\n", " cust_service_calls BIGINT(20),\n", " churn VARCHAR(2048))\"\"\".format(churn_table))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# send a query to show all existing tables\n", "dbcursor.execute(\"SHOW TABLES\")\n", "# fetch all results\n", "tables = dbcursor.fetchall()\n", "\n", "# print names of the tables in the database 'telecom_customer_churn'\n", "for table in tables:\n", " print(table)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we will print the list of columns that will be updated when inserting the data from the data frame." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# send a query to retrieve the column names from the table 'customers' and fetch the results\n", "dbcursor.execute(\"SHOW COLUMNS FROM {}\".format(churn_table))\n", "columns = dbcursor.fetchall()\n", "cols = \"','\".join([x[0] for x in columns])\n", "\n", "# print the column names as a comma-separate string created in a previous statement.\n", "print(\"'\" + cols + \"'\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Everything looks good so far! Now we're ready to [bulk load our data into Amazon Aurora from S3](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/AuroraMySQL.Integrating.LoadFromS3.html)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(source_data_file_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# send a query to load the data into the table 'customers' from the S3 bucket.\n", "dbcursor.execute(\"\"\"LOAD DATA FROM S3 's3://{bucket}/{filename}' INTO TABLE {tablename} \n", " FIELDS TERMINATED BY ','\n", " LINES TERMINATED BY '\\n' IGNORE 1 LINES\"\"\".format(tablename = database_name + '.' + churn_table, \n", " bucket = bucket_name, \n", " filename = source_data_file_name))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# commit the above transaction for all users\n", "cnx.commit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's check for load errors, and check whether the resulting data looks correct. The output provides us with the name of the source s3 bucket from which the data were loaded, file name and when it was loaded." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# run a query to check the history of all data loads from S3 to the database\n", "dbcursor.execute(\"SELECT * from mysql.aurora_s3_load_history WHERE load_prefix = 's3://{bucket}/{filename}'\".format(\n", " tablename = churn_table,\n", " bucket = bucket_name,\n", " filename = source_data_file_name))\n", "all_loads = dbcursor.fetchall()\n", "for load in all_loads:\n", " print(load)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# run a query to preview the first 5 rows from the table:\n", "dbcursor.execute(\"SELECT * FROM `{}` LIMIT 5\".format(churn_table))\n", "result = dbcursor.fetchall()\n", "for i in result:\n", " print(i)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see that the customer data is now in Aurora. \n", "\n", "## Load Customer Messages to Database\n", "\n", "Now we'll create a second table, one with some messages from customer service calls. We'll use this table later, to test Amazon Comprehend integration with our database.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# here we are reusing the same cursor object created above and removing the table 'customer_message'\n", "# if it already exists.\n", "dbcursor.execute(\"DROP TABLE IF EXISTS `{}`\".format(customer_msgs_table))\n", "# next, we define a table with four colums: area code, phone number, text of a message from a customer and\n", "# the time they called.\n", "sql = \"\"\"CREATE TABLE IF NOT EXISTS `{}` (\n", " area_code BIGINT(20) NOT NULL,\n", " phone VARCHAR(2048) NOT NULL,\n", " message VARCHAR(255) NOT NULL,\n", " calltime TIMESTAMP NOT NULL\n", ");\"\"\".format(customer_msgs_table)\n", "dbcursor.execute(sql)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# verify that the table was successfully created by showing all existing tables \n", "# in the database 'telecom_customer_churn'\n", "dbcursor.execute(\"SHOW TABLES\")\n", "tables = dbcursor.fetchall()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for table in tables:\n", " print(table)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# here we request to see the format of the columns in the table 'customer_message'\n", "dbcursor.execute(\"DESCRIBE `{}`;\".format(customer_msgs_table))\n", "dbcursor.fetchall()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# the following SQL statement loads 6 rows in the table 'customer_message' with the area code,\n", "# phone number, generated messages and the date/time of the call.\n", "sql_inserts =[\"\"\"\n", " INSERT INTO customer_message(area_code, phone, message, calltime)\n", " VALUES (415, \"329-6603\", \"Thank you very much for resolving the issues with my bill!\", '2020-01-01 10:10:10');\"\"\",\n", " \"\"\"INSERT INTO customer_message(area_code, phone, message, calltime)\n", " VALUES (415, \"351-7269\", \"I don't understand how I paid for 100 minutes and got only 90, you are ripping me off!\",'2020-01-01 10:10:10');\"\"\",\n", " \"\"\"INSERT INTO customer_message(area_code, phone, message, calltime)\n", " VALUES (408, \"360-1596\", \"Please fix this issue! I am sick of sitting on a phone every single day with you people!\",'2020-01-01 10:10:10');\"\"\",\n", " \"\"\"INSERT INTO customer_message(area_code, phone, message, calltime)\n", " VALUES (415, \"382-4657\", \"This is a really great feature, thank for helping me store all my phone numbers.\", '2020-01-01 10:10:10');\"\"\",\n", " \"\"\"INSERT INTO customer_message(area_code, phone, message, calltime)\n", " VALUES (415, \"371-7191\", \"Why am I paying so much for my international minutes?\", '2020-01-01 10:10:10');\"\"\",\n", " \"\"\"INSERT INTO customer_message(area_code, phone, message, calltime)\n", " VALUES (415, \"358-1921\", \"Why do I have to wait for the response from the customer service for so long? I don't have time for this.\", '2020-01-01 10:10:10');\"\"\"\n", " ]\n", "\n", "try:\n", " for i in range(len(sql_inserts)):\n", " dbcursor.execute(sql_inserts[i])\n", " # NB : you won't get an IntegrityError when reading\n", "except (MySQLdb.Error, MySQLdb.Warning) as e:\n", " print(e)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cnx.commit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Lastly, let's join the tables and read them to pandas DataFrame to check that we can see customer and complaint data as expected." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sql = \"\"\"SELECT cu.state, cu.area_code, cu.phone, cu.int_plan, cu.vmail_plan, cu.churn, \n", " calls.message \n", " FROM {} cu, {} calls\n", " WHERE cu.area_code = calls.area_code AND cu.phone = calls.phone\n", " AND message is not null\"\"\".format(churn_table, customer_msgs_table)\n", "\n", "df = pd.read_sql(sql, con = cnx)\n", "df.head(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# close connection to the database\n", "cnx.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Our Amazon Aurora database now contains our \"production\" data. Now we're finally at the starting point of our scenario!\n", "\n", "## Section 2. Export data from Amazon Aurora to S3 for use in Machine Learning\n", "\n", "The DBA has just received the request: \"Please export the customer data to S3, so the data scientist can explore the reason for data churn. Thanks!\" \n", "\n", "Luckily, there's a new Amazon Aurora feature that makes it easy: [Saving Data from an Amazon Aurora MySQL DB Cluster into Text Files in an Amazon S3 Bucket](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/AuroraMySQL.Integrating.SaveIntoS3.html). We'll use this feature to export our customer data to S3.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create connection to the database\n", "cnx = mysql.connect(user = db_user, \n", " password = db_password,\n", " host = db_host)\n", "\n", "# create cursor\n", "dbcursor = cnx.cursor(buffered = True)\n", "dbcursor.execute(\"USE {}\".format(database_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also split the data into test, training, validation and upload to s3 separately directly from our database. But for now, we'll let the data scientists deal with that!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "One of the requirements for performing queries against the Amazon SageMaker endpoint (which will be created shortly) is SQL privileges to invoke Amazon SageMaker and to execute functions, which is described in the documentation [here](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/mysql-ml.html#aurora-ml-sql-privileges), section \"Granting SQL Privileges for Invoking Aurora Machine Learning Services\". \n", "\n", "Let's check that we have the right privileges. We should see 'SELECT INTO S3' listed. We also need to have the [right privileges](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/AuroraMySQL.Integrating.SaveIntoS3.html#AuroraMySQL.Integrating.SaveIntoS3.Grant). Since we are using admin as a user to invoke queries, this step isn't needed. However, in normal circumstances, a SQL user should have these privileges granted." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# this statement displays the privileges and roles that are assigned to a MySQL user account or role\n", "dbcursor.execute(\"SHOW GRANTS\")\n", "dbcursor.fetchall()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There are several ways we can unload the data. We could choose to unload without using headers; this would be appropriate if we're unloading a large amount of data and are using a metadata catalog (such as [AWS Glue](https://aws.amazon.com/glue/)) to store the column information. \n", "\n", "Here, as it's a small amount of data, to simplify the use case and to avoid introducing errors, we choose to unload the data in CSV format and add a header for use by the ML engineer in Part 2. Otherwise, we could also provide the ML engineer with the column list." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dbcursor.execute(\"\"\"SELECT * FROM `{tablename}` INTO OUTFILE S3 's3://{bucket}/{prefix}/{mldata}' \n", " FORMAT CSV HEADER\"\"\".format(tablename = churn_table,\n", " bucket = bucket_name,\n", " prefix = prefix,\n", " mldata = ml_data))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dbcursor.execute(\"SHOW COLUMNS FROM {}\".format(churn_table))\n", "columns = dbcursor.fetchall()\n", "cols = \"','\".join([x[0] for x in columns])\n", "\n", "print(\"'\" + cols + \"'\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we have the column names in case we need to pass this information to the ML engineer so he knows what the columns are in our unloaded data.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cnx.close()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The DBA is done for now! The customer data has been unloaded into S3, and is ready for model training.\n", "\n", "Move on to the next notebook, \"Part 2\", which covers training the machine learning model on this data." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.1" } }, "nbformat": 4, "nbformat_minor": 4 }