## AWS Fargate Autoscaling Pipeline Fargate autoscaling pipeline for batch processing events from SQS ## License Summary This sample code is made available under a modified MIT license. See the LICENSE file. ## Background This is designed to deploy an end to end solution using SQS, ECS/Fargate and Lambda. A common use case is one where a massive number of files in S3 need to be pre-processed before they can be consumed by another job. For example, image files which are in non-standard aspect ratios need to be converted into a standard 224x224 or 299x299 format for deep learning. The example described in this repository expects files uploaded to an S3 bucket (source) which trigger S3 event notifications which publish file metadata to SQS. On a regular 2 minute CloudWatch event rule, a Lambda function is invoked which checks the SQS queue for the ApproximateNumberOfMessages attribute and calls an ECS API to scale the number of Fargate tasks accordingly. Fargate tasks are designed to continue processing and draining the SQS queue before terminating down to 0 tasks automatically - this eliminates the need to keep a fixed pool of ECS tasks running (in the case of cluster mode). If tasks are scaling up from 0, initial latency to begin processing is a few minutes ~ 2 mins for Lambda trigger (configurable) + time to provision Fargate tasks. 10 new Fargate tasks can be added per Lambda invocation, upto a maximum of 100 concurrent Fargate tasks with this solution. All configuration for the service is stored in SSM Parameters. ## Pre-requisites * Create the `AWSServiceRoleForECS` [service role](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/check-service-role.html) if one does not already exist. * Create an S3 Bucket which will contain your unprocessed data (source bucket). * Create an S3 Bucket which will contain the processed data (destination bucket). You could also use the previously created source bucket.. * Create a SQS queue and attach a policy to the queue which allows the Source Bucket ARN to publish event notifications to the Queue. In this example, the bucket called `my-source-s3-bucket` is allowed to SendMessage to the SQS queue called `incomingqueue`. Attach the resource policy to the SQS queue. This can be done via the Console or the CLI. ``` { "Version": "2012-10-17", "Id": "arn:aws:sqs:us-west-2:123456789012:incomingqueue/SQSDefaultPolicy", "Statement": [ { "Sid": "SQSDefaultPolicy-SID1", "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": "SQS:SendMessage", "Resource": "arn:aws:sqs:us-west-2:123456789012:incomingqueue", "Condition": { "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:my-source-s3-bucket" } } } ] } ``` * On the source S3 bucket create an [Event notification](https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html) to publish to your SQS queue on Put, Post and Copy. This can be done via the Console or the CLI. In this example, the event notification is on the path prefix `ecsproc/` and will only notify for `.jpg` files under this path. ``` $ aws s3api get-bucket-notification-configuration --bucket my-source-s3-bucket { "QueueConfigurations": [ { "Id": "mys3sqseventnotification", "QueueArn": "arn:aws:sqs:us-west-2:123456789012:incomingqueue", "Events": [ "s3:ObjectCreated:Put", "s3:ObjectCreated:Post", "s3:ObjectCreated:Copy" ], "Filter": { "Key": { "FilterRules": [ { "Name": "Prefix", "Value": "ecsproc/" }, { "Name": "Suffix", "Value": ".jpg" } ] } } } ] } ``` ## Deployment Deployment for the solution is done via CloudFormation in the following order ### Deploy the template `cft-fargate-task.yaml` The templates expects a VPC, Subnet and SG to be created prior to deploying the stack. If you plan to SSH into the Fargate tasks for debugging, ensure that port 22 is allowed in the Security Group. Generate your own ssh key pair and replace the build/ecs_key.pub with the newly generated public key. Template parameters - - VPC, Subnet and Security Group for the Fargate tasks. - The Unprocessed Bucket and Processed Bucket names to be added to the role policy for the Fargate tasks - The name of the ECR Repository to be create (default: ecs-pipeline) - TaskCPU and TaskMemory are the corresponding vCPUs and Memory assigned to each Fargate task. Refer to https://aws.amazon.com/fargate/pricing/ for supported configurations. - SQS Queue name previously created (default: incomingqueue) This stack will deploy an ECS Cluster, Service, Task Definition, ECR Repository (default: ecs-pipeline), required Roles and Policies and various SSM parameters. ### Build and push the Docker image Publish to the previously created ECR repo. From the root folder of this source tree - ``` docker build --build-arg AWS_REGION=us-west-2 -t 123456789012.dkr.ecr.us-west-2.amazonaws.com/ecs-pipeline:latest -f build/Dockerfile . $(aws ecr get-login --no-include-email) docker push 123456789012.dkr.ecr.us-west-2.amazonaws.com/ecs-pipeline:latest ``` Alternatively refer to `build/buildspec.yml` for a Code Build compatible buildspec to perform the Docker build and push. Ensure that you are publishing to the correct ECR repo created by the stack previously. ### Deploy the template `cft-fargate-trigger.yaml` This template expects the previous stack to be deployed successfully. This stack will deploy a Lambda function, CloudWatch event rule and appropriate roles and policies to trigger the ECS Fargate tasks. Template parameters - 1. ScheduleExpression - the CloudWatch cron expression that will set the trigger frequency for the Lambda function. Default - 2 minutes. 2. ECSClusterName - Obtained automatically from SSM using the parameter value 3. SQS Queue name previously created (default: incomingqueue) ## SSM Parameters The following SSM parameters created automatically by the `cft-fargate-task` stack. Do **not** manually update any of these parameters: ``` PIPELINE_UNPROCESSED_SQS_URL PIPELINE_ECS_CLUSTER PIPELINE_ECS_TASK_CONTAINER PIPELINE_ECS_TASK_DEFINITON PIPELINE_ECS_TASK_SECURITYGROUP PIPELINE_ECS_TASK_SUBNET PIPELINE_S3_DEST_BUCKET PIPELINE_S3_DEST_PREFIX ``` The following parameters can be updated to change the operation of the pipeline: ``` PIPELINE_ENABLED - Enable or Disable the ECS Pipeline Tasks [0 or 1]. This pauses the pipeline from processing any further events from SQS until it is enabled again. This is useful when deploying a new version of the docker image or performing any cleanup tasks as needed. SQS events continue to pile up and will be processed when an update to this parameter enables the pipeline. PIPELINE_ECS_MAX_TASKS - Maximum number of ECS Fargate Tasks to run concurrently [ default 10, maximum 100 ] ``` ## Testing the solution The sample code (in ingestion.sh and preprocess_job.py) resizes JPEG images into 224x224 using the Python Pillow library. Drop sample images into your source S3 bucket under the file prefix defined above. This will immediately trigger 2 messages to the SQS queue. Within 2 minutes, CloudWatch will trigger the Lambda function. The Lambda function will invoke a maximum of 10 Fargate tasks per invocation, upto the maximum specified by the SSM parameter `PIPELINE_ECS_MAX_TASKS`. The processed data will appear in the destination S3 bucket and the SQS queue will drain to 0. The Lambda trigger does not create additional Fargate tasks when the SQS queue is empty. The processed data folder is accompanied with a logs/ folder which contain the stdout/stderr log of the processing task. Log files have a .success or .failed extension depending on the exit code. After the queue is drained, the Fargate tasks terminate and drop to 0. ## Future improvements - Handle pagination of the ECS list_tasks API in Lambda to support more than 100 maximum tasks