For linear models the 3rd option is by far most efficient and I suspect what 
Evan is alluding to. 


Unfortunately it's not directly possible with the classes in Mllib now so 
you'll have to roll your own using underlying sgd / bfgs primitives.
—
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:45 AM, Christopher Nguyen <c...@adatao.com>
wrote:

> Hi sparkuser2345,
> I'm inferring the problem statement is something like "how do I make this
> complete faster (given my compute resources)?"
> Several comments.
> First, Spark only allows launching parallel tasks from the driver, not from
> workers, which is why you're seeing the exception when you try. Whether the
> latter is a sensible/doable idea is another discussion, but I can
> appreciate why many people assume this should be possible.
> Second, on optimization, you may be able to apply Sean's idea about
> (thread) parallelism at the driver, combined with the knowledge that often
> these cluster tasks bottleneck while competing for the same resources at
> the same time (cpu vs disk vs network, etc.) You may be able to achieve
> some performance optimization by randomizing these timings. This is not
> unlike GMail randomizing user storage locations around the world for load
> balancing. Here, you would partition each of your RDDs into a different
> number of partitions, making some tasks larger than others, and thus some
> may be in cpu-intensive map while others are shuffling data around the
> network. This is rather cluster-specific; I'd be interested in what you
> learn from such an exercise.
> Third, I find it useful always to consider doing as much as possible in one
> pass, subject to memory limits, e.g., mapPartitions() vs map(), thus
> minimizing map/shuffle/reduce boundaries with their context switches and
> data shuffling. In this case, notice how you're running the
> training+prediction k times over mostly the same rows, with map/reduce
> boundaries in between. While the training phase is sealed in this context,
> you may be able to improve performance by collecting all the k models
> together, and do a [m x k] predictions all at once which may end up being
> faster.
> Finally, as implied from the above, for the very common k-fold
> cross-validation pattern, the algorithm itself might be written to be smart
> enough to take both train and test data and "do the right thing" within
> itself, thus obviating the need for the user to prepare k data sets and
> running over them serially, and likely saving a lot of repeated
> computations in the right internal places.
> Enjoy,
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
> On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen <so...@cloudera.com> wrote:
>> If you call .par on data_kfolded it will become a parallel collection in
>> Scala and so the maps will happen in parallel .
>> On Jul 5, 2014 9:35 AM, "sparkuser2345" <hm.spark.u...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to fit a logistic regression model with cross validation in
>>> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
>>> each element is a pair of RDDs containing the training and test data:
>>>
>>> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
>>> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])
>>>
>>> scala> data_kfolded
>>> res21:
>>>
>>> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
>>> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
>>> =
>>> Array((MappedRDD[9] at map at <console>:24,MappedRDD[7] at map at
>>> <console>:23), (MappedRDD[13] at map at <console>:24,MappedRDD[11] at map
>>> at
>>> <console>:23), (MappedRDD[17] at map at <console>:24,MappedRDD[15] at map
>>> at
>>> <console>:23))
>>>
>>> Everything works fine when using data_kfolded:
>>>
>>> val validationErrors =
>>> data_kfolded.map { datafold =>
>>>   val svmAlg = new SVMWithSGD()
>>>   val model_reg = svmAlg.run(datafold._1)
>>>   val labelAndPreds = datafold._2.map { point =>
>>>     val prediction = model_reg.predict(point.features)
>>>     (point.label, prediction)
>>>   }
>>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>>> datafold._2.count
>>>   trainErr.toDouble
>>> }
>>>
>>> scala> validationErrors
>>> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
>>> 0.29833546734955185)
>>>
>>> However, I have understood that the models are not fitted in parallel as
>>> data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
>>> running the same code where data_kfolded has been replaced with
>>> sc.parallelize(data_kfolded), I get a null pointer exception from the line
>>> where the run method of the SVMWithSGD object is called with the traning
>>> data. I guess this is somehow related to the fact that RDDs can't be
>>> accessed from inside a closure. I fail to understand though why the first
>>> version works and the second doesn't. Most importantly, is there a way to
>>> fit the models in parallel? I would really appreciate your help.
>>>
>>> val validationErrors =
>>> sc.parallelize(data_kfolded).map { datafold =>
>>>   val svmAlg = new SVMWithSGD()
>>>   val model_reg = svmAlg.run(datafold._1) // This line gives null pointer
>>> exception
>>>   val labelAndPreds = datafold._2.map { point =>
>>>     val prediction = model_reg.predict(point.features)
>>>     (point.label, prediction)
>>>   }
>>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>>> datafold._2.count
>>>   trainErr.toDouble
>>> }
>>> validationErrors.collect
>>>
>>> java.lang.NullPointerException
>>>         at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
>>>         at
>>> org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
>>>         at scala.Option.getOrElse(Option.scala:120)
>>>         at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
>>>         at org.apache.spark.rdd.RDD.take(RDD.scala:824)
>>>         at org.apache.spark.rdd.RDD.first(RDD.scala:856)
>>>         at
>>>
>>> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121)
>>>         at
>>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36)
>>>         at
>>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
>>>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>         at
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>         at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>         at
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>         at
>>> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>         at
>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>         at
>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>         at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
>>>         at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
>>>         at
>>>
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
>>>         at
>>>
>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
>>>         at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>>         at
>>>
>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>>>         at
>>>
>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>         at
>>>
>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>>         at
>>>
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>>>         at
>>>
>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>>         at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:744)
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-parallelize-model-fitting-with-different-cross-validation-folds-tp8839.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>

Reply via email to