It's a bit hard to tell from the snippets of code but it's likely related
to the fact that when you serialize instances the enclosing class, if any,
also gets serialized, as well as any other place where fields used in the
closure come from...e.g.check this discussion:
http://stackoverflow.com/questions/9747443/java-io-invalidclassexception-no-valid-constructor

The programming guide also has good advice on serialization issues. I would
particulary check how Shortsale/Nomatch/Foreclosure are declared (I'd
advise making them top-level case classes)...

On Mon, Jul 13, 2015 at 12:32 PM, <saif.a.ell...@wellsfargo.com> wrote:

>  Hi,
>
> For some experiment I am doing, I am trying to do the following.
>
> 1.Created an abstract class Validator. Created case objects from Validator
> with validate(row: Row): Boolean method.
>
> 2. Adding in a list all case objects
>
> 3. Each validate takes a Row into account, returns *“itself”* if validate
> returns true, so then, I do this to return an arbitrary number for each
> match
>  def evaluate_paths(row: Row, validators: List[ClientPath]): Int = {
>
>     var result: Int = -1
>
>     for (validator <- validators) {
>         validator.validate(row) match {
>             case Shortsale =>  result = 0
>             case Foreclosure => result = 1
>             case Nomatch => result 99
>             //...
>         }
>     }
>     result
> }
>
> val validators = List[ClientPath](
>     Shortsale,
>     Foreclosure)
>
> 4. Then I run the map[Int](row => evaluate_paths(row, validators)
>
> But this blows up, it does not like the creation of the list of validators
> when executing an action function on the RDD such as take(1).
> I have tried also instead of list, an Iterator and Array, but no case.
> Also replaced the for loop with a while loop.
> Curiously, I tried with custom-made Rows, and the execution works
> properly, when calling evaluate_paths(some_row, validators).
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 125.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 125.0 (TID 830, localhost): java.io.InvalidClassException:
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Nomatch$;
> no valid constructor at
> java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
> at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:768)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> ...
> ...
> ...
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at
> org.apache.spark.scheduler.Task.run(Task.scala:70) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace: at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236) at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>  ------
>
> Any advice grateful
> Saif
>
>

Reply via email to