// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT-0 package com.amazonaws.gdcreplication.util; import java.util.List; import java.util.Optional; import com.amazonaws.services.glue.AWSGlue; import com.amazonaws.services.glue.model.Database; import com.amazonaws.services.glue.model.Partition; import com.amazonaws.services.glue.model.Table; import com.amazonaws.services.sqs.AmazonSQS; public class GDCUtil { /** * This method processes a Message that belongs to Table schema * @param glue * @param sqs * @param targetGlueCatalogId * @param sourceGlueCatalogId * @param tableWithPartitions * @param message * @param ddbTblNameForTableStatusTracking * @param sqsQueueURL * @param exportBatchId * @param skipTableArchive */ public void processTableSchema(AWSGlue glue, AmazonSQS sqs, String targetGlueCatalogId, String sourceGlueCatalogId, TableWithPartitions tableWithPartitions, String message, String ddbTblNameForTableStatusTracking, String sqsQueueURL, String exportBatchId, boolean skipTableArchive) { DDBUtil ddbUtil = new DDBUtil(); SQSUtil sqsUtil = new SQSUtil(); GlueUtil glueUtil = new GlueUtil(); long importRunId = System.currentTimeMillis(); // Get Table and its Partitions from Input JSON Table table = tableWithPartitions.getTable(); List<Partition> partitionListFromExport = tableWithPartitions.getPartitionList(); // Create or update table TableReplicationStatus tableStatus = glueUtil.createOrUpdateTable(glue, table, targetGlueCatalogId, skipTableArchive); // If database not found then create one if (tableStatus.isDbNotFoundError()) { System.out.printf("Creating Database with name: '%s'. \n", table.getDatabaseName()); DBReplicationStatus dbStatus = glueUtil.createGlueDatabase(glue, targetGlueCatalogId, table.getDatabaseName(), "Database Imported from Glue Data Catalog of AWS Account Id: ".concat(sourceGlueCatalogId)); // Now, try to create / update table again. if (dbStatus.isCreated()) { tableStatus = glueUtil.createOrUpdateTable(glue, tableWithPartitions.getTable(), targetGlueCatalogId, skipTableArchive); } } tableStatus.setTableSchema(message); // Update table partitions if (!tableStatus.isError()) { // Get table partitions from Target Account List<Partition> partitionsB4Replication = glueUtil.getPartitions(glue, targetGlueCatalogId, table.getDatabaseName(), table.getName()); System.out.println("Number of partitions before replication: " + partitionsB4Replication.size()); // Add Partitions to the table if the export has Partitions if (partitionListFromExport.size() > 0) { tableStatus.setExportHasPartitions(true); if (partitionsB4Replication.size() == 0) { System.out.println("Adding partitions based on the export."); boolean partitionsAdded = glueUtil.addPartitions(glue, partitionListFromExport, targetGlueCatalogId, table.getDatabaseName(), table.getName()); if (partitionsAdded) tableStatus.setPartitionsReplicated(true); } else { System.out.println( "Table has partitions. They will be deleted first before adding partitions based on Export."); // delete partitions in batch mode boolean partitionsDeleted = glueUtil.deletePartitions(glue, targetGlueCatalogId, table.getDatabaseName(), table.getName(), partitionsB4Replication); // Enable the below code for debugging purpose. Check number of table partitions after deletion // List<Partition> partitionsAfterDeletion = glueUtil.getPartitions(glue, targetGlueCatalogId, // table.getDatabaseName(), table.getName()); // System.out.println("Number of partitions after deletion: " + partitionsAfterDeletion.size()); // add partitions from S3 object boolean partitionsAdded = glueUtil.addPartitions(glue, partitionListFromExport, targetGlueCatalogId, table.getDatabaseName(), table.getName()); if (partitionsDeleted && partitionsAdded) tableStatus.setPartitionsReplicated(true); // Enable the below code for debugging purpose. Check number of table partitions after addition // List<Partition> partitionsAfterAddition = glueUtil.getPartitions(glue, targetGlueCatalogId, // table.getDatabaseName(), table.getName()); // System.out.println("Number of partitions after addition: " + partitionsAfterAddition.size()); } } else if (partitionListFromExport.size() == 0) { tableStatus.setExportHasPartitions(false); if (partitionsB4Replication.size() > 0) { // Export has no partitions but table already has some partitions. Those // partitions will be deleted in batch mode. boolean partitionsDeleted = glueUtil.deletePartitions(glue, targetGlueCatalogId, table.getDatabaseName(), table.getName(), partitionsB4Replication); if (partitionsDeleted) tableStatus.setPartitionsReplicated(true); } } } // If there is any error in creating/updating table then send it to DLQ else { System.out.println("Error in creating/updating table in the Glue Data Catalog. It will be send to DLQ."); sqsUtil.sendTableSchemaToDeadLetterQueue(sqs, sqsQueueURL, tableStatus, exportBatchId, sourceGlueCatalogId); } // Track status in DynamoDB ddbUtil.trackTableImportStatus(tableStatus, sourceGlueCatalogId, targetGlueCatalogId, importRunId, exportBatchId, ddbTblNameForTableStatusTracking); System.out.printf( "Processing of Table shcema completed. Result: Table replicated: %b, Export has partitions: %b, " + "Partitions replicated: %b, error: %b \n", tableStatus.isReplicated(), tableStatus.isExportHasPartitions(), tableStatus.isPartitionsReplicated(), tableStatus.isError()); } /** * This method processes a Message that belongs to Database schema * @param glue * @param sqs * @param targetGlueCatalogId * @param db * @param message * @param sqsQueueURL * @param sourceGlueCatalogId * @param exportBatchId * @param ddbTblNameForDBStatusTracking */ public void processDatabseSchema(AWSGlue glue, AmazonSQS sqs, String targetGlueCatalogId, Database db, String message, String sqsQueueURL, String sourceGlueCatalogId, String exportBatchId, String ddbTblNameForDBStatusTracking) { DDBUtil ddbUtil = new DDBUtil(); GlueUtil glueUtil = new GlueUtil(); SQSUtil sqsUtil = new SQSUtil(); boolean isDBCreated = false; long importRunId = System.currentTimeMillis(); Database database = glueUtil.getDatabaseIfExist(glue, targetGlueCatalogId, db); boolean dbExist = Optional.ofNullable(database).isPresent(); if (!dbExist) { DBReplicationStatus dbStatus = glueUtil.createGlueDatabase(glue, targetGlueCatalogId, db); if (dbStatus.isError()) { System.out.println("Error in creating database in the Glue Data Catalog. It will be send to DLQ."); sqsUtil.sendDatabaseSchemaToDeadLetterQueue(sqs, sqsQueueURL, message, db.getName(), exportBatchId, sourceGlueCatalogId); } else isDBCreated = true; } else System.out.printf( "Database with name '%s' exist already in target Glue Data Catalog. No action will be taken. \n", database.getName()); // Track status in DynamoDB ddbUtil.trackDatabaseImportStatus(sourceGlueCatalogId, targetGlueCatalogId, ddbTblNameForDBStatusTracking, db.getName(), importRunId, exportBatchId, isDBCreated); System.out.printf("Processing of Database shcema completed. Result: DB already exist: %b, DB created: %b. \n", dbExist, isDBCreated); } }