// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT-0 package com.amazonaws.gdcreplication.util; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.services.glue.model.Database; import com.amazonaws.services.glue.model.Table; import com.amazonaws.services.sns.AmazonSNS; import com.amazonaws.services.sns.model.MessageAttributeValue; import com.amazonaws.services.sns.model.PublishRequest; import com.amazonaws.services.sns.model.PublishResult; import com.google.gson.Gson; /** * This is class has utility methods to work with Amazon SNS * @author Ravi Itha, Amazon Web Services, Inc. * */ public class SNSUtil { public PublishResult publishLargeTableSchemaToSNS(AmazonSNS sns, String topicArn, String region, String bucketName, String message, String sourceGlueCatalogId, String exportBatchId, String messageType) { PublishResult publishResponse = null; PublishRequest publishRequest = new PublishRequest(topicArn, message); Map<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>(); messageAttributes.put("source_catalog_id", createStringAttribute(sourceGlueCatalogId)); messageAttributes.put("message_type", createStringAttribute(messageType)); messageAttributes.put("export_batch_id", createStringAttribute(exportBatchId)); messageAttributes.put("bucket_name", createStringAttribute(bucketName)); messageAttributes.put("region_name", createStringAttribute(region)); publishRequest.setMessageAttributes(messageAttributes); try { publishResponse = sns.publish(publishRequest); } catch (Exception e) { e.printStackTrace(); System.out.println("Large Table message could not be published to SNS Topic. Topic ARN: " + topicArn); System.out.println("Message to be published: " + message); } return publishResponse; } /** * This method publishes one Table Schema (DDL) to SNS Topic * * @param sns * @param topicArn * @param databaseDDL * @param sourceGlueCatalogId * @return */ public PublishResult publishDatabaseSchemaToSNS(AmazonSNS sns, String topicArn, String databaseDDL, String sourceGlueCatalogId, String exportBatchId) { PublishResult publishResponse = null; PublishRequest publishRequest = new PublishRequest(topicArn, databaseDDL); Map<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>(); messageAttributes.put("source_catalog_id", createStringAttribute(sourceGlueCatalogId)); messageAttributes.put("message_type", createStringAttribute("database")); messageAttributes.put("export_batch_id", createStringAttribute(exportBatchId)); publishRequest.setMessageAttributes(messageAttributes); try { publishResponse = sns.publish(publishRequest); } catch (Exception e) { System.out.println("Database schema could not be published to SNS Topic."); } return publishResponse; } /** * This method publishes all Database Schemas (DDL) to SNS Topic and tracks the * status in a DynamoDB table. * * @param sns * @param dBList * @param snsTopicArn * @param ddbUtil * @param ddbTblName * @param sourceGlueCatalogId * @return */ public int publishDatabaseSchemasToSNS(AmazonSNS sns, List<Database> masterDBList, String snsTopicArn, DDBUtil ddbUtil, String ddbTblName, String sourceGlueCatalogId) { long exportRunId = System.currentTimeMillis(); String exportBatchId = Long.toString(exportRunId); AtomicInteger numberOfDatabasesExported = new AtomicInteger(); // Create Message Attributes MessageAttributeValue sourceCatalogIdMA = createStringAttribute(sourceGlueCatalogId); MessageAttributeValue msgTypeMA = createStringAttribute("database"); MessageAttributeValue exportBatchIdMA = createStringAttribute(exportBatchId); // Convert databases to JSON Messages and publish them to SNS Topic for (Database db : masterDBList) { // Convert Glue Database to JSON String Gson gson = new Gson(); String databaseDDL = gson.toJson(db); // Publish JSON String to Amazon SNS topic PublishRequest publishRequest = new PublishRequest(snsTopicArn, databaseDDL); Map<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>(); messageAttributes.put("source_catalog_id", sourceCatalogIdMA); messageAttributes.put("message_type", msgTypeMA); messageAttributes.put("export_batch_id", exportBatchIdMA); publishRequest.setMessageAttributes(messageAttributes); try { PublishResult publishResponse = sns.publish(publishRequest); numberOfDatabasesExported.getAndIncrement(); System.out.printf("Schema for Database '%s' published to SNS Topic. Message_Id: %s. \n", db.getName(), publishResponse.getMessageId()); ddbUtil.trackDatabaseExportStatus(ddbTblName, db.getName(), databaseDDL, publishResponse.getMessageId(), sourceGlueCatalogId, exportRunId, exportBatchId, true); } catch (Exception e) { e.printStackTrace(); System.out.printf( "Schema for Database '%s' could not be published to SNS Topic. It will be audited in DynamoDB table. \n", db.getName()); ddbUtil.trackDatabaseExportStatus(ddbTblName, db.getName(), databaseDDL, "", sourceGlueCatalogId, exportRunId, exportBatchId, false); } } System.out.println("Number of databases exported to SNS: " + numberOfDatabasesExported.get()); return numberOfDatabasesExported.get(); } /** * This method publishes Table Schema (DDL) to SNS Topic * * @param sns * @param topicArn * @param tableDDL * @param sourceGlueCatalogId * @return */ public PublishResult publishTableSchemaToSNS(AmazonSNS sns, String topicArn, Table table, String tableDDL, String sourceGlueCatalogId, String exportBatchId) { PublishResult publishResponse = null; PublishRequest publishRequest = new PublishRequest(topicArn, tableDDL); Map<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>(); messageAttributes.put("source_catalog_id", createStringAttribute(sourceGlueCatalogId)); messageAttributes.put("message_type", createStringAttribute("table")); messageAttributes.put("export_batch_id", createStringAttribute(exportBatchId)); publishRequest.setMessageAttributes(messageAttributes); try { publishResponse = sns.publish(publishRequest); System.out.printf("Table schema for Table '%s' of database '%s' published to SNS Topic. Message_Id: %s. \n", table.getName(), table.getDatabaseName(), publishResponse.getMessageId()); } catch (Exception e) { e.printStackTrace(); System.out.printf( "Table schema for Table '%s' of database '%s' could not be published to SNS Topic. This will be tracked in DynamoDB table. \n", table.getName(), table.getDatabaseName()); } return publishResponse; } /** * This method creates MessageAttributeValue using a String value * * @param attributeValue * @return */ public MessageAttributeValue createStringAttribute(final String attributeValue) { final MessageAttributeValue messageAttributeValue = new MessageAttributeValue().withDataType("String") .withStringValue(attributeValue); return messageAttributeValue; } }