If you call .par on data_kfolded it will become a parallel collection in Scala and so the maps will happen in parallel . On Jul 5, 2014 9:35 AM, "sparkuser2345" <hm.spark.u...@gmail.com> wrote:
> Hi, > > I am trying to fit a logistic regression model with cross validation in > Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where > each element is a pair of RDDs containing the training and test data: > > (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint], > test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint]) > > scala> data_kfolded > res21: > > Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint], > org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] > = > Array((MappedRDD[9] at map at <console>:24,MappedRDD[7] at map at > <console>:23), (MappedRDD[13] at map at <console>:24,MappedRDD[11] at map > at > <console>:23), (MappedRDD[17] at map at <console>:24,MappedRDD[15] at map > at > <console>:23)) > > Everything works fine when using data_kfolded: > > val validationErrors = > data_kfolded.map { datafold => > val svmAlg = new SVMWithSGD() > val model_reg = svmAlg.run(datafold._1) > val labelAndPreds = datafold._2.map { point => > val prediction = model_reg.predict(point.features) > (point.label, prediction) > } > val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / > datafold._2.count > trainErr.toDouble > } > > scala> validationErrors > res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837, > 0.29833546734955185) > > However, I have understood that the models are not fitted in parallel as > data_kfolded is not an RDD (although it's an array of pairs of RDDs). When > running the same code where data_kfolded has been replaced with > sc.parallelize(data_kfolded), I get a null pointer exception from the line > where the run method of the SVMWithSGD object is called with the traning > data. I guess this is somehow related to the fact that RDDs can't be > accessed from inside a closure. I fail to understand though why the first > version works and the second doesn't. Most importantly, is there a way to > fit the models in parallel? I would really appreciate your help. > > val validationErrors = > sc.parallelize(data_kfolded).map { datafold => > val svmAlg = new SVMWithSGD() > val model_reg = svmAlg.run(datafold._1) // This line gives null pointer > exception > val labelAndPreds = datafold._2.map { point => > val prediction = model_reg.predict(point.features) > (point.label, prediction) > } > val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / > datafold._2.count > trainErr.toDouble > } > validationErrors.collect > > java.lang.NullPointerException > at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971) > at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) > at org.apache.spark.rdd.RDD.take(RDD.scala:824) > at org.apache.spark.rdd.RDD.first(RDD.scala:856) > at > > org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121) > at > $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36) > at > $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602) > at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602) > at > > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888) > at > > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) > at org.apache.spark.scheduler.Task.run(Task.scala:53) > at > > org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) > at > > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) > at > > org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) > at > org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-parallelize-model-fitting-with-different-cross-validation-folds-tp8839.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >