I'm running into the following deserialization issue when trying to run a very simple Java-based application using a local Master (see stack trace below).
My code basically queries Solr using a custom Hadoop InputFormat. I've hacked my code to make sure the objects involved (PipelineDocumentWritable and SerIntWritable) are serializable. I only did this to rule out any weirdness around handling Hadoop Writable objects. I've also tried the Kyro stuff but it seems this is a problem with deserializing the task. In other words, I've tried doing: sparkConf.set("spark.serializer", KryoSerializer.class.getName()); sparkConf.set("spark.kryo.registrator", LWKryoRegistrator.class.getName()); And same problem either with or without overriding to use Kryo. In fact, my LWKryoRegistrator impl never gets invoked so the exception is happening lower down in the Spark stack. Here's the code I'm trying to run (basically query Solr through a custom InputFormat and then do word count on the text in the tweet_s field that comes back in the results): >>>> JobConf jobConf = new JobConf(); jobConf.set(LWMapRedInputFormat.SOLR_ZKHOST, cli.getOptionValue("zkHost", "localhost:9983")); jobConf.set(LWMapRedInputFormat.SOLR_COLLECTION, cli.getOptionValue("collection", "collection1")); jobConf.set(LWMapRedInputFormat.SOLR_QUERY, cli.getOptionValue("query", "*:*")); jobConf.set(LWMapRedInputFormat.SOLR_USE_CURSOR, cli.getOptionValue("useCursorMark", "false")); JavaSparkContext jsc = new JavaSparkContext(conf); JavaPairRDD<SerIntWritable, PipelineDocumentWritable> solrRDD = jsc.hadoopRDD(jobConf, LWSerMapRedInputFormat.class, SerIntWritable.class, PipelineDocumentWritable.class); JavaRDD<String> words = solrRDD.flatMap(new FlatMapFunction<Tuple2<SerIntWritable, PipelineDocumentWritable>, String>() { @Override public Iterable<String> call(Tuple2<SerIntWritable, PipelineDocumentWritable> arg) { String str = arg._2.getPipelineDocument().getFirstField("tweet_s").toString(); str = str.toLowerCase().replaceAll("[.,!?\n]", " "); return Arrays.asList(str.split(" ")); } }); JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); counts.foreach(new VoidFunction<Tuple2<String, Integer>>() { public void call(Tuple2<String, Integer> pair) throws Exception { System.out.println("\n\n >> "+pair._1+": "+pair._2+" \n"); } }); <<<< This is on my local machine running Spark 1.1.0 pre-built for Hadoop 2.4. My custom InputFormat is also built against Hadoop 2.4. Here's how I run this application: [~/tools/spark-1.1.0-bin-hadoop2.4/bin]$ ./spark-submit --master local --class com.lucidworks.spark.SparkApp spark-proto-1.0-SNAPSHOT-with-deps.jar query-solr -zkHost=localhost:2181/local410 -collection=foo -query="*:*" -useCursorMark -v I've turned on DEBUG logging and there's not much useful information beyond the stack trace. Here's the exception I'm getting: >>> 2014-10-01 13:21:06,295 [main] INFO DAGScheduler - Failed to run foreach at SolrQueryProcessor.java:83 Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): java.lang.IllegalStateException: unread block data java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Anyone have any suggestions on how to resolve this? I've seen other reports of this "unread block data" problem (e.g. https://issues.apache.org/jira/browse/SPARK-1867) but no real solutions other than there's some mismatch between Hadoop versions, which I don't think is the case here since I'm not running a real cluster (master=local). Moreover, I get the same error when running this code in a JUnit test. Thanks in advance for any guidance you can provide. Cheers, Tim --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org