package software.amazon.msk.serverlesscluster; import software.amazon.awssdk.services.kafka.KafkaClient; import software.amazon.awssdk.services.kafka.model.ClusterState; import software.amazon.awssdk.services.kafka.model.ConflictException; import software.amazon.awssdk.services.kafka.model.CreateClusterV2Request; import software.amazon.awssdk.services.kafka.model.CreateClusterV2Response; import software.amazon.cloudformation.exceptions.CfnAlreadyExistsException; import software.amazon.cloudformation.exceptions.CfnNotStabilizedException; import software.amazon.cloudformation.proxy.AmazonWebServicesClientProxy; import software.amazon.cloudformation.proxy.Logger; import software.amazon.cloudformation.proxy.ProgressEvent; import software.amazon.cloudformation.proxy.ProxyClient; import software.amazon.cloudformation.proxy.ResourceHandlerRequest; public class CreateHandler extends BaseHandlerStd { private Logger logger; protected ProgressEvent<ResourceModel, CallbackContext> handleRequest( final AmazonWebServicesClientProxy proxy, final ResourceHandlerRequest<ResourceModel> request, final CallbackContext callbackContext, final ProxyClient<KafkaClient> proxyClient, final Logger logger) { this.logger = logger; final ResourceModel model = request.getDesiredResourceState(); final String clientRequestToken = request.getClientRequestToken(); logger.log( String.format("[Request: %s] Handling create operation, resource model: %s", clientRequestToken, model)); model.setTags(TagHelper.generateTagsForCreate(request)); return ProgressEvent.progress(model, callbackContext) .then(progress -> proxy.initiate("AWS-MSK-ServerlessCluster::Create", proxyClient, model, callbackContext) .translateToServiceRequest(Translator::translateToCreateRequest) .backoffDelay(STABILIZATION_DELAY_CREATE) .makeServiceCall(this::createResource) .stabilize(this::stabilizedOnCreate) .handleError((createClusterRequest, exception, _proxyClient, _resourceModel, _callbackContext) -> handleError(exception, model, callbackContext, logger, clientRequestToken)) .progress()) .then(progress -> new ReadHandler().handleRequest(proxy, request, callbackContext, proxyClient, logger)); } /** * Handler execute operation to call create cluster api * @param createClusterRequest the aws service request to create a resource * @param proxyClient the aws service client to make the call * @return awsResponse create resource response */ private CreateClusterV2Response createResource( final CreateClusterV2Request createClusterRequest, final ProxyClient<KafkaClient> proxyClient) { try { return proxyClient .injectCredentialsAndInvokeV2(createClusterRequest,proxyClient.client()::createClusterV2); } catch (final ConflictException e) { logger.log(String.format("Cluster with name %s already exists: %s ", createClusterRequest.clusterName(), e.getMessage())); throw new CfnAlreadyExistsException(ResourceModel.TYPE_NAME, createClusterRequest.clusterName(), e); } } /** * Handler stabilize operation to wait till resource reaches terminal state by calling DescribeClusterV2 api * @param createClusterRequest the aws service request to create a resource * @param createClusterResponse the aws service response to create a resource * @param proxyClient the aws service client to make the call * @param model resource model * @param callbackContext callback context * @return boolean state of stabilized or not */ private boolean stabilizedOnCreate( final CreateClusterV2Request createClusterRequest, final CreateClusterV2Response createClusterResponse, final ProxyClient<KafkaClient> proxyClient, final ResourceModel model, final CallbackContext callbackContext) { if (model.getArn() == null) { model.setArn(createClusterResponse.clusterArn()); } final String clusterArn = model.getArn(); final ClusterState currentClusterState = proxyClient.injectCredentialsAndInvokeV2(Translator.translateToReadRequest(model), proxyClient.client()::describeClusterV2).clusterInfo().state(); switch (currentClusterState) { case ACTIVE: logger.log(String.format("Cluster %s is stabilized, current state is %s", clusterArn, currentClusterState)); return true; case CREATING: logger.log(String.format("Cluster %s is stabilizing, current state is %s", clusterArn, currentClusterState)); return false; default: logger.log(String.format("Cluster %s reached unexpected state %s", clusterArn, currentClusterState)); throw new CfnNotStabilizedException(ResourceModel.TYPE_NAME, model.getArn()); } } }