import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.streaming.Trigger import scala.collection.JavaConverters._ import com.datastax.spark.connector._ import org.apache.spark.sql.cassandra._ import org.apache.spark.sql.SaveMode._ import com.datastax.spark.connector._ import org.apache.spark.sql.cassandra._ import com.datastax.spark.connector.cql._ import com.datastax.oss.driver.api.core._ import org.apache.spark.sql.functions.rand import com.amazonaws.services.glue.log.GlueLogger import scala.util.Random object GlueApp { def main(sysArgs: Array[String]) { val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "KEYSPACE_NAME", "TABLE_NAME", "DRIVER_CONF").toArray) val driverConfFileName = args("DRIVER_CONF") val conf = new SparkConf() .setAll( Seq( ("spark.task.maxFailures", "10"), ("spark.cassandra.connection.config.profile.path", driverConfFileName), ("spark.cassandra.query.retry.count", "1000"), ("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"), ("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"), ("spark.cassandra.concurrent.reads", "512"), ("spark.cassandra.output.concurrent.writes", "8"), ("spark.cassandra.output.batch.grouping.key", "none"), ("spark.cassandra.output.batch.size.rows", "1") )) val spark: SparkContext = new SparkContext(conf) val glueContext: GlueContext = new GlueContext(spark) val sparkSession: SparkSession = glueContext.getSparkSession import sparkSession.implicits._ Job.init(args("JOB_NAME"), glueContext, args.asJava) val logger = new GlueLogger //validation steps for peers and partitioner val connector = CassandraConnector.apply(conf); val session = connector.openSession(); val peersCount = session.execute("SELECT * FROM system.peers").all().size() val partitioner = session.execute("SELECT partitioner from system.local").one().getString("partitioner") logger.info("Total number of seeds: " + peersCount) logger.info("Configured partitioner: " + partitioner) if(partitioner.equals("com.amazonaws.cassandra.DefaultPartitioner")){ throw new Exception("Sark requires the use of RandomPartitioner or Murmur3Partitioner. See Working with partioners in Amazon Keyspaces documentation. https://docs.aws.amazon.com/keyspaces/latest/devguide/working-with-partitioners.html") } val tableName = args("TABLE_NAME") val keyspaceName = args("KEYSPACE_NAME") val today = 1677271870000L val randomData = (1 to 1000000) .map(_ => (Random.nextLong+"",today, Random.nextLong+"", Random.nextLong)) .toDF("id","create_date","data", "count") randomData.write.format("org.apache.spark.sql.cassandra").mode("append").option("keyspace", keyspaceName).option("table", tableName).option("output.consistency.level","LOCAL_QUORUM").save() Job.commit() } }