{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Keep Calm and Parquet\n", "\n", "In this workshop we will be leveraging a number of analytics tools to show the diversity of the AWS platform. We will walk through querying unoptimized csv files and converting them to Parquet to improve performance. We also want to show how you can access data in your data lake with Redshift, Athena, and EMR giving you freedom of choice to choose the right tool for the job keeping a single source of truth of your data in S3.\n", "\n", "![Modern Data Lake](../../docs/assets/images/modern-datalake.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import botocore\n", "import json\n", "import time\n", "import os\n", "import getpass\n", "\n", "import project_path # path to helper methods\n", "from lib import workshop\n", "from pandas import read_sql\n", "\n", "glue = boto3.client('glue')\n", "s3 = boto3.resource('s3')\n", "s3_client = boto3.client('s3')\n", "cfn = boto3.client('cloudformation')\n", "redshift_client = boto3.client('redshift')\n", "ec2_client = boto3.client('ec2')\n", "\n", "session = boto3.session.Session()\n", "region = session.region_name\n", "account_id = boto3.client('sts').get_caller_identity().get('Account')\n", "\n", "database_name = 'taxi' # AWS Glue Data Catalog Database Name\n", "redshift_database_name = 'taxidb'\n", "environment_name = 'taxi-workshop'\n", "table_name = 'yellow'\n", "redshift_node_type = 'ds2.xlarge'\n", "redshift_port=5439\n", "\n", "use_existing = True" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create S3 Bucket](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html)\n", "\n", "We will create an S3 bucket that will be used throughout the workshop for storing our data.\n", "\n", "[s3.create_bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket) boto3 documentation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bucket = workshop.create_bucket(region, session, 'taxi-')\n", "print(bucket)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Copy Sample Data to S3 bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-example-download-file.html) \n", "\n", "We will download some files from New York City Taxi and Limousine Commission (TLC) Trip Record Data dataset available on the [AWS Open Data Registry](https://registry.opendata.aws/nyc-tlc-trip-records-pds/).\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 cp s3://nyc-tlc/trip\\ data/yellow_tripdata_2017-01.csv s3://$bucket/datalake/yellow/\n", "!aws s3 cp s3://nyc-tlc/trip\\ data/yellow_tripdata_2017-02.csv s3://$bucket/datalake/yellow/\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Upload to S3](https://docs.aws.amazon.com/AmazonS3/latest/dev/Welcome.html)\n", "\n", "Next, we will upload the json file created above to S3 to be used later in the workshop.\n", "\n", "[s3.upload_file](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.upload_file) boto3 documentation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "file_name = 'paymenttype.csv'\n", "session.resource('s3').Bucket(bucket).Object(os.path.join('datalake', 'paymenttype', file_name)).upload_file(file_name)\n", "\n", "file_name = 'ratecode.csv'\n", "session.resource('s3').Bucket(bucket).Object(os.path.join('datalake', 'ratecode', file_name)).upload_file(file_name)\n", "\n", "file_name = 'taxi_zone_lookup.csv'\n", "session.resource('s3').Bucket(bucket).Object(os.path.join('datalake', 'taxi_zone_lookup', file_name)).upload_file(file_name)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create VPC](https://docs.aws.amazon.com/vpc/index.html) \n", "\n", "We need a VPC for some of the resources in this workshop. You have the option to create a brand new VPC or use the VPC flaged as the default." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if use_existing:\n", " vpc_filter = [{'Name':'isDefault', 'Values':['true']}]\n", " default_vpc = ec2_client.describe_vpcs(Filters=vpc_filter)\n", " vpc_id = default_vpc['Vpcs'][0]['VpcId']\n", "\n", " subnet_filter = [{'Name':'vpc-id', 'Values':[vpc_id]}]\n", " subnets = ec2_client.describe_subnets(Filters=subnet_filter)\n", " subnet1_id = subnets['Subnets'][0]['SubnetId']\n", " subnet2_id = subnets['Subnets'][1]['SubnetId']\n", "else: \n", " vpc, subnet1, subnet2 = workshop.create_and_configure_vpc()\n", " vpc_id = vpc.id\n", " subnet1_id = subnet1.id\n", " subnet2_id = subnet2.id" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(vpc_id)\n", "print(subnet1_id)\n", "print(subnet2_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload [CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/GettingStarted.html) template\n", "\n", "In the interest of time we will leverage CloudFormation to launch EMR and Redshift instances to leverage on the analytics side after we have cataloged and transformed the data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "redshift_file = 'cfn/redshift.yaml'\n", "session.resource('s3').Bucket(bucket).Object(redshift_file).upload_file(redshift_file)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "emr_file = 'cfn/emr.yaml'\n", "session.resource('s3').Bucket(bucket).Object(emr_file).upload_file(emr_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Enter the user name used for the Redshift Cluster" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "admin_user = getpass.getpass()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Enter the password used in creating the Redshift Cluster" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Password must be 8 characters long alphanumeric only 1 Upper, 1 Lower\n", "admin_password = getpass.getpass()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import re\n", "\n", "pattern = re.compile(r\"^(?=.*[a-z])(?=.*[A-Z])(?=.*\\d)[a-zA-Z\\d]{8,}$\")\n", "result = pattern.match(admin_password)\n", "if result:\n", " print('Valid')\n", "else:\n", " print('Invalid, Password must be 8 characters long alphanumeric only 1 Upper, 1 Lower')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Execute CloudFormation Stack to generate Redshift Data Warehouse\n", "\n", "Later in the workshop we will be using this [Redshift](https://aws.amazon.com/redshift/) cluster to run queries over data populated in our data lake with [Redshift Spectrum](https://aws.amazon.com/blogs/big-data/amazon-redshift-spectrum-extends-data-warehousing-out-to-exabytes-no-loading-required/)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cfn_template = 'https://s3-{0}.amazonaws.com/{1}/{2}'.format(region, bucket, redshift_file)\n", "print(cfn_template)\n", "\n", "redshift_stack_name = 'RedshiftTaxiStack'\n", "response = cfn.create_stack(\n", " StackName=redshift_stack_name,\n", " TemplateURL=cfn_template,\n", " Capabilities = [\"CAPABILITY_NAMED_IAM\"],\n", " Parameters=[\n", " {\n", " 'ParameterKey': 'EnvironmentName',\n", " 'ParameterValue': environment_name\n", " },\n", " {\n", " 'ParameterKey': 'AdministratorUser',\n", " 'ParameterValue': admin_user\n", " },\n", " {\n", " 'ParameterKey': 'AdministratorPassword',\n", " 'ParameterValue': admin_password\n", " },\n", " {\n", " 'ParameterKey': 'DatabaseName',\n", " 'ParameterValue': redshift_database_name\n", " },\n", " {\n", " 'ParameterKey': 'NodeType',\n", " 'ParameterValue': redshift_node_type\n", " },\n", " {\n", " 'ParameterKey': 'S3Bucket',\n", " 'ParameterValue': bucket\n", " }\n", " ]\n", ")\n", "\n", "print(response)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Execute CloudFormation Stack to generate EMR Cluster\n", "\n", "We will also be querying data in the Data Lake from [EMR](https://aws.amazon.com/emr/) as well through the use of an [EMR Notebook](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-managed-notebooks.html)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cfn_template = 'https://s3-{0}.amazonaws.com/{1}/{2}'.format(region, bucket, emr_file)\n", "print(cfn_template)\n", "emr_stack_name = 'EMRTaxiStack'\n", "\n", "response = cfn.create_stack(\n", " StackName=emr_stack_name,\n", " TemplateURL=cfn_template,\n", " Capabilities = [\"CAPABILITY_NAMED_IAM\"],\n", " Parameters=[\n", " {\n", " 'ParameterKey': 'EnvironmentName',\n", " 'ParameterValue': environment_name\n", " },\n", " {\n", " 'ParameterKey': 'VPC',\n", " 'ParameterValue': vpc_id\n", " },\n", " {\n", " 'ParameterKey': 'PublicSubnet',\n", " 'ParameterValue': subnet1_id\n", " },\n", " {\n", " 'ParameterKey': 'OutputS3Bucket',\n", " 'ParameterValue': bucket\n", " }\n", " ]\n", ")\n", "\n", "print(response)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Discover the data in your Data Lake\n", "\n", "In this next section we will be using [AWS Glue](https://aws.amazon.com/glue/) to discover, catalog, and transform your data. Glue currently only supports `Python 2.7`, hence we'll write the script in `Python 2.7`.\n", "\n", "### Permission setup for invoking AWS Glue from this Notebook\n", "In order to enable this Notebook to run AWS Glue jobs, we need to add one additional permission to the default execution role of this notebook. We will be using SageMaker Python SDK to retrieve the default execution role and then you have to go to [IAM Dashboard](https://console.aws.amazon.com/iam/home) to edit the Role to add AWS Glue specific permission. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Finding out the current execution role of the Notebook\n", "We are using SageMaker Python SDK to retrieve the current role for this Notebook which needs to be enhanced to support the functionality in AWS Glue." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import SageMaker Python SDK to get the Session and execution_role\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "sess = sagemaker.Session()\n", "role = get_execution_role()\n", "role_name = role[role.rfind('/') + 1:]\n", "print(role_name)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Adding AWS Glue as an additional trusted entity to this role\n", "This step is needed if you want to pass the execution role of this Notebook while calling Glue APIs as well without creating an additional **Role**. If you have not used AWS Glue before, then this step is mandatory. \n", "\n", "If you have used AWS Glue previously, then you should have an already existing role that can be used to invoke Glue APIs. In that case, you can pass that role while calling Glue (later in this notebook) and skip this next step." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "On the IAM dashboard, please click on **Roles** on the left sidenav and search for this Role. Once the Role appears, click on the Role to go to its **Summary** page. Click on the **Trust relationships** tab on the **Summary** page to add AWS Glue as an additional trusted entity. \n", "\n", "Click on **Edit trust relationship** and replace the JSON with this JSON.\n", "```\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": [\n", " \"sagemaker.amazonaws.com\",\n", " \"glue.amazonaws.com\"\n", " ]\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " }\n", " ]\n", "}\n", "```\n", "Once this is complete, click on **Update Trust Policy** and you are done.\n", "\n", "![IAM Roles](../../docs/assets/images/iam_roles_hl.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"https://console.aws.amazon.com/iam/home?region={0}#/roles/{1}\".format(region, role_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the [AWS Glue Catalog Database](https://docs.aws.amazon.com/glue/latest/dg/define-database.html)\n", "\n", "When you define a table in the AWS Glue Data Catalog, you add it to a database. A database is used to organize tables in AWS Glue. You can organize your tables using a crawler or using the AWS Glue console. A table can be in only one database at a time.\n", "\n", "There is a central Glue Catalog for each AWS account. When creating the database you will use your account id declared above as `account_id`\n", "\n", "[glue.create_database](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_database)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "workshop.create_db(glue, account_id, database_name, 'New York City Taxi and Limousine Commission (TLC) Trip Record Data')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Use a [Glue Crawler](https://docs.aws.amazon.com/glue/latest/dg/add-crawler.html) to Discover the transformed data\n", "\n", "You can use a crawler to populate the AWS Glue Data Catalog with tables. This is the primary method used by most AWS Glue users. You add a crawler within your Data Catalog to traverse your data stores. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. Extract, transform, and load (ETL) jobs that you define in AWS Glue use these metadata tables as sources and targets.\n", "\n", "A crawler can crawl both file-based and table-based data stores. Crawlers can crawl the following data stores:\n", "\n", "* Amazon Simple Storage Service (Amazon S3)\n", " * [Built-in Classifiers](https://docs.aws.amazon.com/glue/latest/dg/add-classifier.html#classifier-built-in)\n", " * [Custom Classifiers](https://docs.aws.amazon.com/glue/latest/dg/custom-classifier.html)\n", "* Amazon Redshift\n", "* Amazon Relational Database Service (Amazon RDS)\n", " * Amazon Aurora\n", " * MariaDB\n", " * Microsoft SQL Server\n", " * MySQL\n", " * Oracle\n", " * PostgreSQL\n", "* Amazon DynamoDB\n", "* Publicly accessible databases [Blog](https://aws.amazon.com/blogs/big-data/how-to-access-and-analyze-on-premises-data-stores-using-aws-glue/)\n", " * Aurora\n", " * MariaDB\n", " * SQL Server\n", " * MySQL\n", " * Oracle\n", " * PostgreSQL\n", "\n", "[glue.create_crawler](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_crawler)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "crawler_name = 'NY-Taxi-Crawler'\n", "crawler_path = 's3://{0}/datalake/'.format(bucket)\n", "\n", "response = glue.create_crawler(\n", " Name=crawler_name,\n", " Role=role,\n", " DatabaseName=database_name,\n", " Description='Crawler for NY Taxi Data',\n", " Targets={\n", " 'S3Targets': [\n", " {\n", " 'Path': crawler_path\n", " }\n", " ]\n", " },\n", " SchemaChangePolicy={\n", " 'UpdateBehavior': 'UPDATE_IN_DATABASE',\n", " 'DeleteBehavior': 'DEPRECATE_IN_DATABASE'\n", " }\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start the Glue Crawler\n", "\n", "You can use a crawler to populate the AWS Glue Data Catalog with tables. This is the primary method used by most AWS Glue users. You add a crawler within your Data Catalog to traverse your data stores. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. Extract, transform, and load (ETL) jobs that you define in AWS Glue use these metadata tables as sources and targets.\n", "\n", "[glue.start_crawler](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.start_crawler)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.start_crawler(\n", " Name=crawler_name\n", ")\n", "\n", "print (\"Crawler: https://{0}.console.aws.amazon.com/glue/home?region={0}#crawler:name={1}\".format(region, crawler_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Checking Glue crawler status\n", "\n", "We will now monitor the crawler status waiting for it to get back into the `READY` state meaning the crawler completed it's crawl. You can also look at the [CloudWatch logs](https://docs.aws.amazon.com/glue/latest/dg/console-crawlers.html#console-crawlers-details) for the crawler for more details." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "crawler_status = glue.get_crawler(Name=crawler_name)['Crawler']['State']\n", "\n", "while crawler_status not in ('READY'):\n", " crawler_status = glue.get_crawler(Name=crawler_name)['Crawler']['State']\n", " print(crawler_status)\n", " time.sleep(30)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View Crawler Results\n", "\n", "Now that we have crawled the raw data available, we want to look at the results of the crawl to see the tables that were created. You will click on the link `Tables in taxi` to view the tables the crawler found. It will look like the image below:\n", "\n", "![Taxi Tables](../../docs/assets/images/glue-taxi-tables.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('https://{0}.console.aws.amazon.com/glue/home?region={0}#database:name={1}'.format(region, database_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Parquet version of the yellow CSV table\n", "\n", "From [Wikipedia](https://en.wikipedia.org/wiki/Apache_Parquet), \"Apache Parquet is a free and open-source column-oriented data storage format of the Apache Hadoop ecosystem. It is similar to the other columnar-storage file formats available in Hadoop namely RCFile and ORC. It is compatible with most of the data processing frameworks in the Hadoop environment. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.\"\n", "\n", "The key points in this code is how easy it is to get access to the AWS Glue Data Catalog leveraging the [Glue libraries](https://github.com/awslabs/aws-glue-libs). Some of the key concepts are below:\n", "\n", "* [`glueContext.create_dynamic_frame.from_catalog`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_catalog) - Read table metadata from the Glue Data Catalog using Glue libs to load tables into the pyspark job.\n", "* Writing back S3 [`glueContext.write_dynamic_frame.from_options`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_catalog) with options:\n", " * Convert data to different format `format=\"parquet\"`. This format is [columnar](https://docs.aws.amazon.com/athena/latest/ug/columnar-storage.html) and provides [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression by default.\n", " \n", "You can find more best practices for Glue and Athena [here](https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile yellow_parquet_etl.py\n", "\n", "import sys\n", "import os\n", "from awsglue.transforms import *\n", "from awsglue.utils import getResolvedOptions\n", "from pyspark.context import SparkContext\n", "from awsglue.context import GlueContext\n", "from awsglue.job import Job\n", "\n", "## @params: [JOB_NAME]\n", "args = getResolvedOptions(sys.argv, ['JOB_NAME', 'S3_OUTPUT_BUCKET', 'S3_OUTPUT_KEY_PREFIX', 'DATABASE_NAME', 'TABLE_NAME', 'REGION'])\n", "\n", "sc = SparkContext()\n", "glueContext = GlueContext(sc)\n", "spark = glueContext.spark_session\n", "job = Job(glueContext)\n", "job.init(args['JOB_NAME'], args)\n", "## @type: DataSource\n", "## @args: [database = \"taxi\", table_name = \"yellow\", transformation_ctx = \"datasource0\"]\n", "## @return: datasource0\n", "## @inputs: []\n", "datasource0 = glueContext.create_dynamic_frame.from_catalog(database=args['DATABASE_NAME'], table_name=args['TABLE_NAME'], transformation_ctx = \"datasource0\")\n", "## @type: ResolveChoice\n", "## @args: [choice = \"make_struct\", transformation_ctx = \"resolvechoice1\"]\n", "## @return: resolvechoice1\n", "## @inputs: [frame = datasource0]\n", "resolvechoice1 = ResolveChoice.apply(frame = datasource0, choice = \"make_struct\", transformation_ctx = \"resolvechoice1\")\n", "## @type: DropNullFields\n", "## @args: [transformation_ctx = \"dropnullfields2\"]\n", "## @return: dropnullfields2\n", "## @inputs: [frame = resolvechoice1]\n", "dropnullfields2 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = \"dropnullfields2\")\n", "\n", "parquet_output_path = 's3://' + os.path.join(args['S3_OUTPUT_BUCKET'], args['S3_OUTPUT_KEY_PREFIX'])\n", "print(parquet_output_path)\n", "\n", "## @type: DataSink\n", "## @args: [connection_type = \"s3\", connection_options = {\"path\": \"\"}, format = \"parquet\", transformation_ctx = \"datasink3\"]\n", "## @return: datasink3\n", "## @inputs: [frame = dropnullfields2]\n", "datasink3 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields2, connection_type = \"s3\", connection_options = {\"path\": parquet_output_path}, format = \"parquet\", transformation_ctx = \"datasink4\")\n", "job.commit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload the ETL script to S3\n", "We will be uploading the `yellow_parquet_etl` script to S3 so Glue can use it to run the PySpark job. You can replace it with your own script if needed. If your code has multiple files, you need to zip those files and upload to S3 instead of uploading a single file like it's being done here." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "script_location = sess.upload_data(path='yellow_parquet_etl.py', bucket=bucket, key_prefix='codes')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Output location of the data.\n", "s3_output_key_prefix = 'datalake/yellow_parquet/'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Authoring jobs with AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/author-job.html)\n", "\n", "Next we'll be creating Glue client via Boto so that we can invoke the `create_job` API of Glue. `create_job` API will create a job definition which can be used to execute your jobs in Glue. The job definition created here is mutable. While creating the job, we are also passing the code location as well as the dependencies location to Glue.\n", "\n", "`AllocatedCapacity` parameter controls the hardware resources that Glue will use to execute this job. It is measures in units of `DPU`. For more information on `DPU`, please see [here](https://docs.aws.amazon.com/glue/latest/dg/add-job.html).\n", "\n", "[glue.create_job](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_job)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from time import gmtime, strftime\n", "import time\n", "\n", "timestamp_prefix = strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "\n", "job_name = 'ny-yellow-parquet-' + timestamp_prefix\n", "response = glue.create_job(\n", " Name=job_name,\n", " Description='PySpark job to convert yellow taxi csv data to parquet',\n", " Role=role, # you can pass your existing AWS Glue role here if you have used Glue before\n", " ExecutionProperty={\n", " 'MaxConcurrentRuns': 1\n", " },\n", " Command={\n", " 'Name': 'glueetl',\n", " 'ScriptLocation': script_location\n", " },\n", " DefaultArguments={\n", " '--job-language': 'python',\n", " '--job-bookmark-option': 'job-bookmark-disable'\n", " },\n", " AllocatedCapacity=5,\n", " Timeout=60,\n", ")\n", "glue_job_name = response['Name']\n", "print(glue_job_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The aforementioned job will be executed now by calling `start_job_run` API. This API creates an immutable run/execution corresponding to the job definition created above. We will require the `job_run_id` for the particular job execution to check for status. We'll pass the data and model locations as part of the job execution parameters.\n", "\n", "[glue.start_job_run](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.start_job_run)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job_run_id = glue.start_job_run(JobName=job_name,\n", " Arguments = {\n", " '--S3_OUTPUT_BUCKET': bucket,\n", " '--S3_OUTPUT_KEY_PREFIX': s3_output_key_prefix,\n", " '--DATABASE_NAME': database_name,\n", " '--TABLE_NAME': table_name,\n", " '--REGION': region\n", " })['JobRunId']\n", "print(job_run_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Checking Glue Job Status\n", "\n", "Now we will check for the job status to see if it has `SUCCEEDED`, `FAILED` or `STOPPED`. Once the job is succeeded, we have the transformed data into S3 in Parquet format which we will use to query with Athena and visualize with QuickSight. If the job fails, you can go to AWS Glue console, click on **Jobs** tab on the left, and from the page, click on this particular job and you will be able to find the CloudWatch logs (the link under **Logs**) link for these jobs which can help you to see what exactly went wrong in the job execution.\n", "\n", "[glue.get_job_run](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_job_run)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job_run_status = glue.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']\n", "while job_run_status not in ('FAILED', 'SUCCEEDED', 'STOPPED'):\n", " job_run_status = glue.get_job_run(JobName=job_name,RunId=job_run_id)['JobRun']['JobRunState']\n", " print (job_run_status)\n", " time.sleep(60)\n", "print(job_run_status)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Crawler to populate Parquet formated table in Glue Data Catalog\n", "\n", "We will create another crawler for the curated dataset we created converting the CSV files into Parquet formatted data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "parq_crawler_name = 'NY-Curated-Crawler'\n", "parq_crawler_path = 's3://'+bucket+'/datalake/yellow_parquet/'\n", "\n", "response = glue.create_crawler(\n", " Name=parq_crawler_name,\n", " Role=role,\n", " DatabaseName=database_name,\n", " Description='Crawler for the Parquet transformed yellow taxi data',\n", " Targets={\n", " 'S3Targets': [\n", " {\n", " 'Path': parq_crawler_path\n", " }\n", " ]\n", " },\n", " SchemaChangePolicy={\n", " 'UpdateBehavior': 'UPDATE_IN_DATABASE',\n", " 'DeleteBehavior': 'DEPRECATE_IN_DATABASE'\n", " }\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start Crawler \n", "\n", "Much like we did with the raw data crcawler we will start the curated crawler pointing to the new data set created from the Glue job." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.start_crawler(\n", " Name=parq_crawler_name\n", ")\n", "\n", "print (\"Crawler: https://{0}.console.aws.amazon.com/glue/home?region={0}#crawler:name={1}\".format(region, parq_crawler_name))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Monitor the status of the Parquet crawler" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "crawler_status = glue.get_crawler(Name=parq_crawler_name)['Crawler']['State']\n", "\n", "while crawler_status not in ('READY'):\n", " crawler_status = glue.get_crawler(Name=parq_crawler_name)['Crawler']['State']\n", " print(crawler_status)\n", " time.sleep(30)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('https://{0}.console.aws.amazon.com/glue/home?region={0}#database:name={1}'.format(region, database_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Query the Data Lake with Athena](https://aws.amazon.com/athena/)\n", "\n", "For the self-serve end users that need the ability to create ad-hoc queries against the data Athena is a great choice the utilizes Presto and ANSI SQL to query a number of file formats on S3.\n", "\n", "To query the tables created by the crawler we will be installing a python library for querying the data in the Glue Data Catalog with Athena. For more information jump to [PyAthena](https://pypi.org/project/PyAthena/). You can also use the AWS console by browsing to the Athena service and run queries through the browser. Alternatively, you can also use the [JDBC/ODBC](https://docs.aws.amazon.com/athena/latest/ug/athena-bi-tools-jdbc-odbc.html) drivers available." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install PyAthena" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Simple Select Query\n", "\n", "In this first query we will create a simple query to show the ability of Athena to query the raw CSV data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "from pyathena import connect\n", "from pyathena.util import as_pandas\n", "\n", "cursor = connect(region_name=region, s3_staging_dir='s3://'+bucket+'/athena/temp').cursor()\n", "cursor.execute('select * from ' + database_name + '.yellow limit 10')\n", "\n", "df = as_pandas(cursor)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Complex Join Query\n", "\n", "Now we will get more complex and create a query that utilizes multiple joins using Athena." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "cursor.execute('''SELECT * FROM ''' + database_name + '''.yellow \n", "JOIN ''' + database_name + '''.paymenttype ON yellow.payment_type = paymenttype.id \n", "JOIN ''' + database_name + '''.ratecode ON yellow.ratecodeid = ratecode.id \n", "JOIN ''' + database_name + '''.taxi_zone_lookup AS pu_taxizone ON yellow.pulocationid = pu_taxizone.locationid \n", "JOIN ''' + database_name + '''.taxi_zone_lookup AS do_taxizone ON yellow.dolocationid = do_taxizone.locationid \n", "limit 10;''')\n", "\n", "df = as_pandas(cursor)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Complex Join Query With Where Clause\n", "\n", "Taking it a step further, we will now utilize the query with multiple joins and aggregate the number of entries by vendor looking at just the data found in the first 10 days of Jan. 2017.\n", "\n", "In our Glue job we could have taken it a step further to optimze queries like this using data partitioning by date.\n", "\n", "#### What is data partitioning?\n", "\n", "A partition is a division of a logical database or its constituent elements into distinct independent parts. Database partitioning is normally done for manageability, performance or availability reasons, or for load balancing.\n", "\n", "Examples in S3 would utilize prefixes in the bucket for the partitions in key=value pairs.\n", "\n", "* s3://datalake/taxi/yellow/year=2018/month=1/\n", "* s3://datalake/taxi/yellow/year=2018/month=1/day=1/\n", " \n", "**Optional Exercise**\n", "If you would like to try this for yourself you can change the Glue Job above when writing the data to S3 you select how to partition the data.\n", "\n", "#### Glue context writing patitions\n", "* Extract `year`, `month`, and `day` from the `tpep_pickup_datetime`. Look at [Pyspark documentation](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumn) for help.\n", "* Writing back S3 [`glueContext.write_dynamic_frame.from_options`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_catalog) with options:\n", " * [Partition](https://docs.aws.amazon.com/athena/latest/ug/partitions.html) the data based on columns `connection_options = {\"path\": parquet_output_path, \"partitionKeys\": [\"year, month, day\"]}`\n", " * Convert data to a [columnar format](https://docs.aws.amazon.com/athena/latest/ug/columnar-storage.html) `format=\"parquet\"`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "cursor.execute('''WITH yellow AS (SELECT date_parse(yellow.tpep_pickup_datetime,'%Y-%m-%d %H:%i:%s') AS pu_datetime, yellow.* FROM ''' + database_name + '''.yellow ) \n", "SELECT count(yellow.vendorid) as cnt FROM yellow \n", "JOIN ''' + database_name + '''.paymenttype ON yellow.payment_type = paymenttype.id \n", "JOIN ''' + database_name + '''.ratecode ON yellow.ratecodeid = ratecode.id \n", "JOIN ''' + database_name + '''.taxi_zone_lookup AS pu_taxizone ON yellow.pulocationid = pu_taxizone.locationid \n", "JOIN ''' + database_name + '''.taxi_zone_lookup AS do_taxizone ON yellow.dolocationid = do_taxizone.locationid \n", "WHERE year(pu_datetime) = 2017 \n", "AND month(pu_datetime) = 1 \n", "AND day(pu_datetime) BETWEEN 1 AND 10''')\n", "\n", "df = as_pandas(cursor)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.head(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Optimized queries using the Parquet yellow taxi data\n", "\n", "We will run the same queries again but this time we will use the dataset utilizing the parquet format to show the performance gains you get when converting." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "cursor.execute('select * from ' + database_name + '.yellow_parquet limit 10')\n", "df = as_pandas(cursor)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Same complex queries using the `yellow_parquet` table instead." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "cursor.execute('''\n", "WITH yellow AS (SELECT date_parse(yellow.tpep_pickup_datetime,'%Y-%m-%d %H:%i:%s') AS pu_datetime, yellow.* FROM ''' + database_name + '''.yellow_parquet as yellow ) \n", "select count( yellow.vendorid)\n", "FROM yellow\n", "Inner JOIN ''' + database_name + '''.paymenttype ON yellow.payment_type = paymenttype.id \n", "Inner JOIN ''' + database_name + '''.ratecode ON yellow.ratecodeid = ratecode.id \n", "Inner JOIN ''' + database_name + '''.taxi_zone_lookup AS pu_taxizone ON yellow.pulocationid = pu_taxizone.locationid \n", "Inner JOIN ''' + database_name + '''.taxi_zone_lookup AS do_taxizone ON yellow.dolocationid = do_taxizone.locationid \n", "WHERE year(pu_datetime) = 2017 \n", "AND month(pu_datetime) = 1 \n", "AND day(pu_datetime) BETWEEN 1 AND 10''')\n", "\n", "df = as_pandas(cursor)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Check status Redshift Cloudformation Stacks\n", "\n", "Let's check in on the status of the EMR and Redshift CloudFormation stacks. Now that we showed how you can leverage Athena for querying the raw and curated data we want to dive into using other analytics engines to show the capability of keeping all your data in your data lake and leverage the right tools for the job.\n", "\n", "Separating your storage from your compute allows you to scale each component independently. This gives you the flexibility needed when making tool selection as well providing agility in upgrading to new tools and services as they come out helping future proof your data lake solution." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = cfn.describe_stacks(\n", " StackName=redshift_stack_name\n", ")\n", "\n", "\n", "if response['Stacks'][0]['StackStatus'] == 'CREATE_COMPLETE':\n", " for output in response['Stacks'][0]['Outputs']:\n", " if (output['OutputKey'] == 'RedshiftAddress'):\n", " redshift_cluster_name = output['OutputValue'].split('.')[0]\n", " print(redshift_cluster_name)\n", "else:\n", " print('Not yet complete.')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = redshift_client.describe_clusters(\n", " ClusterIdentifier=redshift_cluster_name\n", ")\n", "\n", "status = response['Clusters'][0]['ClusterStatus']\n", "\n", "if status == 'available':\n", " redshift_address = response['Clusters'][0]['Endpoint']['Address']\n", " print(redshift_address)\n", " jdbc_url = 'jdbc:redshift://' + redshift_address + ':' + str(redshift_port) + '/' + redshift_database_name\n", " print(jdbc_url)\n", " iam_role = response['Clusters'][0]['IamRoles'][0]['IamRoleArn']\n", " print(iam_role)\n", "else: \n", " print('Not yet available. Current status is {}'.format(status))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Install the psycopg2 library to connect to Redshift\n", "\n", "Psycopg is the most popular PostgreSQL database adapter for the Python programming language. Its main features are the complete implementation of the Python DB API 2.0 specification and the thread safety. \n", "\n", "[psycopg2](http://initd.org/psycopg/)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install psycopg2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create connection attributes" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "conn_string = { 'dbname': redshift_database_name, \n", " 'user': admin_user,\n", " 'pwd':admin_password,\n", " 'host': redshift_address,\n", " 'port':redshift_port\n", " }" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import psycopg2\n", "\n", "def create_conn(config):\n", " try:\n", " # get a connection, if a connect cannot be made an exception will be raised here\n", " con=psycopg2.connect(dbname=config['dbname'], host=config['host'], \n", " port=config['port'], user=config['user'], \n", " password=config['pwd'])\n", " return con\n", " except Exception as err:\n", " print(err)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "con = create_conn(config=conn_string)\n", "print(\"Connected to Redshift!\\n\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Redshift Spectrum external table" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "statement = '''create external schema spectrum \n", "from data catalog \n", "database \\'''' + database_name + '''\\'\n", "iam_role \\'''' + iam_role + '''\\'\n", "create external database if not exists;'''\n", "\n", "print(statement)\n", "# con.cursor will return a cursor object, you can use this cursor to perform queries\n", "cur = con.cursor()\n", "cur.execute(statement)\n", "con.commit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get the count by vendor for Jan 1st - 10th on 2017 using the CSV formatted data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "## Unoptimized\n", "\n", "statement = '''select count(yellow.vendorid)\n", "from spectrum.yellow\n", "Inner JOIN spectrum.paymenttype ON yellow.payment_type = paymenttype.id\n", "Inner JOIN spectrum.ratecode ON yellow.ratecodeid = ratecode.id\n", "Inner JOIN spectrum.taxi_zone_lookup AS pu_taxizone ON yellow.pulocationid =\n", "pu_taxizone.locationid\n", "Inner JOIN spectrum.taxi_zone_lookup AS do_taxizone ON yellow.dolocationid =\n", "do_taxizone.locationid\n", "where extract(month from cast(tpep_pickup_datetime as date)) = 1 and\n", "extract(year from cast(tpep_pickup_datetime as date)) = 2017 and\n", "extract(day from cast(tpep_pickup_datetime as date)) between 1 and 10;'''\n", "\n", "df = read_sql(statement, con=con)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View results" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Get the count by vendor for Jan 1st - 10th on 2017 using the Parquet formatted data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "## Optimized\n", "\n", "statement = '''select count(yellow.vendorid)\n", "from spectrum.yellow_parquet as yellow\n", "Inner JOIN spectrum.paymenttype ON yellow.payment_type = paymenttype.id\n", "Inner JOIN spectrum.ratecode ON yellow.ratecodeid = ratecode.id\n", "Inner JOIN spectrum.taxi_zone_lookup AS pu_taxizone ON yellow.pulocationid =\n", "pu_taxizone.locationid\n", "Inner JOIN spectrum.taxi_zone_lookup AS do_taxizone ON yellow.dolocationid =\n", "do_taxizone.locationid\n", "where extract(month from cast(tpep_pickup_datetime as date)) = 1 and\n", "extract(year from cast(tpep_pickup_datetime as date)) = 2017 and\n", "extract(day from cast(tpep_pickup_datetime as date)) between 1 and 10;'''\n", "\n", "df = read_sql(statement, con=con)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Check status EMR Cloudformation Stacks\n", "Let's check in on the status of the EMR cluster. If it's not yet finished please wait until it's ready." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = cfn.describe_stacks(\n", " StackName=emr_stack_name\n", ")\n", "\n", "if response['Stacks'][0]['StackStatus'] == 'CREATE_COMPLETE':\n", " for output in response['Stacks'][0]['Outputs']:\n", " if (output['OutputKey'] == 'EMRClusterId'):\n", " cluster_id = output['OutputValue']\n", " print(cluster_id)\n", "else:\n", " print('Not yet complete.')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "notebook_prefix = 's3://{0}/notebooks/'.format(bucket)\n", "emr_notebooks_file = 'TaxiEMRNotebook.ipynb'\n", "\n", "print('Notebook Name: {}'.format(emr_notebooks_file.split('.')[0]))\n", "print('Notebook Location: {}'.format(notebook_prefix))\n", "print('Notebook Cluster: {}'.format(cluster_id))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create an EMR Notebook\n", "\n", "Create a notebook in EMR to run Spark queries in based on the attributes above." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('https://{0}.console.aws.amazon.com/elasticmapreduce/home?region={0}#create-notebook:'.format(region))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Find Notebook id and import TaxiEMRNotebook into EMR Notebook\n", "\n", "There is a notebook `TaxiEMRNotebook.ipynb` that you will want to download and import into the EMR notebook you just created and walk through the cells comparing the optimized vs. unoptimized schema format." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Get Notebook Id\n", "notebook_id = '{{notebook_id}}'\n", "session.resource('s3').Bucket(bucket).Object(os.path.join('notebooks', notebook_id, emr_notebooks_file)).upload_file(emr_notebooks_file)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Open EMR Notebook and execute queries" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('https://{0}.console.aws.amazon.com/elasticmapreduce/home?region={0}#notebooks-list:'.format(region))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Congratulations!!!!** You have completed the workshops showing the capabilities of leveraging a Data Lake on AWS and the flexibility of choice when using analytics tools in AWS. Before you run the cleanup please delete the EMR Notebook you created above by selecting the notebook and clicking `Delete` in the toolbar on the EMR Notebook console." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Cleanup " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = cfn.delete_stack(StackName=redshift_stack_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = cfn.delete_stack(StackName=emr_stack_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.delete_crawler(Name=parq_crawler_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.delete_crawler(Name=crawler_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.delete_job(JobName=glue_job_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = glue.delete_database(\n", " CatalogId = account_id,\n", " Name = database_name\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 rb s3://$bucket --force " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "waiter = cfn.get_waiter('stack_delete_complete')\n", "waiter.wait(\n", " StackName=emr_stack_name\n", ")\n", "\n", "print('The wait is over for {0}'.format(emr_stack_name))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "waiter = cfn.get_waiter('stack_delete_complete')\n", "waiter.wait(\n", " StackName=redshift_stack_name\n", ")\n", "\n", "print('The wait is over for {0}'.format(redshift_stack_name))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not use_existing:\n", " workshop.vpc_cleanup(vpc_id)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }