Hi, there

Here is the problem I ran into when executing a Spark Job (Spark 1.3). The
spark job is loading a bunch of avro files using Spark SQL spark-avro 1.0.0
library. Then it does some filter/map transformation, repartition to 1
partition and then write to HDFS. It creates 2 stages. The total HDFS block
number is around 12000, thus it creates 12000 partitions, thus 12000 tasks
for the first stage. I have total 9 executors launched with 5 thread for
each. The job has run fine until the very end.  When it reaches 19980/20000
tasks succeeded, it suddenly failed the last 20 tasks and I lost 2
executors. The spark did launched 2 new executors and finishes the job
eventually by reprocessing the 20 tasks.

I only ran into this issue when I run the spark application on the full
dataset. When I run the 1/3 of the dataset, everything finishes fine
without error.

Question 1: What is the root cause of this issue? It is simiar to
http://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed
and https://issues.apache.org/jira/browse/SPARK-3052, but it says the issue
has been fixed since 1.2
Quesiton 2: I am a little surprised that after the 2 new executors were
launched,  replacing the two failed executors, they simply reprocessed the
failed 20 tasks/partitions.  What about the results for other parititons
processed by the 2 failed executors before? I assumed the results of these
parititons are stored to the local disk and thus do not need to be computed
by the new exectuors?  When are the data stored locally? Is it
configuration? This question is for my own understanding about the spark
framework.

The exception causing the exectuor failure is below

org.apache.avro.AvroRuntimeException: java.io.IOException: Filesystem closed
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:278)
at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:64)
at org.apache.avro.mapred.AvroRecordReader.next(AvroRecordReader.java:32)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:245)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:212)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.avro.mapred.FsInput.read(FsInput.java:54)
at
org.apache.avro.file.DataFileReader$SeekableInputStream.read(DataFileReader.java:210)
at
org.apache.avro.io.BinaryDecoder$InputStreamByteSource.tryReadRaw(BinaryDecoder.java:839)
at org.apache.avro.io.BinaryDecoder.isEnd(BinaryDecoder.java:444)
at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:264)

Reply via email to