Hello,

I'm writing a Spark based application which works around a pretty huge data
stored on s3. It's about **15 TB** in size uncompressed. Data is laid
across multiple small LZO compressed files files, varying from 10-100MB.

By default the job spawns 130k tasks while reading dataset and mapping it
to schema.

And then it fails around 70k tasks completions and after ~20 tasks failure.

**Exception:**

    WARN lzo.LzopInputStream: IOException in getCompressedData; likely LZO
corruption.
        org.apache.http.ConnectionClosedException: Premature end of
Content-Length delimited message body

Looks like the s3 connection is getting closed prematurely.

**I have tried nearly 40 different combos of configurations.**

**To summarize them:** 1 executor to 3 executors per node, 18GB to 42GB
`--executor-memory`, 3-5 `--executor-cores`, 1.8GB-4.0 GB
`spark.yarn.executor.memoryOverhead`, Both, Kryo and Default Java
serializers, 0.5 to 0.35 `spark.memory.storageFraction`, default, 130000 to
200000 partitions for bigger dataset. default, 200 to 2001
`spark.sql.shuffle.partitions`.

**And most importantly:** 100 to 2048 `fs.s3a.connection.maximum` property.

[This seems to be most relevant property to exception.]

[In all cases, driver was set to memory = 51GB, cores = 12,
`MEMORY_AND_DISK_SER` level for caching]

Nothing worked!

If I run the program with half of the bigger dataset size (7.5TB), it
finishes successfully in 1.5 hr.

1. What could I be doing wrong?
2. How do I determine the optimal value for `fs.s3a.connection.maximum`?
3. Is it possible that the s3 clients are getting GCed?

Any help will be appreciated!

***Environment:***

AWS EMR 5.7.0, 60 x i2.2xlarge SPOT Instances (16 vCPU, 61GB RAM, 2 x 800GB
SSD), Spark 2.1.0

YARN is used as resource manager.

***Code:***

It's a fairly simple job, doing something like this:

    val sl = StorageLevel.MEMORY_AND_DISK_SER


sparkSession.sparkContext.hadoopConfiguration.set("io.compression.codecs",
"com.hadoop.compression.lzo.LzopCodec")
    sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")

sparkSession.sparkContext.hadoopConfiguration.setInt("fs.s3a.connection.maximum",
1200)

    val dataset_1: DataFrame = sparkSession
        .read
        .format("csv")
        .option("delimiter", ",")
        .schema(<schema: StructType>)
        .csv("s3a://...")
        .select("ID") //15 TB

    dataset_1.persist(sl)

    print(dataset_1.count())

    tmp = dataset_1.groupBy(“ID”).agg(count("*").alias("count_id”))
    tmp2 = tmp.groupBy("count_id").agg(count("*").alias(“count_count_id”))
    tmp2.write.csv(…)

    dataset_1.unpersist()

***Full Stacktrace:***

    17/08/21 20:02:36 INFO compress.CodecPool: Got brand-new decompressor
[.lzo]
    17/08/21 20:06:18 WARN lzo.LzopInputStream: IOException in
getCompressedData; likely LZO corruption.
    org.apache.http.ConnectionClosedException: Premature end of
Content-Length delimited message body (expected: 79627927; received:
19388396
            at
org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180)
            at
org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
            at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
            at
com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151)
            at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
            at
com.amazonaws.services.s3.model.S3ObjectInputStream.read(S3ObjectInputStream.java:155)
            at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
            at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
            at
com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151)
            at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
            at
com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:108)
            at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
            at
com.amazonaws.services.s3.model.S3ObjectInputStream.read(S3ObjectInputStream.java:155)
            at
org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
            at java.io.DataInputStream.read(DataInputStream.java:149)
            at
com.hadoop.compression.lzo.LzopInputStream.readFully(LzopInputStream.java:73)
            at
com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:321)
            at
com.hadoop.compression.lzo.LzopInputStream.decompress(LzopInputStream.java:261)
            at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
            at java.io.InputStream.read(InputStream.java:101)
            at
org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
            at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
            at
org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
            at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:186)
            at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
            at
org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
            at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
            at
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
            at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
            at
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
            at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
            at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
            at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
            at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
            at
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:99)
            at
org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryRelation.scala:91)
            at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:364)
            at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1021)
            at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
            at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
            at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
            at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
            at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
            at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)

Reply via email to