{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Database Ingestion\n", "\n", "In this workshop we will create a SQL Server database, crawl the metadata with a Glue Crawler, and tarnsform the data into Parquet landing in our data lake for use with multiple analytics engines. The diagram below shows the workflow we will build in this notebook. This is a typical scenario where a customer wants to grab data from a relational database and make it available to their analytics groups.\n", "\n", "**If connecting to an on-premises database please review this [blog](https://aws.amazon.com/blogs/big-data/how-to-access-and-analyze-on-premises-data-stores-using-aws-glue/) for requirements.** \n", "\n", "![SQL DB Crawl](../../docs/assets/images/sql-db-crawl.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import botocore\n", "import json\n", "import time\n", "import os\n", "import getpass\n", "\n", "import project_path # path to helper methods\n", "from lib import workshop\n", "from pandas import read_sql\n", "\n", "glue = boto3.client('glue')\n", "s3 = boto3.resource('s3')\n", "s3_client = boto3.client('s3')\n", "cfn = boto3.client('cloudformation')\n", "redshift_client = boto3.client('redshift')\n", "ec2_client = boto3.client('ec2')\n", "rds = boto3.client('rds')\n", "\n", "session = boto3.session.Session()\n", "region = session.region_name\n", "account_id = boto3.client('sts').get_caller_identity().get('Account')\n", "\n", "workshop_user = 'db'\n", "database_name = 'sales' # AWS Glue Data Catalog Database Name\n", "environment_name = 'workshop{0}'.format(workshop_user)\n", "\n", "use_existing = True" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create S3 Bucket](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html)\n", "\n", "We will create an S3 bucket that will be used throughout the workshop for storing our data.\n", "\n", "[s3.create_bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket) boto3 documentation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bucket = workshop.create_bucket(region, session, 'db-')\n", "print(bucket)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create VPC](https://docs.aws.amazon.com/vpc/index.html) \n", "\n", "We need a VPC for some of the resources in this workshop. You have the option to create a brand new VPC or use the VPC flaged as the default." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if use_existing:\n", " vpc_filter = [{'Name':'isDefault', 'Values':['true']}]\n", " default_vpc = ec2_client.describe_vpcs(Filters=vpc_filter)\n", " vpc_id = default_vpc['Vpcs'][0]['VpcId']\n", "\n", " subnet_filter = [{'Name':'vpc-id', 'Values':[vpc_id]}]\n", " subnets = ec2_client.describe_subnets(Filters=subnet_filter)\n", " subnet1_id = subnets['Subnets'][0]['SubnetId']\n", " subnet2_id = subnets['Subnets'][1]['SubnetId']\n", "else: \n", " vpc, subnet1, subnet2 = workshop.create_and_configure_vpc()\n", " vpc_id = vpc.id\n", " subnet1_id = subnet1.id\n", " subnet2_id = subnet2.id" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(vpc_id)\n", "print(subnet1_id)\n", "print(subnet2_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload [CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/GettingStarted.html) template\n", "\n", "In the interest of time we will leverage CloudFormation to launch our source SQL Server Database in RDS. The steps below would apply as well for databases supported with Glue that are not running in RDS." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rds_file = 'sqlserver-rds.yaml'\n", "file_path = os.path.join('cfn', rds_file)\n", "session.resource('s3').Bucket(bucket).Object(file_path).upload_file(file_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Enter the user name used for the SQL Server Database\n", "\n", "**Must contain only alphanumeric characters.**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "admin_user = getpass.getpass()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Enter password used for the SQL Server Database\n", "\n", "**Must contain only alphanumeric characters with at least one capital letter and one number.**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "admin_password = getpass.getpass()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import re\n", "\n", "pattern = re.compile(r\"^(?=.*[a-z])(?=.*[A-Z])(?=.*\\d)[a-zA-Z\\d]{8,}$\")\n", "result = pattern.match(admin_password)\n", "if result:\n", " print('Valid')\n", "else:\n", " print('Invalid, Password must be 8 characters long alphanumeric only 1 Upper, 1 Lower')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Execute CloudFormation Stack to create SQL Server RDS instance" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cfn_template = 'https://s3.amazonaws.com/{0}/cfn/{1}'.format(bucket, rds_file)\n", "print(cfn_template)\n", "\n", "rds_stack_name = 'SQLServerRDSStack'\n", "response = cfn.create_stack(\n", " StackName=rds_stack_name,\n", " TemplateURL=cfn_template,\n", " Capabilities = [\"CAPABILITY_NAMED_IAM\"],\n", " Parameters=[\n", " {\n", " 'ParameterKey': 'SqlServerInstanceName',\n", " 'ParameterValue': environment_name\n", " },\n", " {\n", " 'ParameterKey': 'DatabaseUsername',\n", " 'ParameterValue': admin_user\n", " },\n", " {\n", " 'ParameterKey': 'DatabasePassword',\n", " 'ParameterValue': admin_password\n", " }\n", " ]\n", ")\n", "\n", "print(response)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Check status of SQL Server RDS CloudFormaton stack" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = cfn.describe_stacks(\n", " StackName=rds_stack_name\n", ")\n", "\n", "while response['Stacks'][0]['StackStatus'] != 'CREATE_COMPLETE':\n", " print('Not yet complete.')\n", " time.sleep(30)\n", " response = cfn.describe_stacks(\n", " StackName=rds_stack_name\n", " )\n", "\n", "for output in response['Stacks'][0]['Outputs']:\n", " if (output['OutputKey'] == 'SQLDatabaseEndpoint'):\n", " rds_endpoint = output['OutputValue']\n", " print('RDS Endpoint: {0}'.format(rds_endpoint))\n", " if (output['OutputKey'] == 'SQLServerSecurityGroup'):\n", " rds_sg_id = output['OutputValue']\n", " print('RDS Security Group: {0}'.format(rds_sg_id))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Sales DB\n", "\n", "We will create a sample representation of a sales database containing customers, products, and relationships between them. To do so in this notebook we will be using the python library [`pymssql`](http://pymssql.org/en/stable/). In the first step we will create the `sales` database and then execute scripts located in this folder for the table structure and data populated in each table. Data for db from [www.dofactory.com](http://www.dofactory.com/sql/sample-database)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install pymssql" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pymssql\n", "\n", "conn = pymssql.connect(rds_endpoint, admin_user, admin_password)\n", "conn.autocommit(True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cr=conn.cursor()\n", "cr.execute('create database sales;')\n", "conn.commit()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def run_sql_file(filename, connection):\n", " '''\n", " The function takes a filename and a connection as input\n", " and will run the SQL query on the given connection \n", " '''\n", " file = open(filename, 'r')\n", " sql = s = \" \".join(file.readlines())\n", " cursor = connection.cursor()\n", " cursor.execute(sql) \n", " connection.commit()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "run_sql_file('db-scripts/sample-model.sql', conn)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "run_sql_file('db-scripts/sample-data.sql', conn)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "run_sql_file('db-scripts/sample-view.sql', conn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Verify sample data loaded correctly\n", "\n", "We will use pandas to visualize the data in the `Customer` table of the `sales` database." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "df = read_sql('SELECT top 10 * FROM sales.dbo.Customer ', con=conn)\n", "df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "df = read_sql('SELECT top 10 * FROM sales.dbo.v_top_10_customers_by_order order by OrderCount desc', con=conn)\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Discover the data in your Data Lake\n", "\n", "In this next section we will be using [AWS Glue](https://aws.amazon.com/glue/) to discover, catalog, and transform your data. Glue currently only supports `Python 2.7`, hence we'll write the script in `Python 2.7`.\n", "\n", "### Permission setup for invoking AWS Glue from this Notebook\n", "In order to enable this Notebook to run AWS Glue jobs, we need to add one additional permission to the default execution role of this notebook. We will be using SageMaker Python SDK to retrieve the default execution role and then you have to go to [IAM Dashboard](https://console.aws.amazon.com/iam/home) to edit the Role to add AWS Glue specific permission. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Finding out the current execution role of the Notebook\n", "We are using SageMaker Python SDK to retrieve the current role for this Notebook which needs to be enhanced to support the functionality in AWS Glue." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import SageMaker Python SDK to get the Session and execution_role\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "sess = sagemaker.Session()\n", "role = get_execution_role()\n", "role_name = role[role.rfind('/') + 1:]\n", "print(role_name)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Adding AWS Glue as an additional trusted entity to this role\n", "This step is needed if you want to pass the execution role of this Notebook while calling Glue APIs as well without creating an additional **Role**. If you have not used AWS Glue before, then this step is mandatory. \n", "\n", "If you have used AWS Glue previously, then you should have an already existing role that can be used to invoke Glue APIs. In that case, you can pass that role while calling Glue (later in this notebook) and skip this next step." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "On the IAM dashboard, please click on **Roles** on the left sidenav and search for this Role. Once the Role appears, click on the Role to go to its **Summary** page. Click on the **Trust relationships** tab on the **Summary** page to add AWS Glue as an additional trusted entity. \n", "\n", "Click on **Edit trust relationship** and replace the JSON with this JSON.\n", "```\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": [\n", " \"sagemaker.amazonaws.com\",\n", " \"glue.amazonaws.com\"\n", " ]\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " }\n", " ]\n", "}\n", "```\n", "Once this is complete, click on **Update Trust Policy** and you are done.\n", "\n", "![IAM Roles](../../docs/assets/images/iam_roles_hl.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"https://console.aws.amazon.com/iam/home?region={0}#/roles/{1}\".format(region, role_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get RDS instance attributes\n", "\n", "We need to get the `SecurityGroup`, `AvailabilityZone`, and `Subnet` the RDS instance is running in." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = rds.describe_db_instances(DBInstanceIdentifier=environment_name)\n", "\n", "rds_sg_id = response['DBInstances'][0]['VpcSecurityGroups'][0]['VpcSecurityGroupId']\n", "print(rds_sg_id)\n", "\n", "rds_az = response['DBInstances'][0]['AvailabilityZone']\n", "print(rds_az)\n", "\n", "for sub in response['DBInstances'][0]['DBSubnetGroup']['Subnets']:\n", " if sub['SubnetAvailabilityZone']['Name'] == rds_az:\n", " rds_subnet = sub['SubnetIdentifier']\n", " print(rds_subnet)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the [AWS Glue Catalog Database](https://docs.aws.amazon.com/glue/latest/dg/define-database.html)\n", "\n", "When you define a table in the AWS Glue Data Catalog, you add it to a database. A database is used to organize tables in AWS Glue. You can organize your tables using a crawler or using the AWS Glue console. A table can be in only one database at a time.\n", "\n", "There is a central Glue Catalog for each AWS account. When creating the database you will use your account id declared above as `account_id`\n", "\n", "[glue.create_database](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_database)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "workshop.create_db(glue, account_id, database_name, 'Sales data in MS SQL Server database')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create Glue SQL Connection](https://docs.aws.amazon.com/glue/latest/dg/populate-add-connection.html)\n", "\n", "Connections are used by crawlers and jobs in AWS Glue to access certain types of data stores. We will create a connection to the `sales` MS SQL Server database we created above.\n", "\n", "[glue.create_connection](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_connection)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "db_connection_name = 'MSSQLSalesConnection{0}'.format(workshop_user)\n", "\n", "response = glue.create_connection(\n", " CatalogId=account_id,\n", " ConnectionInput={\n", " 'Name': db_connection_name,\n", " 'Description': 'MS SQL Server Sales Connection',\n", " 'ConnectionType': 'JDBC',\n", " 'MatchCriteria': [\n", " 'string',\n", " ],\n", " 'ConnectionProperties': {\n", " 'JDBC_CONNECTION_URL': 'jdbc:sqlserver://{0};databaseName={1}'.format(rds_endpoint, database_name),\n", " 'JDBC_ENFORCE_SSL': 'false',\n", " 'PASSWORD': admin_password,\n", " 'USERNAME': admin_user\n", " },\n", " 'PhysicalConnectionRequirements': {\n", " 'SubnetId': rds_subnet,\n", " 'SecurityGroupIdList': [\n", " rds_sg_id,\n", " ],\n", " 'AvailabilityZone': rds_az\n", " }\n", " }\n", ")\n", "\n", "print(response)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Use a [Glue Crawler](https://docs.aws.amazon.com/glue/latest/dg/add-crawler.html) to Discover the transformed data\n", "\n", "You can use a crawler to populate the AWS Glue Data Catalog with tables. This is the primary method used by most AWS Glue users. You add a crawler within your Data Catalog to traverse your data stores. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. Extract, transform, and load (ETL) jobs that you define in AWS Glue use these metadata tables as sources and targets.\n", "\n", "A crawler can crawl both file-based and table-based data stores. Crawlers can crawl the following data stores:\n", "\n", "* Amazon Simple Storage Service (Amazon S3)\n", " * [Built-in Classifiers](https://docs.aws.amazon.com/glue/latest/dg/add-classifier.html#classifier-built-in)\n", " * [Custom Classifiers](https://docs.aws.amazon.com/glue/latest/dg/custom-classifier.html)\n", "* Amazon Redshift\n", "* Amazon Relational Database Service (Amazon RDS)\n", " * Amazon Aurora\n", " * MariaDB\n", " * Microsoft SQL Server\n", " * MySQL\n", " * Oracle\n", " * PostgreSQL\n", "* Amazon DynamoDB\n", "* Publicly accessible databases [Blog](https://aws.amazon.com/blogs/big-data/how-to-access-and-analyze-on-premises-data-stores-using-aws-glue/)\n", " * Aurora\n", " * MariaDB\n", " * SQL Server\n", " * MySQL\n", " * Oracle\n", " * PostgreSQL\n", "\n", "[glue.create_crawler](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_crawler)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "crawler_name = 'MSSQL-Sales-Crawler'\n", "\n", "response = glue.create_crawler(\n", " Name=crawler_name,\n", " Role=role,\n", " DatabaseName=database_name,\n", " Description='Crawler for SQL Server Sales Database',\n", " Targets={\n", " 'JdbcTargets': [\n", " {\n", " 'ConnectionName': db_connection_name,\n", " 'Path': database_name\n", " },\n", " ]\n", " },\n", " TablePrefix='R_',\n", " SchemaChangePolicy={\n", " 'UpdateBehavior': 'UPDATE_IN_DATABASE',\n", " 'DeleteBehavior': 'DEPRECATE_IN_DATABASE'\n", " }\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start the Glue Crawler\n", "\n", "You can use a crawler to populate the AWS Glue Data Catalog with tables. This is the primary method used by most AWS Glue users. You add a crawler within your Data Catalog to traverse your data stores. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. Extract, transform, and load (ETL) jobs that you define in AWS Glue use these metadata tables as sources and targets.\n", "\n", "[glue.start_crawler](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.start_crawler)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.start_crawler(\n", " Name=crawler_name\n", ")\n", "\n", "print (\"Crawler: https://{0}.console.aws.amazon.com/glue/home?region={0}#crawler:name={1}\".format(region, crawler_name))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Checking Glue crawler status\n", "\n", "We will now monitor the crawler status waiting for it to get back into the `READY` state meaning the crawler completed it's crawl. You can also look at the [CloudWatch logs](https://docs.aws.amazon.com/glue/latest/dg/console-crawlers.html#console-crawlers-details) for the crawler for more details." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "crawler_status = glue.get_crawler(Name=crawler_name)['Crawler']['State']\n", "\n", "while crawler_status not in ('READY'):\n", " crawler_status = glue.get_crawler(Name=crawler_name)['Crawler']['State']\n", " print(crawler_status)\n", " time.sleep(30)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View Crawler Results\n", "\n", "Now that we have crawled the raw data available, we want to look at the results of the crawl to see the tables that were created. You will click on the link `Tables in sales` to view the tables the crawler found. We prefixed the tables with an `R_` to signify they are raw tables from the source database. It will look like the image below:\n", "\n", "![Sales Tables](../../docs/assets/images/sales-tables.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('https://{0}.console.aws.amazon.com/glue/home?region={0}#database:name={1}'.format(region, database_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Parquet version and denormalize customer orders\n", "\n", "From [Wikipedia](https://en.wikipedia.org/wiki/Apache_Parquet), \"Apache Parquet is a free and open-source column-oriented data storage format of the Apache Hadoop ecosystem. It is similar to the other columnar-storage file formats available in Hadoop namely RCFile and ORC. It is compatible with most of the data processing frameworks in the Hadoop environment. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.\" \n", "\n", "The key points in this code is how easy it is to get access to the AWS Glue Data Catalog leveraging the [Glue libraries](https://github.com/awslabs/aws-glue-libs). Some of the key concepts are below:\n", "\n", "* [`glueContext.create_dynamic_frame.from_catalog`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_catalog) - Read table metadata from the Glue Data Catalog using Glue libs to load tables into the pyspark job.\n", "* Writing back S3 [`glueContext.write_dynamic_frame.from_options`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_catalog) with options:\n", " * Convert data to different format `format=\"parquet\"`. This format is [columnar](https://docs.aws.amazon.com/athena/latest/ug/columnar-storage.html) and provides [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression by default.\n", " * Partition the data. The script is using `\"partitionKeys\": [\"customerid\"]` to partition the data under the S3 prefix to look like the following `s3://datalake..../sales_parquet/customer=[n]`.\n", "\n", "**This script is sacrificing best practices in parquet size to demonstrate features of the Glue libraries.**\n", "\n", "You can find more best practices for Glue and Athena [here](https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile sales_parquet_etl.py\n", "import sys\n", "import os\n", "from awsglue.transforms import *\n", "from awsglue.utils import getResolvedOptions\n", "from pyspark.context import SparkContext\n", "from awsglue.context import GlueContext\n", "from awsglue.job import Job\n", "\n", "## @params: [JOB_NAME]\n", "args = getResolvedOptions(sys.argv, ['JOB_NAME', 'S3_OUTPUT_BUCKET', 'S3_OUTPUT_KEY_PREFIX', 'DATABASE_NAME', 'REGION'])\n", "\n", "sc = SparkContext()\n", "glueContext = GlueContext(sc)\n", "spark = glueContext.spark_session\n", "job = Job(glueContext)\n", "job.init(args['JOB_NAME'], args)\n", "\n", "datasource0 = glueContext.create_dynamic_frame.from_catalog(database=args['DATABASE_NAME'], table_name='r_sales_dbo_customer', transformation_ctx = \"datasource0\")\n", "\n", "datasource1 = glueContext.create_dynamic_frame.from_catalog(database=args['DATABASE_NAME'], table_name='r_sales_dbo_order', transformation_ctx = \"datasource1\")\n", "\n", "datasource2 = RenameField.apply(datasource1, \"id\", \"orderid\")\n", "\n", "datasource3 = datasource2.join( [\"customer_id\"],[\"id\"], datasource0, transformation_ctx = \"join\")\n", "\n", "applymapping1 = ApplyMapping.apply(frame = datasource3, mappings = [(\"firstname\", \"string\", \"firstname\", \"string\"), \\\n", " (\"phone\", \"string\", \"phone\", \"string\"), (\"country\", \"string\", \"country\", \"string\"), \\\n", " (\"id\", \"int\", \"id\", \"int\"), (\"lastname\", \"string\", \"lastname\", \"string\"), \\\n", " (\"city\", \"string\", \"city\", \"string\"), (\"ordernumber\", \"string\", \"ordernumber\", \"string\"), \\\n", " (\"orderid\", \"int\", \"orderid\", \"int\"), (\"customerid\", \"int\", \"customerid\", \"int\"), \\\n", " (\"totalamount\", \"decimal(12,2)\", \"totalamount\", \"decimal(12,2)\"), (\"orderdate\", \"timestamp\", \"orderdate\", \"timestamp\")], transformation_ctx = \"applymapping1\")\n", "\n", "selectfields1 = SelectFields.apply(frame = applymapping1, paths = [\"ordernumber\", \"city\", \"firstname\", \"lastname\", \"totalamount\", \"phone\", \"customerid\", \"orderdate\", \"country\"], transformation_ctx = \"selectfields1\")\n", "\n", "parquet_output_path = 's3://' + os.path.join(args['S3_OUTPUT_BUCKET'], args['S3_OUTPUT_KEY_PREFIX'])\n", "\n", "datasink3 = glueContext.write_dynamic_frame.from_options(frame = selectfields1, connection_type = \"s3\", connection_options = {\"path\": parquet_output_path, \"partitionKeys\": [\"customerid\"]}, format = \"parquet\", transformation_ctx = \"datasink4\")\n", "job.commit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload the ETL script to S3\n", "We will be uploading the `sales_parquet_etl` script to S3 so Glue can use it to run the PySpark job. You can replace it with your own script if needed. If your code has multiple files, you need to zip those files and upload to S3 instead of uploading a single file like it's being done here." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "script_location = sess.upload_data(path='sales_parquet_etl.py', bucket=bucket, key_prefix='codes')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Output location of the data.\n", "s3_output_key_prefix = 'datalake/sales_parquet/'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Authoring jobs with AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/author-job.html)\n", "\n", "Next we'll be creating Glue client via Boto so that we can invoke the `create_job` API of Glue. `create_job` API will create a job definition which can be used to execute your jobs in Glue. The job definition created here is mutable. While creating the job, we are also passing the code location as well as the dependencies location to Glue.\n", "\n", "`AllocatedCapacity` parameter controls the hardware resources that Glue will use to execute this job. It is measures in units of `DPU`. For more information on `DPU`, please see [here](https://docs.aws.amazon.com/glue/latest/dg/add-job.html).\n", "\n", "[glue.create_job](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_job)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from time import gmtime, strftime\n", "import time\n", "\n", "timestamp_prefix = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "\n", "job_name = 'sales-parquet-' + timestamp_prefix\n", "response = glue.create_job(\n", " Name=job_name,\n", " Description='PySpark job to convert sales SQL Server tables data to parquet',\n", " Role=role, # you can pass your existing AWS Glue role here if you have used Glue before\n", " ExecutionProperty={\n", " 'MaxConcurrentRuns': 1\n", " },\n", " Command={\n", " 'Name': 'glueetl',\n", " 'ScriptLocation': script_location\n", " },\n", " DefaultArguments={\n", " '--job-language': 'python',\n", " '--job-bookmark-option': 'job-bookmark-disable'\n", " },\n", " AllocatedCapacity=5,\n", " Timeout=60,\n", ")\n", "glue_job_name = response['Name']\n", "print(glue_job_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The aforementioned job will be executed now by calling `start_job_run` API. This API creates an immutable run/execution corresponding to the job definition created above. We will require the `job_run_id` for the particular job execution to check for status. We'll pass the data and model locations as part of the job execution parameters.\n", "\n", "[glue.start_job_run](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.start_job_run)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job_run_id = glue.start_job_run(JobName=job_name,\n", " Arguments = {\n", " '--S3_OUTPUT_BUCKET': bucket,\n", " '--S3_OUTPUT_KEY_PREFIX': s3_output_key_prefix,\n", " '--DATABASE_NAME': database_name,\n", " '--REGION': region\n", " })['JobRunId']\n", "print(job_run_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Checking Glue Job Status\n", "\n", "Now we will check for the job status to see if it has `SUCCEEDED`, `FAILED` or `STOPPED`. Once the job is succeeded, we have the transformed data into S3 in Parquet format which we will use to query with Athena and visualize with QuickSight. If the job fails, you can go to AWS Glue console, click on **Jobs** tab on the left, and from the page, click on this particular job and you will be able to find the CloudWatch logs (the link under **Logs**) link for these jobs which can help you to see what exactly went wrong in the job execution.\n", "\n", "[glue.get_job_run](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_job_run)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job_run_status = glue.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']\n", "while job_run_status not in ('FAILED', 'SUCCEEDED', 'STOPPED'):\n", " job_run_status = glue.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']\n", " print (job_run_status)\n", " time.sleep(60)\n", "print(job_run_status)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Crawler to populate Parquet formated table in Glue Data Catalog\n", "\n", "We will create another crawler for the curated dataset we created converting the CSV files into Parquet formatted data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "parq_crawler_name = 'Sales-Curated-Crawler'\n", "parq_crawler_path = 's3://{0}/{1}'.format(bucket, s3_output_key_prefix)\n", "\n", "response = glue.create_crawler(\n", " Name=parq_crawler_name,\n", " Role=role,\n", " DatabaseName=database_name,\n", " Description='Crawler for the Parquet transformed sales data',\n", " Targets={\n", " 'S3Targets': [\n", " {\n", " 'Path': parq_crawler_path\n", " }\n", " ]\n", " },\n", " SchemaChangePolicy={\n", " 'UpdateBehavior': 'UPDATE_IN_DATABASE',\n", " 'DeleteBehavior': 'DEPRECATE_IN_DATABASE'\n", " }\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start Crawler \n", "\n", "Much like we did with the raw data crcawler we will start the curated crawler pointing to the new data set created from the Glue job." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.start_crawler(\n", " Name=parq_crawler_name\n", ")\n", "\n", "print (\"Crawler: https://{0}.console.aws.amazon.com/glue/home?region={0}#crawler:name={1}\".format(region, parq_crawler_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Monitor the status of the Parquet crawler" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "crawler_status = glue.get_crawler(Name=parq_crawler_name)['Crawler']['State']\n", "\n", "while crawler_status not in ('READY'):\n", " crawler_status = glue.get_crawler(Name=parq_crawler_name)['Crawler']['State']\n", " print(crawler_status)\n", " time.sleep(30)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('https://{0}.console.aws.amazon.com/glue/home?region={0}#database:name={1}'.format(region, database_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Query the Data Lake with Athena](https://aws.amazon.com/athena/)\n", "\n", "For the self-serve end users that need the ability to create ad-hoc queries against the data Athena is a great choice the utilizes Presto and ANSI SQL to query a number of file formats on S3.\n", "\n", "To query the tables created by the crawler we will be installing a python library for querying the data in the Glue Data Catalog with Athena. For more information jump to [PyAthena](https://pypi.org/project/PyAthena/). You can also use the AWS console by browsing to the Athena service and run queries through the browser. Alternatively, you can also use the [JDBC/ODBC](https://docs.aws.amazon.com/athena/latest/ug/athena-bi-tools-jdbc-odbc.html) drivers available." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install PyAthena" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Simple Select Query\n", "\n", "In this first query we will create a simple query to show the ability of Athena to query the raw CSV data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "from pyathena import connect\n", "from pyathena.util import as_pandas\n", "\n", "cursor = connect(region_name=region, s3_staging_dir='s3://'+bucket+'/athena/temp').cursor()\n", "cursor.execute('select customerid, count(ordernumber) as order_cnt from ' + database_name + '.sales_parquet group by customerid')\n", "\n", "df = as_pandas(cursor)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Congratulations, you have completed loading data into your data lake from a SQL Server database. This completes the workshop you can now run the cleanup scripts to remove all resources created in this account." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean Up" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = cfn.delete_stack(StackName=rds_stack_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.delete_crawler(Name=crawler_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.delete_crawler(Name=parq_crawler_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.delete_job(JobName=glue_job_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.delete_database(\n", " CatalogId = account_id,\n", " Name = database_name\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.delete_connection(\n", " CatalogId=account_id,\n", " ConnectionName=db_connection_name\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "waiter = cfn.get_waiter('stack_delete_complete')\n", "waiter.wait(\n", " StackName=rds_stack_name\n", ")\n", "\n", "print('The wait is over for {0}'.format(rds_stack_name))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 rb s3://$bucket --force " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not use_existing:\n", " workshop.vpc_cleanup(vpc_id)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }