--- description: "(Operations Conductor) Adjusts the throughput on a DynamoDB table or its global secondary indexes" schemaVersion: "0.3" assumeRole: "%%OperationsConductorSharedRoleArn%%" parameters: SQSMsgBody: type: "String" description: "JSON Stringified version of the message body that was read off the Resource Queue" SQSMsgReceiptHandle: type: "String" description: "Receipt handle of the SQS message that was read off the queue" TargetResourceType: type: "String" description: "The AWS resource type for which this automation applies. The format of this value should be: service:resourceType" default: "dynamodb:table" TableReadCapacityUnits: type: "String" description: "(Optional) Read capacity for the table" default: "" TableWriteCapacityUnits: type: "String" description: "(Optional) Write capacity for the table" default: "" GlobalSecondaryIndexName: type: "String" description: "(Optional) Name for the Global Secondary Index to update. If provided, either GlobalSecondaryIndexReadCapacityUnits or GlobalSecondaryIndexWriteCapacityUnits must be supplied" default: "" GlobalSecondaryIndexReadCapacityUnits: type: "String" description: "(Optional) Read capacity for the Global Secondary Index" default: "" GlobalSecondaryIndexWriteCapacityUnits: type: "String" description: "(Optional) Write capacity for the Global Secondary Index" default: "" mainSteps: - name: "VALIDATE_MSG_CONTENTS" action: aws:executeScript timeoutSeconds: 30 description: "Validates contents of the message from the Resource Queue and parses out parameters" inputs: Runtime: python3.6 Handler: script_handler InputPayload: SQSMsgBody: "{{ SQSMsgBody }}" TableReadCapacityUnits: "{{ TableReadCapacityUnits }}" TableWriteCapacityUnits: "{{ TableWriteCapacityUnits }}" GlobalSecondaryIndexName: "{{ GlobalSecondaryIndexName }}" GlobalSecondaryIndexReadCapacityUnits: "{{ GlobalSecondaryIndexReadCapacityUnits }}" GlobalSecondaryIndexWriteCapacityUnits: "{{ GlobalSecondaryIndexWriteCapacityUnits }}" Script: |- import boto3 import json def script_handler(events, context): output = { "statusCode": 200 } sqs_msg_body = json.loads(events["SQSMsgBody"]) if "ResourceId" not in sqs_msg_body: raise Exception("ResourceId was not found in the SQS Message Body.") output["resource_id"] = sqs_msg_body["ResourceId"] if "ResourceRegion" not in sqs_msg_body: raise Exception("ResourceRegion was not found in the SQS Message Body.") output["source_region"] = sqs_msg_body["ResourceRegion"] if "ResourceAccount" not in sqs_msg_body: raise Exception("ResourceAccount was not found in the SQS Message Body.") output["source_account_id"] = sqs_msg_body["ResourceAccount"] if "TargetTag" not in sqs_msg_body: raise Exception("TargetTag was not found in the SQS Message Body.") output["target_tag_name"] = sqs_msg_body["TargetTag"] if "TaskId" not in sqs_msg_body: raise Exception("TaskId was not found in the SQS Message Body.") output["task_id"] = sqs_msg_body["TaskId"] if "ParentExecutionId" not in sqs_msg_body: raise Exception("ParentExecutionId was not found in the SQS Message Body.") output["parent_execution_id"] = sqs_msg_body["ParentExecutionId"] output["table_read_capacity_units"] = "" if events["TableReadCapacityUnits"].strip() != "": output["table_read_capacity_units"] = int(events["TableReadCapacityUnits"].strip()) if output["table_read_capacity_units"] < 1 or output["table_read_capacity_units"] > 40000: raise Exception("Invalid value for TableReadCapacityUnits. Must be between 1 and 40000") output["table_write_capacity_units"] = "" if events["TableWriteCapacityUnits"].strip() != "": output["table_write_capacity_units"] = int(events["TableWriteCapacityUnits"].strip()) if output["table_write_capacity_units"] < 1 or output["table_write_capacity_units"] > 40000: raise Exception("Invalid value for TableWriteCapacityUnits. Must be between 1 and 40000") output["gsi_name"] = "" if events["GlobalSecondaryIndexName"].strip() != "": output["gsi_name"] = events["GlobalSecondaryIndexName"].strip() output["gsi_read_capacity_units"] = "" if events["GlobalSecondaryIndexReadCapacityUnits"].strip() != "": output["gsi_read_capacity_units"] = int(events["GlobalSecondaryIndexReadCapacityUnits"].strip()) if output["gsi_read_capacity_units"] < 1 or output["gsi_read_capacity_units"] > 40000: raise Exception("Invalid value for GlobalSecondaryIndexReadCapacityUnits. Must be between 1 and 40000") output["gsi_write_capacity_units"] = "" if events["GlobalSecondaryIndexWriteCapacityUnits"].strip() != "": output["gsi_write_capacity_units"] = int(events["GlobalSecondaryIndexWriteCapacityUnits"].strip()) if output["gsi_write_capacity_units"] < 1 or output["gsi_write_capacity_units"] > 40000: raise Exception("Invalid value for GlobalSecondaryIndexWriteCapacityUnits. Must be between 1 and 40000") if output["gsi_name"] != "": if output["gsi_read_capacity_units"] == "" and output["gsi_write_capacity_units"] == "": raise Exception("A Global Secondary Index name was provided but neither Read nor Write Capacity Units were specified") if output["table_read_capacity_units"] == "" and output["table_write_capacity_units"] == "" and output["gsi_read_capacity_units"] == "" and output["gsi_write_capacity_units"] == "": raise Exception("No values to update were supplied") # Treat as strings so they are properly handled later in later steps output["table_read_capacity_units"] = str(output["table_read_capacity_units"]) output["table_write_capacity_units"] = str(output["table_write_capacity_units"]) output["gsi_read_capacity_units"] = str(output["gsi_read_capacity_units"]) output["gsi_write_capacity_units"] = str(output["gsi_write_capacity_units"]) return output outputs: - Name: "resource_id" Selector: "$.Payload.resource_id" Type: "String" - Name: "source_region" Selector: "$.Payload.source_region" Type: "String" - Name: "source_account_id" Selector: "$.Payload.source_account_id" Type: "String" - Name: "target_tag_name" Selector: "$.Payload.target_tag_name" Type: "String" - Name: "task_id" Selector: "$.Payload.task_id" Type: "String" - Name: "parent_execution_id" Selector: "$.Payload.parent_execution_id" Type: "String" - Name: "table_read_capacity_units" Selector: "$.Payload.table_read_capacity_units" Type: "String" - Name: "table_write_capacity_units" Selector: "$.Payload.table_write_capacity_units" Type: "String" - Name: "gsi_name" Selector: "$.Payload.gsi_name" Type: "String" - Name: "gsi_read_capacity_units" Selector: "$.Payload.gsi_read_capacity_units" Type: "String" - Name: "gsi_write_capacity_units" Selector: "$.Payload.gsi_write_capacity_units" Type: "String" - name: "CREATE_PERFORM_ACTION_AUTOMATION_EXECUTION_RECORD" action: aws:executeAwsApi timeoutSeconds: 30 description: "Creates a record of this automation execution in the Operations Conductor Automation Executions Table" inputs: { "Service": "dynamodb", "Api": "PutItem", "TableName": "%%AutomationExecutionsTableName%%", "Item": { "parentExecutionId": { "S": "{{ VALIDATE_MSG_CONTENTS.parent_execution_id }}" }, "automationExecutionId": { "S": "{{ automation:EXECUTION_ID }}" }, "status": { "S": "InProgress" } } } - name: "PERFORM_ACTION_ON_RESOURCE" action: aws:executeScript timeoutSeconds: 30 description: "Adjusts the provisioned throughput for the supplied DynamoDB table and/or Global Secondary Index" inputs: Runtime: python3.6 Handler: script_handler InputPayload: source_account_id: "{{ VALIDATE_MSG_CONTENTS.source_account_id }}" source_region: "{{ VALIDATE_MSG_CONTENTS.source_region }}" table_read_capacity_units: "{{ VALIDATE_MSG_CONTENTS.table_read_capacity_units }}" table_write_capacity_units: "{{ VALIDATE_MSG_CONTENTS.table_write_capacity_units }}" gsi_read_capacity_units: "{{ VALIDATE_MSG_CONTENTS.gsi_read_capacity_units }}" gsi_write_capacity_units: "{{ VALIDATE_MSG_CONTENTS.gsi_write_capacity_units }}" gsi_name: "{{ VALIDATE_MSG_CONTENTS.gsi_name }}" resource_id: "{{ VALIDATE_MSG_CONTENTS.resource_id }}" task_id: "{{ VALIDATE_MSG_CONTENTS.task_id }}" Script: |- import boto3 import json def script_handler(events, context): response = { "statusCode": 200, "next_step": "REMOVE_MSG_FROM_RESOURCE_QUEUE" } task_id = events["task_id"] source_account_id = events["source_account_id"] source_region = events["source_region"] table_read_capacity_units = None if events["table_read_capacity_units"] != "": table_read_capacity_units = int(events["table_read_capacity_units"]) table_write_capacity_units = None if events["table_write_capacity_units"] != "": table_write_capacity_units = int(events["table_write_capacity_units"]) gsi_read_capacity_units = None if events["gsi_read_capacity_units"] != "": gsi_read_capacity_units = int(events["gsi_read_capacity_units"]) gsi_write_capacity_units = None if events["gsi_write_capacity_units"] != "": gsi_write_capacity_units = int(events["gsi_write_capacity_units"]) # Assume role in source account sts_connection = boto3.client('sts') assumed_role = sts_connection.assume_role( RoleArn=f"arn:aws:iam::{source_account_id}:role/{source_account_id}-{source_region}-{task_id}", RoleSessionName="ops_conductor_set_dynamodb_capacity" ) dynamodb_client = boto3.client( 'dynamodb', region_name=source_region, aws_access_key_id=assumed_role['Credentials']['AccessKeyId'], aws_secret_access_key=assumed_role['Credentials']['SecretAccessKey'], aws_session_token=assumed_role['Credentials']['SessionToken'] ) print(f"Looking up Table Name { events['resource_id'] }") desc_table_response = dynamodb_client.describe_table(TableName=events['resource_id']) if "Table" not in desc_table_response: raise Exception(f"Table { events['resource_id'] } was not found") print(f"Table { events['resource_id'] } found") table = desc_table_response["Table"] if table["TableStatus"] != "ACTIVE": raise Exception("Table is not in an ACTIVE status. Exiting.") # If a GSI Name was supplied, verify that the index exists on the table global_secondary_index = None if events["gsi_name"] != "": for gsi in table["GlobalSecondaryIndexes"]: if gsi["IndexName"] == events["gsi_name"]: global_secondary_index = gsi break if global_secondary_index is None: raise Exception(f"Global Secondary Index ({ events['gsi_name'] }) was not found on this table.") if global_secondary_index["IndexStatus"] != "ACTIVE": raise Exception(f"Global Secondary Index ({ events['gsi_name'] }) is not in an ACTIVE status. Exiting.") update_params = { "TableName": events["resource_id"] } if table_read_capacity_units is not None or table_write_capacity_units is not None: new_table_throughput = { "ReadCapacityUnits": table["ProvisionedThroughput"]["ReadCapacityUnits"], "WriteCapacityUnits": table["ProvisionedThroughput"]["WriteCapacityUnits"] } found_different_table_throughput_value = False if table_read_capacity_units is not None and table_read_capacity_units != table["ProvisionedThroughput"]["ReadCapacityUnits"]: new_table_throughput["ReadCapacityUnits"] = table_read_capacity_units found_different_table_throughput_value = True print(f"Going to update the table's ReadCapacityUnits to {table_read_capacity_units}") if table_write_capacity_units is not None and table_write_capacity_units != table["ProvisionedThroughput"]["WriteCapacityUnits"]: new_table_throughput["WriteCapacityUnits"] = table_write_capacity_units found_different_table_throughput_value = True print(f"Going to update the table's WriteCapacityUnits to {table_write_capacity_units}") if found_different_table_throughput_value: update_params["ProvisionedThroughput"] = new_table_throughput if global_secondary_index is not None: gsi_updates = [ { "Update": { "IndexName": events["gsi_name"], "ProvisionedThroughput": { "ReadCapacityUnits": global_secondary_index["ProvisionedThroughput"]["ReadCapacityUnits"], "WriteCapacityUnits": global_secondary_index["ProvisionedThroughput"]["WriteCapacityUnits"] } } } ] found_different_gsi_throughput_value = False if gsi_read_capacity_units is not None and gsi_read_capacity_units != global_secondary_index["ProvisionedThroughput"]["ReadCapacityUnits"]: gsi_updates[0]["Update"]["ProvisionedThroughput"]["ReadCapacityUnits"] = gsi_read_capacity_units found_different_gsi_throughput_value = True print(f"Going to update GSI ({ events['gsi_name'] }) and set ReadCapacityUnits to { gsi_read_capacity_units }") if gsi_write_capacity_units is not None and gsi_write_capacity_units != global_secondary_index["ProvisionedThroughput"]["WriteCapacityUnits"]: gsi_updates[0]["Update"]["ProvisionedThroughput"]["WriteCapacityUnits"] = gsi_write_capacity_units found_different_gsi_throughput_value = True print(f"Going to update GSI ({ events['gsi_name'] }) and set WriteCapacityUnits to { gsi_write_capacity_units }") if found_different_gsi_throughput_value: update_params["GlobalSecondaryIndexUpdates"] = gsi_updates if "ProvisionedThroughput" not in update_params and "GlobalSecondaryIndexUpdates" not in update_params: print("After inspecting the table, did not find anything that needed to be updated. Exiting.") response["next_step"] = "NO_ACTION_PERFORMED_REMOVE_MSG_FROM_RESOURCE_QUEUE" else: print("Performing update on table") update_response = dynamodb_client.update_table(**update_params) print("Update successful") return response outputs: - Name: "next_step" Selector: "$.Payload.next_step" Type: "String" # Decision point for whether the action was performed and needs to be verified or whether # the action was skipped (i.e. was not needed to be performed) - name: "determineNextStep" action: "aws:branch" inputs: Choices: - NextStep: "REMOVE_MSG_FROM_RESOURCE_QUEUE" Variable: "{{ PERFORM_ACTION_ON_RESOURCE.next_step }}" StringEquals: "REMOVE_MSG_FROM_RESOURCE_QUEUE" - NextStep: "NO_ACTION_PERFORMED_REMOVE_MSG_FROM_RESOURCE_QUEUE" Variable: "{{ PERFORM_ACTION_ON_RESOURCE.next_step }}" StringEquals: "NO_ACTION_PERFORMED_REMOVE_MSG_FROM_RESOURCE_QUEUE" isEnd: true - name: "REMOVE_MSG_FROM_RESOURCE_QUEUE" action: "aws:executeAwsApi" inputs: { "Service": "sqs", "Api": "DeleteMessage", "QueueUrl": "%%ResourceQueueUrl%%", "ReceiptHandle": "{{ SQSMsgReceiptHandle }}" } - name: "UPDATE_AUTOMATION_EXECUTION_RECORD" action: aws:executeAwsApi timeoutSeconds: 30 description: "Updates the record of this automation execution in the Operations Conductor Automation Executions Table to mark it as successfully completed" inputs: { "Service": "dynamodb", "Api": "UpdateItem", "TableName": "%%AutomationExecutionsTableName%%", "Key": { "parentExecutionId": { "S": "{{ VALIDATE_MSG_CONTENTS.parent_execution_id }}" }, "automationExecutionId": { "S": "{{ automation:EXECUTION_ID }}" } }, "UpdateExpression": "SET #stat = :val1", "ExpressionAttributeNames": { "#stat": "status" }, "ExpressionAttributeValues": { ":val1": { "S": "Success" } } } - name: "UPDATE_TASK_EXECUTIONS_RECORD" action: aws:executeAwsApi timeoutSeconds: 30 description: "Updates the record for the overall execution of the Operations Conductor Task that spawned this automation" inputs: { "Service": "dynamodb", "Api": "UpdateItem", "TableName": "%%TaskExecutionsTableName%%", "Key": { "taskId": { "S": "{{ VALIDATE_MSG_CONTENTS.task_id }}" }, "parentExecutionId": { "S": "{{ VALIDATE_MSG_CONTENTS.parent_execution_id }}" } }, "UpdateExpression": "SET completedResourceCount = completedResourceCount + :incr, lastUpdateTime = :uptime", "ExpressionAttributeValues": { ":incr": { "N": "1" }, ":uptime": { "S": "{{ global:DATE_TIME }}" } } } - name: "CHECK_FOR_TASK_EXECUTION_COMPLETION" action: aws:executeAwsApi timeoutSeconds: 30 description: "Marks the overall task execution as Success if all resources have been successfully acted on" inputs: { "Service": "dynamodb", "Api": "UpdateItem", "TableName": "%%TaskExecutionsTableName%%", "Key": { "taskId": { "S": "{{ VALIDATE_MSG_CONTENTS.task_id }}" }, "parentExecutionId": { "S": "{{ VALIDATE_MSG_CONTENTS.parent_execution_id }}" } }, "UpdateExpression": "SET #s = :stat", "ConditionExpression": "completedResourceCount = totalResourceCount", "ExpressionAttributeNames": { "#s": "status" }, "ExpressionAttributeValues": { ":stat": { "S": "Success" } } } onFailure: Continue isEnd: true # Cleanup in case when the action was skipped (i.e. was not needed to be performed) - name: "NO_ACTION_PERFORMED_REMOVE_MSG_FROM_RESOURCE_QUEUE" action: "aws:executeAwsApi" inputs: { "Service": "sqs", "Api": "DeleteMessage", "QueueUrl": "%%ResourceQueueUrl%%", "ReceiptHandle": "{{ SQSMsgReceiptHandle }}" } - name: "NO_ACTION_PERFORMED_UPDATE_AUTOMATION_EXECUTION_RECORD" action: aws:executeAwsApi timeoutSeconds: 30 description: "Updates the record of this automation execution in the Operations Conductor Automation Executions Table to mark it as successfully completed" inputs: { "Service": "dynamodb", "Api": "UpdateItem", "TableName": "%%AutomationExecutionsTableName%%", "Key": { "parentExecutionId": { "S": "{{ VALIDATE_MSG_CONTENTS.parent_execution_id }}" }, "automationExecutionId": { "S": "{{ automation:EXECUTION_ID }}" } }, "UpdateExpression": "SET #stat = :val1", "ExpressionAttributeNames": { "#stat": "status" }, "ExpressionAttributeValues": { ":val1": { "S": "Success" } } } - name: "NO_ACTION_PERFORMED_UPDATE_TASK_EXECUTIONS_RECORD" action: aws:executeAwsApi timeoutSeconds: 30 description: "Updates the record for the overall execution of the Operations Conductor Task that spawned this automation" inputs: { "Service": "dynamodb", "Api": "UpdateItem", "TableName": "%%TaskExecutionsTableName%%", "Key": { "taskId": { "S": "{{ VALIDATE_MSG_CONTENTS.task_id }}" }, "parentExecutionId": { "S": "{{ VALIDATE_MSG_CONTENTS.parent_execution_id }}" } }, "UpdateExpression": "SET completedResourceCount = completedResourceCount + :incr, lastUpdateTime = :uptime", "ExpressionAttributeValues": { ":incr": { "N": "1" }, ":uptime": { "S": "{{ global:DATE_TIME }}" } } } - name: "NO_ACTION_PERFORMED_CHECK_FOR_TASK_EXECUTION_COMPLETION" action: aws:executeAwsApi timeoutSeconds: 30 description: "Marks the overall task execution as Success if all resources have been successfully acted on" inputs: { "Service": "dynamodb", "Api": "UpdateItem", "TableName": "%%TaskExecutionsTableName%%", "Key": { "taskId": { "S": "{{ VALIDATE_MSG_CONTENTS.task_id }}" }, "parentExecutionId": { "S": "{{ VALIDATE_MSG_CONTENTS.parent_execution_id }}" } }, "UpdateExpression": "SET #s = :stat", "ConditionExpression": "completedResourceCount = totalResourceCount", "ExpressionAttributeNames": { "#s": "status" }, "ExpressionAttributeValues": { ":stat": { "S": "Success" } } } onFailure: Continue isEnd: true