How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread sparkuser2345
Hi, 

I am trying to fit a logistic regression model with cross validation in
Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
each element is a pair of RDDs containing the training and test data: 

(training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])

scala data_kfolded
res21:
Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] =
Array((MappedRDD[9] at map at console:24,MappedRDD[7] at map at
console:23), (MappedRDD[13] at map at console:24,MappedRDD[11] at map at
console:23), (MappedRDD[17] at map at console:24,MappedRDD[15] at map at
console:23))

Everything works fine when using data_kfolded: 

val validationErrors = 
data_kfolded.map { datafold = 
  val svmAlg = new SVMWithSGD() 
  val model_reg = svmAlg.run(datafold._1)
  val labelAndPreds = datafold._2.map { point =
val prediction = model_reg.predict(point.features)
(point.label, prediction)
  }
  val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
datafold._2.count
  trainErr.toDouble
}

scala validationErrors
res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
0.29833546734955185)

However, I have understood that the models are not fitted in parallel as
data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
running the same code where data_kfolded has been replaced with
sc.parallelize(data_kfolded), I get a null pointer exception from the line
where the run method of the SVMWithSGD object is called with the traning
data. I guess this is somehow related to the fact that RDDs can't be
accessed from inside a closure. I fail to understand though why the first
version works and the second doesn't. Most importantly, is there a way to
fit the models in parallel? I would really appreciate your help. 

val validationErrors = 
sc.parallelize(data_kfolded).map { datafold = 
  val svmAlg = new SVMWithSGD() 
  val model_reg = svmAlg.run(datafold._1) // This line gives null pointer
exception
  val labelAndPreds = datafold._2.map { point =
val prediction = model_reg.predict(point.features)
(point.label, prediction)
  }
  val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
datafold._2.count
  trainErr.toDouble
}
validationErrors.collect

java.lang.NullPointerException
at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.RDD.take(RDD.scala:824)
at org.apache.spark.rdd.RDD.first(RDD.scala:856)
at
org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121)
at
$line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:36)
at
$line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
at java.security.AccessController.doPrivileged(Native Method)
   

Re: How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread Sean Owen
If you call .par on data_kfolded it will become a parallel collection in
Scala and so the maps will happen in parallel .
On Jul 5, 2014 9:35 AM, sparkuser2345 hm.spark.u...@gmail.com wrote:

 Hi,

 I am trying to fit a logistic regression model with cross validation in
 Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
 each element is a pair of RDDs containing the training and test data:

 (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
 test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])

 scala data_kfolded
 res21:

 Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
 org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
 =
 Array((MappedRDD[9] at map at console:24,MappedRDD[7] at map at
 console:23), (MappedRDD[13] at map at console:24,MappedRDD[11] at map
 at
 console:23), (MappedRDD[17] at map at console:24,MappedRDD[15] at map
 at
 console:23))

 Everything works fine when using data_kfolded:

 val validationErrors =
 data_kfolded.map { datafold =
   val svmAlg = new SVMWithSGD()
   val model_reg = svmAlg.run(datafold._1)
   val labelAndPreds = datafold._2.map { point =
 val prediction = model_reg.predict(point.features)
 (point.label, prediction)
   }
   val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
 datafold._2.count
   trainErr.toDouble
 }

 scala validationErrors
 res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
 0.29833546734955185)

 However, I have understood that the models are not fitted in parallel as
 data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
 running the same code where data_kfolded has been replaced with
 sc.parallelize(data_kfolded), I get a null pointer exception from the line
 where the run method of the SVMWithSGD object is called with the traning
 data. I guess this is somehow related to the fact that RDDs can't be
 accessed from inside a closure. I fail to understand though why the first
 version works and the second doesn't. Most importantly, is there a way to
 fit the models in parallel? I would really appreciate your help.

 val validationErrors =
 sc.parallelize(data_kfolded).map { datafold =
   val svmAlg = new SVMWithSGD()
   val model_reg = svmAlg.run(datafold._1) // This line gives null pointer
 exception
   val labelAndPreds = datafold._2.map { point =
 val prediction = model_reg.predict(point.features)
 (point.label, prediction)
   }
   val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
 datafold._2.count
   trainErr.toDouble
 }
 validationErrors.collect

 java.lang.NullPointerException
 at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
 at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at org.apache.spark.rdd.RDD.take(RDD.scala:824)
 at org.apache.spark.rdd.RDD.first(RDD.scala:856)
 at

 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121)
 at
 $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:36)
 at
 $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
 at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
 at

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
 at

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
 at

 

Spark 1.0 failed on HDP 2.0 with absurd exception

2014-07-05 Thread Konstantin Kudryavtsev
Hi all,

I have cluster with HDP 2.0. I built Spark 1.0 on edge node and trying to
run with a command
./bin/spark-submit --class test.etl.RunETL --master yarn-cluster
--num-executors 14 --driver-memory 3200m --executor-memory 3g
--executor-cores 2 my-etl-1.0-SNAPSHOT-hadoop2.2.0.jar

in result I got failed YARN application with following stack trace

Application application_1404481778533_0068 failed 3 times due to AM
Container for appattempt_1404481778533_0068_03 exited with exitCode: 1
due to: Exception from container-launch:
org.apache.hadoop.util.Shell$ExitCodeException:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:464)
at org.apache.hadoop.util.Shell.run(Shell.java:379)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:283)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:79)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
 .Failing this attempt.. Failing the application

Log Type: stderr

Log Length: 686

Unknown/unsupported param List(--executor-memory, 3072,
--executor-cores, 2, --num-executors, 14)
Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
Options:
  --jar JAR_PATH   Path to your application's JAR file (required)
  --class CLASS_NAME   Name of your application's main class (required)
  --args ARGS  Arguments to be passed to your application's main class.
   Mutliple invocations are possible, each will be
passed in order.
  --num-workers NUMNumber of workers to start (Default: 2)
  --worker-cores NUM   Number of cores for the workers (Default: 1)
  --worker-memory MEM  Memory per Worker (e.g. 1000M, 2G) (Default: 1G)


Seems like the old spark notation any ideas?

Thank you,
Konstantin Kudryavtsev


Re: Graphx traversal and merge interesting edges

2014-07-05 Thread Ankur Dave
Interesting problem! My understanding is that you want to (1) find paths
matching a particular pattern, and (2) add edges between the start and end
vertices of the matched paths.

For (1), I implemented a pattern matcher for GraphX
https://github.com/ankurdave/spark/blob/PatternMatching/graphx/src/main/scala/org/apache/spark/graphx/lib/PatternMatching.scala
that iteratively accumulates partial pattern matches. I used your example
in the unit test
https://github.com/ankurdave/spark/blob/PatternMatching/graphx/src/test/scala/org/apache/spark/graphx/lib/PatternMatchingSuite.scala
.

For (2), you can take the output of the pattern matcher (the set of
matching paths organized by their terminal vertices) and construct a set of
new edges using the initial and terminal vertices of each path. Then you
can make a new graph consisting of the union of the original edge set and
the new edges. Let me know if you'd like help with this.

Ankur http://www.ankurdave.com/


Re: Graphx traversal and merge interesting edges

2014-07-05 Thread HHB
Thanks Ankur,

Cannot thank you enough for this!!! I am reading your example still digesting  
grokking it though :-)

I was breaking my head over this for past few hours.

In my last futile attempts over past few hours. I was looking at Pregel... E.g 
if that could be used to see at what step of a path match the vertex is in and 
send message to next vertex with the history of traversal.. then for merging 
message append the historical traversal path of for each message :-P. 

--Gautam

On 05-Jul-2014, at 3:23 pm, Ankur Dave ankurd...@gmail.com wrote:

 Interesting problem! My understanding is that you want to (1) find paths 
 matching a particular pattern, and (2) add edges between the start and end 
 vertices of the matched paths.
 
 For (1), I implemented a pattern matcher for GraphX that iteratively 
 accumulates partial pattern matches. I used your example in the unit test.
 
 For (2), you can take the output of the pattern matcher (the set of matching 
 paths organized by their terminal vertices) and construct a set of new edges 
 using the initial and terminal vertices of each path. Then you can make a new 
 graph consisting of the union of the original edge set and the new edges. Let 
 me know if you'd like help with this.
 
 Ankur
 



[no subject]

2014-07-05 Thread Konstantin Kudryavtsev
I faced in very strange behavior of job that I was run on YARN hadoop
cluster. One of stages (map function) was split in 80 tasks, 10 of them
successfully finished in ~2 min, but all other jobs are running  40 min
and still not finished... I suspect they hung on.
Any ideas what's going on and how can it be fixed?


Thank you,
Konstantin Kudryavtsev


Re: How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread Evan R. Sparks
To be clear - each of the RDDs is still a distributed dataset and each of
the individual SVM models will be trained in parallel across the cluster.
Sean's suggestion effectively has you submitting multiple spark jobs
simultaneously, which, depending on your cluster configuration and the size
of your dataset, may or may not be a good idea.

There are some tricks you can do to make training multiple models on the
same dataset faster, which we're hoping to expose to users in an upcoming
release.

- Evan


On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen so...@cloudera.com wrote:

 If you call .par on data_kfolded it will become a parallel collection in
 Scala and so the maps will happen in parallel .
 On Jul 5, 2014 9:35 AM, sparkuser2345 hm.spark.u...@gmail.com wrote:

 Hi,

 I am trying to fit a logistic regression model with cross validation in
 Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
 each element is a pair of RDDs containing the training and test data:

 (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
 test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])

 scala data_kfolded
 res21:

 Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
 org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
 =
 Array((MappedRDD[9] at map at console:24,MappedRDD[7] at map at
 console:23), (MappedRDD[13] at map at console:24,MappedRDD[11] at map
 at
 console:23), (MappedRDD[17] at map at console:24,MappedRDD[15] at map
 at
 console:23))

 Everything works fine when using data_kfolded:

 val validationErrors =
 data_kfolded.map { datafold =
   val svmAlg = new SVMWithSGD()
   val model_reg = svmAlg.run(datafold._1)
   val labelAndPreds = datafold._2.map { point =
 val prediction = model_reg.predict(point.features)
 (point.label, prediction)
   }
   val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
 datafold._2.count
   trainErr.toDouble
 }

 scala validationErrors
 res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
 0.29833546734955185)

 However, I have understood that the models are not fitted in parallel as
 data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
 running the same code where data_kfolded has been replaced with
 sc.parallelize(data_kfolded), I get a null pointer exception from the line
 where the run method of the SVMWithSGD object is called with the traning
 data. I guess this is somehow related to the fact that RDDs can't be
 accessed from inside a closure. I fail to understand though why the first
 version works and the second doesn't. Most importantly, is there a way to
 fit the models in parallel? I would really appreciate your help.

 val validationErrors =
 sc.parallelize(data_kfolded).map { datafold =
   val svmAlg = new SVMWithSGD()
   val model_reg = svmAlg.run(datafold._1) // This line gives null pointer
 exception
   val labelAndPreds = datafold._2.map { point =
 val prediction = model_reg.predict(point.features)
 (point.label, prediction)
   }
   val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
 datafold._2.count
   trainErr.toDouble
 }
 validationErrors.collect

 java.lang.NullPointerException
 at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
 at
 org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
 at
 org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
 at org.apache.spark.rdd.RDD.take(RDD.scala:824)
 at org.apache.spark.rdd.RDD.first(RDD.scala:856)
 at

 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121)
 at
 $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:36)
 at
 $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:34)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
 at
 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 at
 scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
 at scala.collection.AbstractIterator.to(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
 at
 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 at 

Re: taking top k values of rdd

2014-07-05 Thread Nick Pentreath
To make it efficient in your case you may need to do a bit of custom code to 
emit the top k per partition and then only send those to the driver. On the 
driver you can just top k the combined top k from each partition (assuming you 
have (object, count) for each top k list).

—
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers ko...@tresata.com wrote:

 my initial approach to taking top k values of a rdd was using a
 priority-queue monoid. along these lines:
 rdd.mapPartitions({ items = Iterator.single(new PriorityQueue(...)) },
 false).reduce(monoid.plus)
 this works fine, but looking at the code for reduce it first reduces within
 a partition (which doesnt help me) and then sends the results to the driver
 where these again get reduced. this means that for every partition the
 (potentially very bulky) priorityqueue gets shipped to the driver.
 my driver is client side, not inside cluster, and i cannot change this, so
 this shipping to driver of all these queues can be expensive.
 is there a better way to do this? should i try to a shuffle first to reduce
 the partitions to the minimal amount (since number of queues shipped is
 equal to number of partitions)?
 is was a way to reduce to a single item RDD, so the queues stay inside
 cluster and i can retrieve the final result with RDD.first?

Re: taking top k values of rdd

2014-07-05 Thread Koert Kuipers
hey nick,
you are right. i didnt explain myself well and my code example was wrong...
i am keeping a priority-queue with k items per partition (using
com.twitter.algebird.mutable.PriorityQueueMonoid.build to limit the sizes
of the queues).
but this still means i am sending k items per partition to my driver, so k
x p, while i only need k.
thanks! koert



On Sat, Jul 5, 2014 at 1:21 PM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 To make it efficient in your case you may need to do a bit of custom code
 to emit the top k per partition and then only send those to the driver. On
 the driver you can just top k the combined top k from each partition
 (assuming you have (object, count) for each top k list).

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers ko...@tresata.com wrote:

 my initial approach to taking top k values of a rdd was using a
 priority-queue monoid. along these lines:

  rdd.mapPartitions({ items = Iterator.single(new PriorityQueue(...)) },
 false).reduce(monoid.plus)

 this works fine, but looking at the code for reduce it first reduces
 within a partition (which doesnt help me) and then sends the results to the
 driver where these again get reduced. this means that for every partition
 the (potentially very bulky) priorityqueue gets shipped to the driver.

 my driver is client side, not inside cluster, and i cannot change this,
 so this shipping to driver of all these queues can be expensive.

 is there a better way to do this? should i try to a shuffle first to
 reduce the partitions to the minimal amount (since number of queues shipped
 is equal to number of partitions)?

 is was a way to reduce to a single item RDD, so the queues stay inside
 cluster and i can retrieve the final result with RDD.first?





Re: taking top k values of rdd

2014-07-05 Thread Nick Pentreath
Right. That is unavoidable unless as you say you repartition into 1 partition, 
which may do the trick.


When I say send the top k per partition I don't mean send the pq but the actual 
values. This may end up being relatively small if k and p are not too big. (I'm 
not sure how large serialized pq is).
—
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:29 AM, Koert Kuipers ko...@tresata.com wrote:

 hey nick,
 you are right. i didnt explain myself well and my code example was wrong...
 i am keeping a priority-queue with k items per partition (using
 com.twitter.algebird.mutable.PriorityQueueMonoid.build to limit the sizes
 of the queues).
 but this still means i am sending k items per partition to my driver, so k
 x p, while i only need k.
 thanks! koert
 On Sat, Jul 5, 2014 at 1:21 PM, Nick Pentreath nick.pentre...@gmail.com
 wrote:
 To make it efficient in your case you may need to do a bit of custom code
 to emit the top k per partition and then only send those to the driver. On
 the driver you can just top k the combined top k from each partition
 (assuming you have (object, count) for each top k list).

 —
 Sent from Mailbox https://www.dropbox.com/mailbox


 On Sat, Jul 5, 2014 at 10:17 AM, Koert Kuipers ko...@tresata.com wrote:

 my initial approach to taking top k values of a rdd was using a
 priority-queue monoid. along these lines:

  rdd.mapPartitions({ items = Iterator.single(new PriorityQueue(...)) },
 false).reduce(monoid.plus)

 this works fine, but looking at the code for reduce it first reduces
 within a partition (which doesnt help me) and then sends the results to the
 driver where these again get reduced. this means that for every partition
 the (potentially very bulky) priorityqueue gets shipped to the driver.

 my driver is client side, not inside cluster, and i cannot change this,
 so this shipping to driver of all these queues can be expensive.

 is there a better way to do this? should i try to a shuffle first to
 reduce the partitions to the minimal amount (since number of queues shipped
 is equal to number of partitions)?

 is was a way to reduce to a single item RDD, so the queues stay inside
 cluster and i can retrieve the final result with RDD.first?




Re: How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread Christopher Nguyen
Hi sparkuser2345,

I'm inferring the problem statement is something like how do I make this
complete faster (given my compute resources)?

Several comments.

First, Spark only allows launching parallel tasks from the driver, not from
workers, which is why you're seeing the exception when you try. Whether the
latter is a sensible/doable idea is another discussion, but I can
appreciate why many people assume this should be possible.

Second, on optimization, you may be able to apply Sean's idea about
(thread) parallelism at the driver, combined with the knowledge that often
these cluster tasks bottleneck while competing for the same resources at
the same time (cpu vs disk vs network, etc.) You may be able to achieve
some performance optimization by randomizing these timings. This is not
unlike GMail randomizing user storage locations around the world for load
balancing. Here, you would partition each of your RDDs into a different
number of partitions, making some tasks larger than others, and thus some
may be in cpu-intensive map while others are shuffling data around the
network. This is rather cluster-specific; I'd be interested in what you
learn from such an exercise.

Third, I find it useful always to consider doing as much as possible in one
pass, subject to memory limits, e.g., mapPartitions() vs map(), thus
minimizing map/shuffle/reduce boundaries with their context switches and
data shuffling. In this case, notice how you're running the
training+prediction k times over mostly the same rows, with map/reduce
boundaries in between. While the training phase is sealed in this context,
you may be able to improve performance by collecting all the k models
together, and do a [m x k] predictions all at once which may end up being
faster.

Finally, as implied from the above, for the very common k-fold
cross-validation pattern, the algorithm itself might be written to be smart
enough to take both train and test data and do the right thing within
itself, thus obviating the need for the user to prepare k data sets and
running over them serially, and likely saving a lot of repeated
computations in the right internal places.

Enjoy,
--
Christopher T. Nguyen
Co-founder  CEO, Adatao http://adatao.com
linkedin.com/in/ctnguyen



On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen so...@cloudera.com wrote:

 If you call .par on data_kfolded it will become a parallel collection in
 Scala and so the maps will happen in parallel .
 On Jul 5, 2014 9:35 AM, sparkuser2345 hm.spark.u...@gmail.com wrote:

 Hi,

 I am trying to fit a logistic regression model with cross validation in
 Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
 each element is a pair of RDDs containing the training and test data:

 (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
 test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])

 scala data_kfolded
 res21:

 Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
 org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
 =
 Array((MappedRDD[9] at map at console:24,MappedRDD[7] at map at
 console:23), (MappedRDD[13] at map at console:24,MappedRDD[11] at map
 at
 console:23), (MappedRDD[17] at map at console:24,MappedRDD[15] at map
 at
 console:23))

 Everything works fine when using data_kfolded:

 val validationErrors =
 data_kfolded.map { datafold =
   val svmAlg = new SVMWithSGD()
   val model_reg = svmAlg.run(datafold._1)
   val labelAndPreds = datafold._2.map { point =
 val prediction = model_reg.predict(point.features)
 (point.label, prediction)
   }
   val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
 datafold._2.count
   trainErr.toDouble
 }

 scala validationErrors
 res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
 0.29833546734955185)

 However, I have understood that the models are not fitted in parallel as
 data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
 running the same code where data_kfolded has been replaced with
 sc.parallelize(data_kfolded), I get a null pointer exception from the line
 where the run method of the SVMWithSGD object is called with the traning
 data. I guess this is somehow related to the fact that RDDs can't be
 accessed from inside a closure. I fail to understand though why the first
 version works and the second doesn't. Most importantly, is there a way to
 fit the models in parallel? I would really appreciate your help.

 val validationErrors =
 sc.parallelize(data_kfolded).map { datafold =
   val svmAlg = new SVMWithSGD()
   val model_reg = svmAlg.run(datafold._1) // This line gives null pointer
 exception
   val labelAndPreds = datafold._2.map { point =
 val prediction = model_reg.predict(point.features)
 (point.label, prediction)
   }
   val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
 datafold._2.count
   trainErr.toDouble
 }
 validationErrors.collect

 

Re: How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread Nick Pentreath
For linear models the 3rd option is by far most efficient and I suspect what 
Evan is alluding to. 


Unfortunately it's not directly possible with the classes in Mllib now so 
you'll have to roll your own using underlying sgd / bfgs primitives.
—
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:45 AM, Christopher Nguyen c...@adatao.com
wrote:

 Hi sparkuser2345,
 I'm inferring the problem statement is something like how do I make this
 complete faster (given my compute resources)?
 Several comments.
 First, Spark only allows launching parallel tasks from the driver, not from
 workers, which is why you're seeing the exception when you try. Whether the
 latter is a sensible/doable idea is another discussion, but I can
 appreciate why many people assume this should be possible.
 Second, on optimization, you may be able to apply Sean's idea about
 (thread) parallelism at the driver, combined with the knowledge that often
 these cluster tasks bottleneck while competing for the same resources at
 the same time (cpu vs disk vs network, etc.) You may be able to achieve
 some performance optimization by randomizing these timings. This is not
 unlike GMail randomizing user storage locations around the world for load
 balancing. Here, you would partition each of your RDDs into a different
 number of partitions, making some tasks larger than others, and thus some
 may be in cpu-intensive map while others are shuffling data around the
 network. This is rather cluster-specific; I'd be interested in what you
 learn from such an exercise.
 Third, I find it useful always to consider doing as much as possible in one
 pass, subject to memory limits, e.g., mapPartitions() vs map(), thus
 minimizing map/shuffle/reduce boundaries with their context switches and
 data shuffling. In this case, notice how you're running the
 training+prediction k times over mostly the same rows, with map/reduce
 boundaries in between. While the training phase is sealed in this context,
 you may be able to improve performance by collecting all the k models
 together, and do a [m x k] predictions all at once which may end up being
 faster.
 Finally, as implied from the above, for the very common k-fold
 cross-validation pattern, the algorithm itself might be written to be smart
 enough to take both train and test data and do the right thing within
 itself, thus obviating the need for the user to prepare k data sets and
 running over them serially, and likely saving a lot of repeated
 computations in the right internal places.
 Enjoy,
 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen
 On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen so...@cloudera.com wrote:
 If you call .par on data_kfolded it will become a parallel collection in
 Scala and so the maps will happen in parallel .
 On Jul 5, 2014 9:35 AM, sparkuser2345 hm.spark.u...@gmail.com wrote:

 Hi,

 I am trying to fit a logistic regression model with cross validation in
 Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
 each element is a pair of RDDs containing the training and test data:

 (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
 test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])

 scala data_kfolded
 res21:

 Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
 org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
 =
 Array((MappedRDD[9] at map at console:24,MappedRDD[7] at map at
 console:23), (MappedRDD[13] at map at console:24,MappedRDD[11] at map
 at
 console:23), (MappedRDD[17] at map at console:24,MappedRDD[15] at map
 at
 console:23))

 Everything works fine when using data_kfolded:

 val validationErrors =
 data_kfolded.map { datafold =
   val svmAlg = new SVMWithSGD()
   val model_reg = svmAlg.run(datafold._1)
   val labelAndPreds = datafold._2.map { point =
 val prediction = model_reg.predict(point.features)
 (point.label, prediction)
   }
   val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble /
 datafold._2.count
   trainErr.toDouble
 }

 scala validationErrors
 res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
 0.29833546734955185)

 However, I have understood that the models are not fitted in parallel as
 data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
 running the same code where data_kfolded has been replaced with
 sc.parallelize(data_kfolded), I get a null pointer exception from the line
 where the run method of the SVMWithSGD object is called with the traning
 data. I guess this is somehow related to the fact that RDDs can't be
 accessed from inside a closure. I fail to understand though why the first
 version works and the second doesn't. Most importantly, is there a way to
 fit the models in parallel? I would really appreciate your help.

 val validationErrors =
 sc.parallelize(data_kfolded).map { datafold =
   val svmAlg = new SVMWithSGD()
   val 

Re: How to use groupByKey and CqlPagingInputFormat

2014-07-05 Thread Martin Gammelsæter
Ah, I see. Thank you!

As we are in the process of building the system we have not tried with
any large amounts of data yet, but when the time comes I'll try both
implementations and do a small benchmark.

On Fri, Jul 4, 2014 at 9:20 PM, Mohammed Guller moham...@glassbeam.com wrote:
 As far as I know, there is not much difference, except that the outer 
 parenthesis is redundant. The problem with your original code was that there 
 was mismatch in the opening and closing parenthesis. Sometimes the error 
 messages are misleading :-)

 Do you see any performance difference with the Datastax spark driver?

 Mohammed

 -Original Message-
 From: Martin Gammelsæter [mailto:martingammelsae...@gmail.com]
 Sent: Friday, July 4, 2014 12:43 AM
 To: user@spark.apache.org
 Subject: Re: How to use groupByKey and CqlPagingInputFormat

 On Thu, Jul 3, 2014 at 10:29 PM, Mohammed Guller moham...@glassbeam.com 
 wrote:
 Martin,

 1) The first map contains the columns in the primary key, which could be a 
 compound primary key containing multiple columns,  and the second map 
 contains all the non-key columns.

 Ah, thank you, that makes sense.

 2) try this fixed code:
 val navnrevmap = casRdd.map{
   case (key, value) =
 (ByteBufferUtil.string(value.get(navn)),
ByteBufferUtil.toInt(value.get(revisjon)))
}.groupByKey()

 I changed from CqlPagingInputFormat to the new Datastax cassandra-spark 
 driver, which is a bit easier to work with, but thanks! I'm curious though, 
 what is the semantic difference between
 map({}) and map{}?



-- 
Mvh.
Martin Gammelsæter
92209139


Re: Spark 1.0 failed on HDP 2.0 with absurd exception

2014-07-05 Thread Cesar Arevalo
From looking at the exception message that was returned, I would try the
following command for running the application:

./bin/spark-submit --class test.etl.RunETL --master yarn-cluster
--num-workers 14 --driver-memory 3200m --worker-memory 3g --worker-cores 2
--jar my-etl-1.0-SNAPSHOT-hadoop2.2.0.jar


I didn't try this, so it may not work.

Best,
-Cesar



On Sat, Jul 5, 2014 at 2:48 AM, Konstantin Kudryavtsev 
kudryavtsev.konstan...@gmail.com wrote:

 Hi all,

 I have cluster with HDP 2.0. I built Spark 1.0 on edge node and trying to
 run with a command
 ./bin/spark-submit --class test.etl.RunETL --master yarn-cluster
 --num-executors 14 --driver-memory 3200m --executor-memory 3g
 --executor-cores 2 my-etl-1.0-SNAPSHOT-hadoop2.2.0.jar

 in result I got failed YARN application with following stack trace

 Application application_1404481778533_0068 failed 3 times due to AM
 Container for appattempt_1404481778533_0068_03 exited with exitCode: 1
 due to: Exception from container-launch:
 org.apache.hadoop.util.Shell$ExitCodeException:
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:464)
 at org.apache.hadoop.util.Shell.run(Shell.java:379)
 at
 org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
 at
 org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195)
 at
 org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:283)
 at
 org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:79)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)
  .Failing this attempt.. Failing the application

 Log Type: stderr

 Log Length: 686

 Unknown/unsupported param List(--executor-memory, 3072, --executor-cores, 2, 
 --num-executors, 14)
 Usage: org.apache.spark.deploy.yarn.ApplicationMaster [options]
 Options:
   --jar JAR_PATH   Path to your application's JAR file (required)
   --class CLASS_NAME   Name of your application's main class (required)
   --args ARGS  Arguments to be passed to your application's main 
 class.
Mutliple invocations are possible, each will be passed 
 in order.
   --num-workers NUMNumber of workers to start (Default: 2)
   --worker-cores NUM   Number of cores for the workers (Default: 1)
   --worker-memory MEM  Memory per Worker (e.g. 1000M, 2G) (Default: 1G)


 Seems like the old spark notation any ideas?

 Thank you,
 Konstantin Kudryavtsev




-- 
Cesar Arevalo
Software Engineer ❘ Zephyr Health
450 Mission Street, Suite #201 ❘ San Francisco, CA 94105
m: +1 415-571-7687 ❘ s: arevalocesar | t: @zephyrhealth
https://twitter.com/zephyrhealth
o: +1 415-529-7649 ❘ f: +1 415-520-9288
http://www.zephyrhealth.com


Re: [mllib] strange/buggy results with RidgeRegressionWithSGD

2014-07-05 Thread DB Tsai
You may try LBFGS to have more stable convergence. In spark 1.1, we will be
able to use LBFGS instead of GD in training process.
On Jul 4, 2014 1:23 PM, Thomas Robert tho...@creativedata.fr wrote:

 Hi all,

 I too am having some issues with *RegressionWithSGD algorithms.

 Concerning your issue Eustache, this could be due to the fact that these
 regression algorithms uses a fixed step (that is divided by
 sqrt(iteration)). During my tests, quite often, the algorithm diverged an
 infinity cost, I guessed because the step was too big. I reduce it and
 managed to get good results on a very simple generated dataset.

 But I was wondering if anyone here had some advises concerning the use of
 these regression algorithms, for example how to choose a good step and
 number of iterations? I wonder if I'm using those right...

 Thanks,

 --

 *Thomas ROBERT*
 www.creativedata.fr


 2014-07-03 16:16 GMT+02:00 Eustache DIEMERT eusta...@diemert.fr:

 Printing the model show the intercept is always 0 :(

 Should I open a bug for that ?


 2014-07-02 16:11 GMT+02:00 Eustache DIEMERT eusta...@diemert.fr:

 Hi list,

 I'm benchmarking MLlib for a regression task [1] and get strange
 results.

 Namely, using RidgeRegressionWithSGD it seems the predicted points miss
 the intercept:

 {code}
 val trainedModel = RidgeRegressionWithSGD.train(trainingData, 1000)
 ...
 valuesAndPreds.take(10).map(t = println(t))
 {code}

 output:

 (2007.0,-3.784588726958493E75)
 (2003.0,-1.9562390324037716E75)
 (2005.0,-4.147413202985629E75)
 (2003.0,-1.524938024096847E75)
 ...

 If I change the parameters (step size, regularization and iterations) I
 get NaNs more often than not:
 (2007.0,NaN)
 (2003.0,NaN)
 (2005.0,NaN)
 ...

 On the other hand DecisionTree model give sensible results.

 I see there is a `setIntercept()` method in abstract class
 GeneralizedLinearAlgorithm that seems to trigger the use of the intercept
 but I'm unable to use it from the public interface :(

 Any help appreciated :)

 Eustache

 [1] https://archive.ics.uci.edu/ml/datasets/YearPredictionMSD





Re: window analysis with Spark and Spark streaming

2014-07-05 Thread Mayur Rustagi
Key idea is to simulate your app time as you enter data . So you can
connect spark streaming to a queue and insert data in it spaced by time.
Easier said than done :). What are the parallelism issues you are hitting
with your static approach.

On Friday, July 4, 2014, alessandro finamore alessandro.finam...@polito.it
wrote:

 Thanks for the replies

 What is not completely clear to me is how time is managed.
 I can create a DStream from file.
 But if I set the window property that will be bounded to the application
 time, right?

 If I got it right, with a receiver I can control the way DStream are
 created.
 But, how can apply then the windowing already shipped with the framework if
 this is bounded to the application time?
 I would like to do define a window of N files but the window() function
 requires a duration as input...




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/window-analysis-with-Spark-and-Spark-streaming-tp8806p8824.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Sent from Gmail Mobile


Re: graphx Joining two VertexPartitions with different indexes is slow.

2014-07-05 Thread Koert Kuipers
thanks for replying. why is joining two vertexrdds without caching slow?
what is recomputed unnecessarily?
i am not sure what is different here from joining 2 regular RDDs (where
nobody seems to recommend to cache before joining i think...)


On Thu, Jul 3, 2014 at 10:52 PM, Ankur Dave ankurd...@gmail.com wrote:

 Oh, I just read your message more carefully and noticed that you're
 joining a regular RDD with a VertexRDD. In that case I'm not sure why the
 warning is occurring, but it might be worth caching both operands
 (graph.vertices and the regular RDD) just to be sure.

 Ankur http://www.ankurdave.com/




Re: reading compress lzo files

2014-07-05 Thread Nicholas Chammas
On Fri, Jul 4, 2014 at 3:33 PM, Gurvinder Singh gurvinder.si...@uninett.no
wrote:

csv =
 sc.newAPIHadoopFile(opts.input,com.hadoop
 .mapreduce.LzoTextInputFormat,org.apache.hadoop
 .io.LongWritable,org.apache.hadoop.io.Text).count()

Does anyone know what the rough equivalent of this would be in the Scala
API?

I am trying the following, but the first import yields an error on my
spark-ec2 cluster:

import com.hadoop.mapreduce.LzoTextInputFormatimport
org.apache.hadoop.io.LongWritableimport org.apache.hadoop.io.Text

sc.newAPIHadoopFile(s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data,
LzoTextInputFormat, LongWritable, Text)

scala import com.hadoop.mapreduce.LzoTextInputFormat
console:12: error: object hadoop is not a member of package com
   import com.hadoop.mapreduce.LzoTextInputFormat

Nick
​


Re: graphx Joining two VertexPartitions with different indexes is slow.

2014-07-05 Thread Ankur Dave
When joining two VertexRDDs with identical indexes, GraphX can use a fast
code path (a zip join without any hash lookups). However, the check for
identical indexes is performed using reference equality.

Without caching, two copies of the index are created. Although the two
indexes are structurally identical, they fail reference equality, and so
GraphX mistakenly uses the slow path involving a hash lookup per joined
element.

I'm working on a patch https://github.com/apache/spark/pull/1297 that
attempts an optimistic zip join with per-element fallback to hash lookups,
which would improve this situation.

Ankur http://www.ankurdave.com/


Re: reading compress lzo files

2014-07-05 Thread Sean Owen
The package com.hadoop.mapreduce certainly looks wrong. If it is a Hadoop
class it starts with org.apache.hadoop
On Jul 6, 2014 4:20 AM, Nicholas Chammas nicholas.cham...@gmail.com
wrote:

 On Fri, Jul 4, 2014 at 3:33 PM, Gurvinder Singh 
 gurvinder.si...@uninett.no wrote:

 csv =
 sc.newAPIHadoopFile(opts.input,com.hadoop
 .mapreduce.LzoTextInputFormat,org.apache.hadoop
 .io.LongWritable,org.apache.hadoop.io.Text).count()

 Does anyone know what the rough equivalent of this would be in the Scala
 API?

 I am trying the following, but the first import yields an error on my
 spark-ec2 cluster:

 import com.hadoop.mapreduce.LzoTextInputFormatimport 
 org.apache.hadoop.io.LongWritableimport org.apache.hadoop.io.Text

 sc.newAPIHadoopFile(s3n://datasets.elasticmapreduce/ngrams/books/20090715/eng-us-all/1gram/data,
  LzoTextInputFormat, LongWritable, Text)

 scala import com.hadoop.mapreduce.LzoTextInputFormat
 console:12: error: object hadoop is not a member of package com
import com.hadoop.mapreduce.LzoTextInputFormat

 Nick
 ​