## Orchestration with Step Functions

In this workshop we will automate many of the steps that were run manually in the `ny-taxi-right-tool.ipynb` and `ny-taxi-streaming.ipynb` notebooks. The way we will accomplish this is through the use of step functions to build a state machine that will execute an AWS Glue job and once finished run a Glue Crawler to update the schema and partitions. A common scenario will be used of a scheduled file drop in a raw location in your data lake that will trigger an S3 event to start the orchetration process. 

![Glue Orchestration](../../docs/assets/images/glue_orch.png)

In [None]:
import boto3
import botocore
import json
import time
import os
import pandas as pd

import project_path # path to helper methods
from lib import workshop
from pandas import read_sql

glue = boto3.client('glue')
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
cfn = boto3.client('cloudformation')
lambda_client = boto3.client('lambda')

session = boto3.session.Session()
region = session.region_name
account_id = boto3.client('sts').get_caller_identity().get('Account')

workshop_user = 'datalake'
database_name = 'taxi_' + workshop_user # AWS Glue Data Catalog Database Name
table_name = 'yellow'
email = 'user@example.com' #Replace with your own email address

s3_output_key_prefix = 'datalake/curated/yellow_parq/' # Output location of the data.

### [Create S3 Bucket](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html)

We will create an S3 bucket that will be used throughout the workshop for storing our data.

[s3.create_bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket) boto3 documentation

In [None]:
bucket = workshop.create_bucket(region, session, 'taxi-')
print(bucket)

### Discover the data in your Data Lake

In this next section we will be using [AWS Glue](https://aws.amazon.com/glue/) to discover, catalog, and transform your data. Glue currently only supports `Python 2.7`, hence we'll write the script in `Python 2.7`.

### Permission setup for invoking AWS Glue from this Notebook
In order to enable this Notebook to run AWS Glue jobs, we need to add one additional permission to the default execution role of this notebook. We will be using SageMaker Python SDK to retrieve the default execution role and then you have to go to [IAM Dashboard](https://console.aws.amazon.com/iam/home) to edit the Role to add AWS Glue specific permission. 

### Finding out the current execution role of the Notebook
We are using SageMaker Python SDK to retrieve the current role for this Notebook which needs to be enhanced to support the functionality in AWS Glue.

In [None]:
# Import SageMaker Python SDK to get the Session and execution_role
import sagemaker
from sagemaker import get_execution_role
sess = sagemaker.Session()
role = get_execution_role()
role_name = role[role.rfind('/') + 1:]
print(role_name)


### Adding AWS Glue as an additional trusted entity to this role
This step is needed if you want to pass the execution role of this Notebook while calling Glue APIs as well without creating an additional **Role**. If you have not used AWS Glue before, then this step is mandatory. 

If you have used AWS Glue previously, then you should have an already existing role that can be used to invoke Glue APIs. In that case, you can pass that role while calling Glue (later in this notebook) and skip this next step.

On the IAM dashboard, please click on **Roles** on the left sidenav and search for this Role. Once the Role appears, click on the Role to go to its **Summary** page. Click on the **Trust relationships** tab on the **Summary** page to add AWS Glue as an additional trusted entity. 

Click on **Edit trust relationship** and replace the JSON with this JSON.
```
{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Principal": {
 "Service": [
 "sagemaker.amazonaws.com",
 "glue.amazonaws.com"
 ]
 },
 "Action": "sts:AssumeRole"
 }
 ]
}
```
Once this is complete, click on **Update Trust Policy** and you are done.

![IAM Roles](../../docs/assets/images/iam_roles_hl.png)

In [None]:
print("https://console.aws.amazon.com/iam/home?region={0}#/roles/{1}".format(region, role_name))

### [Copy Sample Data to S3 bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-example-download-file.html) 

We will download some files from New York City Taxi and Limousine Commission (TLC) Trip Record Data dataset available on the [AWS Open Data Registry](https://registry.opendata.aws/nyc-tlc-trip-records-pds/).


In [None]:
!aws s3 cp s3://nyc-tlc/trip\ data/yellow_tripdata_2017-01.csv s3://$bucket/datalake/raw/yellow/
!aws s3 cp s3://nyc-tlc/trip\ data/yellow_tripdata_2017-02.csv s3://$bucket/datalake/raw/yellow/

### Create the [AWS Glue Catalog Database](https://docs.aws.amazon.com/glue/latest/dg/define-database.html)

When you define a table in the AWS Glue Data Catalog, you add it to a database. A database is used to organize tables in AWS Glue. You can organize your tables using a crawler or using the AWS Glue console. A table can be in only one database at a time.

There is a central Glue Catalog for each AWS account. When creating the database you will use your account id declared above as `account_id`

[glue.create_database](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_database)

In [None]:
workshop.create_db(glue, account_id, database_name, 'New York City Taxi and Limousine Commission (TLC) Trip Record Data')

### Use a [Glue Crawler](https://docs.aws.amazon.com/glue/latest/dg/add-crawler.html) to Discover the transformed data

You can use a crawler to populate the AWS Glue Data Catalog with tables. This is the primary method used by most AWS Glue users. You add a crawler within your Data Catalog to traverse your data stores. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. Extract, transform, and load (ETL) jobs that you define in AWS Glue use these metadata tables as sources and targets.

A crawler can crawl both file-based and table-based data stores. Crawlers can crawl the following data stores:

* Amazon Simple Storage Service (Amazon S3)
 * [Built-in Classifiers](https://docs.aws.amazon.com/glue/latest/dg/add-classifier.html#classifier-built-in)
 * [Custom Classifiers](https://docs.aws.amazon.com/glue/latest/dg/custom-classifier.html)
* Amazon Redshift
* Amazon Relational Database Service (Amazon RDS)
 * Amazon Aurora
 * MariaDB
 * Microsoft SQL Server
 * MySQL
 * Oracle
 * PostgreSQL
* Amazon DynamoDB
* Publicly accessible databases [Blog](https://aws.amazon.com/blogs/big-data/how-to-access-and-analyze-on-premises-data-stores-using-aws-glue/)
 * Aurora
 * MariaDB
 * SQL Server
 * MySQL
 * Oracle
 * PostgreSQL

[glue.create_crawler](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_crawler)

### Create the raw data crawler

In [None]:
crawler_name = 'NY-Taxi-Orch-Crawler-{0}'.format(workshop_user)
raw_prefix = 'datalake/raw/yellow/'
crawler_path = 's3://{0}/{1}'.format(bucket, raw_prefix)

response = glue.create_crawler(
 Name=crawler_name,
 Role=role,
 DatabaseName=database_name,
 Description='Crawler for NY Taxi Data',
 Targets={
 'S3Targets': [
 {
 'Path': crawler_path
 }
 ]
 },
 SchemaChangePolicy={
 'UpdateBehavior': 'UPDATE_IN_DATABASE',
 'DeleteBehavior': 'DEPRECATE_IN_DATABASE'
 }
)

### Create the curated crawler

In [None]:
parq_crawler_name = 'NY-Taxi-Orch-Parq-Crawler-{0}'.format(workshop_user)
parq_crawler_path = 's3://'+bucket+'/datalake/curated/yellow_parq'

response = glue.create_crawler(
 Name=parq_crawler_name,
 Role=role,
 DatabaseName=database_name,
 Description='Crawler for Curated NY Taxi Data',
 Targets={
 'S3Targets': [
 {
 'Path': parq_crawler_path
 }
 ]
 },
 SchemaChangePolicy={
 'UpdateBehavior': 'UPDATE_IN_DATABASE',
 'DeleteBehavior': 'DEPRECATE_IN_DATABASE'
 }
)

### Start the Raw Data Glue Crawler

We will start with populating the Glue Data Catalog with the raw database of NY Taxi data. 

You can use a crawler to populate the AWS Glue Data Catalog with tables. This is the primary method used by most AWS Glue users. You add a crawler within your Data Catalog to traverse your data stores. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. Extract, transform, and load (ETL) jobs that you define in AWS Glue use these metadata tables as sources and targets.

[glue.start_crawler](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.start_crawler)

In [None]:
response = glue.start_crawler(
 Name=crawler_name
)

print ("Crawler: https://{0}.console.aws.amazon.com/glue/home?region={0}#crawler:name={1}".format(region, crawler_name))

### Monitor the status of the Parquet crawler

In [None]:
crawler_status = glue.get_crawler(Name=crawler_name)['Crawler']['State']

while crawler_status not in ('READY'):
 crawler_status = glue.get_crawler(Name=crawler_name)['Crawler']['State']
 print(crawler_status)
 time.sleep(30)

In [None]:
print('https://{0}.console.aws.amazon.com/glue/home?region={0}#database:name={1}'.format(region, database_name))

### Create Parquet version of the yellow CSV table

From [Wikipedia](https://en.wikipedia.org/wiki/Apache_Parquet), "Apache Parquet is a free and open-source column-oriented data storage format of the Apache Hadoop ecosystem. It is similar to the other columnar-storage file formats available in Hadoop namely RCFile and ORC. It is compatible with most of the data processing frameworks in the Hadoop environment. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk."

The key points in this code is how easy it is to get access to the AWS Glue Data Catalog leveraging the [Glue libraries](https://github.com/awslabs/aws-glue-libs). Some of the key concepts are below:

* [`glueContext.create_dynamic_frame.from_catalog`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-create_dynamic_frame_from_catalog) - Read table metadata from the Glue Data Catalog using Glue libs to load tables into the pyspark job.
* Writing back S3 [`glueContext.write_dynamic_frame.from_options`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-write_dynamic_frame_from_catalog) with options:
 * Convert data to different format `format="parquet"`. This format is [columnar](https://docs.aws.amazon.com/athena/latest/ug/columnar-storage.html) and provides [Snappy](https://en.wikipedia.org/wiki/Snappy_(compression)) compression by default.
 
You can find more best practices for Glue and Athena [here](https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html)

In [None]:
%%writefile yellow_parquet_orch_etl.py

import sys
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.context import SparkContext
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'S3_OUTPUT_BUCKET', 'S3_OUTPUT_KEY_PREFIX', 'DATABASE_NAME', 'TABLE_NAME', 'REGION'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "taxi", table_name = "yellow", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database=args['DATABASE_NAME'], table_name=args['TABLE_NAME'], transformation_ctx = "datasource0")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice1"]
## @return: resolvechoice1
## @inputs: [frame = datasource0]
resolvechoice1 = ResolveChoice.apply(frame = datasource0, choice = "make_struct", transformation_ctx = "resolvechoice1")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields2"]
## @return: dropnullfields2
## @inputs: [frame = resolvechoice1]
dropnullfields2 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields2")

addcolumn_df = dropnullfields2.toDF()

addcolumn_df \
 .withColumn("pu_datetime", to_timestamp(col("tpep_pickup_datetime"), "yyyy-MM-dd HH:mm:ss")) \
 .withColumn("do_datetime", to_timestamp(col("tpep_dropoff_datetime"), "yyyy-MM-dd HH:mm:ss")) \
 .withColumn("pu_year", year("pu_datetime")) \
 .withColumn("pu_month", month("pu_datetime")) \
 .withColumn("pu_day", dayofmonth("pu_datetime"))

parquet_output_path = 's3://' + os.path.join(args['S3_OUTPUT_BUCKET'], args['S3_OUTPUT_KEY_PREFIX'])
print(parquet_output_path)

writeframe = DynamicFrame.fromDF(addcolumn_df, glueContext, "writeframe")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": ""}, format = "parquet", transformation_ctx = "datasink3"]
## @return: datasink3
## @inputs: [frame = dropnullfields2]
datasink3 = glueContext.write_dynamic_frame.from_options(frame = writeframe, connection_type = "s3", connection_options = {"path": parquet_output_path}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

### Upload the ETL script to S3
We will be uploading the `yellow_parquet_orch_etl` script to S3 so Glue can use it to run the PySpark job. You can replace it with your own script if needed. If your code has multiple files, you need to zip those files and upload to S3 instead of uploading a single file like it's being done here.

In [None]:
script_location = sess.upload_data(path='yellow_parquet_orch_etl.py', bucket=bucket, key_prefix='codes')

### [Authoring jobs with AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/author-job.html)

Next we'll be creating Glue client via Boto so that we can invoke the `create_job` API of Glue. `create_job` API will create a job definition which can be used to execute your jobs in Glue. The job definition created here is mutable. While creating the job, we are also passing the code location as well as the dependencies location to Glue.

`AllocatedCapacity` parameter controls the hardware resources that Glue will use to execute this job. It is measures in units of `DPU`. For more information on `DPU`, please see [here](https://docs.aws.amazon.com/glue/latest/dg/add-job.html).

[glue.create_job](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.create_job)

In [None]:
from time import gmtime, strftime
import time

timestamp_prefix = strftime("%Y-%m-%d-%H-%M-%S", gmtime())

job_name = 'ny-yellow-parquet-orch-' + timestamp_prefix
response = glue.create_job(
 Name=job_name,
 Description='PySpark job to convert yellow taxi csv data to parquet',
 Role=role, # you can pass your existing AWS Glue role here if you have used Glue before
 ExecutionProperty={
 'MaxConcurrentRuns': 1
 },
 Command={
 'Name': 'glueetl',
 'ScriptLocation': script_location
 },
 DefaultArguments={
 '--job-language': 'python',
 '--job-bookmark-option': 'job-bookmark-enable'
 },
 AllocatedCapacity=5,
 Timeout=60,
)
glue_job_name = response['Name']
print(glue_job_name)

### Build Lambda functions for orchestration

In [None]:
!mkdir -p cfn/start_crawler
!mkdir -p cfn/poller

### Create Lambda function to start Glue crawler

This function will be triggered to start the crawler process when the Glue job has finished. It will crawl new data created from the job and update the schema and partitions of the parquet table.

In [None]:
%%writefile cfn/start_crawler/start_crawler.py
from __future__ import print_function
import datetime
import json
import boto3

glue = boto3.client('glue')

def handler(event, context):
 # Log the received event
 print("Received event: " + json.dumps(event, indent=2))
 # Get jobId from the event
 crawler_name = event['crawler_name']
 try:
 response = glue.get_crawler(Name=crawler_name)
 event['status'] = "UNKNOWN"
 
 if response['Crawler']['State'] == 'READY': 
 print("Execute crawler: " + crawler_name)
 
 # Call Start Crawler
 response = glue.start_crawler(Name=crawler_name)
 # Return the jobtatus
 event['status'] = "STARTED"
 else:
 event['status'] = "FAILED"
 event['status_message'] = 'Crawler in use' 
 except Exception as e:
 print(e)
 message = 'Error starting Glue Crawler'
 print(message)
 event['status'] = "FAILED"
 event['status_message'] = message + ' ' + str(e)
 
 return event

### Create Lambda function to poll crawler status

Crawler's are not a natively supported service with step functions, therefore we will need to monitor the activity of the crawler looking for it to finish. This function gets the crawler status and returns the avlue to the step function to proceed accordingly.

In [None]:
%%writefile cfn/poller/poll_crawler_status.py
from __future__ import print_function
import datetime
import json
import boto3
from datetime import timedelta 
import os

glue = boto3.client('glue')

def handler(event, context):
 # Log the received event
 print("RECEIVED EVENT: " + json.dumps(event, indent=2))
 crawler_name = event['crawler_name']
 
 try:
 response = glue.get_crawler(Name=crawler_name)
 event['status'] = response['Crawler']['State']
 except Exception as e:
 print(e)
 message = 'Error getting Glue Job status'
 print(message)
 event['status'] = "UNKNOWN"
 
 print("RESPONSE EVENT: " + json.dumps(event, indent=2))
 return event

### Create orchestration with [CloudFormation](https://aws.amazon.com/cloudformation/)

In the interest of time, we will use CloudFormation to create the state machine and hook up the required events. You can view the template below to get a better understanding of how CloudFormation works.

In [None]:
!cat cfn/sf_orch_glue.yaml

### [Package deployment](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-cli-package.html)

For some resource properties that require an Amazon S3 location (a bucket name and filename), you can specify local references instead. For example, you might specify the S3 location of your AWS Lambda function's source code or an Amazon API Gateway REST API's OpenAPI (formerly Swagger) file. Instead of manually uploading the files to an S3 bucket and then adding the location to your template, you can specify local references, called local artifacts, in your template and then use the package command to quickly upload them. A local artifact is a path to a file or folder that the package command uploads to Amazon S3. For example, an artifact can be a local path to your AWS Lambda function's source code or an Amazon API Gateway REST API's OpenAPI file.

With the yaml file created we can now `package` out CloudFormation template to prepare it for deployment and finally call the `deploy` function on the CloudFormation service to build our API service for out Inference endpoint.

In [None]:
!aws cloudformation package \
 --template-file cfn/sf_orch_glue.yaml \
 --output-template-file orch_packaged.yaml \
 --s3-bucket $bucket --s3-prefix cfn

### [Deploy Application](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-cli-deploy.html)

AWS CloudFormation requires you to use a change set to create a template that includes transforms. Instead of independently creating and then executing a change set, use the `aws cloudformation deploy` command. When you run this command, it creates a change set, executes the change set, and then terminates. This command reduces the numbers of required steps when you create or update a stack that includes transforms.

In [None]:
stack_name = 'glue-orch-{0}'.format(workshop_user)

!aws cloudformation deploy \
 --template-file orch_packaged.yaml \
 --stack-name $stack_name \
 --capabilities CAPABILITY_IAM \
 --parameter-overrides NotificationEmail=$email

### [Get Outputs of the CloudFormation template](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/outputs-section-structure.html)

The optional `Outputs` section declares output values that you can import into other stacks, return in response, or view on the AWS CloudFormation console. We provide outputs for the `Name` and `ARN`s for the required resources.

In [None]:
response = cfn.describe_stacks(StackName=stack_name)

outputs = response["Stacks"][0]["Outputs"]

for output in response['Stacks'][0]['Outputs']:
 if (output['OutputKey'] == 'StepFunctionsStateMachine'):
 sf_arn = output['OutputValue']
 if (output['OutputKey'] == 'StepFunctionsAPIUrl'):
 api = output['OutputValue']
 
pd.set_option('display.max_colwidth', -1)
pd.DataFrame(outputs, columns=["OutputKey", "OutputValue"])

### Create the request object for the API call to start the Glue Job

Now that the CloudFormation template is complete we can trigger the first run of the orchestration. The Step Function is proxied by API gateway to simplify calling the orchestration from 3rd party applications. [API Gateway](https://aws.amazon.com/api-gateway/) allows developers to quickly build REST API that are secure an scalable creating a "front door" to services with AWS.

The step function orchestration requires the below JSON input.

In [None]:
input = {
 "job_name": glue_job_name,
 "arguments": {
 "region": region,
 "database_name": database_name,
 "table_name": table_name,
 "s3_output_bucket": bucket,
 "s3_output_key_prefix": s3_output_key_prefix
 },
 "crawler_name": parq_crawler_name,
 "wait_time": 60,
 "name": "IngestionOrchestration",
 "stateMachineArn": sf_arn
}

payload = json.dumps(input, indent=4)
post_val = "'{0}'".format(payload)

print(post_val)

### Trigger first Glue Job

Simply using `curl` we can make the required `POST` call of the API to start orchestration.

In [None]:
!curl -X POST -d $post_val $api --header "Content-Type: application/json"

### Watch orchestration

With the link below find the `executionArn` above to see the status of the state machine. You can follow the orchestration to completion in the UI as well as get the status from the CLI and SDKs. Once complete we should now be able to query the newly added data in your data lake.

Once you click on the job run you will notice the steps to start and monitor the Glue job, run the curated crawler to populate new partitions and update schema changes and complete the job run. If at any point in the process the orchestration fails it will submit a notification to the email address used above.

In [None]:
print('https://{0}.console.aws.amazon.com/states/home?region={0}#/statemachines/view/{1}'.format(region, sf_arn)) 

### Parquet processed data set

Once you hit the link below click the `tables` link and see there is a new `yellow_parq` table that is utilizing the parquet schema.

In [None]:
print('https://{0}.console.aws.amazon.com/glue/home?region={0}#database:name={1}'.format(region, database_name))

### [Query the Data Lake with Athena](https://aws.amazon.com/athena/)

For the self-serve end users that need the ability to create ad-hoc queries against the data Athena is a great choice the utilizes Presto and ANSI SQL to query a number of file formats on S3.

To query the tables created by the crawler we will be installing a python library for querying the data in the Glue Data Catalog with Athena. For more information jump to [PyAthena](https://pypi.org/project/PyAthena/). You can also use the AWS console by browsing to the Athena service and run queries through the browser. Alternatively, you can also use the [JDBC/ODBC](https://docs.aws.amazon.com/athena/latest/ug/athena-bi-tools-jdbc-odbc.html) drivers available.

In [None]:
!pip install PyAthena

In [None]:
%%time
from pyathena import connect
from pyathena.util import as_pandas

cursor = connect(region_name=region, s3_staging_dir='s3://'+bucket+'/athena/temp').cursor()
cursor.execute('select * from ' + database_name + '.yellow_parq limit 10')

df = as_pandas(cursor)

In [None]:
df

### [Create S3 Event Notification to execute new job run](https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html)

In this example, we are abstracting the call to the Step Functions State machine behind an API gateway call. This makes it easy for 3rd parties to trigger the start of the orchestration by utilizing a REST API call. We could easily modify the below Lambda function to use the appropriate boto3 call to execution the state machine bypassing the API call. What we want to call out here is if you have a scheduled job to drop data into S3 that you want to automate on boarding you can do that with S3 events and Step Functions to automate the AWS Glue calls to register, update partitions, and transform into parquet.

In [None]:
!mkdir trigger

In [None]:
%%writefile trigger/trigger.py

import json
import os
import requests

def handler(event, context):
 bucket_name = event['Records'][0]['s3']['bucket']['name']
 s3_key_name = event['Records'][0]['s3']['object']['key']
 
 API = os.environ['API']
 data = {
 "job_name": os.environ['GLUE_JOB_NAME'],
 "arguments": {
 "region": os.environ['AWS_DEFAULT_REGION'],
 "database_name": os.environ['DATABASE_NAME'],
 "table_name": os.environ['TABLE_NAME'],
 "s3_output_bucket": os.environ['BUCKET'],
 "s3_output_key_prefix": os.environ['BUCKET_PREFIX']
 },
 "crawler_name": os.environ['CRAWLER_NAME'],
 "wait_time": os.environ['WAIT_TIME'],
 "name": "PartitionUpdateExecution",
 "stateMachineArn": os.environ['SF_ARN']
 }
 
 result = requests.post(API, headers={'content-type': 'application/json'}, data=json.dumps(data))
 return result.status_code

### Zip Lambda function to trigger the Glue Job State Machine

The Lambda function needs the `requests` library to make the API call to trigger the orchestration and the python file itself. We will zip them and create the new Lambda function.

In [None]:
%%bash
cd trigger
pip install requests --target .
zip trigger.zip -r6 .

### Create the Lamba function to trigger the orchestration

The trigger function needs a number of environment variables to trigger the appropriate state machine and pass the required Glue params for loading the data.

[lambda_client.create_function](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda.html#Lambda.Client.create_function)

In [None]:
fn_name = "NY-Taxi-Trigger-SF-{0}".format(workshop_user)
fn_role = 'arn:aws:iam::{0}:role/lambda_basic_execution'.format(account_id)

response = lambda_client.create_function(
 FunctionName=fn_name,
 Runtime='python2.7',
 Role=fn_role,
 Handler="trigger.handler",
 Code={'ZipFile': open("trigger/trigger.zip", 'rb').read(), },
 Environment={
 'Variables': {
 'API': api,
 'GLUE_JOB_NAME': glue_job_name,
 'DATABASE_NAME': database_name,
 'TABLE_NAME': table_name,
 'BUCKET': bucket,
 'BUCKET_PREFIX': s3_output_key_prefix,
 'CRAWLER_NAME': parq_crawler_name,
 'WAIT_TIME': '60',
 'SF_ARN': sf_arn
 }
 }, 
)

In [None]:
trigger_arn = response['FunctionArn']
print(trigger_arn)

### [Give S3 Bucket permission to execute Lambda function](https://docs.aws.amazon.com/lambda/latest/dg/lambda-permissions.html)

We need to grant S3 permission to use a function. You can apply the policy at the function level, or specify a qualifier to restrict access to a single version or alias. If you use a qualifier, the invoker must use the full Amazon Resource Name (ARN) of that version or alias to invoke the function.

[lambda_client.add_permission](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda.html#Lambda.Client.add_permission)

In [None]:
s3_arn = 'arn:aws:s3:::{0}'.format(bucket)

response = lambda_client.add_permission(
 FunctionName=fn_name,
 StatementId='1',
 Action='lambda:InvokeFunction',
 Principal='s3.amazonaws.com',
 SourceArn=s3_arn,
 SourceAccount=account_id
)

### [Create S3 Bucket Notification](https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html)

Amazon S3 can publish events (for example, when an object is created in a bucket) to AWS Lambda and invoke your Lambda function by passing the event data as a parameter. This integration enables you to write Lambda functions that process Amazon S3 events. In Amazon S3, you add bucket notification configuration that identifies the type of event that you want Amazon S3 to publish and the Lambda function that you want to invoke.

In [None]:
data = {}
data['LambdaFunctionConfigurations'] = [
 {
 'Id': "{0}-dl-notifications".format(bucket),
 'LambdaFunctionArn': trigger_arn,
 'Events': ["s3:ObjectCreated:*"],
 'Filter': {
 "Key": {
 "FilterRules": [
 {
 "Name": "prefix",
 "Value": raw_prefix
 }
 ]
 }
 }
 }
]

print(json.dumps(data, indent=4))
s3 = boto3.resource('s3')
bucket_notification = s3.BucketNotification(bucket)
try:
 response = bucket_notification.put(NotificationConfiguration=data)
 print('Bucket notification updated successfully')
except Exception as e:
 print(e)

### Add more raw data to trigger orchestration

Let's add another file to the raw S3 prefix and watch the orchestration be triggers to start the Glue job and crawler to update the schema and partitions in the curated table. Wait for the step function to complete and go to Athena to query for data from this file.

In [None]:
!aws s3 cp s3://nyc-tlc/trip\ data/yellow_tripdata_2017-03.csv s3://$bucket/datalake/raw/yellow/

### View Step Function state

The uploading of a new file to the `raw/yellow` prefix in the data lake triggered a new job run of the step function to update the data lake. When the state machine is complete you should be able to query for data on `2017-03` as well as the original files.

In [None]:
print('https://{0}.console.aws.amazon.com/states/home?region={0}#/statemachines/view/{1}'.format(region, sf_arn))

## Clean Up

In [None]:
response = cfn.delete_stack(StackName=stack_name)

In [None]:
response = glue.delete_crawler(Name=parq_crawler_name)

In [None]:
response = glue.delete_crawler(Name=crawler_name)

In [None]:
response = glue.delete_job(JobName=glue_job_name)

In [None]:
response = glue.delete_database(
 CatalogId = account_id,
 Name = database_name
)

In [None]:
response = lambda_client.delete_function(FunctionName=fn_name)

In [None]:
!aws s3 rb s3://$bucket --force 

In [None]:
waiter = cfn.get_waiter('stack_delete_complete')
waiter.wait(
 StackName=stack_name
)

print('The wait is over for {0}'.format(stack_name))