package software.amazon.redshift.clusterparametergroup; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.services.redshift.RedshiftClient; import software.amazon.awssdk.services.redshift.model.ClusterParameterGroupNotFoundException; import software.amazon.awssdk.services.redshift.model.CreateTagsResponse; import software.amazon.awssdk.services.redshift.model.DescribeClusterParametersRequest; import software.amazon.awssdk.services.redshift.model.DescribeClusterParametersResponse; import software.amazon.awssdk.services.redshift.model.DescribeTagsRequest; import software.amazon.awssdk.services.redshift.model.DescribeTagsResponse; import software.amazon.awssdk.services.redshift.model.InvalidClusterParameterGroupStateException; import software.amazon.awssdk.services.redshift.model.InvalidClusterStateException; import software.amazon.awssdk.services.redshift.model.InvalidTagException; import software.amazon.awssdk.services.redshift.model.ModifyClusterParameterGroupRequest; import software.amazon.awssdk.services.redshift.model.ModifyClusterParameterGroupResponse; import software.amazon.awssdk.services.redshift.model.ResetClusterParameterGroupRequest; import software.amazon.awssdk.services.redshift.model.ResetClusterParameterGroupResponse; import software.amazon.awssdk.services.redshift.model.ResourceNotFoundException; import software.amazon.cloudformation.proxy.AmazonWebServicesClientProxy; import software.amazon.cloudformation.proxy.HandlerErrorCode; import software.amazon.cloudformation.proxy.Logger; import software.amazon.cloudformation.proxy.OperationStatus; import software.amazon.cloudformation.proxy.ProgressEvent; import software.amazon.cloudformation.proxy.ProxyClient; import software.amazon.cloudformation.proxy.ResourceHandlerRequest; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; public class UpdateHandler extends BaseHandlerStd { public static final String NEED_TO_BE_RESET = "needToBeReset"; private static final String WLM_JSON_CONFIGURATION = "wlm_json_configuration"; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private Logger logger; protected ProgressEvent handleRequest( final AmazonWebServicesClientProxy proxy, final ResourceHandlerRequest request, final CallbackContext callbackContext, final ProxyClient proxyClient, final Logger logger) { this.logger = logger; final String resourceName = String.format("arn:%s:redshift:%s:%s:parametergroup:%s", request.getAwsPartition(), request.getRegion(), request.getAwsAccountId(), request.getDesiredResourceState().getParameterGroupName()); return ProgressEvent.progress(request.getDesiredResourceState(), callbackContext) .then(progress -> proxy.initiate(String.format("%s::Update::ReadTags", CALL_GRAPH_TYPE_NAME), proxyClient, progress.getResourceModel(), progress.getCallbackContext()) .translateToServiceRequest(resourceModel -> Translator.translateToReadTagsRequest(resourceName)) .makeServiceCall(this::readTags) .handleError(this::operateTagsErrorHandler) .done((tagsRequest, tagsResponse, client, model, context) -> ProgressEvent.builder() .callbackContext(callbackContext) .callbackDelaySeconds(0) .resourceModel(Translator.translateFromReadTagsResponse(tagsResponse)) .status(OperationStatus.IN_PROGRESS) .build())) .then(progress -> proxy.initiate(String.format("%s::Update::UpdateTags", CALL_GRAPH_TYPE_NAME), proxyClient, progress.getResourceModel(), progress.getCallbackContext()) .translateToServiceRequest(resourceModel -> Translator.translateToUpdateTagsRequest(request.getDesiredResourceState(), resourceModel, resourceName)) .makeServiceCall(this::updateTags) .handleError(this::operateTagsErrorHandler) .done((tagsRequest, tagsResponse, client, model, context) -> ProgressEvent.builder() .callbackContext(callbackContext) .callbackDelaySeconds(0) .resourceModel(request.getDesiredResourceState()) .status(OperationStatus.IN_PROGRESS) .build())) .then(progress -> proxy.initiate(String.format("%s::Update::ReadParameters", CALL_GRAPH_TYPE_NAME), proxyClient, progress.getResourceModel(), progress.getCallbackContext()) .translateToServiceRequest(Translator::translateToReadParametersRequest) .makeServiceCall(this::describeClusterParameters) .handleError(this::describeClusterParametersErrorHandler) .done((readRequest, readResponse, client, model, context) -> ProgressEvent.builder() .callbackContext(context) .callbackDelaySeconds(0) .resourceModel(getUpdatableResourceModel(model, Translator.translateFromReadParametersResponse(readResponse, model))) .status(OperationStatus.IN_PROGRESS) .build())) .then(progress -> proxy.initiate(String.format("%s::Update::ResetParameters", CALL_GRAPH_TYPE_NAME), proxyClient, progress.getResourceModel(), progress.getCallbackContext()) .translateToServiceRequest(Translator::translateToResetRequest) .makeServiceCall(this::resetClusterParameterGroup) .handleError(this::resetClusterParameterGroupErrorHandler) .progress()) .then(progress -> proxy.initiate(String.format("%s::Update::UpdateParameters", CALL_GRAPH_TYPE_NAME), proxyClient, progress.getResourceModel(), progress.getCallbackContext()) .translateToServiceRequest(Translator::translateToUpdateRequest) .makeServiceCall(this::modifyClusterParameterGroup) .handleError(this::modifyClusterParameterGroupErrorHandler) .progress()) .then(progress -> new ReadHandler().handleRequest(proxy, request, callbackContext, proxyClient, logger)); } /* compares the desired parameters and the previous parameters, calculates which parameters need to be reset (value set to NEED_TO_BE_RESET), and which parameter values need to be updated */ private ResourceModel getUpdatableResourceModel(ResourceModel desiredModel, ResourceModel previousModel) { logger.log("DesiredModel parameters: " + desiredModel.getParameters() + "\nPreviousModel parameters: " + previousModel.getParameters()); Function, List> lowerCaseParameterName = (raw) -> Optional.ofNullable(raw) .map(parameters -> parameters .stream() .map(parameter -> Parameter.builder() .parameterName(StringUtils.lowerCase(parameter.getParameterName())) .parameterValue(parameter.getParameterValue()) .build()) .collect(Collectors.toList())) .orElse(Collections.emptyList()); List desiredParameters = lowerCaseParameterName.apply(desiredModel.getParameters()); List previousParameters = lowerCaseParameterName.apply(previousModel.getParameters()); /* like before, we assume there's no duplicated parameters like [{key1: value1}, {key1: value2}] */ Function, Map> paramsToMap = list -> list.stream() .collect(Collectors.toMap(Parameter::getParameterName, Parameter::getParameterValue)); Map desiredParamKeyValueMap = paramsToMap.apply(desiredParameters); Map previousParamKeyValueMap = paramsToMap.apply(previousParameters); /* previousParameters [ { "auto_analyze": true }, { "date_style": "ISO, MDY" }, { "wlm_json_configuration": "[{key1: value1}]" } ] desiredParameters [ { "statement_timeout": 1000 }, { "date_style": "ISO, MDY" }, // value stays the same, will be ignored, { "wlm_json_configuration": "[{key1: value1}]" } // needs to `JSON` compare with previous wlm_json_configuration (see code) ] updatableParameters [ { "auto_analyze": NEED_TO_BE_RESET }, // exists in previous not in desired, value will be set to NEED_TO_BE_RESET ] */ List updatableParameters = CollectionUtils.disjunction(desiredParameters, previousParameters) .stream() .filter(parameter -> { /* We only specially handle when wlm_json_configuration exists in both previous and desired parameters. When both previous and desired parameters contain wlm_json_configuration, we'll need to first sanitize both wlm_json_configurations (stringified JSON) then check if they're equal. For example, [{key:value}] is the same as [ {key: value}] When only previous parameters contains wlm_json_configuration, it means it needs to be reset. When only desired parameters contains wlm_json_configuration, we only need to apply the new one. */ if (parameter.getParameterName().equals(WLM_JSON_CONFIGURATION) && desiredParamKeyValueMap.containsKey(WLM_JSON_CONFIGURATION) && previousParamKeyValueMap.containsKey(WLM_JSON_CONFIGURATION) ) { try { final String desiredWlm = getSanitizedString(desiredParamKeyValueMap.get(WLM_JSON_CONFIGURATION)); final String previousWlm = getSanitizedString(previousParamKeyValueMap.get(WLM_JSON_CONFIGURATION)); /* only when their sanitized values don't match, we update */ return !desiredWlm.equals(previousWlm); } catch (JsonProcessingException e) { /* ignore this exception for now to be consistent with existing behavior, this invalid JSON will fail modifyParameterGroup API's JSON validation. When I get a chance to verify the behaviors we should for sure throw invalid JSON exception as soon as we can: throw new CfnInvalidRequestException(e); */ return true; // true meaning we'll modify wlm_json_configuration using the new one } } // don't filter out any parameters by default return true; }) .map(parameter -> Parameter.builder() .parameterName(parameter.getParameterName()) .parameterValue(desiredParamKeyValueMap.getOrDefault(parameter.getParameterName(), NEED_TO_BE_RESET)) .build()) /* needs distinct because doing the map operation above can introduce the same Parameter, this happens when the same parameter value changes, in which this parameter key exists in both previous and desired parameters, but w/ diff values, after map() we'll have 2 parameters, whose values are all set to desired parameter's value */ .distinct() .collect(Collectors.toList()); return desiredModel.toBuilder() .parameters(updatableParameters) .build(); } private String getSanitizedString(final String str) throws JsonProcessingException { return OBJECT_MAPPER.readValue(str, JsonNode.class).toString(); } private DescribeClusterParametersResponse describeClusterParameters(final DescribeClusterParametersRequest awsRequest, final ProxyClient proxyClient) { DescribeClusterParametersResponse awsResponse; awsResponse = proxyClient.injectCredentialsAndInvokeV2(awsRequest, proxyClient.client()::describeClusterParameters); logger.log(String.format("%s's Parameters has successfully been read.", ResourceModel.TYPE_NAME)); if (awsResponse.hasParameters()) { for (software.amazon.awssdk.services.redshift.model.Parameter param : awsResponse.parameters()) { logger.log("Parameter from describeClusterParameter: " + param.toString()); } } return awsResponse; } private ProgressEvent describeClusterParametersErrorHandler(final DescribeClusterParametersRequest awsRequest, final Exception exception, final ProxyClient client, final ResourceModel model, final CallbackContext context) { if (exception instanceof ClusterParameterGroupNotFoundException) { return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.NotFound); } else { return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.GeneralServiceException); } } private DescribeTagsResponse readTags(final DescribeTagsRequest awsRequest, final ProxyClient proxyClient) { DescribeTagsResponse awsResponse; awsResponse = proxyClient.injectCredentialsAndInvokeV2(awsRequest, proxyClient.client()::describeTags); logger.log(String.format("%s's tags have successfully been read.", ResourceModel.TYPE_NAME)); return awsResponse; } private CreateTagsResponse updateTags(final ModifyTagsRequest awsRequest, final ProxyClient proxyClient) { CreateTagsResponse awsResponse = null; if (awsRequest.getDeleteOldTagsRequest().tagKeys().isEmpty()) { logger.log(String.format("No tags would be deleted for the resource: %s.", ResourceModel.TYPE_NAME)); } else { proxyClient.injectCredentialsAndInvokeV2(awsRequest.getDeleteOldTagsRequest(), proxyClient.client()::deleteTags); logger.log(String.format("Delete tags for the resource: %s.", ResourceModel.TYPE_NAME)); } if (awsRequest.getCreateNewTagsRequest().tags().isEmpty()) { logger.log(String.format("No tags would be created for the resource: %s.", ResourceModel.TYPE_NAME)); } else { awsResponse = proxyClient.injectCredentialsAndInvokeV2(awsRequest.getCreateNewTagsRequest(), proxyClient.client()::createTags); logger.log(String.format("Create tags for the resource: %s.", ResourceModel.TYPE_NAME)); } return awsResponse; } private ProgressEvent operateTagsErrorHandler(final Object awsRequest, final Exception exception, final ProxyClient client, final ResourceModel model, final CallbackContext context) { if (exception instanceof ResourceNotFoundException) { return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.NotFound); } else if (exception instanceof InvalidTagException || exception instanceof InvalidClusterStateException) { return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.InvalidRequest); } else { return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.GeneralServiceException); } } private ResetClusterParameterGroupResponse resetClusterParameterGroup(final ResetClusterParameterGroupRequest awsRequest, final ProxyClient proxyClient) { return Optional.of(awsRequest) .filter(r -> !CollectionUtils.isEmpty(r.parameters())) .map(r -> { ResetClusterParameterGroupResponse awsResponse; awsResponse = proxyClient.injectCredentialsAndInvokeV2(r, proxyClient.client()::resetClusterParameterGroup); logger.log(String.format("%s's Parameters has successfully been reset.", ResourceModel.TYPE_NAME)); return awsResponse; }) .orElseGet(() -> { logger.log(String.format("%s's Parameters has nothing to be reset.", ResourceModel.TYPE_NAME)); return ResetClusterParameterGroupResponse.builder().build(); }); } private ProgressEvent resetClusterParameterGroupErrorHandler(final ResetClusterParameterGroupRequest awsRequest, final Exception exception, final ProxyClient client, final ResourceModel model, final CallbackContext context) { if (exception instanceof ClusterParameterGroupNotFoundException) { return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.NotFound); } else if (exception instanceof InvalidClusterParameterGroupStateException) { return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.ResourceConflict); } else { return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.GeneralServiceException); } } private ModifyClusterParameterGroupResponse modifyClusterParameterGroup(final ModifyClusterParameterGroupRequest awsRequest, final ProxyClient proxyClient) { return Optional.of(awsRequest) .filter(r -> !CollectionUtils.isEmpty(r.parameters())) .map(r -> { ModifyClusterParameterGroupResponse awsResponse; awsResponse = proxyClient.injectCredentialsAndInvokeV2(r, proxyClient.client()::modifyClusterParameterGroup); logger.log(String.format("%s's Parameters has successfully been updated.", ResourceModel.TYPE_NAME)); return awsResponse; }) .orElseGet(() -> { logger.log(String.format("%s's Parameters has nothing to be updated.", ResourceModel.TYPE_NAME)); return ModifyClusterParameterGroupResponse.builder().build(); }); } private ProgressEvent modifyClusterParameterGroupErrorHandler(final ModifyClusterParameterGroupRequest awsRequest, final Exception exception, final ProxyClient client, final ResourceModel model, final CallbackContext context) { if (exception instanceof ClusterParameterGroupNotFoundException) { return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.NotFound); } else if (exception instanceof InvalidClusterParameterGroupStateException) { return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.ResourceConflict); } else { return ProgressEvent.defaultFailureHandler(exception, HandlerErrorCode.GeneralServiceException); } } }