Hi, 

I am trying to fit a logistic regression model with cross validation in
Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
each element is a pair of RDDs containing the training and test data: 

(training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])

scala> data_kfolded
res21:
Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] =
Array((MappedRDD[9] at map at <console>:24,MappedRDD[7] at map at
<console>:23), (MappedRDD[13] at map at <console>:24,MappedRDD[11] at map at
<console>:23), (MappedRDD[17] at map at <console>:24,MappedRDD[15] at map at
<console>:23))

Everything works fine when using data_kfolded: 

val validationErrors = 
data_kfolded.map { datafold => 
  val svmAlg = new SVMWithSGD() 
  val model_reg = svmAlg.run(datafold._1)
  val labelAndPreds = datafold._2.map { point =>
    val prediction = model_reg.predict(point.features)
    (point.label, prediction)
  }
  val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
datafold._2.count
  trainErr.toDouble
}

scala> validationErrors
res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
0.29833546734955185)

However, I have understood that the models are not fitted in parallel as
data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
running the same code where data_kfolded has been replaced with
sc.parallelize(data_kfolded), I get a null pointer exception from the line
where the run method of the SVMWithSGD object is called with the traning
data. I guess this is somehow related to the fact that RDDs can't be
accessed from inside a closure. I fail to understand though why the first
version works and the second doesn't. Most importantly, is there a way to
fit the models in parallel? I would really appreciate your help. 

val validationErrors = 
sc.parallelize(data_kfolded).map { datafold => 
  val svmAlg = new SVMWithSGD() 
  val model_reg = svmAlg.run(datafold._1) // This line gives null pointer
exception
  val labelAndPreds = datafold._2.map { point =>
    val prediction = model_reg.predict(point.features)
    (point.label, prediction)
  }
  val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
datafold._2.count
  trainErr.toDouble
}
validationErrors.collect

java.lang.NullPointerException
        at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
        at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
        at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
        at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
        at org.apache.spark.rdd.RDD.take(RDD.scala:824)
        at org.apache.spark.rdd.RDD.first(RDD.scala:856)
        at
org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121)
        at
$line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36)
        at
$line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
        at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
        at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
        at org.apache.spark.scheduler.Task.run(Task.scala:53)
        at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
        at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
        at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
        at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-parallelize-model-fitting-with-different-cross-validation-folds-tp8839.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to