Hi dear Spark users & developpers,

I've stumbled on a problem that seems to occur randomly. 
My tasks sometimes (60%) fail with errors like that :

java.io.StreamCorruptedException: invalid handle value: 006E0007
	at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1454)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1330)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:101)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
	at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:26)
	at scala.collection.Iterator$class.foreach(Iterator.scala:772)
	at org.apache.spark.util.CompletionIterator.foreach(CompletionIterator.scala:23)
	at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:92)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:92)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
	at org.apache.spark.scheduler.ResultTask.run(ResultTask.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:744)

The code that produces the error :

object ItemCooccurrences {
	case class Cooccurrence(val i:Int, val j:Int)
	class CooccurrenceWritable(var c:Cooccurrence) extends Writable {
		def this() = this(new Cooccurrence(0,0))
		override def readFields(in: DataInput) : Unit = {
			val i = in readInt()
			val j = in readInt()
			c = new Cooccurrence(i,j)
		}
		override def write(out: DataOutput) : Unit = {
			out writeInt(c.i)
			out writeInt(c.j)
		}
		override def toString() : String = {
		  "("+c.i+","+c.j+")"
		}
	}
	
  def cooccurrencesFromArray(words:Array[Int], shift : Int = 1): List[Cooccurrence] = {
    val unmerged =
      for (i <- 0 until words.length - shift) 
    	  yield { 
        val wi = words(i) 
        val wj = words(i+shift) 
        new Cooccurrence(Math.min(wi,wj),Math.max(wi,wj)) 
        } 
      
      
    unmerged.toList
  }

	def main(args: Array[String]): Unit = {
		if (args.length <= 2) {
			System.err.println("Usage: ItemCooccurrences <dataset> <result> <dist>")
			System.exit(1)
		}
		
		val sc = new SparkContext(args(0), "ItemCooccurrences",
				System.getenv("SPARK_HOME"))
		
		val shift = if (args.length > 3) args(3).toInt else 1
		
		val dataset = sc.newAPIHadoopFile[Text,IntArrayWritable,
			SequenceFileInputFormat[Text,IntArrayWritable]](args(1))
		val coocurrences = dataset.
			flatMap(x => cooccurrencesFromArray(x._2.array,shift)).
			map(x => (x,1l)).
			reduceByKey(_+_).
			map(x => x match { case (y,z) => (new CooccurrenceWritable(y),new LongWritable(z)) })
		coocurrences.saveAsNewAPIHadoopFile(args(2)+"-cooccurrences.seq", 
		    classOf[CooccurrenceWritable], classOf[LongWritable], classOf[SequenceFileOutputFormat[CooccurrenceWritable,LongWritable]])
		
		sc.stop()
	}
}

I've tried both with 0.8.0 and current master

When I use the KryoSerializer, the error does not seem to happen, but 
the process takes significantly longer to finish, especially for the 
second  stage (after the reduceByKey)
Any help would be very appreciated

Cheers,
Guillaume
--
eXenSa
Guillaume PITEL, Président
+33(0)6 25 48 86 80

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05

Reply via email to