For linear models the 3rd option is by far most efficient and I suspect what Evan is alluding to.
Unfortunately it's not directly possible with the classes in Mllib now so you'll have to roll your own using underlying sgd / bfgs primitives. — Sent from Mailbox On Sat, Jul 5, 2014 at 10:45 AM, Christopher Nguyen <c...@adatao.com> wrote: > Hi sparkuser2345, > I'm inferring the problem statement is something like "how do I make this > complete faster (given my compute resources)?" > Several comments. > First, Spark only allows launching parallel tasks from the driver, not from > workers, which is why you're seeing the exception when you try. Whether the > latter is a sensible/doable idea is another discussion, but I can > appreciate why many people assume this should be possible. > Second, on optimization, you may be able to apply Sean's idea about > (thread) parallelism at the driver, combined with the knowledge that often > these cluster tasks bottleneck while competing for the same resources at > the same time (cpu vs disk vs network, etc.) You may be able to achieve > some performance optimization by randomizing these timings. This is not > unlike GMail randomizing user storage locations around the world for load > balancing. Here, you would partition each of your RDDs into a different > number of partitions, making some tasks larger than others, and thus some > may be in cpu-intensive map while others are shuffling data around the > network. This is rather cluster-specific; I'd be interested in what you > learn from such an exercise. > Third, I find it useful always to consider doing as much as possible in one > pass, subject to memory limits, e.g., mapPartitions() vs map(), thus > minimizing map/shuffle/reduce boundaries with their context switches and > data shuffling. In this case, notice how you're running the > training+prediction k times over mostly the same rows, with map/reduce > boundaries in between. While the training phase is sealed in this context, > you may be able to improve performance by collecting all the k models > together, and do a [m x k] predictions all at once which may end up being > faster. > Finally, as implied from the above, for the very common k-fold > cross-validation pattern, the algorithm itself might be written to be smart > enough to take both train and test data and "do the right thing" within > itself, thus obviating the need for the user to prepare k data sets and > running over them serially, and likely saving a lot of repeated > computations in the right internal places. > Enjoy, > -- > Christopher T. Nguyen > Co-founder & CEO, Adatao <http://adatao.com> > linkedin.com/in/ctnguyen > On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen <so...@cloudera.com> wrote: >> If you call .par on data_kfolded it will become a parallel collection in >> Scala and so the maps will happen in parallel . >> On Jul 5, 2014 9:35 AM, "sparkuser2345" <hm.spark.u...@gmail.com> wrote: >> >>> Hi, >>> >>> I am trying to fit a logistic regression model with cross validation in >>> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where >>> each element is a pair of RDDs containing the training and test data: >>> >>> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint], >>> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint]) >>> >>> scala> data_kfolded >>> res21: >>> >>> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint], >>> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] >>> = >>> Array((MappedRDD[9] at map at <console>:24,MappedRDD[7] at map at >>> <console>:23), (MappedRDD[13] at map at <console>:24,MappedRDD[11] at map >>> at >>> <console>:23), (MappedRDD[17] at map at <console>:24,MappedRDD[15] at map >>> at >>> <console>:23)) >>> >>> Everything works fine when using data_kfolded: >>> >>> val validationErrors = >>> data_kfolded.map { datafold => >>> val svmAlg = new SVMWithSGD() >>> val model_reg = svmAlg.run(datafold._1) >>> val labelAndPreds = datafold._2.map { point => >>> val prediction = model_reg.predict(point.features) >>> (point.label, prediction) >>> } >>> val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / >>> datafold._2.count >>> trainErr.toDouble >>> } >>> >>> scala> validationErrors >>> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837, >>> 0.29833546734955185) >>> >>> However, I have understood that the models are not fitted in parallel as >>> data_kfolded is not an RDD (although it's an array of pairs of RDDs). When >>> running the same code where data_kfolded has been replaced with >>> sc.parallelize(data_kfolded), I get a null pointer exception from the line >>> where the run method of the SVMWithSGD object is called with the traning >>> data. I guess this is somehow related to the fact that RDDs can't be >>> accessed from inside a closure. I fail to understand though why the first >>> version works and the second doesn't. Most importantly, is there a way to >>> fit the models in parallel? I would really appreciate your help. >>> >>> val validationErrors = >>> sc.parallelize(data_kfolded).map { datafold => >>> val svmAlg = new SVMWithSGD() >>> val model_reg = svmAlg.run(datafold._1) // This line gives null pointer >>> exception >>> val labelAndPreds = datafold._2.map { point => >>> val prediction = model_reg.predict(point.features) >>> (point.label, prediction) >>> } >>> val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / >>> datafold._2.count >>> trainErr.toDouble >>> } >>> validationErrors.collect >>> >>> java.lang.NullPointerException >>> at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971) >>> at >>> org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) >>> at >>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) >>> at scala.Option.getOrElse(Option.scala:120) >>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) >>> at org.apache.spark.rdd.RDD.take(RDD.scala:824) >>> at org.apache.spark.rdd.RDD.first(RDD.scala:856) >>> at >>> >>> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121) >>> at >>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36) >>> at >>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34) >>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> at >>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>> at >>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>> at >>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>> at >>> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) >>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>> at >>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>> at >>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>> at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602) >>> at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602) >>> at >>> >>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888) >>> at >>> >>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888) >>> at >>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) >>> at org.apache.spark.scheduler.Task.run(Task.scala:53) >>> at >>> >>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) >>> at >>> >>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) >>> at >>> >>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at javax.security.auth.Subject.doAs(Subject.java:415) >>> at >>> >>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) >>> at >>> >>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> at java.lang.Thread.run(Thread.java:744) >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-parallelize-model-fitting-with-different-cross-validation-folds-tp8839.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>