{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## FastQC Batch Workshop\n", "\n", "In this workshop we will develop an AWS Batch environment to submit FastQC jobs to levraging cloud native job scheduling services with [AWS Batch](https://docs.aws.amazon.com/batch/latest/userguide/what-is-batch.html). We will be leveraging the [AWS Open Data Registry](https://registry.opendata.aws/) to use the [1000 Genomes](https://registry.opendata.aws/1000-genomes/) to execute FastQC against a [FASTQ formatted](https://en.wikipedia.org/wiki/FASTQ_format) file from the data set.\n", "\n", "## **If multiple users are running in the same account update `workshop_user` with your unique username to help avoid collisions**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import botocore\n", "import json\n", "import time\n", "import os\n", "import base64\n", "import docker\n", "import pandas as pd\n", "\n", "import project_path # path to helper methods\n", "from lib import workshop\n", "from botocore.exceptions import ClientError\n", "\n", "ecr = boto3.client('ecr')\n", "cfn = boto3.client('cloudformation')\n", "ec2_client = boto3.client('ec2')\n", "batch = boto3.client('batch')\n", "iam = boto3.client('iam')\n", "ssm = boto3.client('ssm')\n", "\n", "session = boto3.session.Session()\n", "region = session.region_name\n", "account_id = boto3.client('sts').get_caller_identity().get('Account')\n", "\n", "workshop_user = 'hpc' # no capitals all lower case\n", "batch_sec_group_name = 'FastQBatchSG_' + workshop_user\n", "repo = 'fastqc_demo_' + workshop_user\n", "job_def_name = 'fastqc_demo_job_' + workshop_user\n", "instance_profile_name = 'FastQInstanceProfile_' + workshop_user\n", "iam_stack_name = 'FastQCIAMRolesStack-' + workshop_user \n", "default_env = 'FastQCEnvironment' + '_' + workshop_user\n", "bid_percentage = 100\n", "desired_cpu = 4\n", "\n", "use_existing = True" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create VPC](https://docs.aws.amazon.com/vpc/index.html) \n", "\n", "In order to simulate a Greengrass device on an EC2 instance we will create a new VPC with a public subnet by running the code below. As you can see to make a subnet public an Internet Gateway is attached to the VPC and a routing table is created with and entry to route all traffic at `0.0.0.0/0` to the Internet gateway. We will store the VPC and Subnet Id's to be used later in the notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if use_existing:\n", " vpc_filter = [{'Name':'isDefault', 'Values':['true']}]\n", " default_vpc = ec2_client.describe_vpcs(Filters=vpc_filter)\n", " vpc_id = default_vpc['Vpcs'][0]['VpcId']\n", "\n", " subnet_filter = [{'Name':'vpc-id', 'Values':[vpc_id]}]\n", " subnets = ec2_client.describe_subnets(Filters=subnet_filter)\n", " subnet1_id = subnets['Subnets'][0]['SubnetId']\n", " subnet2_id = subnets['Subnets'][1]['SubnetId']\n", "else: \n", " vpc, subnet1, subnet2 = workshop.create_and_configure_vpc()\n", " vpc_id = vpc.id\n", " subnet1_id = subnet1.id\n", " subnet2_id = subnet2.id" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(vpc_id)\n", "print(subnet1_id)\n", "print(subnet2_id)\n", "print(region)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create S3 Bucket](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html)\n", "\n", "We will create an S3 bucket that will be used throughout the workshop for storing our data.\n", "\n", "[s3.create_bucket](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.create_bucket) boto3 documentation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bucket = workshop.create_bucket_name('batch-')\n", "#bucket = workshop.create_bucket(region, session, \"batch-fastq-hpc\", with_uuid=False)\n", "#session.resource('s3').create_bucket(Bucket=bucket, CreateBucketConfiguration={'LocationConstraint': region})\n", "session.resource('s3').create_bucket(Bucket=bucket)\n", "\n", "print(bucket)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the bash script to run in container\n", "\n", "Create the shell script used to run fastqc and send the output to our S3 bucket for analysis. Replace the **`{{bucket}}`** and **`{{region}}`** variables. This script runs the fastqc process on the fastq file from 1000 genomes data set and sends the results to an S3 bucket for further inspection." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from IPython.core.magic import register_line_cell_magic\n", "\n", "@register_line_cell_magic\n", "def writetemplate(line, cell):\n", " with open(line, 'w+') as f:\n", " f.write(cell.format(**globals()))\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writetemplate fastqc.sh\n", "#! /bin/bash\n", "aws s3 cp $1 .\n", "filename=$(basename $1)\n", "fastqc $filename\n", "report=$(ls *.html)\n", "aws s3 mv *.zip s3://{bucket} --acl public-read\n", "aws s3 mv *.html s3://{bucket} --acl public-read\n", "rm $filename\n", "echo OUTPUT: https://s3.{region}.amazonaws.com/{bucket}/$report" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create the Dockerfile for FastQC\n", "\n", "Note: the CMD line will be overriden during docker run with the desired filename as the input parameter. ENTRYPOINT, however ,can not be overriden. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile Dockerfile\n", "FROM biocontainers/fastqc:v0.11.5_cv3\n", "USER root\n", "ADD fastqc.sh /home/biodocker/bin/fastqc.sh\n", "RUN chown -v biodocker /home/biodocker/bin/fastqc.sh && chmod -v 764 /home/biodocker/bin/fastqc.sh && pip install awscli\n", "USER biodocker\n", "ENV PATH /home/biodocker/.local/bin:$PATH\n", "CMD fastqc.sh s3://1000genomes/phase3/data/NA21144/sequence_read/ERR047877.filt.fastq.gz\n", " " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create the ECR Repository](https://docs.aws.amazon.com/AmazonECR/latest/userguide/Repositories.html)\n", "\n", "Amazon Elastic Container Registry (Amazon ECR) provides API operations to create, monitor, and delete image repositories and set permissions that control who can access them. You can perform the same actions in the Repositories section of the Amazon ECR console. Amazon ECR also integrates with the Docker CLI allowing you to push and pull images from your development environments to your repositories.\n", "\n", "[ecr.create_repository](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecr.html#ECR.Client.create_repository)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " response = ecr.create_repository(\n", " repositoryName=repo\n", " )\n", "except ClientError as e:\n", " if e.response['Error']['Code'] == 'RepositoryAlreadyExistsException':\n", " print(\"Repo exists, skip\")\n", " else:\n", " raise e" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Build container image and upload to ECR\n", "\n", "In order to work with ECR you need to retrieve a token, and that token is valid for a specified registry for 12 hours. This command allows you to use the `docker` CLI to push and pull images with Amazon ECR. If you do not specify a registry, the default registry is assumed.\n", "\n", "We will use the [Docker SDK for Python](https://docker-py.readthedocs.io/en/stable/) to build and push the image to the ECR repository.\n", "\n", "[ecr.get_authorization_token](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecr.html#ECR.Client.get_authorization_token)\n", "\n", "Note: Sagemaker notebook runs on instances that already has docker installed and dockerd runing. This part will not work if you are using SageMaker Studio, where docker command is not installed. You will have to use AWS CodeBuild to build and register the image to ECR. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "login = ecr.get_authorization_token()\n", "b64token = login['authorizationData'][0]['authorizationToken'].encode('utf-8')\n", "username, password = base64.b64decode(b64token).decode('utf-8').split(':')\n", "registry = login['authorizationData'][0]['proxyEndpoint']\n", "\n", "client = docker.from_env()\n", "client.login(username, password, registry=registry)\n", "\n", "img, logs = client.images.build(path='.', tag=repo)\n", "registry_with_name = registry.replace('https://', '') + '/' + repo\n", "print(registry_with_name)\n", "img.tag(registry_with_name, tag='latest')\n", "client.images.push(registry_with_name, tag='latest')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('https://{0}.console.aws.amazon.com/ecr/repositories/{1}/?region={0}'.format(region, repo))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create the IAM roles required for AWS Batch](https://docs.aws.amazon.com/batch/latest/userguide/IAM_policies.html)\n", "\n", "By default, IAM users don't have permission to create or modify AWS Batch resources, or perform tasks using the AWS Batch API. This means that they also can't do so using the AWS Batch console or the AWS CLI. To allow IAM users to create or modify resources and submit jobs, you must create IAM policies that grant IAM users permission to use the specific resources and API actions they need. Then, attach those policies to the IAM users or groups that require those permissions." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload [CloudFormation](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/GettingStarted.html) template\n", "\n", "In the interest of time we will generate the IAM Roles required with a CloudFormation template." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!cat fastq-batch-roles.yaml" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iam_file = 'fastq-batch-roles.yaml'\n", "session.resource('s3').Bucket(bucket).Object(os.path.join('cfn', iam_file)).upload_file(iam_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Execute CloudFormation Stack for IAM Roles\n", "\n", "Creates a stack as specified in the template. After the call completes successfully, the stack creation starts. You can check the status of the stack via the DescribeStacks API.\n", "\n", "[cfn.create_stack](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudformation.html#CloudFormation.Client.create_stack)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# The cf template we use needs AmazonEC2SpotFleetRole, ref: https://docs.aws.amazon.com/batch/latest/userguide/spot_fleet_IAM_role.html#spot-fleet-roles-console\n", "iam_client = session.client('iam')\n", "\n", "try:\n", " resp = iam_client.create_role(RoleName='AmazonEC2SpotFleetRole',\n", " AssumeRolePolicyDocument='{\"Version\":\"2012-10-17\",\"Statement\":[{\"Sid\":\"\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"spotfleet.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}')\n", "except ClientError as e:\n", " if e.response['Error']['Code'] == 'EntityAlreadyExists':\n", " print('AmazonEC2SpotFleetRole already exists, ignore')\n", " else: \n", " raise e\n", "\n", "try:\n", " resp = iam_client.attach_role_policy(PolicyArn='arn:aws:iam::aws:policy/service-role/AmazonEC2SpotFleetTaggingRole',\n", " RoleName='AmazonEC2SpotFleetRole')\n", "except ClientError as e:\n", " if e.response['Error']['Code'] == 'EntityAlreadyExists':\n", " print('AmazonEC2SpotFleetRole already exists, ignore')\n", " else: \n", " raise e\n", "\n", "try:\n", " resp = iam_client.create_service_linked_role(AWSServiceName='spot.amazonaws.com')\n", "except ClientError as e:\n", " if e.response['Error']['Code'] == 'InvalidInput':\n", " print('Linked roles already exist, ignore') \n", " else: \n", " raise \n", "\n", "try:\n", " resp = iam_client.create_service_linked_role(AWSServiceName='spot.amazonaws.com')\n", "except ClientError as e:\n", " if e.response['Error']['Code'] == 'InvalidInput':\n", " print('Linked roles already exist, ignore') \n", " else: \n", " raise \n", "\n", "resp = iam_client.get_role(RoleName='AmazonEC2SpotFleetRole')\n", "batch_spot_role_arn = resp['Role']['Arn']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# change to useing TemplateBody, instead of pulling from S3\n", "#cfn_template = 'https://s3-{0}.amazonaws.com/{1}/cfn/{2}'.format(region, bucket, iam_file)\n", "#print(cfn_template)\n", "\n", "with open(iam_file) as tf:\n", " template_data = tf.read()\n", " cfn.validate_template(TemplateBody=template_data)\n", " \n", " response = cfn.create_stack(\n", " StackName=iam_stack_name,\n", "# TemplateURL=cfn_template,\n", " TemplateBody=template_data,\n", " Capabilities = [\"CAPABILITY_NAMED_IAM\"],\n", " Parameters=[\n", " {\n", " 'ParameterKey': 'S3Bucket',\n", " 'ParameterValue': bucket\n", " }\n", " ]\n", " )\n", "\n", " print(response)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('waiting for stack complete...')\n", "waiter = cfn.get_waiter('stack_create_complete')\n", "waiter.wait(\n", " StackName=iam_stack_name\n", ")\n", "print('stack complete.')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Get Outputs of the CloudFormation template](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/outputs-section-structure.html)\n", "\n", "The optional `Outputs` section declares output values that you can import into other stacks, return in response, or view on the AWS CloudFormation console. We provide outputs for the `Name` and `ARN`s for the requires roles for AWS Batch services." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = cfn.describe_stacks(StackName=iam_stack_name)\n", "\n", "outputs = response[\"Stacks\"][0][\"Outputs\"]\n", "\n", "for output in response['Stacks'][0]['Outputs']:\n", " if (output['OutputKey'] == 'BatchTaskRole'):\n", " batch_task_role = output['OutputValue']\n", " if (output['OutputKey'] == 'BatchTaskRoleArn'):\n", " batch_task_role_arn = output['OutputValue']\n", " if (output['OutputKey'] == 'BatchInstanceRole'):\n", " batch_instance_role = output['OutputValue']\n", " if (output['OutputKey'] == 'BatchInstanceRoleArn'):\n", " batch_instance_role_arn = output['OutputValue']\n", " if (output['OutputKey'] == 'BatchServiceRole'):\n", " batch_service_role = output['OutputValue']\n", " if (output['OutputKey'] == 'BatchServiceRoleArn'):\n", " batch_service_role_arn = output['OutputValue']\n", "# if (output['OutputKey'] == 'BatchSpotFleetRole'):\n", "# batch_spot_role = output['OutputValue']\n", "# if (output['OutputKey'] == 'BatchSpotFleetRoleArn'):\n", "# batch_spot_role_arn = output['OutputValue']\n", "\n", "pd.set_option('display.max_colwidth', -1)\n", "pd.DataFrame(outputs, columns=[\"OutputKey\", \"OutputValue\"])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create Instance Profile for Batch instances](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_switch-role-ec2_instance-profiles.html)\n", "\n", "An instance profile is a container for an IAM role that you can use to pass role information to an EC2 instance when the instance starts.\n", "\n", "[iam.create_instance_profile](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html#IAM.Client.create_instance_profile)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iam.create_instance_profile(\n", " InstanceProfileName=instance_profile_name\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Associate IAM Role with instance profile\n", "\n", "[iam.add_role_to_instance_profile](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html#IAM.Client.add_role_to_instance_profile)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iam.add_role_to_instance_profile(\n", " InstanceProfileName=instance_profile_name,\n", " RoleName=batch_instance_role\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create Security Group](https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html)\n", "\n", "\n", "A security group acts as a virtual firewall for your instance to control inbound and outbound traffic. When you launch an instance in a VPC, you can assign up to five security groups to the instance. Security groups act at the instance level, not the subnet level. Therefore, each instance in a subnet in your VPC could be assigned to a different set of security groups. If you don't specify a particular group at launch time, the instance is automatically assigned to the default security group for the VPC.\n", "\n", "[ec2_client.create_security_group](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2.html#EC2.Client.create_security_group) boto3 documentation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try:\n", " sg = ec2_client.create_security_group(\n", " Description='security group for Compute Environment',\n", " GroupName=batch_sec_group_name,\n", " VpcId=vpc_id\n", " )\n", " batch_sec_group_id=sg[\"GroupId\"]\n", "except ClientError as e:\n", " if e.response['Error']['Code'] == 'InvalidGroup.Duplicate':\n", " print(\"SG already exists, \")\n", " resp = ec2_client.describe_security_groups(Filters=[dict(Name='group-name', Values=[batch_sec_group_name])])\n", " batch_sec_group_id = resp['SecurityGroups'][0]['GroupId']\n", " \n", "print('Batch security group id - ' + batch_sec_group_name)\n", "print(batch_sec_group_id)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create the Batch Environment](https://docs.opendata.aws/genomics-workflows/aws-batch/configure-aws-batch-cfn/)\n", "\n", "We will create the required AWS Batch environment for genomics workflows in the next few cells. This will be used to submit job requests to for the FastQC container.\n", "\n", "[batch.create_compute_environment](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html#Batch.Client.create_compute_environment)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_compute_environment(computeEnvironmentName, computeType, unitVCpus, imageId, serviceRole, instanceRole,\n", " subnets, securityGroups, bidPercentage=None, spotFleetRole=None):\n", " \n", " compute_resources = {\n", " 'type': computeType,\n", " 'imageId': imageId,\n", " 'minvCpus': unitVCpus * 1,\n", " 'maxvCpus': unitVCpus * 16,\n", " 'desiredvCpus': unitVCpus * 1,\n", " 'instanceTypes': ['optimal'],\n", " 'subnets': subnets,\n", " 'securityGroupIds': securityGroups,\n", " 'instanceRole': instanceRole\n", " }\n", " \n", " if computeType == 'SPOT':\n", " compute_resources = {\n", " 'type': computeType,\n", " 'imageId': imageId,\n", " 'minvCpus': unitVCpus * 1,\n", " 'maxvCpus': unitVCpus * 16,\n", " 'desiredvCpus': unitVCpus * 1,\n", " 'instanceTypes': ['optimal'],\n", " 'subnets': subnets,\n", " 'securityGroupIds': securityGroups,\n", " 'instanceRole': instanceRole,\n", " 'bidPercentage': bidPercentage,\n", " 'spotIamFleetRole': spotFleetRole,\n", " }\n", " \n", " response = batch.create_compute_environment(\n", " computeEnvironmentName=computeEnvironmentName,\n", " type='MANAGED',\n", " serviceRole=serviceRole,\n", " computeResources=compute_resources\n", " )\n", "\n", " while True:\n", " describe = batch.describe_compute_environments(computeEnvironments=[computeEnvironmentName])\n", " computeEnvironment = describe['computeEnvironments'][0]\n", " status = computeEnvironment['status']\n", " if status == 'VALID':\n", " print('\\rSuccessfully created compute environment {}'.format(computeEnvironmentName))\n", " break\n", " elif status == 'INVALID':\n", " reason = computeEnvironment['statusReason']\n", " raise Exception('Failed to create compute environment: {}'.format(reason))\n", " print('\\rCreating compute environment...')\n", " time.sleep(5)\n", " \n", " return response" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get Latest [Amazon Linux AMI](https://aws.amazon.com/amazon-linux-ami/)\n", "\n", "The Amazon Linux 2 AMI is a supported and maintained Linux image provided by Amazon Web Services for use on Amazon Elastic Compute Cloud (Amazon EC2). It is designed to provide a stable, secure, and high performance execution environment for applications running on Amazon EC2. It supports the latest EC2 instance type features and includes packages that enable easy integration with AWS. Amazon Web Services provides ongoing security and maintenance updates to all instances running the Amazon Linux AMI. The Amazon Linux AMI is provided at no additional charge to Amazon EC2 users. The specific AMI we are using is teh ECS optimized version that is needed for AWS Batch." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = ssm.get_parameters(Names=['/aws/service/ecs/optimized-ami/amazon-linux-2/recommended'])\n", "ami = json.loads(response['Parameters'][0]['Value'])['image_id']\n", "print(ami)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create Batch Compute Environment](https://docs.aws.amazon.com/batch/latest/userguide/compute_environments.html)\n", "\n", "Compute environments contain the Amazon ECS container instances that are used to run containerized batch jobs. A given compute environment can also be mapped to one or many job queues. Within a job queue, the associated compute environments each have an order that is used by the scheduler to determine where to place jobs that are ready to be executed.\n", "\n", "[batch.create_compute_environment](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html#Batch.Client.create_compute_environment)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "security_groups = [batch_sec_group_id]\n", "\n", "resp = create_compute_environment(default_env, 'SPOT', desired_cpu, ami, batch_service_role_arn, instance_profile_name, \\\n", " [subnet1_id], security_groups, bid_percentage, batch_spot_role_arn)\n", "\n", "default_ce_arn = resp['computeEnvironmentArn']\n", "default_ce = resp['computeEnvironmentName']\n", "print(default_ce_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create the AWS Batch Job Queue](https://docs.aws.amazon.com/batch/latest/userguide/create-job-queue.html)\n", "\n", "Jobs are submitted to a job queue, where they reside until they are able to be scheduled to run in a compute environment. An AWS account can have multiple job queues. For example, you might create a queue that uses Amazon EC2 On-Demand instances for high priority jobs and another queue that uses Amazon EC2 Spot Instances for low-priority jobs. Job queues have a priority that is used by the scheduler to determine which jobs in which queue should be evaluated for execution first.\n", "\n", "[batch.create_job_queue](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html#Batch.Client.create_job_queue)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def create_job_queue(computeEnvironmentName, workshopUser, priority):\n", " jobQueueName = computeEnvironmentName + '_queue'\n", " response = batch.create_job_queue(jobQueueName=jobQueueName,\n", " priority=priority,\n", " computeEnvironmentOrder=[\n", " {\n", " 'order': 1,\n", " 'computeEnvironment': computeEnvironmentName\n", " }\n", " ])\n", "\n", " while True:\n", " describe = batch.describe_job_queues(jobQueues=[jobQueueName])\n", " jobQueue = describe['jobQueues'][0]\n", " status = jobQueue['status']\n", " if status == 'VALID':\n", " print('\\rSuccessfully created job queue {}'.format(jobQueueName))\n", " break\n", " elif status == 'INVALID':\n", " reason = jobQueue['statusReason']\n", " raise Exception('Failed to create job queue: {}'.format(reason))\n", " print('\\rCreating job queue... ')\n", " time.sleep(5)\n", "\n", " return response" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "resp = create_job_queue(default_env, workshop_user, 1)\n", "fastq_queue_arn = resp['jobQueueArn']\n", "fastq_queue = resp['jobQueueName']\n", "print(fastq_queue_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### [Create AWS Batch Job Definition](https://docs.aws.amazon.com/batch/latest/userguide/create-job-definition.html)\n", "\n", "AWS Batch job definitions specify how jobs are to be run. While each job must reference a job definition, many of the parameters that are specified in the job definition can be overridden at runtime.\n", "\n", "[batch.register_job_definition](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html#Batch.Client.register_job_definition)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job_def = batch.register_job_definition(\n", " jobDefinitionName=job_def_name,\n", " type='container',\n", " parameters={\n", " 'InputFile': 's3://1000genomes/phase3/data/NA21144/sequence_read/ERR047877.filt.fastq.gz'\n", " },\n", " containerProperties={\n", " 'image': registry_with_name,\n", " 'vcpus': 1,\n", " 'memory': 512,\n", " 'command': [\n", " 'fastqc.sh', \n", " 'Ref::InputFile'\n", " ],\n", " 'jobRoleArn': batch_task_role_arn\n", " }\n", ")\n", "\n", "print(job_def)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Submit Job from the console\n", "\n", "We will use the console to submit the job, but you can also use the CLI and SDKs. On the link below click `Create Job` and fill in the parameters below:\n", "\n", "* Job name: `FASTQCJobDemo`\n", "* Job definition: `fastqc_demo_job:1`\n", "* Job queue: `DefaultFastQCEnvironment_queue`\n", "* Job type: `Single`\n", "* Job attempts: `1`\n", "\n", "All other settings can be left as the defaults. For the parameters:\n", "\n", "* InputFile: `s3://1000genomes/phase3/data/NA21144/sequence_read/ERR047877.filt.fastq.gz`\n", "\n", "And finally, for the command it should be populated but should contain:\n", "\n", "* `fastqc.sh Ref::InputFile` in the space delimited section." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('https://{0}.console.aws.amazon.com/batch/home?region={0}#/jobs/new'.format(region))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View results of FastQC job " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('https://s3.{0}.amazonaws.com/{1}/ERR047877.filt_fastqc.html'.format(region, bucket))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Submit multiple files from the CLI\n", "\n", "You can use boto3 api or batch CLI to submit the jobs\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "files_to_process = ['ERR047877', 'ERR047878', 'ERR047879', 'ERR048950', 'ERR048951', 'ERR048952', 'ERR251691', 'ERR251692']\n", "for fn in files_to_process:\n", " print(f\"Submit {fn}\")\n", " sb_resp = batch.submit_job(jobName=f\"FastQC-API-{fn}\", \n", " jobQueue=fastq_queue, \n", " jobDefinition=job_def_name, \n", " parameters={'InputFile':f\"s3://1000genomes/phase3/data/NA21144/sequence_read/{fn}.filt.fastq.gz\"})\n", " print(sb_resp['jobName'], sb_resp['jobId'] )" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws batch submit-job --job-name FastQC-CLI1 --job-queue $fastq_queue --job-definition $job_def_name --parameters InputFile=s3://1000genomes/phase3/data/NA21144/sequence_read/ERR047877.filt.fastq.gz\n", "!aws batch submit-job --job-name FastQC-CLI2 --job-queue $fastq_queue --job-definition $job_def_name --parameters InputFile=s3://1000genomes/phase3/data/NA21144/sequence_read/ERR047878.filt.fastq.gz\n", "!aws batch submit-job --job-name FastQC-CLI3 --job-queue $fastq_queue --job-definition $job_def_name --parameters InputFile=s3://1000genomes/phase3/data/NA21144/sequence_read/ERR047879.filt.fastq.gz\n", "!aws batch submit-job --job-name FastQC-CLI4 --job-queue $fastq_queue --job-definition $job_def_name --parameters InputFile=s3://1000genomes/phase3/data/NA21144/sequence_read/ERR048950.filt.fastq.gz\n", "!aws batch submit-job --job-name FastQC-CLI5 --job-queue $fastq_queue --job-definition $job_def_name --parameters InputFile=s3://1000genomes/phase3/data/NA21144/sequence_read/ERR048951.filt.fastq.gz\n", "!aws batch submit-job --job-name FastQC-CLI6 --job-queue $fastq_queue --job-definition $job_def_name --parameters InputFile=s3://1000genomes/phase3/data/NA21144/sequence_read/ERR048952.filt.fastq.gz\n", "!aws batch submit-job --job-name FastQC-CLI7 --job-queue $fastq_queue --job-definition $job_def_name --parameters InputFile=s3://1000genomes/phase3/data/NA21144/sequence_read/ERR251691.filt.fastq.gz\n", "!aws batch submit-job --job-name FastQC-CLI8 --job-queue $fastq_queue --job-definition $job_def_name --parameters InputFile=s3://1000genomes/phase3/data/NA21144/sequence_read/ERR251692.filt.fastq.gz" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Monitor results of the jobs in the AWS Batch Dashboard" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('https://{0}.console.aws.amazon.com/batch/home?region={0}#/dashboard'.format(region))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleanup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def delete_compute_environment(computeEnvironment):\n", " response = batch.update_compute_environment(\n", " computeEnvironment=computeEnvironment,\n", " state='DISABLED',\n", " )\n", " \n", " while True:\n", " response = batch.describe_compute_environments(\n", " computeEnvironments=[computeEnvironment])\n", " assert len(response['computeEnvironments']) == 1\n", " env = response['computeEnvironments'][0]\n", " state = env['state']\n", " status = env['status']\n", " if status == 'UPDATING':\n", " print(\"Environment %r is updating, waiting...\" % (computeEnvironment,))\n", "\n", " elif state == 'DISABLED':\n", " break\n", "\n", " else:\n", " raise RuntimeError('Expected status=UPDATING or state=DISABLED, '\n", " 'but status=%r and state=%r' % (status, state))\n", "\n", " # wait a little bit before checking again.\n", " time.sleep(15)\n", " \n", " response = batch.delete_compute_environment(\n", " computeEnvironment=computeEnvironment\n", " )\n", "\n", " time.sleep(5)\n", " response = describe_compute_environments([computeEnvironment])\n", " \n", " while response['computeEnvironments'][0]['status'] == 'DELETING':\n", " time.sleep(5)\n", " response = describe_compute_environments([computeEnvironment])\n", " if len(response['computeEnvironments']) != 1:\n", " break\n", " \n", " return response\n", "\n", "\n", "def describe_compute_environments(compute_envs):\n", " try:\n", " response = batch.describe_compute_environments(\n", " computeEnvironments=compute_envs,\n", " )\n", " except ClientError as e:\n", " print(e.response['Error']['Message'])\n", " raise\n", "\n", " return response\n", "\n", "\n", "def delete_job_queue(job_queue):\n", " job_queues = [job_queue]\n", " response = describe_job_queues(job_queues)\n", " \n", " if response['jobQueues'][0]['state'] != 'DISABLED':\n", " try:\n", " batch.update_job_queue(\n", " jobQueue=job_queue,\n", " state='DISABLED'\n", " )\n", " except ClientError as e:\n", " print(e.response['Error']['Message'])\n", " raise\n", "\n", " terminate_jobs(job_queue)\n", "\n", " # Wait until job queue is DISABLED\n", " response = describe_job_queues(job_queues)\n", "\n", " while response['jobQueues'][0]['state'] != 'DISABLED':\n", " time.sleep(5)\n", " response = describe_job_queues(job_queues)\n", " \n", " time.sleep(10)\n", " if response['jobQueues'][0]['status'] != 'DELETING':\n", " try:\n", " batch.delete_job_queue(\n", " jobQueue=job_queue,\n", " )\n", " except ClientError as e:\n", " print(e.response['Error']['Message'])\n", " raise\n", "\n", " response = describe_job_queues(job_queues)\n", " \n", " while response['jobQueues'][0]['status'] == 'DELETING':\n", " time.sleep(5)\n", " response = describe_job_queues(job_queues)\n", "\n", " if len(response['jobQueues']) != 1:\n", " break\n", "\n", "\n", "def describe_job_queues(job_queues):\n", " try:\n", " response = batch.describe_job_queues(\n", " jobQueues=job_queues\n", " )\n", " except ClientError as e:\n", " print(e.response['Error']['Message'])\n", " raise\n", "\n", " return response\n", "\n", "\n", "def terminate_jobs(job_queue):\n", " response = list_jobs(job_queue)\n", " for job in response['jobSummaryList']:\n", " batch.terminate_job(\n", " jobId =job['jobId'],\n", " reason='Removing Batch Environment'\n", " )\n", " while response.get('nextToken', None) is not None:\n", " response = list_jobs(job_queue, response['nextToken'])\n", " for job in response['jobSummaryList']:\n", " batch.terminate_job(\n", " jobId =job['jobId'],\n", " reason='Removing Batch Environment'\n", " )\n", "\n", "\n", "def list_jobs(job_queue, next_token=\"\"):\n", " try:\n", " if next_token:\n", " response = batch.list_jobs(\n", " jobQueue=job_queue,\n", " nextToken=next_token\n", " )\n", " else:\n", " response = batch.list_jobs(\n", " jobQueue=job_queue,\n", " )\n", " except ClientError as e:\n", " print(e.response['Error']['Message'])\n", " raise\n", "\n", " return response" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(job_def_name)\n", "response = batch.deregister_job_definition(jobDefinition=job_def_name+':1')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "resp = delete_job_queue(fastq_queue)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "resp = delete_compute_environment(default_ce)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = ec2_client.delete_security_group(GroupId=batch_sec_group_id)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = iam.remove_role_from_instance_profile(\n", " InstanceProfileName=instance_profile_name,\n", " RoleName=batch_instance_role\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = iam.delete_instance_profile(\n", " InstanceProfileName=instance_profile_name\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = cfn.delete_stack(StackName=iam_stack_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "waiter = cfn.get_waiter('stack_delete_complete')\n", "waiter.wait(\n", " StackName=iam_stack_name\n", ")\n", "\n", "print('The wait is over for {0}'.format(iam_stack_name))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = ecr.delete_repository(\n", " registryId=account_id,\n", " repositoryName=repo,\n", " force=True\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 rb s3://$bucket --force " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not use_existing:\n", " workshop.vpc_cleanup(vpc_id)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_python3", "language": "python", "name": "conda_python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 4 }