java.io.StreamCorruptedException: invalid handle value: 006E0007
at java.io.ObjectInputStream.readHandle(ObjectInputStream.java:1454)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1330)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:101)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:26)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at org.apache.spark.util.CompletionIterator.foreach(CompletionIterator.scala:23)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:92)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$combineByKey$2.apply(PairRDDFunctions.scala:92)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:36)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.scheduler.ResultTask.run(ResultTask.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:158)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
The code that produces the error :
object ItemCooccurrences {
case class Cooccurrence(val i:Int, val j:Int)
class CooccurrenceWritable(var c:Cooccurrence) extends Writable {
def this() = this(new Cooccurrence(0,0))
override def readFields(in: DataInput) : Unit = {
val i = in readInt()
val j = in readInt()
c = new Cooccurrence(i,j)
}
override def write(out: DataOutput) : Unit = {
out writeInt(c.i)
out writeInt(c.j)
}
override def toString() : String = {
"("+c.i+","+c.j+")"
}
}
def cooccurrencesFromArray(words:Array[Int], shift : Int = 1): List[Cooccurrence] = {
val unmerged =
for (i <- 0 until words.length - shift)
yield {
val wi = words(i)
val wj = words(i+shift)
new Cooccurrence(Math.min(wi,wj),Math.max(wi,wj))
}
unmerged.toList
}
def main(args: Array[String]): Unit = {
if (args.length <= 2) {
System.err.println("Usage: ItemCooccurrences <dataset> <result> <dist>")
System.exit(1)
}
val sc = new SparkContext(args(0), "ItemCooccurrences",
System.getenv("SPARK_HOME"))
val shift = if (args.length > 3) args(3).toInt else 1
val dataset = sc.newAPIHadoopFile[Text,IntArrayWritable,
SequenceFileInputFormat[Text,IntArrayWritable]](args(1))
val coocurrences = dataset.
flatMap(x => cooccurrencesFromArray(x._2.array,shift)).
map(x => (x,1l)).
reduceByKey(_+_).
map(x => x match { case (y,z) => (new CooccurrenceWritable(y),new LongWritable(z)) })
coocurrences.saveAsNewAPIHadoopFile(args(2)+"-cooccurrences.seq",
classOf[CooccurrenceWritable], classOf[LongWritable], classOf[SequenceFileOutputFormat[CooccurrenceWritable,LongWritable]])
sc.stop()
}
}
I've tried both with 0.8.0 and current master
When I use the KryoSerializer, the error does not seem to happen, but
the process takes significantly longer to finish, especially for the
second stage (after the reduceByKey)
Any help would be very appreciated
Cheers, Guillaume --
|

