Hi, I am trying to fit a logistic regression model with cross validation in Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where each element is a pair of RDDs containing the training and test data:
(training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint], test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint]) scala> data_kfolded res21: Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint], org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] = Array((MappedRDD[9] at map at <console>:24,MappedRDD[7] at map at <console>:23), (MappedRDD[13] at map at <console>:24,MappedRDD[11] at map at <console>:23), (MappedRDD[17] at map at <console>:24,MappedRDD[15] at map at <console>:23)) Everything works fine when using data_kfolded: val validationErrors = data_kfolded.map { datafold => val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) val labelAndPreds = datafold._2.map { point => val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } scala> validationErrors res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837, 0.29833546734955185) However, I have understood that the models are not fitted in parallel as data_kfolded is not an RDD (although it's an array of pairs of RDDs). When running the same code where data_kfolded has been replaced with sc.parallelize(data_kfolded), I get a null pointer exception from the line where the run method of the SVMWithSGD object is called with the traning data. I guess this is somehow related to the fact that RDDs can't be accessed from inside a closure. I fail to understand though why the first version works and the second doesn't. Most importantly, is there a way to fit the models in parallel? I would really appreciate your help. val validationErrors = sc.parallelize(data_kfolded).map { datafold => val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) // This line gives null pointer exception val labelAndPreds = datafold._2.map { point => val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } validationErrors.collect java.lang.NullPointerException at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.RDD.take(RDD.scala:824) at org.apache.spark.rdd.RDD.first(RDD.scala:856) at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121) at $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36) at $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602) at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-parallelize-model-fitting-with-different-cross-validation-folds-tp8839.html Sent from the Apache Spark User List mailing list archive at Nabble.com.