Re: Stream is corrupted in ShuffleBlockFetcherIterator
Thank you all for your help. The issue was caused by few failed disks in the cluster. Right after they had been replaced everything worked well. Looking forward to moving to spark 3.0 which is able to manage corrupted shuffle blocks Cheers, Mike Pryakhin. On Wed, 28 Aug 2019 at 03:44, Darshan Pandya wrote: > you can also try to > > set "spark.io.compression.codec" to "snappy" to try a different > compression codec > > On Fri, Aug 16, 2019 at 10:14 AM Vadim Semenov > wrote: > >> This is what you're looking for: >> >> Handle large corrupt shuffle blocks >> https://issues.apache.org/jira/browse/SPARK-26089 >> >> So until 3.0 the only way I can think of is to reduce the size/split your >> job into many >> >> On Thu, Aug 15, 2019 at 4:47 PM Mikhail Pryakhin >> wrote: >> >>> Hello, Spark community! >>> >>> I've been struggling with my job which constantly fails due to inability >>> to uncompress some previously compressed blocks while shuffling data. >>> I use spark 2.2.0 with all the configuration settings left by default >>> (no specific compression codec is specified). I've ascertained that >>> LZ4CompressionCodec is used as a default codec. The job fails as soon as >>> the limit of attempts exceeded with the following message: >>> >>> Caused by: java.io.IOException: Stream is corrupted >>> at >>> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211) >>> at >>> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125) >>> at >>> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137) >>> at >>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340) >>> at >>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327) >>> at >>> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327) >>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) >>> at org.apache.spark.util.Utils$.copyStream(Utils.scala:348) >>> at >>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395) >>> ... 28 more >>> Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 14649 of >>> input buffer >>> >>> >>> Actually, I've stumbled upon a bug [1] as a not fixed yet. Any clue on >>> how to workaround this issue? I've tried the Snappy codec but it fails >>> likewise with a bit different message) >>> >>> org.apache.spark.shuffle.FetchFailedException: failed to uncompress the >>> chunk: FAILED_TO_UNCOMPRESS(5) >>> at >>> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442) >>> at >>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403) >>> at >>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59) >>> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) >>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >>> at >>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>> at >>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >>> at >>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown >>> Source) >>> at >>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown >>> Source) >>> at >>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) >>> at >>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) >>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) >>> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >>> at >>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown >>> Source) >>> at >>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown >>> Source) >>> at >>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) >>> at >>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >>> at >>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) >>> at org.apache.spark.scheduler.Task.run(Task.scala:108) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) >>> at >>> java.util.concurrent.ThreadPool
Re: Stream is corrupted in ShuffleBlockFetcherIterator
you can also try to set "spark.io.compression.codec" to "snappy" to try a different compression codec On Fri, Aug 16, 2019 at 10:14 AM Vadim Semenov wrote: > This is what you're looking for: > > Handle large corrupt shuffle blocks > https://issues.apache.org/jira/browse/SPARK-26089 > > So until 3.0 the only way I can think of is to reduce the size/split your > job into many > > On Thu, Aug 15, 2019 at 4:47 PM Mikhail Pryakhin > wrote: > >> Hello, Spark community! >> >> I've been struggling with my job which constantly fails due to inability >> to uncompress some previously compressed blocks while shuffling data. >> I use spark 2.2.0 with all the configuration settings left by default (no >> specific compression codec is specified). I've ascertained that >> LZ4CompressionCodec is used as a default codec. The job fails as soon as >> the limit of attempts exceeded with the following message: >> >> Caused by: java.io.IOException: Stream is corrupted >> at >> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211) >> at >> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125) >> at >> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137) >> at >> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340) >> at >> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327) >> at >> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) >> at org.apache.spark.util.Utils$.copyStream(Utils.scala:348) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395) >> ... 28 more >> Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 14649 of >> input buffer >> >> >> Actually, I've stumbled upon a bug [1] as a not fixed yet. Any clue on >> how to workaround this issue? I've tried the Snappy codec but it fails >> likewise with a bit different message) >> >> org.apache.spark.shuffle.FetchFailedException: failed to uncompress the >> chunk: FAILED_TO_UNCOMPRESS(5) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59) >> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) >> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >> at >> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >> at >> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >> at >> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown >> Source) >> at >> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown >> Source) >> at >> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) >> at >> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) >> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) >> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >> at >> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown >> Source) >> at >> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown >> Source) >> at >> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) >> at >> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) >> at >> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) >> at org.apache.spark.scheduler.Task.run(Task.scala:108) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.io.IOException: failed to uncompress the chunk: >> FAILED_TO_UNCOMPRESS(5) >> at >> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:361) >> at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInpu
Re: Stream is corrupted in ShuffleBlockFetcherIterator
This is what you're looking for: Handle large corrupt shuffle blocks https://issues.apache.org/jira/browse/SPARK-26089 So until 3.0 the only way I can think of is to reduce the size/split your job into many On Thu, Aug 15, 2019 at 4:47 PM Mikhail Pryakhin wrote: > Hello, Spark community! > > I've been struggling with my job which constantly fails due to inability > to uncompress some previously compressed blocks while shuffling data. > I use spark 2.2.0 with all the configuration settings left by default (no > specific compression codec is specified). I've ascertained that > LZ4CompressionCodec is used as a default codec. The job fails as soon as > the limit of attempts exceeded with the following message: > > Caused by: java.io.IOException: Stream is corrupted > at > org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211) > at > org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125) > at > org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137) > at > org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340) > at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327) > at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337) > at org.apache.spark.util.Utils$.copyStream(Utils.scala:348) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395) > ... 28 more > Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 14649 of > input buffer > > > Actually, I've stumbled upon a bug [1] as a not fixed yet. Any clue on how > to workaround this issue? I've tried the Snappy codec but it fails > likewise with a bit different message) > > org.apache.spark.shuffle.FetchFailedException: failed to uncompress the > chunk: FAILED_TO_UNCOMPRESS(5) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: failed to uncompress the chunk: > FAILED_TO_UNCOMPRESS(5) > at > org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:361) > at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:158) > at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142) > at java.io.InputStream.read(InputStream.java:101) > at > org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340) > at org.apache.spark.util.Utils$$anonfun$co