How to parallelize model fitting with different cross-validation folds?
Hi, I am trying to fit a logistic regression model with cross validation in Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where each element is a pair of RDDs containing the training and test data: (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint], test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint]) scala data_kfolded res21: Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint], org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] = Array((MappedRDD[9] at map at console:24,MappedRDD[7] at map at console:23), (MappedRDD[13] at map at console:24,MappedRDD[11] at map at console:23), (MappedRDD[17] at map at console:24,MappedRDD[15] at map at console:23)) Everything works fine when using data_kfolded: val validationErrors = data_kfolded.map { datafold = val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) val labelAndPreds = datafold._2.map { point = val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } scala validationErrors res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837, 0.29833546734955185) However, I have understood that the models are not fitted in parallel as data_kfolded is not an RDD (although it's an array of pairs of RDDs). When running the same code where data_kfolded has been replaced with sc.parallelize(data_kfolded), I get a null pointer exception from the line where the run method of the SVMWithSGD object is called with the traning data. I guess this is somehow related to the fact that RDDs can't be accessed from inside a closure. I fail to understand though why the first version works and the second doesn't. Most importantly, is there a way to fit the models in parallel? I would really appreciate your help. val validationErrors = sc.parallelize(data_kfolded).map { datafold = val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) // This line gives null pointer exception val labelAndPreds = datafold._2.map { point = val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } validationErrors.collect java.lang.NullPointerException at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.RDD.take(RDD.scala:824) at org.apache.spark.rdd.RDD.first(RDD.scala:856) at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121) at $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:36) at $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602) at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method)
Re: How to parallelize model fitting with different cross-validation folds?
If you call .par on data_kfolded it will become a parallel collection in Scala and so the maps will happen in parallel . On Jul 5, 2014 9:35 AM, sparkuser2345 hm.spark.u...@gmail.com wrote: Hi, I am trying to fit a logistic regression model with cross validation in Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where each element is a pair of RDDs containing the training and test data: (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint], test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint]) scala data_kfolded res21: Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint], org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] = Array((MappedRDD[9] at map at console:24,MappedRDD[7] at map at console:23), (MappedRDD[13] at map at console:24,MappedRDD[11] at map at console:23), (MappedRDD[17] at map at console:24,MappedRDD[15] at map at console:23)) Everything works fine when using data_kfolded: val validationErrors = data_kfolded.map { datafold = val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) val labelAndPreds = datafold._2.map { point = val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } scala validationErrors res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837, 0.29833546734955185) However, I have understood that the models are not fitted in parallel as data_kfolded is not an RDD (although it's an array of pairs of RDDs). When running the same code where data_kfolded has been replaced with sc.parallelize(data_kfolded), I get a null pointer exception from the line where the run method of the SVMWithSGD object is called with the traning data. I guess this is somehow related to the fact that RDDs can't be accessed from inside a closure. I fail to understand though why the first version works and the second doesn't. Most importantly, is there a way to fit the models in parallel? I would really appreciate your help. val validationErrors = sc.parallelize(data_kfolded).map { datafold = val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) // This line gives null pointer exception val labelAndPreds = datafold._2.map { point = val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } validationErrors.collect java.lang.NullPointerException at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.RDD.take(RDD.scala:824) at org.apache.spark.rdd.RDD.first(RDD.scala:856) at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121) at $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:36) at $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602) at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at
Spark 1.0 failed on HDP 2.0 with absurd exception
Hi all, I have cluster with HDP 2.0. I built Spark 1.0 on edge node and trying to run with a command ./bin/spark-submit --class test.etl.RunETL --master yarn-cluster --num-executors 14 --driver-memory 3200m --executor-memory 3g --executor-cores 2 my-etl-1.0-SNAPSHOT-hadoop2.2.0.jar in result I got failed YARN application with following stack trace Application application_1404481778533_0068 failed 3 times due to AM Container for appattempt_1404481778533_0068_03 exited with exitCode: 1 due to: Exception from container-launch: org.apache.hadoop.util.Shell$ExitCodeException: at org.apache.hadoop.util.Shell.runCommand(Shell.java:464) at org.apache.hadoop.util.Shell.run(Shell.java:379) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:283) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:79) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) .Failing this attempt.. Failing the application Log Type: stderr Log Length: 686 Unknown/unsupported param List(--executor-memory, 3072, --executor-cores, 2, --num-executors, 14) Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] Options: --jar JAR_PATH Path to your application's JAR file (required) --class CLASS_NAME Name of your application's main class (required) --args ARGS Arguments to be passed to your application's main class. Mutliple invocations are possible, each will be passed in order. --num-workers NUMNumber of workers to start (Default: 2) --worker-cores NUM Number of cores for the workers (Default: 1) --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G) Seems like the old spark notation any ideas? Thank you, Konstantin Kudryavtsev
Re: Graphx traversal and merge interesting edges
Interesting problem! My understanding is that you want to (1) find paths matching a particular pattern, and (2) add edges between the start and end vertices of the matched paths. For (1), I implemented a pattern matcher for GraphX https://github.com/ankurdave/spark/blob/PatternMatching/graphx/src/main/scala/org/apache/spark/graphx/lib/PatternMatching.scala that iteratively accumulates partial pattern matches. I used your example in the unit test https://github.com/ankurdave/spark/blob/PatternMatching/graphx/src/test/scala/org/apache/spark/graphx/lib/PatternMatchingSuite.scala . For (2), you can take the output of the pattern matcher (the set of matching paths organized by their terminal vertices) and construct a set of new edges using the initial and terminal vertices of each path. Then you can make a new graph consisting of the union of the original edge set and the new edges. Let me know if you'd like help with this. Ankur http://www.ankurdave.com/
Re: Graphx traversal and merge interesting edges
Thanks Ankur, Cannot thank you enough for this!!! I am reading your example still digesting grokking it though :-) I was breaking my head over this for past few hours. In my last futile attempts over past few hours. I was looking at Pregel... E.g if that could be used to see at what step of a path match the vertex is in and send message to next vertex with the history of traversal.. then for merging message append the historical traversal path of for each message :-P. --Gautam On 05-Jul-2014, at 3:23 pm, Ankur Dave ankurd...@gmail.com wrote: Interesting problem! My understanding is that you want to (1) find paths matching a particular pattern, and (2) add edges between the start and end vertices of the matched paths. For (1), I implemented a pattern matcher for GraphX that iteratively accumulates partial pattern matches. I used your example in the unit test. For (2), you can take the output of the pattern matcher (the set of matching paths organized by their terminal vertices) and construct a set of new edges using the initial and terminal vertices of each path. Then you can make a new graph consisting of the union of the original edge set and the new edges. Let me know if you'd like help with this. Ankur
[no subject]
I faced in very strange behavior of job that I was run on YARN hadoop cluster. One of stages (map function) was split in 80 tasks, 10 of them successfully finished in ~2 min, but all other jobs are running 40 min and still not finished... I suspect they hung on. Any ideas what's going on and how can it be fixed? Thank you, Konstantin Kudryavtsev
Re: How to parallelize model fitting with different cross-validation folds?
To be clear - each of the RDDs is still a distributed dataset and each of the individual SVM models will be trained in parallel across the cluster. Sean's suggestion effectively has you submitting multiple spark jobs simultaneously, which, depending on your cluster configuration and the size of your dataset, may or may not be a good idea. There are some tricks you can do to make training multiple models on the same dataset faster, which we're hoping to expose to users in an upcoming release. - Evan On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen so...@cloudera.com wrote: If you call .par on data_kfolded it will become a parallel collection in Scala and so the maps will happen in parallel . On Jul 5, 2014 9:35 AM, sparkuser2345 hm.spark.u...@gmail.com wrote: Hi, I am trying to fit a logistic regression model with cross validation in Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where each element is a pair of RDDs containing the training and test data: (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint], test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint]) scala data_kfolded res21: Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint], org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] = Array((MappedRDD[9] at map at console:24,MappedRDD[7] at map at console:23), (MappedRDD[13] at map at console:24,MappedRDD[11] at map at console:23), (MappedRDD[17] at map at console:24,MappedRDD[15] at map at console:23)) Everything works fine when using data_kfolded: val validationErrors = data_kfolded.map { datafold = val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) val labelAndPreds = datafold._2.map { point = val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } scala validationErrors res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837, 0.29833546734955185) However, I have understood that the models are not fitted in parallel as data_kfolded is not an RDD (although it's an array of pairs of RDDs). When running the same code where data_kfolded has been replaced with sc.parallelize(data_kfolded), I get a null pointer exception from the line where the run method of the SVMWithSGD object is called with the traning data. I guess this is somehow related to the fact that RDDs can't be accessed from inside a closure. I fail to understand though why the first version works and the second doesn't. Most importantly, is there a way to fit the models in parallel? I would really appreciate your help. val validationErrors = sc.parallelize(data_kfolded).map { datafold = val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) // This line gives null pointer exception val labelAndPreds = datafold._2.map { point = val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } validationErrors.collect java.lang.NullPointerException at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at org.apache.spark.rdd.RDD.take(RDD.scala:824) at org.apache.spark.rdd.RDD.first(RDD.scala:856) at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121) at $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:36) at $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at
Re: taking top k values of rdd
To make it efficient in your case you may need to do a bit of custom code to emit the top k per partition and then only send those to the driver. On the driver you can just top k the combined top k from each partition (assuming you have (object, count) for each top k list). — Sent from Mailbox On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers ko...@tresata.com wrote: my initial approach to taking top k values of a rdd was using a priority-queue monoid. along these lines: rdd.mapPartitions({ items = Iterator.single(new PriorityQueue(...)) }, false).reduce(monoid.plus) this works fine, but looking at the code for reduce it first reduces within a partition (which doesnt help me) and then sends the results to the driver where these again get reduced. this means that for every partition the (potentially very bulky) priorityqueue gets shipped to the driver. my driver is client side, not inside cluster, and i cannot change this, so this shipping to driver of all these queues can be expensive. is there a better way to do this? should i try to a shuffle first to reduce the partitions to the minimal amount (since number of queues shipped is equal to number of partitions)? is was a way to reduce to a single item RDD, so the queues stay inside cluster and i can retrieve the final result with RDD.first?
Re: taking top k values of rdd
hey nick, you are right. i didnt explain myself well and my code example was wrong... i am keeping a priority-queue with k items per partition (using com.twitter.algebird.mutable.PriorityQueueMonoid.build to limit the sizes of the queues). but this still means i am sending k items per partition to my driver, so k x p, while i only need k. thanks! koert On Sat, Jul 5, 2014 at 1:21 PM, Nick Pentreath nick.pentre...@gmail.com wrote: To make it efficient in your case you may need to do a bit of custom code to emit the top k per partition and then only send those to the driver. On the driver you can just top k the combined top k from each partition (assuming you have (object, count) for each top k list). — Sent from Mailbox https://www.dropbox.com/mailbox On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers ko...@tresata.com wrote: my initial approach to taking top k values of a rdd was using a priority-queue monoid. along these lines: rdd.mapPartitions({ items = Iterator.single(new PriorityQueue(...)) }, false).reduce(monoid.plus) this works fine, but looking at the code for reduce it first reduces within a partition (which doesnt help me) and then sends the results to the driver where these again get reduced. this means that for every partition the (potentially very bulky) priorityqueue gets shipped to the driver. my driver is client side, not inside cluster, and i cannot change this, so this shipping to driver of all these queues can be expensive. is there a better way to do this? should i try to a shuffle first to reduce the partitions to the minimal amount (since number of queues shipped is equal to number of partitions)? is was a way to reduce to a single item RDD, so the queues stay inside cluster and i can retrieve the final result with RDD.first?
Re: taking top k values of rdd
Right. That is unavoidable unless as you say you repartition into 1 partition, which may do the trick. When I say send the top k per partition I don't mean send the pq but the actual values. This may end up being relatively small if k and p are not too big. (I'm not sure how large serialized pq is). — Sent from Mailbox On Sat, Jul 5, 2014 at 10:29 AM, Koert Kuipers ko...@tresata.com wrote: hey nick, you are right. i didnt explain myself well and my code example was wrong... i am keeping a priority-queue with k items per partition (using com.twitter.algebird.mutable.PriorityQueueMonoid.build to limit the sizes of the queues). but this still means i am sending k items per partition to my driver, so k x p, while i only need k. thanks! koert On Sat, Jul 5, 2014 at 1:21 PM, Nick Pentreath nick.pentre...@gmail.com wrote: To make it efficient in your case you may need to do a bit of custom code to emit the top k per partition and then only send those to the driver. On the driver you can just top k the combined top k from each partition (assuming you have (object, count) for each top k list). — Sent from Mailbox https://www.dropbox.com/mailbox On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers ko...@tresata.com wrote: my initial approach to taking top k values of a rdd was using a priority-queue monoid. along these lines: rdd.mapPartitions({ items = Iterator.single(new PriorityQueue(...)) }, false).reduce(monoid.plus) this works fine, but looking at the code for reduce it first reduces within a partition (which doesnt help me) and then sends the results to the driver where these again get reduced. this means that for every partition the (potentially very bulky) priorityqueue gets shipped to the driver. my driver is client side, not inside cluster, and i cannot change this, so this shipping to driver of all these queues can be expensive. is there a better way to do this? should i try to a shuffle first to reduce the partitions to the minimal amount (since number of queues shipped is equal to number of partitions)? is was a way to reduce to a single item RDD, so the queues stay inside cluster and i can retrieve the final result with RDD.first?
Re: How to parallelize model fitting with different cross-validation folds?
Hi sparkuser2345, I'm inferring the problem statement is something like how do I make this complete faster (given my compute resources)? Several comments. First, Spark only allows launching parallel tasks from the driver, not from workers, which is why you're seeing the exception when you try. Whether the latter is a sensible/doable idea is another discussion, but I can appreciate why many people assume this should be possible. Second, on optimization, you may be able to apply Sean's idea about (thread) parallelism at the driver, combined with the knowledge that often these cluster tasks bottleneck while competing for the same resources at the same time (cpu vs disk vs network, etc.) You may be able to achieve some performance optimization by randomizing these timings. This is not unlike GMail randomizing user storage locations around the world for load balancing. Here, you would partition each of your RDDs into a different number of partitions, making some tasks larger than others, and thus some may be in cpu-intensive map while others are shuffling data around the network. This is rather cluster-specific; I'd be interested in what you learn from such an exercise. Third, I find it useful always to consider doing as much as possible in one pass, subject to memory limits, e.g., mapPartitions() vs map(), thus minimizing map/shuffle/reduce boundaries with their context switches and data shuffling. In this case, notice how you're running the training+prediction k times over mostly the same rows, with map/reduce boundaries in between. While the training phase is sealed in this context, you may be able to improve performance by collecting all the k models together, and do a [m x k] predictions all at once which may end up being faster. Finally, as implied from the above, for the very common k-fold cross-validation pattern, the algorithm itself might be written to be smart enough to take both train and test data and do the right thing within itself, thus obviating the need for the user to prepare k data sets and running over them serially, and likely saving a lot of repeated computations in the right internal places. Enjoy, -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen so...@cloudera.com wrote: If you call .par on data_kfolded it will become a parallel collection in Scala and so the maps will happen in parallel . On Jul 5, 2014 9:35 AM, sparkuser2345 hm.spark.u...@gmail.com wrote: Hi, I am trying to fit a logistic regression model with cross validation in Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where each element is a pair of RDDs containing the training and test data: (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint], test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint]) scala data_kfolded res21: Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint], org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] = Array((MappedRDD[9] at map at console:24,MappedRDD[7] at map at console:23), (MappedRDD[13] at map at console:24,MappedRDD[11] at map at console:23), (MappedRDD[17] at map at console:24,MappedRDD[15] at map at console:23)) Everything works fine when using data_kfolded: val validationErrors = data_kfolded.map { datafold = val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) val labelAndPreds = datafold._2.map { point = val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } scala validationErrors res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837, 0.29833546734955185) However, I have understood that the models are not fitted in parallel as data_kfolded is not an RDD (although it's an array of pairs of RDDs). When running the same code where data_kfolded has been replaced with sc.parallelize(data_kfolded), I get a null pointer exception from the line where the run method of the SVMWithSGD object is called with the traning data. I guess this is somehow related to the fact that RDDs can't be accessed from inside a closure. I fail to understand though why the first version works and the second doesn't. Most importantly, is there a way to fit the models in parallel? I would really appreciate your help. val validationErrors = sc.parallelize(data_kfolded).map { datafold = val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) // This line gives null pointer exception val labelAndPreds = datafold._2.map { point = val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } validationErrors.collect
Re: How to parallelize model fitting with different cross-validation folds?
For linear models the 3rd option is by far most efficient and I suspect what Evan is alluding to. Unfortunately it's not directly possible with the classes in Mllib now so you'll have to roll your own using underlying sgd / bfgs primitives. — Sent from Mailbox On Sat, Jul 5, 2014 at 10:45 AM, Christopher Nguyen c...@adatao.com wrote: Hi sparkuser2345, I'm inferring the problem statement is something like how do I make this complete faster (given my compute resources)? Several comments. First, Spark only allows launching parallel tasks from the driver, not from workers, which is why you're seeing the exception when you try. Whether the latter is a sensible/doable idea is another discussion, but I can appreciate why many people assume this should be possible. Second, on optimization, you may be able to apply Sean's idea about (thread) parallelism at the driver, combined with the knowledge that often these cluster tasks bottleneck while competing for the same resources at the same time (cpu vs disk vs network, etc.) You may be able to achieve some performance optimization by randomizing these timings. This is not unlike GMail randomizing user storage locations around the world for load balancing. Here, you would partition each of your RDDs into a different number of partitions, making some tasks larger than others, and thus some may be in cpu-intensive map while others are shuffling data around the network. This is rather cluster-specific; I'd be interested in what you learn from such an exercise. Third, I find it useful always to consider doing as much as possible in one pass, subject to memory limits, e.g., mapPartitions() vs map(), thus minimizing map/shuffle/reduce boundaries with their context switches and data shuffling. In this case, notice how you're running the training+prediction k times over mostly the same rows, with map/reduce boundaries in between. While the training phase is sealed in this context, you may be able to improve performance by collecting all the k models together, and do a [m x k] predictions all at once which may end up being faster. Finally, as implied from the above, for the very common k-fold cross-validation pattern, the algorithm itself might be written to be smart enough to take both train and test data and do the right thing within itself, thus obviating the need for the user to prepare k data sets and running over them serially, and likely saving a lot of repeated computations in the right internal places. Enjoy, -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen so...@cloudera.com wrote: If you call .par on data_kfolded it will become a parallel collection in Scala and so the maps will happen in parallel . On Jul 5, 2014 9:35 AM, sparkuser2345 hm.spark.u...@gmail.com wrote: Hi, I am trying to fit a logistic regression model with cross validation in Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where each element is a pair of RDDs containing the training and test data: (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint], test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint]) scala data_kfolded res21: Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint], org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] = Array((MappedRDD[9] at map at console:24,MappedRDD[7] at map at console:23), (MappedRDD[13] at map at console:24,MappedRDD[11] at map at console:23), (MappedRDD[17] at map at console:24,MappedRDD[15] at map at console:23)) Everything works fine when using data_kfolded: val validationErrors = data_kfolded.map { datafold = val svmAlg = new SVMWithSGD() val model_reg = svmAlg.run(datafold._1) val labelAndPreds = datafold._2.map { point = val prediction = model_reg.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / datafold._2.count trainErr.toDouble } scala validationErrors res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837, 0.29833546734955185) However, I have understood that the models are not fitted in parallel as data_kfolded is not an RDD (although it's an array of pairs of RDDs). When running the same code where data_kfolded has been replaced with sc.parallelize(data_kfolded), I get a null pointer exception from the line where the run method of the SVMWithSGD object is called with the traning data. I guess this is somehow related to the fact that RDDs can't be accessed from inside a closure. I fail to understand though why the first version works and the second doesn't. Most importantly, is there a way to fit the models in parallel? I would really appreciate your help. val validationErrors = sc.parallelize(data_kfolded).map { datafold = val svmAlg = new SVMWithSGD() val
Re: How to use groupByKey and CqlPagingInputFormat
Ah, I see. Thank you! As we are in the process of building the system we have not tried with any large amounts of data yet, but when the time comes I'll try both implementations and do a small benchmark. On Fri, Jul 4, 2014 at 9:20 PM, Mohammed Guller moham...@glassbeam.com wrote: As far as I know, there is not much difference, except that the outer parenthesis is redundant. The problem with your original code was that there was mismatch in the opening and closing parenthesis. Sometimes the error messages are misleading :-) Do you see any performance difference with the Datastax spark driver? Mohammed -Original Message- From: Martin Gammelsæter [mailto:martingammelsae...@gmail.com] Sent: Friday, July 4, 2014 12:43 AM To: user@spark.apache.org Subject: Re: How to use groupByKey and CqlPagingInputFormat On Thu, Jul 3, 2014 at 10:29 PM, Mohammed Guller moham...@glassbeam.com wrote: Martin, 1) The first map contains the columns in the primary key, which could be a compound primary key containing multiple columns, and the second map contains all the non-key columns. Ah, thank you, that makes sense. 2) try this fixed code: val navnrevmap = casRdd.map{ case (key, value) = (ByteBufferUtil.string(value.get(navn)), ByteBufferUtil.toInt(value.get(revisjon))) }.groupByKey() I changed from CqlPagingInputFormat to the new Datastax cassandra-spark driver, which is a bit easier to work with, but thanks! I'm curious though, what is the semantic difference between map({}) and map{}? -- Mvh. Martin Gammelsæter 92209139
Re: Spark 1.0 failed on HDP 2.0 with absurd exception
From looking at the exception message that was returned, I would try the following command for running the application: ./bin/spark-submit --class test.etl.RunETL --master yarn-cluster --num-workers 14 --driver-memory 3200m --worker-memory 3g --worker-cores 2 --jar my-etl-1.0-SNAPSHOT-hadoop2.2.0.jar I didn't try this, so it may not work. Best, -Cesar On Sat, Jul 5, 2014 at 2:48 AM, Konstantin Kudryavtsev kudryavtsev.konstan...@gmail.com wrote: Hi all, I have cluster with HDP 2.0. I built Spark 1.0 on edge node and trying to run with a command ./bin/spark-submit --class test.etl.RunETL --master yarn-cluster --num-executors 14 --driver-memory 3200m --executor-memory 3g --executor-cores 2 my-etl-1.0-SNAPSHOT-hadoop2.2.0.jar in result I got failed YARN application with following stack trace Application application_1404481778533_0068 failed 3 times due to AM Container for appattempt_1404481778533_0068_03 exited with exitCode: 1 due to: Exception from container-launch: org.apache.hadoop.util.Shell$ExitCodeException: at org.apache.hadoop.util.Shell.runCommand(Shell.java:464) at org.apache.hadoop.util.Shell.run(Shell.java:379) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:283) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:79) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) .Failing this attempt.. Failing the application Log Type: stderr Log Length: 686 Unknown/unsupported param List(--executor-memory, 3072, --executor-cores, 2, --num-executors, 14) Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options] Options: --jar JAR_PATH Path to your application's JAR file (required) --class CLASS_NAME Name of your application's main class (required) --args ARGS Arguments to be passed to your application's main class. Mutliple invocations are possible, each will be passed in order. --num-workers NUMNumber of workers to start (Default: 2) --worker-cores NUM Number of cores for the workers (Default: 1) --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G) Seems like the old spark notation any ideas? Thank you, Konstantin Kudryavtsev -- Cesar Arevalo Software Engineer ❘ Zephyr Health 450 Mission Street, Suite #201 ❘ San Francisco, CA 94105 m: +1 415-571-7687 ❘ s: arevalocesar | t: @zephyrhealth https://twitter.com/zephyrhealth o: +1 415-529-7649 ❘ f: +1 415-520-9288 http://www.zephyrhealth.com
Re: [mllib] strange/buggy results with RidgeRegressionWithSGD
You may try LBFGS to have more stable convergence. In spark 1.1, we will be able to use LBFGS instead of GD in training process. On Jul 4, 2014 1:23 PM, Thomas Robert tho...@creativedata.fr wrote: Hi all, I too am having some issues with *RegressionWithSGD algorithms. Concerning your issue Eustache, this could be due to the fact that these regression algorithms uses a fixed step (that is divided by sqrt(iteration)). During my tests, quite often, the algorithm diverged an infinity cost, I guessed because the step was too big. I reduce it and managed to get good results on a very simple generated dataset. But I was wondering if anyone here had some advises concerning the use of these regression algorithms, for example how to choose a good step and number of iterations? I wonder if I'm using those right... Thanks, -- *Thomas ROBERT* www.creativedata.fr 2014-07-03 16:16 GMT+02:00 Eustache DIEMERT eusta...@diemert.fr: Printing the model show the intercept is always 0 :( Should I open a bug for that ? 2014-07-02 16:11 GMT+02:00 Eustache DIEMERT eusta...@diemert.fr: Hi list, I'm benchmarking MLlib for a regression task [1] and get strange results. Namely, using RidgeRegressionWithSGD it seems the predicted points miss the intercept: {code} val trainedModel = RidgeRegressionWithSGD.train(trainingData, 1000) ... valuesAndPreds.take(10).map(t = println(t)) {code} output: (2007.0,-3.784588726958493E75) (2003.0,-1.9562390324037716E75) (2005.0,-4.147413202985629E75) (2003.0,-1.524938024096847E75) ... If I change the parameters (step size, regularization and iterations) I get NaNs more often than not: (2007.0,NaN) (2003.0,NaN) (2005.0,NaN) ... On the other hand DecisionTree model give sensible results. I see there is a `setIntercept()` method in abstract class GeneralizedLinearAlgorithm that seems to trigger the use of the intercept but I'm unable to use it from the public interface :( Any help appreciated :) Eustache [1] https://archive.ics.uci.edu/ml/datasets/YearPredictionMSD
Re: window analysis with Spark and Spark streaming
Key idea is to simulate your app time as you enter data . So you can connect spark streaming to a queue and insert data in it spaced by time. Easier said than done :). What are the parallelism issues you are hitting with your static approach. On Friday, July 4, 2014, alessandro finamore alessandro.finam...@polito.it wrote: Thanks for the replies What is not completely clear to me is how time is managed. I can create a DStream from file. But if I set the window property that will be bounded to the application time, right? If I got it right, with a receiver I can control the way DStream are created. But, how can apply then the windowing already shipped with the framework if this is bounded to the application time? I would like to do define a window of N files but the window() function requires a duration as input... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/window-analysis-with-Spark-and-Spark-streaming-tp8806p8824.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Sent from Gmail Mobile
Re: graphx Joining two VertexPartitions with different indexes is slow.
thanks for replying. why is joining two vertexrdds without caching slow? what is recomputed unnecessarily? i am not sure what is different here from joining 2 regular RDDs (where nobody seems to recommend to cache before joining i think...) On Thu, Jul 3, 2014 at 10:52 PM, Ankur Dave ankurd...@gmail.com wrote: Oh, I just read your message more carefully and noticed that you're joining a regular RDD with a VertexRDD. In that case I'm not sure why the warning is occurring, but it might be worth caching both operands (graph.vertices and the regular RDD) just to be sure. Ankur http://www.ankurdave.com/
Re: reading compress lzo files
On Fri, Jul 4, 2014 at 3:33 PM, Gurvinder Singh gurvinder.si...@uninett.no wrote: csv = sc.newAPIHadoopFile(opts.input,com.hadoop .mapreduce.LzoTextInputFormat,org.apache.hadoop .io.LongWritable,org.apache.hadoop.io.Text).count() Does anyone know what the rough equivalent of this would be in the Scala API? I am trying the following, but the first import yields an error on my spark-ec2 cluster: import com.hadoop.mapreduce.LzoTextInputFormatimport org.apache.hadoop.io.LongWritableimport org.apache.hadoop.io.Text sc.newAPIHadoopFile(s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data, LzoTextInputFormat, LongWritable, Text) scala import com.hadoop.mapreduce.LzoTextInputFormat console:12: error: object hadoop is not a member of package com import com.hadoop.mapreduce.LzoTextInputFormat Nick
Re: graphx Joining two VertexPartitions with different indexes is slow.
When joining two VertexRDDs with identical indexes, GraphX can use a fast code path (a zip join without any hash lookups). However, the check for identical indexes is performed using reference equality. Without caching, two copies of the index are created. Although the two indexes are structurally identical, they fail reference equality, and so GraphX mistakenly uses the slow path involving a hash lookup per joined element. I'm working on a patch https://github.com/apache/spark/pull/1297 that attempts an optimistic zip join with per-element fallback to hash lookups, which would improve this situation. Ankur http://www.ankurdave.com/
Re: reading compress lzo files
The package com.hadoop.mapreduce certainly looks wrong. If it is a Hadoop class it starts with org.apache.hadoop On Jul 6, 2014 4:20 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: On Fri, Jul 4, 2014 at 3:33 PM, Gurvinder Singh gurvinder.si...@uninett.no wrote: csv = sc.newAPIHadoopFile(opts.input,com.hadoop .mapreduce.LzoTextInputFormat,org.apache.hadoop .io.LongWritable,org.apache.hadoop.io.Text).count() Does anyone know what the rough equivalent of this would be in the Scala API? I am trying the following, but the first import yields an error on my spark-ec2 cluster: import com.hadoop.mapreduce.LzoTextInputFormatimport org.apache.hadoop.io.LongWritableimport org.apache.hadoop.io.Text sc.newAPIHadoopFile(s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data, LzoTextInputFormat, LongWritable, Text) scala import com.hadoop.mapreduce.LzoTextInputFormat console:12: error: object hadoop is not a member of package com import com.hadoop.mapreduce.LzoTextInputFormat Nick