import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.streaming.Trigger import scala.collection.JavaConverters._ import com.datastax.spark.connector._ import org.apache.spark.sql.cassandra._ import org.apache.spark.sql.SaveMode._ object GlueApp { def main(sysArgs: Array[String]) { val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "KEYSPACE_NAME", "TABLE_NAME", "DRIVER_CONF", "FORMAT", "S3_URI").toArray) val driverConfFileName = args("DRIVER_CONF") val conf = new SparkConf() .setAll( Seq( ("spark.task.maxFailures", "10"), ("spark.cassandra.connection.config.profile.path", driverConfFileName), ("spark.cassandra.query.retry.count", "100"), ("spark.cassandra.input.consistency.level", "LOCAL_ONE"), ("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"), ("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"), ("spark.cassandra.concurrent.reads", "512"), )) val spark: SparkContext = new SparkContext(conf) val glueContext: GlueContext = new GlueContext(spark) val sparkSession: SparkSession = glueContext.getSparkSession import sparkSession.implicits._ Job.init(args("JOB_NAME"), glueContext, args.asJava) val logger = new GlueLogger //validation steps for peers and partitioner val connector = CassandraConnector.apply(conf); val session = connector.openSession(); val peersCount = session.execute("SELECT * FROM system.peers").all().size() val partitioner = session.execute("SELECT partitioner from system.local").one().getString("partitioner") logger.info("Total number of seeds:" + peersCount); logger.info("Configured partitioner:" + partitioner); if(peersCount == 0){ throw new Exception("No system peers found. Check required permissions to read from the system.peers table. If using VPCE check permissions for describing VPCE endpoints. https://docs.aws.amazon.com/keyspaces/latest/devguide/vpc-endpoints.html") } if(partitioner.equals("com.amazonaws.cassandra.DefaultPartitioner")){ throw new Exception("Sark requires the use of RandomPartitioner or Murmur3Partitioner. See Working with partioners in Amazon Keyspaces documentation. https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-partitioners.html") } val tableName = args("TABLE_NAME") val keyspaceName = args("KEYSPACE_NAME") val backupS3 = args("S3_URI") val backupFormat = args("FORMAT") val tableDf = sparkSession.read .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> tableName, "keyspace" -> keyspaceName)) .load() tableDf.write.format(backupFormat).mode(SaveMode.ErrorIfExists).save(backupS3) Job.commit() } }