Thank you very much for your time, here is how I designed the case classes, as 
far as I know they apply properly.

Ps: By the way, what do you mean by “The programming guide?”

abstract class Validator {

    // positions to access with Row.getInt(x)
    val shortsale_in_pos = 10
    val month_pos = 11
    val foreclosure_start_dt_pos = 14
    val filemonth_dt_pos = 12
    val reo_start_dt_pos = 14
    // ..

    // redesign into Iterable of Rows -->
    def validate(input: org.apache.spark.sql.Row): Validator

}

case object Nomatch extends Validator {
    def validate(input: Row): Validator = this
  }

case object Shortsale extends Validator {
    def validate(input: Row): Validator = {
        var check1: Boolean = if (input.getDouble(shortsale_in_pos) > 140.0) 
true else false
        if (check1) this else Nomatch
    }
}

Saif

From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com]
Sent: Monday, July 13, 2015 2:16 PM
To: Ellafi, Saif A.
Cc: user@spark.apache.org
Subject: Re: java.io.InvalidClassException

It's a bit hard to tell from the snippets of code but it's likely related to 
the fact that when you serialize instances the enclosing class, if any, also 
gets serialized, as well as any other place where fields used in the closure 
come from...e.g.check this discussion: 
http://stackoverflow.com/questions/9747443/java-io-invalidclassexception-no-valid-constructor

The programming guide also has good advice on serialization issues. I would 
particulary check how Shortsale/Nomatch/Foreclosure are declared (I'd advise 
making them top-level case classes)...

On Mon, Jul 13, 2015 at 12:32 PM, 
<saif.a.ell...@wellsfargo.com<mailto:saif.a.ell...@wellsfargo.com>> wrote:
Hi,

For some experiment I am doing, I am trying to do the following.

1.Created an abstract class Validator. Created case objects from Validator with 
validate(row: Row): Boolean method.

2. Adding in a list all case objects

3. Each validate takes a Row into account, returns “itself” if validate returns 
true, so then, I do this to return an arbitrary number for each match
def evaluate_paths(row: Row, validators: List[ClientPath]): Int = {

    var result: Int = -1

    for (validator <- validators) {
        validator.validate(row) match {
            case Shortsale =>  result = 0
            case Foreclosure => result = 1
            case Nomatch => result 99
            //...
        }
    }
    result
}

val validators = List[ClientPath](
    Shortsale,
    Foreclosure)

4. Then I run the map[Int](row => evaluate_paths(row, validators)

But this blows up, it does not like the creation of the list of validators when 
executing an action function on the RDD such as take(1).
I have tried also instead of list, an Iterator and Array, but no case. Also 
replaced the for loop with a while loop.
Curiously, I tried with custom-made Rows, and the execution works properly, 
when calling evaluate_paths(some_row, validators).

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 125.0 failed 1 times, most recent failure: Lost task 0.0 in stage 125.0 
(TID 830, localhost): java.io.InvalidClassException: 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Nomatch$;
 no valid constructor at 
java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
 at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:768) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1775) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at 
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
...
...
...
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69)
 at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at 
org.apache.spark.scheduler.Task.run(Task.scala:70) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace: at 
org.apache.spark.scheduler.DAGScheduler.org<http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
 at scala.Option.foreach(Option.scala:236) at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

------

Any advice grateful
Saif


Reply via email to