Hello, I'm writing a Spark based application which works around a pretty huge data stored on s3. It's about **15 TB** in size uncompressed. Data is laid across multiple small LZO compressed files files, varying from 10-100MB.
By default the job spawns 130k tasks while reading dataset and mapping it to schema. And then it fails around 70k tasks completions and after ~20 tasks failure. **Exception:** WARN lzo.LzopInputStream: IOException in getCompressedData; likely LZO corruption. org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body Looks like the s3 connection is getting closed prematurely. **I have tried nearly 40 different combos of configurations.** **To summarize them:** 1 executor to 3 executors per node, 18GB to 42GB `--executor-memory`, 3-5 `--executor-cores`, 1.8GB-4.0 GB `spark.yarn.executor.memoryOverhead`, Both, Kryo and Default Java serializers, 0.5 to 0.35 `spark.memory.storageFraction`, default, 130000 to 200000 partitions for bigger dataset. default, 200 to 2001 `spark.sql.shuffle.partitions`. **And most importantly:** 100 to 2048 `fs.s3a.connection.maximum` property. [This seems to be most relevant property to exception.] [In all cases, driver was set to memory = 51GB, cores = 12, `MEMORY_AND_DISK_SER` level for caching] Nothing worked! If I run the program with half of the bigger dataset size (7.5TB), it finishes successfully in 1.5 hr. 1. What could I be doing wrong? 2. How do I determine the optimal value for `fs.s3a.connection.maximum`? 3. Is it possible that the s3 clients are getting GCed? Any help will be appreciated! ***Environment:*** AWS EMR 5.7.0, 60 x i2.2xlarge SPOT Instances (16 vCPU, 61GB RAM, 2 x 800GB SSD), Spark 2.1.0 YARN is used as resource manager. ***Code:*** It's a fairly simple job, doing something like this: val sl = StorageLevel.MEMORY_AND_DISK_SER sparkSession.sparkContext.hadoopConfiguration.set("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec") sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") sparkSession.sparkContext.hadoopConfiguration.setInt("fs.s3a.connection.maximum", 1200) val dataset_1: DataFrame = sparkSession .read .format("csv") .option("delimiter", ",") .schema(<schema: StructType>) .csv("s3a://...") .select("ID") //15 TB dataset_1.persist(sl) print(dataset_1.count()) tmp = dataset_1.groupBy(“ID”).agg(count("*").alias("count_id”)) tmp2 = tmp.groupBy("count_id").agg(count("*").alias(“count_count_id”)) tmp2.write.csv(…) dataset_1.unpersist() ***Full Stacktrace:*** 17/08/21 20:02:36 INFO compress.CodecPool: Got brand-new decompressor [.lzo] 17/08/21 20:06:18 WARN lzo.LzopInputStream: IOException in getCompressedData; likely LZO corruption. org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 79627927; received: 19388396 at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180) at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137) at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151) at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) at com.amazonaws.services.s3.model.S3ObjectInputStream.read(S3ObjectInputStream.java:155) at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151) at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) at com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:108) at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72) at com.amazonaws.services.s3.model.S3ObjectInputStream.read(S3ObjectInputStream.java:155) at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160) at java.io.DataInputStream.read(DataInputStream.java:149) at com.hadoop.compression.lzo.LzopInputStream.readFully(LzopInputStream.java:73) at com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:321) at com.hadoop.compression.lzo.LzopInputStream.decompress(LzopInputStream.java:261) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:186) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:99) at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:91) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:364) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1021) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)