I'm running into the following deserialization issue when trying to
run a very simple Java-based application using a local Master (see
stack trace below).

My code basically queries Solr using a custom Hadoop InputFormat. I've
hacked my code to make sure the objects involved
(PipelineDocumentWritable and SerIntWritable) are serializable. I only
did this to rule out any weirdness around handling Hadoop Writable
objects. I've also tried the Kyro stuff but it seems this is a problem
with deserializing the task. In other words, I've tried doing:

    sparkConf.set("spark.serializer", KryoSerializer.class.getName());
    sparkConf.set("spark.kryo.registrator", LWKryoRegistrator.class.getName());

And same problem either with or without overriding to use Kryo. In
fact, my LWKryoRegistrator impl never gets invoked so the exception is
happening lower down in the Spark stack.

Here's the code I'm trying to run (basically query Solr through a
custom InputFormat and then do word count on the text in the tweet_s
field that comes back in the results):

>>>>
    JobConf jobConf = new JobConf();
    jobConf.set(LWMapRedInputFormat.SOLR_ZKHOST,
cli.getOptionValue("zkHost", "localhost:9983"));
    jobConf.set(LWMapRedInputFormat.SOLR_COLLECTION,
cli.getOptionValue("collection", "collection1"));
    jobConf.set(LWMapRedInputFormat.SOLR_QUERY,
cli.getOptionValue("query", "*:*"));
    jobConf.set(LWMapRedInputFormat.SOLR_USE_CURSOR,
cli.getOptionValue("useCursorMark", "false"));

    JavaSparkContext jsc = new JavaSparkContext(conf);
    JavaPairRDD<SerIntWritable, PipelineDocumentWritable> solrRDD =
jsc.hadoopRDD(jobConf,
            LWSerMapRedInputFormat.class, SerIntWritable.class,
PipelineDocumentWritable.class);

    JavaRDD<String> words = solrRDD.flatMap(new
FlatMapFunction<Tuple2<SerIntWritable, PipelineDocumentWritable>,
String>() {
      @Override
      public Iterable<String> call(Tuple2<SerIntWritable,
PipelineDocumentWritable> arg) {
        String str =
arg._2.getPipelineDocument().getFirstField("tweet_s").toString();
        str = str.toLowerCase().replaceAll("[.,!?\n]", " ");
        return Arrays.asList(str.split(" "));
      }
    });

    JavaPairRDD<String, Integer> ones = words.mapToPair(new
PairFunction<String, String, Integer>() {
      public Tuple2<String, Integer> call(String s) {
        return new Tuple2<String, Integer>(s, 1);
      }
    });
    JavaPairRDD<String, Integer> counts = ones.reduceByKey(new
Function2<Integer, Integer, Integer>() {
      public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
      }
    });

    counts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
      public void call(Tuple2<String, Integer> pair) throws Exception {
        System.out.println("\n\n >> "+pair._1+": "+pair._2+" \n");
      }
    });

<<<<

This is on my local machine running Spark 1.1.0 pre-built for Hadoop
2.4. My custom InputFormat is also built against Hadoop 2.4. Here's
how I run this application:

[~/tools/spark-1.1.0-bin-hadoop2.4/bin]$ ./spark-submit --master local
--class com.lucidworks.spark.SparkApp
spark-proto-1.0-SNAPSHOT-with-deps.jar query-solr
-zkHost=localhost:2181/local410 -collection=foo -query="*:*"
-useCursorMark -v

I've turned on DEBUG logging and there's not much useful information
beyond the stack trace.

Here's the exception I'm getting:
>>>

2014-10-01 13:21:06,295 [main] INFO  DAGScheduler  - Failed to run
foreach at SolrQueryProcessor.java:83

Exception in thread "main" org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost):
java.lang.IllegalStateException: unread block data

        
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421)

        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)

        java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

        java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

        
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

        java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

        
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

        
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)

        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)

        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

        java.lang.Thread.run(Thread.java:745)

Driver stacktrace:

at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)

at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)

at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)

at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)

at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)

at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)

at scala.Option.foreach(Option.scala:236)

at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)

at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)

at akka.actor.ActorCell.invoke(ActorCell.scala:456)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)

at akka.dispatch.Mailbox.run(Mailbox.scala:219)

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Anyone have any suggestions on how to resolve this? I've seen other
reports of this "unread block data" problem (e.g.
https://issues.apache.org/jira/browse/SPARK-1867) but no real
solutions other than there's some mismatch between Hadoop versions,
which I don't think is the case here since I'm not running a real
cluster (master=local). Moreover, I get the same error when running
this code in a JUnit test.

Thanks in advance for any guidance you can provide.

Cheers,
Tim

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to