# ** 5 - Spark ** ## ** BP 5.1 - Determine right infrastructure for your Spark workloads ** Spark workloads may require different types of hardware for different job characteristics to ensure optimal performance. EMR supports [several instance types](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-supported-instance-types.html) to cover all types of processing requirements. While onboarding new workloads, start your benchmarking with general instance types like m5s or m6gs. Monitor the OS and YARN metrics from Ganglia and CloudWatch to determine the system bottlenecks at peak load. Bottlenecks include CPU, memory, storage and I/O. Once identified, choose the appropriate hardware type for your job’s needs. * **Memory-optimized** instances like r5 and r4 are good candidates for memory intensive workloads. Spark jobs that cache large DataFrames, Datasets or RDDs, perform operations like joins and union on large tables, use a lot of internal or user-defined broadcast variables or accumulators, go through many GC cycles and perform massive shuffles (when dynamic allocation is disabled) are likely to be memory intensive. Following diagram shows YARN memory available percentage and aggregated OS memory utilization from Cloudwatch EMR namespace and Ganglia respectively. ![BP - 1](images/spark-bp-1.png) * **CPU-optimized** instances like c5 and c4 are good candidates for CPU intensive workloads . Spark jobs involving complex aggregate operations involving many in-built arithmetic functions or UDFs and jobs that use a lot of LRU caches are likely to be CPU intensive. Following screenshot shows aggregated CPU utilization of the EMR cluster from Ganglia. ![BP - 2](images/spark-bp-2.png) * **General purpose** instances like m5 and m4 are good candidates for a mix of CPU and memory intensive workloads. They are also great for benchmarking and onboarding your Spark applications. Following sheet outlines the CPU:Memory ratio of of 3 example instances instances at a similar price. | Instance Type | Instance | EC2 price | EMR price | Cores | Memory in GiB | CPU:memory ratio | --------|----------------|------------|-----------|--------|------------|------------| | Compute | m5.4xlarge | \$3.06 | 0.27 | 72 | 144 | 2 | | Memory | m6g.4xlarge | \$3.02 | 0.27 | 48 | 384 | 8 | | General | m5.16xlarge | \$3.07 | 0.27 | 64 | 256 | 4 | * **Storage-optimized** instances like i3ens, d2 are good candidates for I/O intensive workloads. If your use case is CPU/memory bound but also consumes a lot of I/O, and demands high disk throughput and low read or write latencies from transient HDFS storage, you can consider using instances backed by SSD storage like r5ds, c5ds, m5ds etc.. For jobs that perform massive shuffles (when dynamic allocation is enabled), Spark external shuffle service will write the shuffle data blocks to the local disks of each node running executors. * **GPU instances** such as p3 family for Spark ML and intensive analytical workloads like image processing. From EMR 6.2, you can also use [Nvidia RAPIDS accelerator](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-rapids.html) plugin to improve your GPU instance performance without any changes to your code or data processing pipelines. Starting EMR 5.31+ and 6.1+, there is Graviton support (r6g, m6g, c6g.) which offers up to 15% improvement in performance and 30% reduction in cost based on our derived benchmarks. They are a great choice to replace your legacy instances to observe improved performance at a lower cost. ![BP - 3](images/spark-bp-3.png) ## ** BP 5.2 - Choose the right deploy mode ** Spark offers two kinds of deploy modes called client and cluster deploy modes. Deploy mode determines where your Spark driver runs. Spark driver is the cockpit for your application. It hosts the SparkContext (or SparkSession) for your application. It keeps track of all tasks executed by the executors and the state of the executors via heartbeats. Driver also fetches the results from the executors running tasks. Choose the right deploy mode based on your workload requirements. ### Client deploy mode This is the default deploy mode for Spark applications in EMR. In this deploy mode, the driver process will be launched within your Spark client - whether you submit your job from EMR master node (using EMR step or spark-submit) or using a client external to EMR. Spark driver is a single point of failure. A failed driver JVM will not be relaunched in client deploy mode. Also, when client loses connectivity with YARN, your job will be killed. ![BP - 4](images/spark-bp-4.png) Use this deploy mode if: * You are running only one or two Spark applications at a time in a cluster. This deploy mode is not ideal if you are running multiple applications at the same time on the same cluster since all those drivers running on a single node can lead to resource contention. * You want to be more in control of your Spark driver configurations. In client mode, Spark driver resources will not compete with YARN resources and can be adjusted separately without affecting the resource procurement of your applications. * If you are running too many executors (1000+). Since Spark driver manages and monitors tasks and executors, too many executors will slow down the driver since they have to send heartbeats, poll task status etc. Since EMR allows you to specify a different instance type for master instance, you can choose a very powerful instance like z1d and allocate the memory and CPU resources to Spark drivers especially if you are running a very high number of executors. * If you want to print output to the console. ### Cluster deploy mode In cluster deploy mode, your Spark driver will be colocated within the Application Master (AM) container from YARN regardless of where you submit your Spark application from. ![BP - 5](images/spark-bp-5.png) Use cluster deploy mode if: * You are submitting multiple applications at the same time or have higher job or EMR step concurrency. Since drivers will be launched within the AM, for multiple applications, the driver resources will be spread across the cluster considering AM will be launched on one of the worker nodes. * If there are relatively fewer number of executors per application. i.e., the Spark driver process does not have to do intensive operations like manage and monitor from too many executors. * You are storing results in S3/HDFS and there is no need to print output to the console. * You want to specify detailed instructions on where AM runs. You can launch AMs on CORE partition or both CORE and TASK partitions based on where you want your AM and executors to launch. For example, you can run AM on only on-demand CORE nodes and executors only on spot task nodes. * If you want to relaunch a failed driver JVM. By default, YARN re-attempts AM loss twice based on property "spark.yarn.maxAppAttempts". You can increase this value further if needed. * If you want to ensure that termination of your client will not terminate your application. * If you want to return success from your client after job submission based on the property "spark.yarn.submit.waitAppCompletion". Regardless of which deploy mode you choose, make sure that your driver Spark configs are tuned for your workload needs. ## ** BP 5.3 - Use right file formats and compression type ** It is highly recommended that you use right file formats for optimal performance. Do not use legacy file formats like CSV, JSON, text files etc. since the read/write performance will much slower. It is highly recommended that you use columnar file formats like Parquet, ORC etc. Especially for Spark, Parquet would be the best choice. When using Parquet file format, Spark will use EMRFSOutputCommitter which is an optimized file committer for writing Parquet data files to S3 from Spark. Also, using Parquet file format is great for schema evolution, filter push downs and integration with applications with transactional support like Apache Hudi, Iceberg etc. Also, it’s recommended to use an optimal compression format. Avoid using unsplittable compression formats like GZIP. Since they are not splittable, when there is a large GZIP compressed file, it can only be processed by a single task/executor leading to OOM errors. ![BP - 6](images/spark-bp-6.png) Use splittable compression formats like BZ2, LZO etc. Parquet uses Snappy compression by default. ORC uses ZLIB compression by default. Both compression types are good choices and you can continue to use defaults. ![BP - 7](images/spark-bp-7.png) You can also apply columnar encryption using KMS. ``` sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS") // Explicit master keys (base64 encoded) - required only for mock InMemoryKMS sc.hadoopConfiguration.set("parquet.encryption.key.list" ,"keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA==") // Activate Parquet encryption, driven by Hadoop properties sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,"org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory") // Write encrypted dataframe files. // Column "square" will be protected with master key "keyA". // Parquet file footers will be protected with master key "keyB" squaresDF.write.option("parquet.encryption.column.keys" , "keyA:square").option("parquet.encryption.footer.key" , "keyB").parquet("/path/to/table.parquet.encrypted") // Read encrypted dataframe files val df2 = spark.read.parquet("/path/to/table.parquet.encrypted") ``` ## ** BP 5.4 - Partitioning ** Partitioning your data is very important if you are going to run your code or queries with filter conditions. Partitioning helps you arrange your data files into S3 prefixes based on the partition key. It helps minimize read/write access footprint i.e., you will be able to only read files from partition folder specified in your where clause - thus skipping a full read. Partitioning can also help if your data ingestion is incremental in nature. However, partitioning can reduce read throughput if you are performing full table scans. You can choose your partition field based on: * Query pattern. i.e., if you find workloads use 1-2 columns frequently as filter fields more so than other columns, it is recommended to consider using them as partitioning field. * Ingestion pattern. i.e., if you load data into your table once everyday, if you want to avoid re-writing historical data, you can partition your data based on date field (typically in YYYY-MM-DD format) * Cardinality of the partitioning column. For partitioning, cardinality should not be too high. * File sizes per partition. It is highly recommended that your individual file sizes within each partition is ~128 MB and not too small since that would be optimal for Spark executor JVM to process. Also, number of shuffle partitions will determine the number of output files per partition. ``` df.repartition(400).write.partitionBy("datecol").parquet("s3://bucket/output/") ``` The above code will create maximum of 400 files per datecol partition. You can use repartitioning to control the file size in destination. i.e., for merging smaller files. But for splitting large files, you can use the property `spark.sql.files.maxPartitionBytes`. Partitioning ensures that pruning takes place during reads and writes. Pruning makes sure that only necessary partition(s) are read from S3 or HDFS. Query plan can be studied from Spark UI to ensure that pruning takes place while reading and writing to partitioned tables from Spark. ## ** BP 5.5 - Tune driver/executor memory, cores and spark.sql.shuffle.partitions to fully utilize cluster resources ** EMR defaults are aimed at smaller JVM sizes. For example, following are the default configuration for ## ** BP 5.6 - Use Kryo serializer by registering custom classes especially for Dataset schemas ** Spark uses Java Serializer by default. From Spark 2.0+, Spark internally uses Kryo Serializer when shuffling RDDs with simple types, arrays of simple types, or string type. It is highly recommended that you use Kryo Serializer and also register your classes in the application. ``` val spark = SparkSession .builder .appName("my spark application name") .config(getConfig) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // use this if you need to increment Kryo buffer size. Default 64k .config("spark.kryoserializer.buffer", "1024k") // use this if you need to increment Kryo buffer max size. Default 64m .config("spark.kryoserializer.buffer.max", "1024m") /* * Use this if you need to register all Kryo required classes. * If it is false, you do not need register any class for Kryo, but it will increase your data size when the data is serializing. */ .config("spark.kryo.registrationRequired", "true") .getOrCreate ``` If you do not specify `classesToRegister`, then there will be a Kryo conversion overhead. So, it is highly recommended to register classes in your application. Especially, if you are using data sets, consider registering your data set schema classes along with classes used in Spark internally based on the data types and structures used in your program. ``` val conf = new SparkConf() conf.registerKryoClasses( Array( classOf[org.myPackage.FlightDataset], classOf[org.myPackage.BookingDataset], classOf[scala.collection.mutable.WrappedArray.ofRef[_]], classOf[org.apache.spark.sql.types.StructType], classOf[Array[org.apache.spark.sql.types.StructType]], classOf[org.apache.spark.sql.types.StructField], classOf[Array[org.apache.spark.sql.types.StructField]], Class.forName("org.apache.spark.sql.types.StringType$"), Class.forName("org.apache.spark.sql.types.LongType$"), Class.forName("org.apache.spark.sql.types.BooleanType$"), Class.forName("org.apache.spark.sql.types.DoubleType$"), classOf[org.apache.spark.sql.types.Metadata], classOf[org.apache.spark.sql.types.ArrayType], Class.forName("org.apache.spark.sql.execution.joins.UnsafeHashedRelation"), classOf[org.apache.spark.sql.catalyst.InternalRow], classOf[Array[org.apache.spark.sql.catalyst.InternalRow]], classOf[org.apache.spark.sql.catalyst.expressions.UnsafeRow], Class.forName("org.apache.spark.sql.execution.joins.LongHashedRelation"), Class.forName("org.apache.spark.sql.execution.joins.LongToUnsafeRowMap"), classOf[org.apache.spark.util.collection.BitSet], classOf[org.apache.spark.sql.types.DataType], classOf[Array[org.apache.spark.sql.types.DataType]], Class.forName("org.apache.spark.sql.types.NullType$"), Class.forName("org.apache.spark.sql.types.IntegerType$"), Class.forName("org.apache.spark.sql.types.TimestampType$"), Class.forName("org.apache.spark.sql.execution.datasources.FileFormatWriter$WriteTaskResult"), Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"), Class.forName("scala.collection.immutable.Set$EmptySet$"), Class.forName("scala.reflect.ClassTag$$anon$1"), Class.forName("java.lang.Class") ) ) } ``` You can also fine tune the following Kryo configs: * *`spark.kryo.unsafe`* - Set to false for faster serialization. This is not unsafer for same platforms but should not be used if your EMR cluster has a mix of AMD and intel types for example. * *`spark.kryoserializer.buffer.max`* - Maximum size of Kryo buffer. Default is 64m. Recommended to increase but this property upto 1024m value should be below 2048m * *`spark.kryoserializer.buffer`* - Initial size of Kryo's serialization buffer. Default is 64k. Recommended to increase up to 1024k. ## ** BP 5.7 - Use appropriate garbage collector ** By default, EMR Spark uses Parallel Garbage Collector which works well in most cases. You can change the GC to G1GC if your GC cycles are slow since G1GC may provide better performance in some cases specifically by reducing GC pause times. Also, since G1GC is the default garbage collector since Java 9, you may want to switch to G1GC for forward compatibility. Following is the spark configuration: ``` [{ "classification": "spark-defaults", "properties": { "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'", "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'" }, "configurations": [] }] ``` You can also tune the GC parameters for GC performance. You can see the comprehensive list of parameters [here](https://www.oracle.com/technical-resources/articles/java/g1gc.html) for G1GC and [here](https://docs.oracle.com/en/java/javase/11/gctuning/parallel-collector1.html) for ParallelGC. Some useful ones are below: ``` -XX:ConcGCThreads=n -XX:ParallelGCThreads=n -XX:InitiatingHeapOccupancyPercent=45 -XX:MaxGCPauseMillis=200 ``` You can also monitor GC performance using Spark UI. the GC time should be ideally <= 1% of total task runtime. If not, tune the GC settings or executor size. For example, we see below in the Spark UI that GC takes almost 25% of task runtime which is a poor GC performance. ![BP - 8](images/spark-bp-8.png) ## ** BP 5.8 - Use appropriate APIs wherever possible ** When using spark APIs, try to go with the most optimal choice if your use case permits. Following are a few examples. ### repartition vs coalesce Both repartition and coalesce are used for changing the number of shuffle partitions. Repartition is used for both increasing and decreasing the shuffle partitions whereas coalesce is used for only decreasing the number of shuffle partitions. If your goal is to decrease the number of shuffle partitions, consider using coalesce instead of repartition. The reason is, repartition triggers a full shuffle but coalesce triggers only a partial shuffle and thus minimizes the amount of data shuffled by keeping a few nodes as receivers of shuffle data. ``` df.coalesce(1) //instead of df.repartition(1) ``` But please note that when you coalesce (or repartition) to a very small number, your JVM will process a lot of data which can lead to OOM issues or disk space issues due to shuffle spill. ### groupByKey vs reduceByKey Use `reduceByKey` instead of `groupByKey` wherever possible. With `groupByKey`, data will be transferred over the network and collected on the reduced workers. This can lead to OOMs since all data is sent across the network. Whereas, with `reduceByKey`, data is combined at partition-level, with only one output for one key at each partition to send over the network. `reduceByKey` required combining all your values into another value with the exact same type. ### orderBy vs sortBy or sortWithinPartitions `orderBy` does global sorting. i.e., all data is sorted in a single JVM. Whereas, sortBy or `sortWithinPartitions` does local sorting i.e., data is sorted within each partition but it does not preserve global ordering. Use sortBy or `sortWithinPartitions` if global sorting is not necessary - especially during writes. Try to avoid `orderBy` clause. Values can be aggregated across partitions in your queries if needed. ## ** BP 5.9 - Leverage spot nodes with managed autoscaling ** Enable managed autoscaling for your EMR clusters. From EMR 5.32 and EMR 6.2 there have been optimizations made to managed scaling to make it more resilient for your Spark workloads. Try to leverage task instance fleets with many instance types per fleet with Spot request since it would give both cost and performance gains. However, in this case, make sure that your output is being written directly to EMRFS since we will have limited core node capacity. Following policy defines max core nodes to 2 and we are requesting the core nodes to be on-demand which is highly recommended. Rest of the nodes are Spot task nodes. ![BP - 9](images/spark-bp-9.png) Following experimentation illustrates the performance gains using Managed Autoscaling. ![BP - 10](images/spark-bp-10.png) For Spark workloads, we observed ~50% gains compared to custom autoscaling clusters. ![BP - 11](images/spark-bp-11.png) ## ** BP 5.10 - For workloads with fixed/predictable pattern, disable dynamic allocation ** Dynamic allocation is enabled in EMR by default. It is a great feature when your cluster is set for autoscaling ``` [{ "classification": "spark-defaults", "properties": { "spark.dynamicAllocation.enabled": "false", "spark.executor.instances": "12", "spark.executor.memory": "4G", "spark.executor.cores": "2" }, "configurations": [] }] ``` ## ** BP 5.11 - Leverage HDFS as temporary storage for I/O intensive workloads ** Many EMR users directly read and write data in S3. This is generally suited for most type of use cases. However, for I/O intensive workflows, this approach could be slower - especially for heavy writes. ![BP - 12](images/spark-bp-12.png) For very I/O intensive workloads or for workloads where the temporary transform data is much larger than the final output, you can leverage HDFS as temporary storage and then use S3DistCp to copy the data into final location. For example, for a fraud detection use case where you could be performing transforms on TBs of data but your final output report is only a few KBs, in such scenarios, leveraging HDFS will give you better performance and also helps you avoid S3 throttling. ![BP - 13](images/spark-bp-13.png) Following is a typical application of HDFS for transient storage. A Spark context could be shared between multiple workflows, wherein, each workflow comprising of multiple transformations. After all transformations are complete, each workflow would write the output to HDFS location. Once all transforms are complete, you can save the final output to S3 either using S3DistCp or SDK determined by the number of files and output size. ![BP - 14](images/spark-bp-14.png) Even if you are using S3 directly to store your data, if your workloads are intensive, use disk optimized instances or SSD storage (i.e., r5d’s and r6gd’s instead of r5s and r6g’s). Because, the dynamic allocation will use Spark external shuffle service that spills data to local disks when the executor JVM cannot hold any more shuffle data. However, while using this architecture, please make sure that you are sizing your HDFS properly to prevent job failures due to lack of storage space when the job is running. Refer to best practice BP 3.14 in Reliability section. ## ** BP 5.12 - Spark speculation with EMRFS ** In Hadoop/Spark, speculative execution is a concept where a slower task will be launched in parallel on another node using a different JVM (based on resource availability). Whichever task completes first (original or speculated task), will write the output to S3. This works well for HDFS based writes. However, for EMRFS, turning on spark.speculation may lead to data loss or duplicate data. By default, `spark.speculation` is turned off. Only enable spark.speculation if you are doing one of the following: * Writing Parquet files to S3 using EMRFSOutputCommitter * Using HDFS as temporary storage (in an understanding that final output will be written to S3 using S3DistCp) * Using HDFS as storage You can consider enabling spark.speculation especially while running workloads on very large clusters, provided you are performing one of the above actions. Reason is that, due to some hardware issues, one node out of 500+ nodes could be slower and may run tasks slowly even if data size being processed is the same as other tasks. Chances of this happening are higher in larger clusters. In that case, spark.speculation will help relaunch those slow tasks on other nodes providing SLA consistency (as long as the above criteria are met). Please do not enable spark.speculation if you are not using EMRFSOutputCommitter to write Parquet files or if you are not using HDFS to write the output since it may lead to incorrect or missing or duplicate data in your destination. ## ** BP 5.13 - Data quality and integrity checks with deequ ** Spark and Hadoop frameworks do not guarantee inherent data integrity. While it is very rare, you may observe some data corruption or missing data or duplicate data due to unexpected errors in the hardware and software stack. In order to check your data integrity, consider using [Deequ](https://github.com/awslabs/deequ) for your Spark workloads. Following are the blogs that can help you get started with Deequ for Spark workloads. [Test data quality at scale with Deequ | AWS Big Data Blog](https://aws.amazon.com/blogs/big-data/test-data-quality-at-scale-with-deequ/) [Testing data quality at scale with PyDeequ | AWS Big Data Blog](https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/) ## ** BP 5.14 - Use DataFrames wherever possible ** WKT we must use Dataframes and Datasets instead of RDDs since both have several enhancements over RDDs like catalyst optimizer and adaptive query execution. But between Datasets and Dataframes Dataframes perform certain optimizations during DAG creation and execution. These optimizations can be identified by inspecting the query plan. For example - * Datasets perform many serializations and deserializations that Dataframes do not perform. * Dataframes perform better push downs. For example, if there is a filter operation, that is applied early on in the query plan so that the data transfer in-memory is reduced. * Dataframes avoid unnecessary exchanges. For example, distinct after join will be accomplished with two exchanges in datasets but with only one exchange in DFs. Only downside to using dataframes instead of datasets is that, with dataset, you generally define schema in class. ``` case class DeviceIoTData ( battery_level: Long, c02_level: Long, cca2: String, cca3: String, cn: String, device_id: Long, device_name: String, humidity: Long, ip: String, latitude: Double, longitude: Double, scale: String, temp: Long, timestamp: Long ) ``` This provides you type-safety. When there are changes to your schema, it can be consolidated and tracked in a single class. This can be considered as an industry standard. While using Spark dataframes, you can achieve something similar by maintaining the table columns in a list. ## ** BP 5.15 - Data Skew ** Data skew can significantly slow down the processing since a single JVM could be handling a large amount of data. In this case observed in Spark UI, a single task is processing 25 times more data than other tasks. This can inevitably lead to slowness, OOMs and disk space filling issues. ![BP - 15](images/spark-bp-15.png) When there is a data skew, it is best handled at code level since very little can be done in terms of configuration. You can increase JVM size but that will impact other tasks and is not the best approach. Some common approaches include salting, ## ** BP 5.16 - Use right type of join ** There are several types of joins in Spark ### Broadcast Join * Broadcast joins are the most optimal options ### Shuffle Hash Join ### Sort Merge Join ### Broadcast Nested Loop Join ## ** BP 5.17 - Configure observability ** Choose an observability platform based on your requirements. ## ** BP 5.18 - Debugging and monitoring Spark applications ** ## ** BP 5.19 - Common Errors ** 1. **Avoid 503 slow downs** * For mitigating S3 throttling errors, consider increasing fs.s3.maxRetries in emrfs-site configuration. By default, it is set to 15 and you may need to increase it based on your workload needs. * You can also increase the multipart upload threshold in EMRFS. Default value at which MPU triggers is 128 MB. ``` [{ "classification": "emrfs-site", "properties": { "fs.s3.maxRetries": "20", "fs.s3n.multipart.uploads.split.size": "268435456" }, "configurations": [] }] ``` * Consider using Iceberg tables’ ObjectStoreLocationProvider to store data under [0*7FFFFF] prefixes and thus, help Amazon S3 learn write pattern to scale accordingly. ``` CREATE TABLE my_catalog.my_ns.my_table ( id bigint, data string, category string) USING iceberg OPTIONS ( 'write.object-storage.enabled'=true, 'write.data.path'='s3://my-table-data-bucket') PARTITIONED BY (category); ``` Your S3 files will be arranged under MURMUR3 hash prefixes like below. ``` 2021-11-01 05:39:24 809.4 KiB 7ffbc860/my_ns/my_table/00328-1642-5ce681a7-dfe3-4751-ab10-37d7e58de08a-00015.parquet 2021-11-01 06:00:10 6.1 MiB 7ffc1730/my_ns/my_table/00460-2631-983d19bf-6c1b-452c-8195-47e450dfad9d-00001.parquet 2021-11-01 04:33:24 6.1 MiB 7ffeeb4e/my_ns/my_table/00156-781-9dbe3f08-0a1d-4733-bd90-9839a7ceda00-00002.parquet ``` * If using Iceberg is not an option and if above approaches don’t resolve the issue, you can create an AWS support case to partition your S3 prefixes. But the prefix pattern needs to be known in advance for eg: s3://bucket/000-fff/ or s3://bucket// 2. **Increase event queue size and heartbeat interval for large number of executors** - If the size of “dropped events 3. **Increase HADOOP, YARN and HDFS heap sizes for intensive workflows** 4. **Use spark.yarn.archive to avoid compressing dependencies especially for high concurrency workloads**