Thank you all for your help.
The issue was caused by few failed disks in the cluster. Right after they
had been replaced everything worked well. Looking forward to moving to
spark 3.0 which is able to manage corrupted shuffle blocks

Cheers, Mike Pryakhin.


On Wed, 28 Aug 2019 at 03:44, Darshan Pandya <darshanpan...@gmail.com>
wrote:

> you can also try to
>
> set "spark.io.compression.codec" to "snappy" to try a different
> compression codec
>
> On Fri, Aug 16, 2019 at 10:14 AM Vadim Semenov <va...@datadoghq.com.invalid>
> wrote:
>
>> This is what you're looking for:
>>
>> Handle large corrupt shuffle blocks
>> https://issues.apache.org/jira/browse/SPARK-26089
>>
>> So until 3.0 the only way I can think of is to reduce the size/split your
>> job into many
>>
>> On Thu, Aug 15, 2019 at 4:47 PM Mikhail Pryakhin <m.prya...@gmail.com>
>> wrote:
>>
>>> Hello, Spark community!
>>>
>>> I've been struggling with my job which constantly fails due to inability
>>> to uncompress some previously compressed blocks while shuffling data.
>>> I use spark 2.2.0 with all the configuration settings left by default
>>> (no specific compression codec is specified). I've ascertained that
>>> LZ4CompressionCodec is used as a default codec. The job fails as soon as
>>> the limit of attempts exceeded with the following  message:
>>>
>>> Caused by: java.io.IOException: Stream is corrupted
>>> at
>>> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
>>> at
>>> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
>>> at
>>> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
>>> at
>>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
>>> at
>>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
>>> at
>>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
>>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>>> at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
>>> at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
>>> ... 28 more
>>> Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 14649 of
>>> input buffer
>>>
>>>
>>> Actually, I've stumbled upon a bug [1] as a not fixed yet. Any clue on
>>> how to workaround this issue?  I've tried the Snappy codec but it fails
>>> likewise with a bit different message)
>>>
>>> org.apache.spark.shuffle.FetchFailedException: failed to uncompress the
>>> chunk: FAILED_TO_UNCOMPRESS(5)
>>> at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
>>> at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
>>> at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
>>> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>> at
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>> at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>>> Source)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>>> Source)
>>> at
>>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>> at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>>> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>>> Source)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>>> Source)
>>> at
>>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>> at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>> at
>>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:108)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.IOException: failed to uncompress the chunk:
>>> FAILED_TO_UNCOMPRESS(5)
>>> at
>>> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:361)
>>> at
>>> org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:158)
>>> at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
>>> at java.io.InputStream.read(InputStream.java:101)
>>> at
>>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
>>> at
>>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
>>> at
>>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
>>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
>>> at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
>>> at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
>>> ... 27 more
>>>
>>>
>>> The option of using no compression seems the only feasible for me at
>>> this point.
>>> I really need your expert assistance, thank you very much in advance!
>>> Any help is greatly appreciated!
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/SPARK-18105
>>>
>>>
>>> Cheers,
>>> Mike Pryakhin
>>>
>>>
>>
>> --
>> Sent from my iPhone
>>
>
>
> --
> Sincerely,
> Darshan
>
> --
Regards, Mikhail Pryakhin.

Reply via email to