Re: possible bug in Spark's ALS implementation...

2014-03-11 Thread Xiangrui Meng
Hi Michael,

I can help check the current implementation. Would you please go to
https://spark-project.atlassian.net/browse/SPARK and create a ticket
about this issue with component MLlib? Thanks!

Best,
Xiangrui

On Tue, Mar 11, 2014 at 3:18 PM, Michael Allman m...@allman.ms wrote:
 Hi,

 I'm implementing a recommender based on the algorithm described in
 http://www2.research.att.com/~yifanhu/PUB/cf.pdf. This algorithm forms the
 basis for Spark's ALS implementation for data sets with implicit features.
 The data set I'm working with is proprietary and I cannot share it, however
 I can say that it's based on the same kind of data in the paper---relative
 viewing time of videos. (Specifically, the rating for each video is
 defined as total viewing time across all visitors divided by video
 duration).

 I'm seeing counterintuitive, sometimes nonsensical recommendations. For
 comparison, I've run the training data through Oryx's in-VM implementation
 of implicit ALS with the same parameters. Oryx uses the same algorithm.
 (Source in this file:
 https://github.com/cloudera/oryx/blob/master/als-common/src/main/java/com/cloudera/oryx/als/common/factorizer/als/AlternatingLeastSquares.java)

 The recommendations made by each system compared to one other are very
 different---moreso than I think could be explained by differences in initial
 state. The recommendations made by the Oryx models look much better,
 especially as I increase the number of latent factors and the iterations.
 The Spark models' recommendations don't improve with increases in either
 latent factors or iterations. Sometimes, they get worse.

 Because of the (understandably) highly-optimized and terse style of Spark's
 ALS implementation, I've had a very hard time following it well enough to
 debug the issue definitively. However, I have found a section of code that
 looks incorrect. As described in the paper, part of the implicit ALS
 algorithm involves computing a matrix product YtCuY (equation 4 in the
 paper). To optimize this computation, this expression is rewritten as YtY +
 Yt(Cu - I)Y. I believe that's what should be happening here:

 https://github.com/apache/incubator-spark/blob/v0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L376

 However, it looks like this code is in fact computing YtY + YtY(Cu - I),
 which is the same as YtYCu. If so, that's a bug. Can someone familiar with
 this code evaluate my claim?

 Cheers,

 Michael


Re: possible bug in Spark's ALS implementation...

2014-03-17 Thread Xiangrui Meng
The factor matrix Y is used twice in implicit ALS computation, one to
compute global Y^T Y, and another to compute local Y_i^T C_i Y_i.
-Xiangrui

On Sun, Mar 16, 2014 at 1:18 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 On Mar 14, 2014, at 5:52 PM, Michael Allman m...@allman.ms wrote:

 I also found that the product and user RDDs were being rebuilt many times
 over in my tests, even for tiny data sets. By persisting the RDD returned
 from updateFeatures() I was able to avoid a raft of duplicate computations.
 Is there a reason not to do this?


 This sounds like a good thing to add, though I'd like to understand why
 these are being recomputed (it seemed that the code would only use each one
 once). Do you have any sense why that is?

 Matei


Re: possible bug in Spark's ALS implementation...

2014-03-17 Thread Xiangrui Meng
Hi Michael,

I made couple changes to implicit ALS. One gives faster construction
of YtY (https://github.com/apache/spark/pull/161), which was merged
into master. The other caches intermediate matrix factors properly
(https://github.com/apache/spark/pull/165). They should give you the
same result as before, but faster (~2x in my local tests). If you have
time to try the improved version, please let me know the speed-up on
your data. Thanks!

Best,
Xiangrui

On Mon, Mar 17, 2014 at 5:07 PM, Michael Allman m...@allman.ms wrote:
 I've created https://spark-project.atlassian.net/browse/SPARK-1263 to address
 the issue of the factor matrix recomputation. I'm planning to submit a
 related pull request shortly.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p2785.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: possible bug in Spark's ALS implementation...

2014-03-18 Thread Xiangrui Meng
Sorry, the link was wrong. Should be
https://github.com/apache/spark/pull/131 -Xiangrui


On Tue, Mar 18, 2014 at 10:20 AM, Michael Allman m...@allman.ms wrote:
 Hi Xiangrui,

 I don't see how https://github.com/apache/spark/pull/161 relates to ALS. Can
 you explain?

 Also, thanks for addressing the issue with factor matrix persistence in PR
 165. I was probably not going to get to that for a while.

 I will try to test your changes today for speed improvements.

 Cheers,

 Michael



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p2817.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Feed KMeans algorithm with a row major matrix

2014-03-18 Thread Xiangrui Meng
Hi Jaonary,

With the current implementation, you need to call Array.slice to make
each row an Array[Double] and cache the result RDD. There is a plan to
support block-wise input data and I will keep you informed.

Best,
Xiangrui

On Tue, Mar 18, 2014 at 2:46 AM, Jaonary Rabarisoa jaon...@gmail.com wrote:
 Dear All,

 I'm trying to cluster data from native library code with Spark Kmeans||. In
 my native library the data are represented as a matrix (row = number of data
 and col = dimension). For efficiency reason, they are copied into a one
 dimensional scala Array row major wise so after the computation I have a
 RDD[Array[Double]] but the dimension of each array represents a set of data
 instead of the data itself. I need to transfrom these array into
 Array[Array[Double]] before running the KMeans|| algorithm.

 How to do this efficiently ?



 Best regards,


Re: possible bug in Spark's ALS implementation...

2014-03-18 Thread Xiangrui Meng
Glad to hear the speed-up. Wish we can improve the implementation
further in the future. -Xiangrui

On Tue, Mar 18, 2014 at 1:55 PM, Michael Allman m...@allman.ms wrote:
 I just ran a runtime performance comparison between 0.9.0-incubating and your
 als branch. I saw a 1.5x improvement in performance.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p2823.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Kmeans example reduceByKey slow

2014-03-24 Thread Xiangrui Meng
Hi Tsai,

Could you share more information about the machine you used and the
training parameters (runs, k, and iterations)? It can help solve your
issues. Thanks!

Best,
Xiangrui

On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com wrote:
 Hi,

 At the reduceBuyKey stage, it takes a few minutes before the tasks start 
 working.

 I have -Dspark.default.parallelism=127 cores (n-1).

 CPU/Network/IO is idling across all nodes when this is happening.

 And there is nothing particular on the master log file. From the spark-shell:

 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 on 
 executor 2: XXX (PROCESS_LOCAL)
 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 38765155 
 bytes in 193 ms
 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 on 
 executor 1: XXX (PROCESS_LOCAL)
 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 38765155 
 bytes in 96 ms
 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 on 
 executor 0: XXX (PROCESS_LOCAL)
 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 38765155 
 bytes in 100 ms

 But it stops there for some significant time before any movement.

 In the stage detail of the UI, I can see that there are 127 tasks running but 
 the duration each is at least a few minutes.

 I'm working off local storage (not hdfs) and the kmeans data is about 6.5GB 
 (50M rows).

 Is this a normal behaviour?

 Thanks!


Re: Kmeans example reduceByKey slow

2014-03-24 Thread Xiangrui Meng
K = 50 is certainly a large number for k-means. If there is no
particular reason to have 50 clusters, could you try to reduce it
to, e.g, 100 or 1000? Also, the example code is not for large-scale
problems. You should use the KMeans algorithm in mllib clustering for
your problem.

-Xiangrui

On Sun, Mar 23, 2014 at 11:53 PM, Tsai Li Ming mailingl...@ltsai.com wrote:
 Hi,

 This is on a 4 nodes cluster each with 32 cores/256GB Ram.

 (0.9.0) is deployed in a stand alone mode.

 Each worker is configured with 192GB. Spark executor memory is also 192GB.

 This is on the first iteration. K=50. Here's the code I use:
 http://pastebin.com/2yXL3y8i , which is a copy-and-paste of the example.

 Thanks!



 On 24 Mar, 2014, at 2:46 pm, Xiangrui Meng men...@gmail.com wrote:

 Hi Tsai,

 Could you share more information about the machine you used and the
 training parameters (runs, k, and iterations)? It can help solve your
 issues. Thanks!

 Best,
 Xiangrui

 On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com wrote:
 Hi,

 At the reduceBuyKey stage, it takes a few minutes before the tasks start 
 working.

 I have -Dspark.default.parallelism=127 cores (n-1).

 CPU/Network/IO is idling across all nodes when this is happening.

 And there is nothing particular on the master log file. From the 
 spark-shell:

 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 on 
 executor 2: XXX (PROCESS_LOCAL)
 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 38765155 
 bytes in 193 ms
 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 on 
 executor 1: XXX (PROCESS_LOCAL)
 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 38765155 
 bytes in 96 ms
 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 on 
 executor 0: XXX (PROCESS_LOCAL)
 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 38765155 
 bytes in 100 ms

 But it stops there for some significant time before any movement.

 In the stage detail of the UI, I can see that there are 127 tasks running 
 but the duration each is at least a few minutes.

 I'm working off local storage (not hdfs) and the kmeans data is about 6.5GB 
 (50M rows).

 Is this a normal behaviour?

 Thanks!



Re: Kmeans example reduceByKey slow

2014-03-24 Thread Xiangrui Meng
Number of rows doesn't matter much as long as you have enough workers
to distribute the work. K-means has complexity O(n * d * k), where n
is number of points, d is the dimension, and k is the number of
clusters. If you use the KMeans implementation from MLlib, the
initialization stage is done on master, so a large k would slow down
the initialization stage. If your data is sparse, the latest change to
KMeans will help with the speed, depending on how sparse your data is.
-Xiangrui

On Mon, Mar 24, 2014 at 12:44 AM, Tsai Li Ming mailingl...@ltsai.com wrote:
 Thanks, Let me try with a smaller K.

 Does the size of the input data matters for the example? Currently I have 50M 
 rows. What is a reasonable size to demonstrate the capability of Spark?





 On 24 Mar, 2014, at 3:38 pm, Xiangrui Meng men...@gmail.com wrote:

 K = 50 is certainly a large number for k-means. If there is no
 particular reason to have 50 clusters, could you try to reduce it
 to, e.g, 100 or 1000? Also, the example code is not for large-scale
 problems. You should use the KMeans algorithm in mllib clustering for
 your problem.

 -Xiangrui

 On Sun, Mar 23, 2014 at 11:53 PM, Tsai Li Ming mailingl...@ltsai.com wrote:
 Hi,

 This is on a 4 nodes cluster each with 32 cores/256GB Ram.

 (0.9.0) is deployed in a stand alone mode.

 Each worker is configured with 192GB. Spark executor memory is also 192GB.

 This is on the first iteration. K=50. Here's the code I use:
 http://pastebin.com/2yXL3y8i , which is a copy-and-paste of the example.

 Thanks!



 On 24 Mar, 2014, at 2:46 pm, Xiangrui Meng men...@gmail.com wrote:

 Hi Tsai,

 Could you share more information about the machine you used and the
 training parameters (runs, k, and iterations)? It can help solve your
 issues. Thanks!

 Best,
 Xiangrui

 On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com 
 wrote:
 Hi,

 At the reduceBuyKey stage, it takes a few minutes before the tasks start 
 working.

 I have -Dspark.default.parallelism=127 cores (n-1).

 CPU/Network/IO is idling across all nodes when this is happening.

 And there is nothing particular on the master log file. From the 
 spark-shell:

 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 
 on executor 2: XXX (PROCESS_LOCAL)
 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 
 38765155 bytes in 193 ms
 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 
 on executor 1: XXX (PROCESS_LOCAL)
 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 
 38765155 bytes in 96 ms
 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 
 on executor 0: XXX (PROCESS_LOCAL)
 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 
 38765155 bytes in 100 ms

 But it stops there for some significant time before any movement.

 In the stage detail of the UI, I can see that there are 127 tasks running 
 but the duration each is at least a few minutes.

 I'm working off local storage (not hdfs) and the kmeans data is about 
 6.5GB (50M rows).

 Is this a normal behaviour?

 Thanks!




Re: Kmeans example reduceByKey slow

2014-03-24 Thread Xiangrui Meng
Sorry, I meant the master branch of https://github.com/apache/spark. -Xiangrui

On Mon, Mar 24, 2014 at 6:27 PM, Tsai Li Ming mailingl...@ltsai.com wrote:
 Thanks again.

 If you use the KMeans implementation from MLlib, the
 initialization stage is done on master,

 The master here is the app/driver/spark-shell?

 Thanks!

 On 25 Mar, 2014, at 1:03 am, Xiangrui Meng men...@gmail.com wrote:

 Number of rows doesn't matter much as long as you have enough workers
 to distribute the work. K-means has complexity O(n * d * k), where n
 is number of points, d is the dimension, and k is the number of
 clusters. If you use the KMeans implementation from MLlib, the
 initialization stage is done on master, so a large k would slow down
 the initialization stage. If your data is sparse, the latest change to
 KMeans will help with the speed, depending on how sparse your data is.
 -Xiangrui

 On Mon, Mar 24, 2014 at 12:44 AM, Tsai Li Ming mailingl...@ltsai.com wrote:
 Thanks, Let me try with a smaller K.

 Does the size of the input data matters for the example? Currently I have 
 50M rows. What is a reasonable size to demonstrate the capability of Spark?





 On 24 Mar, 2014, at 3:38 pm, Xiangrui Meng men...@gmail.com wrote:

 K = 50 is certainly a large number for k-means. If there is no
 particular reason to have 50 clusters, could you try to reduce it
 to, e.g, 100 or 1000? Also, the example code is not for large-scale
 problems. You should use the KMeans algorithm in mllib clustering for
 your problem.

 -Xiangrui

 On Sun, Mar 23, 2014 at 11:53 PM, Tsai Li Ming mailingl...@ltsai.com 
 wrote:
 Hi,

 This is on a 4 nodes cluster each with 32 cores/256GB Ram.

 (0.9.0) is deployed in a stand alone mode.

 Each worker is configured with 192GB. Spark executor memory is also 192GB.

 This is on the first iteration. K=50. Here's the code I use:
 http://pastebin.com/2yXL3y8i , which is a copy-and-paste of the example.

 Thanks!



 On 24 Mar, 2014, at 2:46 pm, Xiangrui Meng men...@gmail.com wrote:

 Hi Tsai,

 Could you share more information about the machine you used and the
 training parameters (runs, k, and iterations)? It can help solve your
 issues. Thanks!

 Best,
 Xiangrui

 On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com 
 wrote:
 Hi,

 At the reduceBuyKey stage, it takes a few minutes before the tasks 
 start working.

 I have -Dspark.default.parallelism=127 cores (n-1).

 CPU/Network/IO is idling across all nodes when this is happening.

 And there is nothing particular on the master log file. From the 
 spark-shell:

 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 
 on executor 2: XXX (PROCESS_LOCAL)
 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 
 38765155 bytes in 193 ms
 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 
 on executor 1: XXX (PROCESS_LOCAL)
 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 
 38765155 bytes in 96 ms
 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 
 on executor 0: XXX (PROCESS_LOCAL)
 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 
 38765155 bytes in 100 ms

 But it stops there for some significant time before any movement.

 In the stage detail of the UI, I can see that there are 127 tasks 
 running but the duration each is at least a few minutes.

 I'm working off local storage (not hdfs) and the kmeans data is about 
 6.5GB (50M rows).

 Is this a normal behaviour?

 Thanks!





Re: Issue with zip and partitions

2014-04-02 Thread Xiangrui Meng
From API docs: Zips this RDD with another one, returning key-value
pairs with the first element in each RDD, second element in each RDD,
etc. Assumes that the two RDDs have the *same number of partitions*
and the *same number of elements in each partition* (e.g. one was made
through a map on the other).

Basically, one RDD should be a mapped RDD of the other, or both RDDs
are mapped RDDs of the same RDD.

Btw, your message says Dell - Internal Use - Confidential...

Best,
Xiangrui

On Tue, Apr 1, 2014 at 7:27 PM,  patrick_nico...@dell.com wrote:
 Dell - Internal Use - Confidential

 I got an exception can't zip RDDs with unusual numbers of Partitions when
 I apply any action (reduce, collect) of dataset created by zipping two
 dataset of 10 million entries each.  The problem occurs independently of the
 number of partitions or when I let Spark creates those partitions.



 Interestingly enough, I do not have problem zipping datasets of 1 and 2.5
 million entries.

 A similar problem was reported on this board with 0.8 but remember if the
 problem was fixed.



 Any idea? Any workaround?



 I appreciate.


Re: ui broken in latest 1.0.0

2014-04-08 Thread Xiangrui Meng
That commit did work for me. Could you confirm the following:

1) After you called cache(), did you make any actions like count() or
reduce()? If you don't materialize the RDD, it won't show up in the
storage tab.

2) Did you run ./make-distribution.sh after you switched to the current master?

Xiangrui

On Tue, Apr 8, 2014 at 9:33 AM, Koert Kuipers ko...@tresata.com wrote:
 i tried again with latest master, which includes commit below, but ui page
 still shows nothing on storage tab.
 koert



 commit ada310a9d3d5419e101b24d9b41398f609da1ad3
 Author: Andrew Or andrewo...@gmail.com
 Date:   Mon Mar 31 23:01:14 2014 -0700

 [Hot Fix #42] Persisted RDD disappears on storage page if re-used

 If a previously persisted RDD is re-used, its information disappears
 from the Storage page.

 This is because the tasks associated with re-using the RDD do not report
 the RDD's blocks as updated (which is correct). On stage submit, however, we
 overwrite any existing

 Author: Andrew Or andrewo...@gmail.com

 Closes #281 from andrewor14/ui-storage-fix and squashes the following
 commits:

 408585a [Andrew Or] Fix storage UI bug



 On Mon, Apr 7, 2014 at 4:21 PM, Koert Kuipers ko...@tresata.com wrote:

 got it thanks


 On Mon, Apr 7, 2014 at 4:08 PM, Xiangrui Meng men...@gmail.com wrote:

 This is fixed in https://github.com/apache/spark/pull/281. Please try
 again with the latest master. -Xiangrui

 On Mon, Apr 7, 2014 at 1:06 PM, Koert Kuipers ko...@tresata.com wrote:
  i noticed that for spark 1.0.0-SNAPSHOT which i checked out a few days
  ago
  (apr 5) that the application detail ui no longer shows any RDDs on
  the
  storage tab, despite the fact that they are definitely cached.
 
  i am running spark in standalone mode.





Re: ui broken in latest 1.0.0

2014-04-08 Thread Xiangrui Meng
That commit fixed the exact problem you described. That is why I want to
confirm that you switched to the master branch. bin/spark-shell doesn't
detect code changes, so you need to run ./make-distribution.sh to
re-compile Spark first. -Xiangrui


On Tue, Apr 8, 2014 at 9:57 AM, Koert Kuipers ko...@tresata.com wrote:

 sorry, i meant to say: note that for a cached rdd in the spark shell it
 all works fine. but something is going wrong with the SPARK-APPLICATION-UI
 in our applications that extensively cache and re-use RDDs


 On Tue, Apr 8, 2014 at 12:55 PM, Koert Kuipers ko...@tresata.com wrote:

 note that for a cached rdd in the spark shell it all works fine. but
 something is going wrong with the spark-shell in our applications that
 extensively cache and re-use RDDs


 On Tue, Apr 8, 2014 at 12:33 PM, Koert Kuipers ko...@tresata.com wrote:

 i tried again with latest master, which includes commit below, but ui
 page still shows nothing on storage tab.
  koert



 commit ada310a9d3d5419e101b24d9b41398f609da1ad3
 Author: Andrew Or andrewo...@gmail.com
 Date:   Mon Mar 31 23:01:14 2014 -0700

 [Hot Fix #42] Persisted RDD disappears on storage page if re-used

 If a previously persisted RDD is re-used, its information disappears
 from the Storage page.

 This is because the tasks associated with re-using the RDD do not
 report the RDD's blocks as updated (which is correct). On stage submit,
 however, we overwrite any existing

 Author: Andrew Or andrewo...@gmail.com

 Closes #281 from andrewor14/ui-storage-fix and squashes the
 following commits:

 408585a [Andrew Or] Fix storage UI bug



 On Mon, Apr 7, 2014 at 4:21 PM, Koert Kuipers ko...@tresata.com wrote:

 got it thanks


 On Mon, Apr 7, 2014 at 4:08 PM, Xiangrui Meng men...@gmail.com wrote:

 This is fixed in https://github.com/apache/spark/pull/281. Please try
 again with the latest master. -Xiangrui

 On Mon, Apr 7, 2014 at 1:06 PM, Koert Kuipers ko...@tresata.com
 wrote:
  i noticed that for spark 1.0.0-SNAPSHOT which i checked out a few
 days ago
  (apr 5) that the application detail ui no longer shows any RDDs on
 the
  storage tab, despite the fact that they are definitely cached.
 
  i am running spark in standalone mode.








Re: Error when compiling spark in IDEA and best practice to use IDE?

2014-04-09 Thread Xiangrui Meng
After sbt/sbt gen-diea, do not import as an SBT project but choose
open project and point it to the spark folder. -Xiangrui

On Tue, Apr 8, 2014 at 10:45 PM, Sean Owen so...@cloudera.com wrote:
 I let IntelliJ read the Maven build directly and that works fine.
 --
 Sean Owen | Director, Data Science | London


 On Wed, Apr 9, 2014 at 6:14 AM, Dong Mo monted...@gmail.com wrote:
 Dear list,

 SBT compiles fine, but when I do the following:
 sbt/sbt gen-idea
 import project as SBT project to IDEA 13.1
 Make Project
 and these errors show up:

 Error:(28, 8) object FileContext is not a member of package
 org.apache.hadoop.fs
 import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path,
 FileUtil}
^
 Error:(31, 8) object Master is not a member of package
 org.apache.hadoop.mapred
 import org.apache.hadoop.mapred.Master
^
 Error:(34, 26) object yarn is not a member of package org.apache.hadoop
 import org.apache.hadoop.yarn.api._
  ^
 Error:(35, 26) object yarn is not a member of package org.apache.hadoop
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
  ^
 Error:(36, 26) object yarn is not a member of package org.apache.hadoop
 import org.apache.hadoop.yarn.api.protocolrecords._
  ^
 Error:(37, 26) object yarn is not a member of package org.apache.hadoop
 import org.apache.hadoop.yarn.api.records._
  ^
 Error:(38, 26) object yarn is not a member of package org.apache.hadoop
 import org.apache.hadoop.yarn.client.YarnClientImpl
  ^
 Error:(39, 26) object yarn is not a member of package org.apache.hadoop
 import org.apache.hadoop.yarn.conf.YarnConfiguration
  ^
 Error:(40, 26) object yarn is not a member of package org.apache.hadoop
 import org.apache.hadoop.yarn.ipc.YarnRPC
  ^
 Error:(41, 26) object yarn is not a member of package org.apache.hadoop
 import org.apache.hadoop.yarn.util.{Apps, Records}
  ^
 Error:(49, 11) not found: type YarnClientImpl
   extends YarnClientImpl with Logging {
   ^
 Error:(48, 20) not found: type ClientArguments
 class Client(args: ClientArguments, conf: Configuration, sparkConf:
 SparkConf)
^
 Error:(51, 18) not found: type ClientArguments
   def this(args: ClientArguments, sparkConf: SparkConf) =
  ^
 Error:(54, 18) not found: type ClientArguments
   def this(args: ClientArguments) = this(args, new SparkConf())
  ^
 Error:(56, 12) not found: type YarnRPC
   var rpc: YarnRPC = YarnRPC.create(conf)
^
 Error:(56, 22) not found: value YarnRPC
   var rpc: YarnRPC = YarnRPC.create(conf)
  ^
 Error:(57, 17) not found: type YarnConfiguration
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
 ^
 Error:(57, 41) not found: type YarnConfiguration
   val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
 ^
 Error:(58, 59) value getCredentials is not a member of
 org.apache.hadoop.security.UserGroupInformation
   val credentials = UserGroupInformation.getCurrentUser().getCredentials()
   ^
 Error:(60, 34) not found: type ClientDistributedCacheManager
   private val distCacheMgr = new ClientDistributedCacheManager()
  ^
 Error:(72, 5) not found: value init
 init(yarnConf)
 ^
 Error:(73, 5) not found: value start
 start()
 ^
 Error:(76, 24) value getNewApplication is not a member of
 org.apache.spark.Logging
 val newApp = super.getNewApplication()
^
 Error:(137, 35) not found: type GetNewApplicationResponse
   def verifyClusterResources(app: GetNewApplicationResponse) = {
   ^
 Error:(156, 65) not found: type ApplicationSubmissionContext
   def createApplicationSubmissionContext(appId: ApplicationId):
 ApplicationSubmissionContext = {
 ^
 Error:(156, 49) not found: type ApplicationId
   def createApplicationSubmissionContext(appId: ApplicationId):
 ApplicationSubmissionContext = {
 ^
 Error:(118, 31) not found: type ApplicationId
   def getAppStagingDir(appId: ApplicationId): String = {
   ^
 Error:(224, 69) not found: type LocalResource
   def prepareLocalResources(appStagingDir: String): HashMap[String,
 LocalResource] = {
 ^
 Error:(307, 39) not found: type LocalResource
   localResources: HashMap[String, LocalResource],
   ^
 Error:(343, 38) not found: type ContainerLaunchContext
   env: HashMap[String, String]): ContainerLaunchContext = {
  ^
 

Re: SVD under spark/mllib/linalg

2014-04-11 Thread Xiangrui Meng
It was moved to mllib.linalg.distributed.RowMatrix. With RowMatrix,
you can compute column summary statistics, gram matrix, covariance,
SVD, and PCA. We will provide multiplication for distributed matrices,
but not in v1.0. -Xiangrui

On Fri, Apr 11, 2014 at 9:12 PM, wxhsdp wxh...@gmail.com wrote:
 Hi, all
 the code under
 https://github.com/apache/spark/tree/master/mllib/src/main/scala/org/apache/spark/mllib/linalg
 has changed. previous matrix classes are all removed, like MatrixEntry,
 MatrixSVD. Instead breeze matrix definition appears. Do we move to Breeze
 Linear Algebra when do linear algorithm?

 another question, are there any matrix multiplication optimized codes in
 spark?
 i only see the outer product method in the removed SVD.scala

 // Compute A^T A, assuming rows are sparse enough to fit in memory
 val rows = data.map(entry =
 (entry.i, (entry.j, entry.mval))).groupByKey()
 val emits = rows.flatMap{ case (rowind, cols)  =
   cols.flatMap{ case (colind1, mval1) =
 cols.map{ case (colind2, mval2) =
 ((colind1, colind2), 
 mval1*mval2) } }//colind1: col index, colind2:
 row index
 }.reduceByKey(_ + _)

 thank you!



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/SVD-under-spark-mllib-linalg-tp4156.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: checkpointing without streaming?

2014-04-21 Thread Xiangrui Meng
Checkpoint clears dependencies. You might need checkpoint to cut a
long lineage in iterative algorithms. -Xiangrui

On Mon, Apr 21, 2014 at 11:34 AM, Diana Carroll dcarr...@cloudera.com wrote:
 I'm trying to understand when I would want to checkpoint an RDD rather than
 just persist to disk.

 Every reference I can find to checkpoint related to Spark Streaming.  But
 the method is defined in the core Spark library, not Streaming.

 Does it exist solely for streaming, or are there circumstances unrelated to
 streaming in which I might want to checkpoint...and if so, like what?

 Thanks,
 Diana


Re: skip lines in spark

2014-04-23 Thread Xiangrui Meng
If the first partition doesn't have enough records, then it may not
drop enough lines. Try

rddData.zipWithIndex().filter(_._2 = 10L).map(_._1)

It might trigger a job.

Best,
Xiangrui

On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai dbt...@stanford.edu wrote:
 Hi Chengi,

 If you just want to skip first n lines in RDD, you can do

 rddData.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
 = {
   if (partitionIdx == 0) {
 lines.drop(n)
   }
   lines
 }


 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu chengi.liu...@gmail.com wrote:

 Hi,
   What is the easiest way to skip first n lines in rdd??
 I am not able to figure this one out?
 Thanks




Re: Spark hangs when i call parallelize + count on a ArrayListbyte[] having 40k elements

2014-04-23 Thread Xiangrui Meng
How big is each entry, and how much memory do you have on each
executor? You generated all data on driver and
sc.parallelize(bytesList) will send the entire dataset to a single
executor. You may run into I/O or memory issues. If the entries are
generated, you should create a simple RDD sc.parallelize(0 until 20,
20) and call mapPartitions to generate them in parallel. -Xiangrui

On Wed, Apr 23, 2014 at 9:23 AM, amit karmakar
amit.codenam...@gmail.com wrote:
 Spark hangs after i perform the following operations


 ArrayListbyte[] bytesList = new ArrayListbyte[]();
 /*
add 40k entries to bytesList
 */

 JavaRDDbyte[] rdd = sparkContext.parallelize(bytesList);
  System.out.println(Count= + rdd.count());


 If i add just one entry it works.

 It works if i modify,
 JavaRDDbyte[] rdd = sparkContext.parallelize(bytesList)
 to
 JavaRDDbyte[] rdd = sparkContext.parallelize(bytesList, 20);

 There is nothing in the logs that can help understand the reason.

 What could be reason for this ?


 Regards,
 Amit Kumar Karmakar


Re: skip lines in spark

2014-04-23 Thread Xiangrui Meng
Sorry, I didn't realize that zipWithIndex() is not in v0.9.1. It is in
the master branch and will be included in v1.0. It first counts number
of records per partition and then assigns indices starting from 0.
-Xiangrui

On Wed, Apr 23, 2014 at 9:56 AM, Chengi Liu chengi.liu...@gmail.com wrote:
 Also, zipWithIndex() is not valid.. Did you meant zipParititions?


 On Wed, Apr 23, 2014 at 9:55 AM, Chengi Liu chengi.liu...@gmail.com wrote:

 Xiangrui,
   So, is it that full code suggestion is :
 val trigger = rddData.zipWithIndex().filter(
 _._2 = 10L).map(_._1)

 and then what DB Tsai recommended
 trigger.mapPartitionsWithIndex((partitionIdx: Int, lines:
 Iterator[String]) = {
   if (partitionIdx == 0) {
 lines.drop(n)
   }
   lines
 })

 Is that the full operation..

 What happens, if I have to drop so many records that the number exceeds
 partition 0.. ??
 How do i handle that case?




 On Wed, Apr 23, 2014 at 9:51 AM, Xiangrui Meng men...@gmail.com wrote:

 If the first partition doesn't have enough records, then it may not
 drop enough lines. Try

 rddData.zipWithIndex().filter(_._2 = 10L).map(_._1)

 It might trigger a job.

 Best,
 Xiangrui

 On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai dbt...@stanford.edu wrote:
  Hi Chengi,
 
  If you just want to skip first n lines in RDD, you can do
 
  rddData.mapPartitionsWithIndex((partitionIdx: Int, lines:
  Iterator[String])
  = {
if (partitionIdx == 0) {
  lines.drop(n)
}
lines
  }
 
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu chengi.liu...@gmail.com
  wrote:
 
  Hi,
What is the easiest way to skip first n lines in rdd??
  I am not able to figure this one out?
  Thanks
 
 





Re: Hadoop—streaming

2014-04-23 Thread Xiangrui Meng
PipedRDD is an RDD[String]. If you know how to parse each result line
into (key, value) pairs, then you can call reduce after.

piped.map(x = (key, value)).reduceByKey((v1, v2) = v)

-Xiangrui

On Wed, Apr 23, 2014 at 2:09 AM, zhxfl 291221...@qq.com wrote:
 Hello,we know Hadoop-streaming is use for Hadoop to run native program.
 Hadoop-streaming supports  Map and Reduce logic. Reduce logic means Hadoop
 collect all values with same key and give the stream for the native
 application.
 Spark has PipeRDD too, but PipeRDD doesn't support Reduce logic. So it's
 difficulty for us to transplant our application from Hadoop to Spark.
 Anyone can give me advise, thanks!


Re: Spark mllib throwing error

2014-04-24 Thread Xiangrui Meng
Could you share the command you used and more of the error message?
Also, is it an MLlib specific problem? -Xiangrui

On Thu, Apr 24, 2014 at 11:49 AM, John King
usedforprinting...@gmail.com wrote:
 ./spark-shell: line 153: 17654 Killed
 $FWDIR/bin/spark-class org.apache.spark.repl.Main $@


 Any ideas?


Re: Trying to use pyspark mllib NaiveBayes

2014-04-24 Thread Xiangrui Meng
Is your Spark cluster running? Try to start with generating simple
RDDs and counting. -Xiangrui

On Thu, Apr 24, 2014 at 11:38 AM, John King
usedforprinting...@gmail.com wrote:
 I receive this error:

 Traceback (most recent call last):

   File stdin, line 1, in module

   File
 /home/ubuntu/spark-1.0.0-rc2/python/pyspark/mllib/classification.py, line
 178, in train

 ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)

   File
 /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 535, in __call__

   File
 /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 368, in send_command

   File
 /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 361, in send_command

   File
 /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 317, in _get_connection

   File
 /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 324, in _create_connection

   File
 /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 431, in start

 py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to
 the Java server


Re: Trying to use pyspark mllib NaiveBayes

2014-04-24 Thread Xiangrui Meng
I tried locally with the example described in the latest guide:
http://54.82.157.211:4000/mllib-naive-bayes.html , and it worked fine.
Do you mind sharing the code you used? -Xiangrui

On Thu, Apr 24, 2014 at 1:57 PM, John King usedforprinting...@gmail.com wrote:
 Yes, I got it running for large RDD (~7 million lines) and mapping. Just
 received this error when trying to classify.


 On Thu, Apr 24, 2014 at 4:32 PM, Xiangrui Meng men...@gmail.com wrote:

 Is your Spark cluster running? Try to start with generating simple
 RDDs and counting. -Xiangrui

 On Thu, Apr 24, 2014 at 11:38 AM, John King
 usedforprinting...@gmail.com wrote:
  I receive this error:
 
  Traceback (most recent call last):
 
File stdin, line 1, in module
 
File
  /home/ubuntu/spark-1.0.0-rc2/python/pyspark/mllib/classification.py,
  line
  178, in train
 
  ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd,
  lambda_)
 
File
 
  /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
  line 535, in __call__
 
File
 
  /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
  line 368, in send_command
 
File
 
  /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
  line 361, in send_command
 
File
 
  /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
  line 317, in _get_connection
 
File
 
  /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
  line 324, in _create_connection
 
File
 
  /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
  line 431, in start
 
  py4j.protocol.Py4JNetworkError: An error occurred while trying to
  connect to
  the Java server




Re: Spark mllib throwing error

2014-04-24 Thread Xiangrui Meng
Do you mind sharing more code and error messages? The information you
provided is too little to identify the problem. -Xiangrui

On Thu, Apr 24, 2014 at 1:55 PM, John King usedforprinting...@gmail.com wrote:
 Last command was:

 val model = new NaiveBayes().run(points)



 On Thu, Apr 24, 2014 at 4:27 PM, Xiangrui Meng men...@gmail.com wrote:

 Could you share the command you used and more of the error message?
 Also, is it an MLlib specific problem? -Xiangrui

 On Thu, Apr 24, 2014 at 11:49 AM, John King
 usedforprinting...@gmail.com wrote:
  ./spark-shell: line 153: 17654 Killed
  $FWDIR/bin/spark-class org.apache.spark.repl.Main $@
 
 
  Any ideas?




Re: Spark mllib throwing error

2014-04-24 Thread Xiangrui Meng
I don't see anything wrong with your code. Could you do points.count()
to see how many training examples you have? Also, make sure you don't
have negative feature values. The error message you sent did not say
NaiveBayes went wrong, but the Spark shell was killed. -Xiangrui

On Thu, Apr 24, 2014 at 4:05 PM, John King usedforprinting...@gmail.com wrote:
 In the other thread I had an issue with Python. In this issue, I tried
 switching to Scala. The code is:

 import org.apache.spark.mllib.regression.LabeledPoint;

 import org.apache.spark.mllib.linalg.SparseVector;

 import org.apache.spark.mllib.classification.NaiveBayes;

 import scala.collection.mutable.ArrayBuffer



 def isEmpty(a: String): Boolean = a != null  !a.replaceAll((?m)\s+$,
 ).isEmpty()

 def parsePoint(a: String): LabeledPoint = {

val values = a.split('\t')

val feat = values(1).split(' ')

val indices = ArrayBuffer.empty[Int]

val featValues = ArrayBuffer.empty[Double]

for (f - feat) {

val q = f.split(':')

if (q.length == 2) {

   indices += (q(0).toInt)

   featValues += (q(1).toDouble)

}

}

val vector = new SparseVector(2357815, indices.toArray,
 featValues.toArray)

return LabeledPoint(values(0).toDouble, vector)

}


 val data = sc.textFile(data.txt)

 val empty = data.filter(isEmpty)

 val points = empty.map(parsePoint)

 points.cache()

 val model = new NaiveBayes().run(points)



 On Thu, Apr 24, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote:

 Do you mind sharing more code and error messages? The information you
 provided is too little to identify the problem. -Xiangrui

 On Thu, Apr 24, 2014 at 1:55 PM, John King usedforprinting...@gmail.com
 wrote:
  Last command was:
 
  val model = new NaiveBayes().run(points)
 
 
 
  On Thu, Apr 24, 2014 at 4:27 PM, Xiangrui Meng men...@gmail.com wrote:
 
  Could you share the command you used and more of the error message?
  Also, is it an MLlib specific problem? -Xiangrui
 
  On Thu, Apr 24, 2014 at 11:49 AM, John King
  usedforprinting...@gmail.com wrote:
   ./spark-shell: line 153: 17654 Killed
   $FWDIR/bin/spark-class org.apache.spark.repl.Main $@
  
  
   Any ideas?
 
 




Re: Spark mllib throwing error

2014-04-24 Thread Xiangrui Meng
I only see one risk: if your feature indices are not sorted, it might
have undefined behavior. Other than that, I don't see any thing
suspicious. -Xiangrui

On Thu, Apr 24, 2014 at 4:56 PM, John King usedforprinting...@gmail.com wrote:
 It just displayed this error and stopped on its own. Do the lines of code
 mentioned in the error have anything to do with it?


 On Thu, Apr 24, 2014 at 7:54 PM, Xiangrui Meng men...@gmail.com wrote:

 I don't see anything wrong with your code. Could you do points.count()
 to see how many training examples you have? Also, make sure you don't
 have negative feature values. The error message you sent did not say
 NaiveBayes went wrong, but the Spark shell was killed. -Xiangrui

 On Thu, Apr 24, 2014 at 4:05 PM, John King usedforprinting...@gmail.com
 wrote:
  In the other thread I had an issue with Python. In this issue, I tried
  switching to Scala. The code is:
 
  import org.apache.spark.mllib.regression.LabeledPoint;
 
  import org.apache.spark.mllib.linalg.SparseVector;
 
  import org.apache.spark.mllib.classification.NaiveBayes;
 
  import scala.collection.mutable.ArrayBuffer
 
 
 
  def isEmpty(a: String): Boolean = a != null 
  !a.replaceAll((?m)\s+$,
  ).isEmpty()
 
  def parsePoint(a: String): LabeledPoint = {
 
 val values = a.split('\t')
 
 val feat = values(1).split(' ')
 
 val indices = ArrayBuffer.empty[Int]
 
 val featValues = ArrayBuffer.empty[Double]
 
 for (f - feat) {
 
 val q = f.split(':')
 
 if (q.length == 2) {
 
indices += (q(0).toInt)
 
featValues += (q(1).toDouble)
 
 }
 
 }
 
 val vector = new SparseVector(2357815, indices.toArray,
  featValues.toArray)
 
 return LabeledPoint(values(0).toDouble, vector)
 
 }
 
 
  val data = sc.textFile(data.txt)
 
  val empty = data.filter(isEmpty)
 
  val points = empty.map(parsePoint)
 
  points.cache()
 
  val model = new NaiveBayes().run(points)
 
 
 
  On Thu, Apr 24, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote:
 
  Do you mind sharing more code and error messages? The information you
  provided is too little to identify the problem. -Xiangrui
 
  On Thu, Apr 24, 2014 at 1:55 PM, John King
  usedforprinting...@gmail.com
  wrote:
   Last command was:
  
   val model = new NaiveBayes().run(points)
  
  
  
   On Thu, Apr 24, 2014 at 4:27 PM, Xiangrui Meng men...@gmail.com
   wrote:
  
   Could you share the command you used and more of the error message?
   Also, is it an MLlib specific problem? -Xiangrui
  
   On Thu, Apr 24, 2014 at 11:49 AM, John King
   usedforprinting...@gmail.com wrote:
./spark-shell: line 153: 17654 Killed
$FWDIR/bin/spark-class org.apache.spark.repl.Main $@
   
   
Any ideas?
  
  
 
 




Re: Running out of memory Naive Bayes

2014-04-26 Thread Xiangrui Meng
How many labels does your dataset have? -Xiangrui

On Sat, Apr 26, 2014 at 6:03 PM, DB Tsai dbt...@stanford.edu wrote:
 Which version of mllib are you using? For Spark 1.0, mllib will
 support sparse feature vector which will improve performance a lot
 when computing the distance between points and centroid.

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Sat, Apr 26, 2014 at 5:49 AM, John King usedforprinting...@gmail.com 
 wrote:
 I'm just wondering are the SparkVector calculations really taking into
 account the sparsity or just converting to dense?


 On Fri, Apr 25, 2014 at 10:06 PM, John King usedforprinting...@gmail.com
 wrote:

 I've been trying to use the Naive Bayes classifier. Each example in the
 dataset is about 2 million features, only about 20-50 of which are non-zero,
 so the vectors are very sparse. I keep running out of memory though, even
 for about 1000 examples on 30gb RAM while the entire dataset is 4 million
 examples. And I would also like to note that I'm using the sparse vector
 class.




Re: running SparkALS

2014-04-28 Thread Xiangrui Meng
Hi Diana,

SparkALS is an example implementation of ALS. It doesn't call the ALS
algorithm implemented in MLlib. M, U, and F are used to generate
synthetic data.

I'm updating the examples. In the meantime, you can take a look at the
updated MLlib guide:
http://50.17.120.186:4000/mllib-collaborative-filtering.html and try
the example code there.

Thanks,
Xiangrui

On Mon, Apr 28, 2014 at 10:30 AM, Diana Carroll dcarr...@cloudera.com wrote:
 Hi everyone.  I'm trying to run some of the Spark example code, and most of
 it appears to be undocumented (unless I'm missing something).  Can someone
 help me out?

 I'm particularly interested in running SparkALS, which wants parameters:
 M U F iter slices

 What are these variables?  They appear to be integers and the default values
 are 100, 500 and 10 respectively but beyond that...huh?

 Thanks!

 Diana


Re: Turn BLAS on MacOSX

2014-05-12 Thread Xiangrui Meng
Those are warning messages instead of errors. You need to add
netlib-java:all to use native BLAS/LAPACK. But it won't work if you
include netlib-java:all in an assembly jar. It has to be a separate
jar when you submit your job. For SGD, we only use level-1 BLAS, so I
don't think native code is called. -Xiangrui

On Sun, May 11, 2014 at 9:32 AM, DB Tsai dbt...@stanford.edu wrote:
 Hi Debasish,

 In https://github.com/apache/spark/blob/master/docs/mllib-guide.md
 Dependencies section, the document talks about the native blas dependencies
 issue.

 For netlib which breeze uses internally, if the native library isn't found,
 the jblas implementation will be used.

 Here is more detail about how to install native library in different
 platform.
 https://github.com/fommil/netlib-java/blob/master/README.md#machine-optimised-system-libraries


 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Wed, May 7, 2014 at 10:52 AM, Debasish Das debasish.da...@gmail.com
 wrote:

 Hi,

 How do I load native BLAS libraries on Mac ?

 I am getting the following errors while running LR and SVM with SGD:

 14/05/07 10:48:13 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeSystemBLAS

 14/05/07 10:48:13 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeRefBLAS

 centos it was fine...but on mac I am getting these warnings..

 Also when it fails to run native blas does it use java code for BLAS
 operations ?

 May be after addition of breeze, we should add these details on a page as
 well so that users are aware of it before they report any performance
 results..

 Thanks.

 Deb




Re: Accuracy in mllib BinaryClassificationMetrics

2014-05-13 Thread Xiangrui Meng
Hi Deb, feel free to add accuracy along with precision and recall. -Xiangrui

On Mon, May 12, 2014 at 1:26 PM, Debasish Das debasish.da...@gmail.com wrote:
 Hi,

 I see precision and recall but no accuracy in mllib.evaluation.binary.

 Is it already under development or it needs to be added ?

 Thanks.
 Deb



Re: Reading from .bz2 files with Spark

2014-05-13 Thread Xiangrui Meng
Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
the problem you described, but it does contain several fixes to bzip2
format. -Xiangrui

On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote:
 Hi all,

 Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
 success?


 I'm finding the following results on a recent commit (756c96 from 24hr ago)
 and CDH 4.4.0:

 Works: val r = sc.textFile(/user/aa/myfile.bz2).count
 Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) =
 s+|  ).count

 Specifically, I'm getting an exception coming out of the bzip2 libraries
 (see below stacktraces), which is unusual because I'm able to read from that
 file without an issue using the same libraries via Pig.  It was originally
 created from Pig as well.

 Digging a little deeper I found this line in the .bz2 decompressor's javadoc
 for CBZip2InputStream:

 Instances of this class are not threadsafe. [source]


 My current working theory is that Spark has a much higher level of
 parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
 exceptions much more frequently (as in can't finish a run over a little 2M
 row file) vs hardly at all in other libraries.

 The only other reference I could find to the issue was in presto-users, but
 the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
 the higher compression levels of .bz2.


 Would love to hear if I have some kind of configuration issue or if there's
 a bug in .bz2 that's fixed in later versions of CDH, or generally any other
 thoughts on the issue.


 Thanks!
 Andrew



 Below are examples of some exceptions I'm getting:

 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
 java.lang.ArrayIndexOutOfBoundsException
 java.lang.ArrayIndexOutOfBoundsException: 65535
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)




 java.lang.ArrayIndexOutOfBoundsException: 90
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)



 

Re: Distribute jar dependencies via sc.AddJar(fileName)

2014-05-14 Thread Xiangrui Meng
I don't know whether this would fix the problem. In v0.9, you need
`yarn-standalone` instead of `yarn-cluster`.

See 
https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08

On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng men...@gmail.com wrote:
 Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in
 v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui

 On Mon, May 12, 2014 at 11:14 AM, DB Tsai dbt...@stanford.edu wrote:
 We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar
 dependencies in command line with --addJars option. However, those
 external jars are only available in the driver (application running in
 hadoop), and not available in the executors (workers).

 After doing some research, we realize that we've to push those jars to
 executors in driver via sc.AddJar(fileName). Although in the driver's log
 (see the following), the jar is successfully added in the http server in the
 driver, and I confirm that it's downloadable from any machine in the
 network, I still get `java.lang.NoClassDefFoundError` in the executors.

 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR
 analyticshadoop-eba5cdce1.jar at
 http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with timestamp
 1399672301568

 Then I check the log in the executors, and I don't find anything `Fetching
 file with timestamp timestamp`, which implies something is wrong; the
 executors are not downloading the external jars.

 Any suggestion what we can look at?

 After digging into how spark distributes external jars, I wonder the
 scalability of this approach. What if there are thousands of nodes
 downloading the jar from single http server in the driver? Why don't we push
 the jars into HDFS distributed cache by default instead of distributing them
 via http server?

 Thanks.

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


Re: spark on yarn-standalone, throws StackOverflowError and fails somtimes and succeed for the rest

2014-05-16 Thread Xiangrui Meng
Could you try `println(result.toDebugString())` right after `val
result = ...` and attach the result? -Xiangrui

On Fri, May 9, 2014 at 8:20 AM, phoenix bai mingzhi...@gmail.com wrote:
 after a couple of tests, I find that, if I use:

 val result = model.predict(prdctpairs)
 result.map(x =
 x.user+,+x.product+,+x.rating).saveAsTextFile(output)

 it always fails with above error and the exception seems iterative.

 but if I do:

 val result = model.predict(prdctpairs)
 result.cach()
 result.map(x =
 x.user+,+x.product+,+x.rating).saveAsTextFile(output)

 it succeeds.

 could anyone help explain why the cach() is necessary?

 thanks



 On Fri, May 9, 2014 at 6:45 PM, phoenix bai mingzhi...@gmail.com wrote:

 Hi all,

 My spark code is running on yarn-standalone.

 the last three lines of the code as below,

 val result = model.predict(prdctpairs)
 result.map(x =
 x.user+,+x.product+,+x.rating).saveAsTextFile(output)
 sc.stop()

 the same code, sometimes be able to run successfully and could give out
 the right result, while from time to time, it throws StackOverflowError and
 fail.

 and  I don`t have a clue how I should debug.

 below is the error, (the start and end portion to be exact):


 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
 MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
 44 to sp...@rxx43.mc10.site.net:43885
 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
 MapOutputTrackerMaster: Size of output statuses for shuffle 44 is 148 bytes
 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-35]
 MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
 45 to sp...@rxx43.mc10.site.net:43885
 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-35]
 MapOutputTrackerMaster: Size of output statuses for shuffle 45 is 453 bytes
 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-20]
 MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
 44 to sp...@rxx43.mc10.site.net:56767
 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29]
 MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
 45 to sp...@rxx43.mc10.site.net:56767
 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29]
 MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
 44 to sp...@rxx43.mc10.site.net:49879
 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29]
 MapOutputTrackerMasterActor: Asked to send map output locations for shuffle
 45 to sp...@rxx43.mc10.site.net:49879
 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
 TaskSetManager: Starting task 946.0:17 as TID 146 on executor 6:
 rx15.mc10.site.net (PROCESS_LOCAL)
 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17]
 TaskSetManager: Serialized task 946.0:17 as 6414 bytes in 0 ms
 14-05-09 17:55:51 WARN [Result resolver thread-0] TaskSetManager: Lost TID
 133 (task 946.0:4)
 14-05-09 17:55:51 WARN [Result resolver thread-0] TaskSetManager: Loss was
 due to java.lang.StackOverflowError
 java.lang.StackOverflowError
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:615)
 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
 at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
 at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:615)

 

 at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
 at
 

Re: How to run the SVM and LogisticRegression

2014-05-16 Thread Xiangrui Meng
If you check out the master branch, there are some examples that can
be used as templates under

examples/src/main/scala/org/apache/spark/examples/mllib

Best,
Xiangrui

On Wed, May 14, 2014 at 1:36 PM, yxzhao yxz...@ualr.edu wrote:

 Hello,
 I found the classfication algorithms SVM and LogisticRegression implemented
 in the following directory. And how to run them? What is the commnad line
 should be? Thanks.
 spark-0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/classification



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-the-SVM-and-LogisticRegression-tp5720.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andrew,

Could you try varying the minPartitions parameter? For example:

val r = sc.textFile(/user/aa/myfile.bz2, 4).count
val r = sc.textFile(/user/aa/myfile.bz2, 8).count

Best,
Xiangrui

On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote:
 Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
 the problem you described, but it does contain several fixes to bzip2
 format. -Xiangrui

 On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote:
 Hi all,

 Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
 success?


 I'm finding the following results on a recent commit (756c96 from 24hr ago)
 and CDH 4.4.0:

 Works: val r = sc.textFile(/user/aa/myfile.bz2).count
 Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) =
 s+|  ).count

 Specifically, I'm getting an exception coming out of the bzip2 libraries
 (see below stacktraces), which is unusual because I'm able to read from that
 file without an issue using the same libraries via Pig.  It was originally
 created from Pig as well.

 Digging a little deeper I found this line in the .bz2 decompressor's javadoc
 for CBZip2InputStream:

 Instances of this class are not threadsafe. [source]


 My current working theory is that Spark has a much higher level of
 parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
 exceptions much more frequently (as in can't finish a run over a little 2M
 row file) vs hardly at all in other libraries.

 The only other reference I could find to the issue was in presto-users, but
 the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
 the higher compression levels of .bz2.


 Would love to hear if I have some kind of configuration issue or if there's
 a bug in .bz2 that's fixed in later versions of CDH, or generally any other
 thoughts on the issue.


 Thanks!
 Andrew



 Below are examples of some exceptions I'm getting:

 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
 java.lang.ArrayIndexOutOfBoundsException
 java.lang.ArrayIndexOutOfBoundsException: 65535
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)




 java.lang.ArrayIndexOutOfBoundsException: 90
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327

Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andrew,

I verified that this is due to thread safety. I changed
SPARK_WORKER_CORES to 1 in spark-env.sh, so there is only 1 thread per
worker. Then I can load the file without any problem with different
values of minPartitions. I will submit a JIRA to both Spark and
Hadoop.

Best,
Xiangrui

On Thu, May 15, 2014 at 3:48 PM, Xiangrui Meng men...@gmail.com wrote:
 Hi Andrew,

 Could you try varying the minPartitions parameter? For example:

 val r = sc.textFile(/user/aa/myfile.bz2, 4).count
 val r = sc.textFile(/user/aa/myfile.bz2, 8).count

 Best,
 Xiangrui

 On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote:
 Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
 the problem you described, but it does contain several fixes to bzip2
 format. -Xiangrui

 On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote:
 Hi all,

 Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
 success?


 I'm finding the following results on a recent commit (756c96 from 24hr ago)
 and CDH 4.4.0:

 Works: val r = sc.textFile(/user/aa/myfile.bz2).count
 Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) =
 s+|  ).count

 Specifically, I'm getting an exception coming out of the bzip2 libraries
 (see below stacktraces), which is unusual because I'm able to read from that
 file without an issue using the same libraries via Pig.  It was originally
 created from Pig as well.

 Digging a little deeper I found this line in the .bz2 decompressor's javadoc
 for CBZip2InputStream:

 Instances of this class are not threadsafe. [source]


 My current working theory is that Spark has a much higher level of
 parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
 exceptions much more frequently (as in can't finish a run over a little 2M
 row file) vs hardly at all in other libraries.

 The only other reference I could find to the issue was in presto-users, but
 the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
 the higher compression levels of .bz2.


 Would love to hear if I have some kind of configuration issue or if there's
 a bug in .bz2 that's fixed in later versions of CDH, or generally any other
 thoughts on the issue.


 Thanks!
 Andrew



 Below are examples of some exceptions I'm getting:

 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
 java.lang.ArrayIndexOutOfBoundsException
 java.lang.ArrayIndexOutOfBoundsException: 65535
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)




 java.lang.ArrayIndexOutOfBoundsException: 90
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181

Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andrew,

This is the JIRA I created:
https://issues.apache.org/jira/browse/MAPREDUCE-5893 . Hopefully
someone wants to work on it.

Best,
Xiangrui

On Fri, May 16, 2014 at 6:47 PM, Xiangrui Meng men...@gmail.com wrote:
 Hi Andre,

 I could reproduce the bug with Hadoop 2.2.0. Some older version of
 Hadoop do not support splittable compression, so you ended up with
 sequential reads. It is easy to reproduce the bug with the following
 setup:

 1) Workers are configured with multiple cores.
 2) BZip2 files are big enough or minPartitions is large enough when
 you load the file via sc.textFile(), so that one worker has more than
 one tasks.

 Best,
 Xiangrui

 On Fri, May 16, 2014 at 4:06 PM, Andrew Ash and...@andrewash.com wrote:
 Hi Xiangrui,

 // FYI I'm getting your emails late due to the Apache mailing list outage

 I'm using CDH4.4.0, which I think uses the MapReduce v2 API.  The .jars are
 named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar

 I'm also glad you were able to reproduce!  Please paste a link to the Hadoop
 bug you file so I can follow along.

 Thanks!
 Andrew


 On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote:

 Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
 the problem you described, but it does contain several fixes to bzip2
 format. -Xiangrui

 On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote:
  Hi all,
 
  Is anyone reading and writing to .bz2 files stored in HDFS from Spark
  with
  success?
 
 
  I'm finding the following results on a recent commit (756c96 from 24hr
  ago)
  and CDH 4.4.0:
 
  Works: val r = sc.textFile(/user/aa/myfile.bz2).count
  Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String)
  =
  s+|  ).count
 
  Specifically, I'm getting an exception coming out of the bzip2 libraries
  (see below stacktraces), which is unusual because I'm able to read from
  that
  file without an issue using the same libraries via Pig.  It was
  originally
  created from Pig as well.
 
  Digging a little deeper I found this line in the .bz2 decompressor's
  javadoc
  for CBZip2InputStream:
 
  Instances of this class are not threadsafe. [source]
 
 
  My current working theory is that Spark has a much higher level of
  parallelism than Pig/Hadoop does and thus I get these wild
  IndexOutOfBounds
  exceptions much more frequently (as in can't finish a run over a little
  2M
  row file) vs hardly at all in other libraries.
 
  The only other reference I could find to the issue was in presto-users,
  but
  the recommendation to leave .bz2 for .lzo doesn't help if I actually do
  want
  the higher compression levels of .bz2.
 
 
  Would love to hear if I have some kind of configuration issue or if
  there's
  a bug in .bz2 that's fixed in later versions of CDH, or generally any
  other
  thoughts on the issue.
 
 
  Thanks!
  Andrew
 
 
 
  Below are examples of some exceptions I'm getting:
 
  14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
  java.lang.ArrayIndexOutOfBoundsException
  java.lang.ArrayIndexOutOfBoundsException: 65535
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
  at
 
  org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
  at java.io.InputStream.read(InputStream.java:101)
  at
  org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
  at
  org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
  at
 
  org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
  at
  org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
 
 
 
 
  java.lang.ArrayIndexOutOfBoundsException: 90
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397

Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andre,

I could reproduce the bug with Hadoop 2.2.0. Some older version of
Hadoop do not support splittable compression, so you ended up with
sequential reads. It is easy to reproduce the bug with the following
setup:

1) Workers are configured with multiple cores.
2) BZip2 files are big enough or minPartitions is large enough when
you load the file via sc.textFile(), so that one worker has more than
one tasks.

Best,
Xiangrui

On Fri, May 16, 2014 at 4:06 PM, Andrew Ash and...@andrewash.com wrote:
 Hi Xiangrui,

 // FYI I'm getting your emails late due to the Apache mailing list outage

 I'm using CDH4.4.0, which I think uses the MapReduce v2 API.  The .jars are
 named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar

 I'm also glad you were able to reproduce!  Please paste a link to the Hadoop
 bug you file so I can follow along.

 Thanks!
 Andrew


 On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote:

 Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
 the problem you described, but it does contain several fixes to bzip2
 format. -Xiangrui

 On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote:
  Hi all,
 
  Is anyone reading and writing to .bz2 files stored in HDFS from Spark
  with
  success?
 
 
  I'm finding the following results on a recent commit (756c96 from 24hr
  ago)
  and CDH 4.4.0:
 
  Works: val r = sc.textFile(/user/aa/myfile.bz2).count
  Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String)
  =
  s+|  ).count
 
  Specifically, I'm getting an exception coming out of the bzip2 libraries
  (see below stacktraces), which is unusual because I'm able to read from
  that
  file without an issue using the same libraries via Pig.  It was
  originally
  created from Pig as well.
 
  Digging a little deeper I found this line in the .bz2 decompressor's
  javadoc
  for CBZip2InputStream:
 
  Instances of this class are not threadsafe. [source]
 
 
  My current working theory is that Spark has a much higher level of
  parallelism than Pig/Hadoop does and thus I get these wild
  IndexOutOfBounds
  exceptions much more frequently (as in can't finish a run over a little
  2M
  row file) vs hardly at all in other libraries.
 
  The only other reference I could find to the issue was in presto-users,
  but
  the recommendation to leave .bz2 for .lzo doesn't help if I actually do
  want
  the higher compression levels of .bz2.
 
 
  Would love to hear if I have some kind of configuration issue or if
  there's
  a bug in .bz2 that's fixed in later versions of CDH, or generally any
  other
  thoughts on the issue.
 
 
  Thanks!
  Andrew
 
 
 
  Below are examples of some exceptions I'm getting:
 
  14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
  java.lang.ArrayIndexOutOfBoundsException
  java.lang.ArrayIndexOutOfBoundsException: 65535
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
  at
 
  org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
  at java.io.InputStream.read(InputStream.java:101)
  at
  org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
  at
  org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
  at
 
  org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
  at
  org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
 
 
 
 
  java.lang.ArrayIndexOutOfBoundsException: 90
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
  at
 
  org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
  at java.io.InputStream.read(InputStream.java:101)
  at
  org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209

Re: breeze DGEMM slow in spark

2014-05-17 Thread Xiangrui Meng
You need to include breeze-natives or netlib:all to load the native
libraries. Check the log messages to ensure native libraries are used,
especially on the worker nodes. The easiest way to use OpenBLAS is
copying the shared library to /usr/lib/libblas.so.3 and
/usr/lib/liblapack.so.3. -Xiangrui

On Sat, May 17, 2014 at 8:02 PM, wxhsdp wxh...@gmail.com wrote:
 i think maybe it's related to m1.large, because i also tested on my laptop,
 the two case cost nearly
 the same amount of time.

 my laptop:
 model name  : Intel(R) Core(TM) i5-3380M CPU @ 2.90GHz
 cpu MHz : 2893.549

 os:
 Linux ubuntu 3.11.0-12-generic #19-Ubuntu SMP Wed Oct 9 16:20:46 UTC 2013
 x86_64 x86_64 x86_64 GNU/Linux




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/breeze-DGEMM-slow-in-spark-tp5950p5971.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: reading large XML files

2014-05-20 Thread Xiangrui Meng
Try sc.wholeTextFiles(). It reads the entire file into a string
record. -Xiangrui

On Tue, May 20, 2014 at 8:25 AM, Nathan Kronenfeld
nkronenf...@oculusinfo.com wrote:
 We are trying to read some large GraphML files to use in spark.

 Is there an easy way to read XML-based files like this that accounts for
 partition boundaries and the like?

  Thanks,
  Nathan


 --
 Nathan Kronenfeld
 Senior Visualization Developer
 Oculus Info Inc
 2 Berkeley Street, Suite 600,
 Toronto, Ontario M5A 4J5
 Phone:  +1-416-203-3003 x 238
 Email:  nkronenf...@oculusinfo.com


Re: Job Processing Large Data Set Got Stuck

2014-05-21 Thread Xiangrui Meng
Many OutOfMemoryErrors in the log. Is your data distributed evenly? -Xiangrui

On Wed, May 21, 2014 at 11:23 AM, yxzhao yxz...@ualr.edu wrote:
 I run the pagerank example processing a large data set, 5GB in size, using 48
 machines. The job got stuck at the time point: 14/05/20 21:32:17, as the
 attached log shows. It was stuck there for more than 10 hours and then I
 killed it at last. But I did not find any information explaining why it was
 stuck. Any suggestions? Thanks.


 Spark_OK_48_pagerank.log
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n6185/Spark_OK_48_pagerank.log



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Job-Processing-Large-Data-Set-Got-Stuck-tp6185.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Job Processing Large Data Set Got Stuck

2014-05-21 Thread Xiangrui Meng
If the RDD is cached, you can check its storage information in the
Storage tab of the Web UI.

On Wed, May 21, 2014 at 12:31 PM, yxzhao yxz...@ualr.edu wrote:
 Thanks Xiangrui, How to check and make sure the data is distributed
 evenly? Thanks again.
 On Wed, May 21, 2014 at 2:17 PM, Xiangrui Meng [via Apache Spark User
 List] [hidden email] wrote:

 Many OutOfMemoryErrors in the log. Is your data distributed evenly?
 -Xiangrui

 On Wed, May 21, 2014 at 11:23 AM, yxzhao [hidden email] wrote:

 I run the pagerank example processing a large data set, 5GB in size,
 using
 48
 machines. The job got stuck at the time point: 14/05/20 21:32:17, as the
 attached log shows. It was stuck there for more than 10 hours and then I
 killed it at last. But I did not find any information explaining why it
 was
 stuck. Any suggestions? Thanks.


 Spark_OK_48_pagerank.log


 http://apache-spark-user-list.1001560.n3.nabble.com/file/n6185/Spark_OK_48_pagerank.log



 --
 View this message in context:

 http://apache-spark-user-list.1001560.n3.nabble.com/Job-Processing-Large-Data-Set-Got-Stuck-tp6185.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


 
 If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Job-Processing-Large-Data-Set-Got-Stuck-tp6185p6187.html
 To unsubscribe from Job Processing Large Data Set Got Stuck, click here.
 NAML

 
 View this message in context: Re: Job Processing Large Data Set Got Stuck

 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Inconsistent RDD Sample size

2014-05-21 Thread Xiangrui Meng
It doesn't guarantee the exact sample size. If you fix the random
seed, it would return the same result every time. -Xiangrui

On Wed, May 21, 2014 at 2:05 PM, glxc r.ryan.mcc...@gmail.com wrote:
 I have a graph and am trying to take a random sample of vertices without
 replacement, using the RDD.sample() method

 verts are the vertices in the graph

  val verts = graph.vertices

 and executing this multiple times in a row

  verts.sample(false, 1.toDouble/v1.count.toDouble,
 System.currentTimeMillis).count

 yields different results roughly each time (albeit +/- a small % of the
 target)

 why does this happen? Looked at PartionwiseSampledRDD but can't figure it
 out

 Also, is there another method/technique to yield the same result each time?
 My understanding is that grabbing random indices may not be the best use of
 the RDD model



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Inconsistent-RDD-Sample-size-tp6197.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Error while launching ec2 spark cluster with HVM (r3.large)

2014-05-22 Thread Xiangrui Meng
Was the error message the same as you posted when you used `root` as
the user id? Could you try this:

1) Do not specify user id. (Default would be `root`.)
2) If it fails in the middle, try `spark-ec2  --resume launch
cluster` to continue launching the cluster.

Best,
Xiangrui

On Thu, May 22, 2014 at 12:44 PM, adparker adpar...@gmail.com wrote:
 I had this problem too and fixed it by setting the wait time-out to a larger
 value: --wait

 For example, in stand alone mode with default values, a time out of 480
 seconds worked for me:

 $ cd spark-0.9.1/ec2
 $ ./spark-ec2 --key-pair= --identity-file= --instance-type=r3.large
 --wait=480  launch 




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-launching-ec2-spark-cluster-with-HVM-r3-large-tp5862p6276.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: pyspark MLlib examples don't work with Spark 1.0.0

2014-05-31 Thread Xiangrui Meng
The documentation you looked at is not official, though it is from
@pwendell's website. It was for the Spark SQL release. Please find the
official documentation here:

http://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-support-vector-machine-svm

It contains a working example showing how to construct LabeledPoint
and use it for training.

Best,
Xiangrui

On Fri, May 30, 2014 at 5:10 AM, jamborta jambo...@gmail.com wrote:
 thanks for the reply. I am definitely running 1.0.0, I set it up manually.

 To answer my question, I found out from the examples that it would need a
 new data type called LabeledPoint instead of numpy array.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-MLlib-examples-don-t-work-with-Spark-1-0-0-tp6546p6579.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Create/shutdown objects before/after RDD use (or: Non-serializable classes)

2014-05-31 Thread Xiangrui Meng
Hi Tobias,

One hack you can try is:

rdd.mapPartitions(iter = {
  val x = new X()
  iter.map(row = x.doSomethingWith(row)) ++ { x.shutdown(); Iterator.empty }
})

Best,
Xiangrui

On Thu, May 29, 2014 at 11:38 PM, Tobias Pfeiffer t...@preferred.jp wrote:
 Hi,

 I want to use an object x in my RDD processing as follows:

 val x = new X()
 rdd.map(row = x.doSomethingWith(row))
 println(rdd.count())
 x.shutdown()

 Now the problem is that X is non-serializable, so while this works
 locally, it does not work in cluster setup. I thought I could do

 rdd.mapPartitions(iter = {
   val x = new X()
   val result = iter.map(row = x.doSomethingWith(row))
   x.shutdown()
   result
 })

 to create an instance of X locally, but obviously x.shutdown() is
 called before the first row is processed.

 How can I specify these node-local setup/teardown functions or how do
 I deal in general with non-serializable classes?

 Thanks
 Tobias


Re: Using String Dataset for Logistic Regression

2014-06-03 Thread Xiangrui Meng
Yes. MLlib 1.0 supports sparse input data for linear methods. -Xiangrui

On Mon, Jun 2, 2014 at 11:36 PM, praveshjain1991
praveshjain1...@gmail.com wrote:
 I am not sure. I have just been using some numerical datasets.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Using-String-Dataset-for-Logistic-Regression-tp5523p6784.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Using MLLib in Scala

2014-06-03 Thread Xiangrui Meng
Hi Suela,

(Please subscribe our user mailing list and send your questions there
in the future.) For your case, each file contains a column of numbers.
So you can use `sc.textFile` to read them first, zip them together,
and then create labeled points:

val xx = sc.textFile(/path/to/ex2x.dat).map(x = Vectors.dense(_.toDouble))
val yy = sc.textFile(/path/to/ex2y.dat).map(_.toDouble)
val examples = yy.zip(xx).map { case (y, x) = LabeledPoint(y, x) }

Best,
Xiangrui

On Thu, May 29, 2014 at 2:35 AM, Suela Haxhi suelaha...@gmail.com wrote:

 Hello Xiangrui ,
 my name is Suela Haxhi. Let me ask you a little help. I find some difficulty
 in uploading files in Mllib , namely:
 Binary Classification ;
 Linear Regression ;
 

 E.g. , the file  mllib / data / sample_svm_data.txt  contains the
 following data :
 1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0
 2.228387042742021 2.228387042742023 0 0 0 0 0 0
 0 2.857738033247042 0 0 2.619965104088255 0 2.004684436494304
 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0

 etc  ..

 I don't understand what are the input / output.
 The problem comes when I want to load another type of dataset. E.g. , I want
 to make a Binary Classification on the presence of a disease.

 For example, the estimated proffessor Andrew Ng, on courses in machine
 learning explains:

 Download ex2Data.zip, and extract the files from the zip file.The files
 Contain some example measurements of various heights for boys between the
 ages of two and eights. The y-values are the heights Measured in meters, and
 the x-values are the ages of the boys Corresponding to the heights. Each
 height and age tuples constitutes one training example $ (x ^ {(i)}, y ^
 {(i)} $ in our dataset. = There are $ m $ 50 training examples, and you will
 use them to develop a linear regression model .
 In this problem, you'll Implement linear regression using gradient descent.
 In Matlab / Octave, you can load the training set using the commands
 x = load ( ' ex2x.dat ' ) ;
 y = load ( ' ex2y.dat ' ) ;



 But,  in Mllib,  I can't figure out what these data mean (mllib / data /
 sample_svm_data.txt).
 And I don't know how to load another type of data set using the following
 code:

 Binary Classification
 import org.apache.spark.SparkContext
 import org.apache.spark.mllib.classification.SVMWithSGD
 import org.apache.spark.mllib.regression.LabeledPoint

 / / Load and parse the data file

 / / Run training algorithm to build the model

 / / Evaluate model on training examples and compute the training error



 Can you help me please? Thank you in advance.

 Best Regards
 Suela Haxhi


Re: How to stop a running SparkContext in the proper way?

2014-06-03 Thread Xiangrui Meng
Did you try sc.stop()?

On Tue, Jun 3, 2014 at 9:54 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote:
 Hi,

 I want to know how I can stop a running SparkContext in a proper way so that
 next time when I start a new SparkContext, the web UI can be launched on the
 same port 4040.Now when i quit the job using ctrl+z the new sc are launched
 in new ports.

 I have the same problem with ipython notebook.It is launched on a different
 port when I start the notebook second time after closing the first one.I am
 starting ipython using the command

 IPYTHON_OPTS=notebook --ip  --pylab inline ./bin/pyspark

 Thanks  Regards,
 Meethu M


Re: IllegalArgumentException on calling KMeans.train()

2014-06-04 Thread Xiangrui Meng
Could you check whether the vectors have the same size? -Xiangrui

On Wed, Jun 4, 2014 at 1:43 AM, bluejoe2008 bluejoe2...@gmail.com wrote:
 what does this exception mean?

 14/06/04 16:35:15 ERROR executor.Executor: Exception in task ID 6
 java.lang.IllegalArgumentException: requirement failed
 at scala.Predef$.require(Predef.scala:221)
 at
 org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271)
 at
 org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398)
 at
 org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372)
 at
 org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366)
 at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389)
 at
 org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269)
 at
 org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.Range.foreach(Range.scala:141)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at
 org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268)
 at
 org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95)
 at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
 at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:619)

 my spark version: 1.0.0
 Java: 1.7
 my codes:

 JavaRDDVector docVectors = generateDocVector(...);
 int numClusters = 20;
 int numIterations = 20;
 KMeansModel clusters = KMeans.train(docVectors.rdd(), numClusters,
 numIterations);

 another strange thing is that the mapPartitionsWithIndex() method call in
 generateDocVector() are invoked for 3 times...

 2014-06-04
 
 bluejoe2008


Re: Logistic Regression MLLib Slow

2014-06-04 Thread Xiangrui Meng
80M by 4 should be about 2.5GB uncompressed. 10 iterations shouldn't
take that long, even on a single executor. Besides what Matei
suggested, could you also verify the executor memory in
http://localhost:4040 in the Executors tab. It is very likely the
executors do not have enough memory. In that case, caching may be
slower than reading directly from disk. -Xiangrui

On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Ah, is the file gzipped by any chance? We can’t decompress gzipped files in
 parallel so they get processed by a single task.

 It may also be worth looking at the application UI (http://localhost:4040)
 to see 1) whether all the data fits in memory in the Storage tab (maybe it
 somehow becomes larger, though it seems unlikely that it would exceed 20 GB)
 and 2) how many parallel tasks run in each iteration.

 Matei

 On Jun 4, 2014, at 6:56 PM, Srikrishna S srikrishna...@gmail.com wrote:

 I am using the MLLib one (LogisticRegressionWithSGD)  with PySpark. I am
 running to only 10 iterations.

 The MLLib version of logistic regression doesn't seem to use all the cores
 on my machine.

 Regards,
 Krishna



 On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Are you using the logistic_regression.py in examples/src/main/python or
 examples/src/main/python/mllib? The first one is an example of writing
 logistic regression by hand and won’t be as efficient as the MLlib one. I
 suggest trying the MLlib one.

 You may also want to check how many iterations it runs — by default I
 think it runs 100, which may be more than you need.

 Matei

 On Jun 4, 2014, at 5:47 PM, Srikrishna S srikrishna...@gmail.com wrote:

  Hi All.,
 
  I am new to Spark and I am trying to run LogisticRegression (with SGD)
  using MLLib on a beefy single machine with about 128GB RAM. The dataset has
  about 80M rows with only 4 features so it barely occupies 2Gb on disk.
 
  I am running the code using all 8 cores with 20G memory using
  spark-submit --executor-memory 20G --master local[8]
  logistic_regression.py
 
  It seems to take about 3.5 hours without caching and over 5 hours with
  caching.
 
  What is the recommended use for Spark on a beefy single machine?
 
  Any suggestions will help!
 
  Regards,
  Krishna
 
 
  Code sample:
 
  -
  # Dataset
  d = sys.argv[1]
  data = sc.textFile(d)
 
  # Load and parse the data
  #
  --
  def parsePoint(line):
  values = [float(x) for x in line.split(',')]
  return LabeledPoint(values[0], values[1:])
  _parsedData = data.map(parsePoint)
  parsedData = _parsedData.cache()
  results = {}
 
  # Spark
  #
  --
  start_time = time.time()
  # Build the gl_model
  niters = 10
  spark_model = LogisticRegressionWithSGD.train(parsedData,
  iterations=niters)
 
  # Evaluate the gl_model on training data
  labelsAndPreds = parsedData.map(lambda p: (p.label,
  spark_model.predict(p.features)))
  trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() /
  float(parsedData.count())
 





Re: Logistic Regression MLLib Slow

2014-06-04 Thread Xiangrui Meng
Hi Krishna,

Specifying executor memory in local mode has no effect, because all of
the threads run inside the same JVM. You can either try
--driver-memory 60g or start a standalone server.

Best,
Xiangrui

On Wed, Jun 4, 2014 at 7:28 PM, Xiangrui Meng men...@gmail.com wrote:
 80M by 4 should be about 2.5GB uncompressed. 10 iterations shouldn't
 take that long, even on a single executor. Besides what Matei
 suggested, could you also verify the executor memory in
 http://localhost:4040 in the Executors tab. It is very likely the
 executors do not have enough memory. In that case, caching may be
 slower than reading directly from disk. -Xiangrui

 On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Ah, is the file gzipped by any chance? We can’t decompress gzipped files in
 parallel so they get processed by a single task.

 It may also be worth looking at the application UI (http://localhost:4040)
 to see 1) whether all the data fits in memory in the Storage tab (maybe it
 somehow becomes larger, though it seems unlikely that it would exceed 20 GB)
 and 2) how many parallel tasks run in each iteration.

 Matei

 On Jun 4, 2014, at 6:56 PM, Srikrishna S srikrishna...@gmail.com wrote:

 I am using the MLLib one (LogisticRegressionWithSGD)  with PySpark. I am
 running to only 10 iterations.

 The MLLib version of logistic regression doesn't seem to use all the cores
 on my machine.

 Regards,
 Krishna



 On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Are you using the logistic_regression.py in examples/src/main/python or
 examples/src/main/python/mllib? The first one is an example of writing
 logistic regression by hand and won’t be as efficient as the MLlib one. I
 suggest trying the MLlib one.

 You may also want to check how many iterations it runs — by default I
 think it runs 100, which may be more than you need.

 Matei

 On Jun 4, 2014, at 5:47 PM, Srikrishna S srikrishna...@gmail.com wrote:

  Hi All.,
 
  I am new to Spark and I am trying to run LogisticRegression (with SGD)
  using MLLib on a beefy single machine with about 128GB RAM. The dataset 
  has
  about 80M rows with only 4 features so it barely occupies 2Gb on disk.
 
  I am running the code using all 8 cores with 20G memory using
  spark-submit --executor-memory 20G --master local[8]
  logistic_regression.py
 
  It seems to take about 3.5 hours without caching and over 5 hours with
  caching.
 
  What is the recommended use for Spark on a beefy single machine?
 
  Any suggestions will help!
 
  Regards,
  Krishna
 
 
  Code sample:
 
  -
  # Dataset
  d = sys.argv[1]
  data = sc.textFile(d)
 
  # Load and parse the data
  #
  --
  def parsePoint(line):
  values = [float(x) for x in line.split(',')]
  return LabeledPoint(values[0], values[1:])
  _parsedData = data.map(parsePoint)
  parsedData = _parsedData.cache()
  results = {}
 
  # Spark
  #
  --
  start_time = time.time()
  # Build the gl_model
  niters = 10
  spark_model = LogisticRegressionWithSGD.train(parsedData,
  iterations=niters)
 
  # Evaluate the gl_model on training data
  labelsAndPreds = parsedData.map(lambda p: (p.label,
  spark_model.predict(p.features)))
  trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() /
  float(parsedData.count())
 





Re: Native library can not be loaded when using Mllib PCA

2014-06-05 Thread Xiangrui Meng
For standalone and yarn mode, you need to install native libraries on all 
nodes. The best solution is installing them to /usr/lib/libblas.so.3 and 
/usr/lib/liblapack.so.3 . If your matrix is sparse, the native libraries cannot 
help because they are for dense linear algebra. You can create RDD of sparse 
rows and try k-means directly, it supports sparse input. -Xiangrui

Sent from my iPad

 On Jun 5, 2014, at 2:36 AM, yangliuyu yangli...@163.com wrote:
 
 Hi,
 
 We're using Mllib (1.0.0 release version) on a k-means clustering problem.
 We want to reduce the matrix column size before send the points to k-means
 solver.
 
 It works on my mac with the local mode: spark-test-run-assembly-1.0.jar
 contains my application code, com.github.fommil, netlib code and
 netlib-native*.so files (include jnilib and dll files) 
 
 spark-submit --class test.TestMllibPCA --master local[4] --executor-memory
 3g --driver-memory 3g --driver-class-path
 /data/user/dump/spark-test-run-assembly-1.0.jar
 /data/user/dump/spark-test-run-assembly-1.0.jar
 /data/user/dump/user_fav_2014_04_09.csv.head1w 
 
 But if  --driver-class-path removed, the warn message appears:
 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from:
 com.github.fommil.netlib.NativeSystemLAPACK
 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from:
 com.github.fommil.netlib.NativeRefLAPACK
 
 or set SPARK_CLASSPATH=/data/user/dump/spark-test-run-assembly-1.0.jar can
 also solve the problem.
 
 The matrix contain sparse data with rows: 6778, columns: 2487 and the time
 consume of calculating PCA is 10s and 47s respectively which infers the
 native library works well.
 
 Then I want to test it on a spark standalone cluster(on CentOS), but it
 failed again.
 After change JDK logging level to FINEST, got the message:
 14/06/05 16:19:15 INFO JniLoader: JNI LIB =
 netlib-native_system-linux-x86_64.so
 14/06/05 16:19:15 INFO JniLoader: extracting
 jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_system-linux-x86_64.so
 to /tmp/jniloader6648403281987654682netlib-native_system-linux-x86_64.so
 14/06/05 16:19:15 WARN LAPACK: Failed to load implementation from:
 com.github.fommil.netlib.NativeSystemLAPACK
 14/06/05 16:19:15 INFO JniLoader: JNI LIB =
 netlib-native_ref-linux-x86_64.so
 14/06/05 16:19:15 INFO JniLoader: extracting
 jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_ref-linux-x86_64.so
 to /tmp/jniloader2298588627398263902netlib-native_ref-linux-x86_64.so
 14/06/05 16:19:16 WARN LAPACK: Failed to load implementation from:
 com.github.fommil.netlib.NativeRefLAPACK
 14/06/05 16:19:16 INFO LAPACK: Implementation provided by class
 com.github.fommil.netlib.F2jLAPACK
 
 The libgfortran ,atlas, blas, lapack and arpack are all installed and all of
 the .so files are located under /usr/lib64, spark.executor.extraLibraryPath
 is set to /usr/lib64 in conf/spark-defaults.conf but none of them works. I
 tried add --jars /data/user/dump/spark-test-run-assembly-1.0.jar but no good
 news.
 
 What should I try next?
 
 Is the native library need to be visible for driver and executor both? In
 local mode the problem seems to be a classpath problem, but for standalone
 and yarn mode it get more complex. A detail document is really helpful.
 
 Thanks.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Native-library-can-not-be-loaded-when-using-Mllib-PCA-tp7042.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to process multiple classification with SVM in MLlib

2014-06-07 Thread Xiangrui Meng
At this time, you need to do one-vs-all manually for multiclass
training. For your second question, if the algorithm is implemented in
Java/Scala/Python and designed for single machine, you can broadcast
the dataset to each worker, train models on workers. If the algorithm
is implemented in a different language, maybe you need pipe to train
the models outside JVM (similar to Hadoop Streaming). If the algorithm
is designed for a different parallel platform, then it may be hard to
use it in Spark. -Xiangrui

On Sat, Jun 7, 2014 at 7:15 AM, littlebird cxp...@163.com wrote:
 Hi All,
   As we know, In MLlib the SVM is used for binary classification. I wonder
 how to train SVM model for mutiple classification in MLlib. In addition, how
 to apply the machine learning algorithm in Spark if the algorithm isn't
 included in MLlib. Thank you.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-multiple-classification-with-SVM-in-MLlib-tp7174.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Classpath errors with Breeze

2014-06-08 Thread Xiangrui Meng
Hi dlaw,

You are using breeze-0.8.1, but the spark assembly jar depends on
breeze-0.7. If the spark assembly jar comes the first on the classpath
but the method from DenseMatrix is only available in breeze-0.8.1, you
get NoSuchMethod. So,

a) If you don't need the features in breeze-0.8.1, do not include it
as a dependency.

or

b) Try an experimental features by turning on
spark.files.userClassPathFirst in your Spark configuration.

Best,
Xiangrui

On Sun, Jun 8, 2014 at 10:08 PM, dlaw dieterich.law...@gmail.com wrote:
 Thanks for the quick response. No, I actually build my jar via 'sbt package'
 on EC2 on the master itself.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Classpath-errors-with-Breeze-tp7220p7225.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Classpath errors with Breeze

2014-06-08 Thread Xiangrui Meng
Hi Tobias,

Which file system and which encryption are you using?

Best,
Xiangrui

On Sun, Jun 8, 2014 at 10:16 PM, Xiangrui Meng men...@gmail.com wrote:
 Hi dlaw,

 You are using breeze-0.8.1, but the spark assembly jar depends on
 breeze-0.7. If the spark assembly jar comes the first on the classpath
 but the method from DenseMatrix is only available in breeze-0.8.1, you
 get NoSuchMethod. So,

 a) If you don't need the features in breeze-0.8.1, do not include it
 as a dependency.

 or

 b) Try an experimental features by turning on
 spark.files.userClassPathFirst in your Spark configuration.

 Best,
 Xiangrui

 On Sun, Jun 8, 2014 at 10:08 PM, dlaw dieterich.law...@gmail.com wrote:
 Thanks for the quick response. No, I actually build my jar via 'sbt package'
 on EC2 on the master itself.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Classpath-errors-with-Breeze-tp7220p7225.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to process multiple classification with SVM in MLlib

2014-06-09 Thread Xiangrui Meng
For broadcast data, please read
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
.
For one-vs-all, please read
https://en.wikipedia.org/wiki/Multiclass_classification .

-Xiangrui

On Mon, Jun 9, 2014 at 7:24 AM, littlebird cxp...@163.com wrote:
 Thank you for your reply, I don't quite understand how to do one-vs-all
 manually for multiclass
 training. And for the second question, My algorithm is implemented in Java
 and designed for single machine, How can I broadcast the dataset to each
 worker, train models on workers? Thank you very much.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-multiple-classification-with-SVM-in-MLlib-tp7174p7251.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Not fully cached when there is enough memory

2014-06-11 Thread Xiangrui Meng
Could you try to click one that RDD and see the storage info per
partition? I tried continuously caching RDDs, so new ones kick old
ones out when there is not enough memory. I saw similar glitches but
the storage info per partition is correct. If you find a way to
reproduce this error, please create a JIRA. Thanks! -Xiangrui


Re: Convert text into tfidf vectors for Classification

2014-06-13 Thread Xiangrui Meng
You can create tf vectors and then use
RowMatrix.computeColumnSummaryStatistics to get df (numNonzeros). For
tokenizer and stemmer, you can use scalanlp/chalk. Yes, it is worth
having a simple interface for it. -Xiangrui

On Fri, Jun 13, 2014 at 1:21 AM, Stuti Awasthi stutiawas...@hcl.com wrote:
 Hi all,



 I wanted to perform Text Classification using Spark1.0 Naïve Bayes. I was
 looking for the way to convert text into sparse vector with TFIDF weighting
 scheme.

 I found that MLI library supports that but it is compatible with Spark 0.8.



 What are all the options available to achieve text vectorization. Is there
 any pre-built api’s which can be used or other way in which we can achieve
 this

 Please suggest



 Thanks

 Stuti Awasthi



 ::DISCLAIMER::
 

 The contents of this e-mail and any attachment(s) are confidential and
 intended for the named recipient(s) only.
 E-mail transmission is not guaranteed to be secure or error-free as
 information could be intercepted, corrupted,
 lost, destroyed, arrive late or incomplete, or may contain viruses in
 transmission. The e mail and its contents
 (with or without referred errors) shall therefore not attach any liability
 on the originator or HCL or its affiliates.
 Views or opinions, if any, presented in this email are solely those of the
 author and may not necessarily reflect the
 views or opinions of HCL or its affiliates. Any form of reproduction,
 dissemination, copying, disclosure, modification,
 distribution and / or publication of this message without the prior written
 consent of authorized representative of
 HCL is strictly prohibited. If you have received this email in error please
 delete it and notify the sender immediately.
 Before opening any email and/or attachments, please check them for viruses
 and other defects.

 


Re: MLlib-Missing Regularization Parameter and Intercept for Logistic Regression

2014-06-14 Thread Xiangrui Meng
1. 
examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala
contains example code that shows how to set regParam.

2. A static method with more than 3 parameters becomes hard to
remember and hard to maintain. Please use LogistricRegressionWithSGD's
default constructor and setters.

-Xiangrui


Re: MLlib-Missing Regularization Parameter and Intercept for Logistic Regression

2014-06-16 Thread Xiangrui Meng
Someone is working on weighted regularization. Stay tuned. -Xiangrui

On Mon, Jun 16, 2014 at 9:36 AM, FIXED-TERM Yi Congrui (CR/RTC1.3-NA)
fixed-term.congrui...@us.bosch.com wrote:
 Hi Xiangrui,

 Thank you for the reply! I have tried customizing 
 LogisticRegressionSGD.optimizer as in the example you mentioned, but the 
 source code reveals that the intercept is also penalized if one is included, 
 which is usually inappropriate. The developer should fix this problem.

 Best,

 Congrui

 -Original Message-
 From: Xiangrui Meng [mailto:men...@gmail.com]
 Sent: Friday, June 13, 2014 11:50 PM
 To: user@spark.apache.org
 Cc: user
 Subject: Re: MLlib-Missing Regularization Parameter and Intercept for 
 Logistic Regression

 1. 
 examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala
 contains example code that shows how to set regParam.

 2. A static method with more than 3 parameters becomes hard to
 remember and hard to maintain. Please use LogistricRegressionWithSGD's
 default constructor and setters.

 -Xiangrui


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi Makoto,

How many partitions did you set? If there are too many partitions,
please do a coalesce before calling ML algorithms.

Btw, could you try the tree branch in my repo?
https://github.com/mengxr/spark/tree/tree

I used tree aggregate in this branch. It should help with the scalability.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 12:22 PM, Makoto Yui yuin...@gmail.com wrote:
 Here is follow-up to the previous evaluation.

 aggregate at GradientDescent.scala:178 never finishes at
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178

 We confirmed, by -verbose:gc, that GC is not happening during the aggregate
 and the cumulative CPU time for the task is increasing little by little.

 LBFGS also does not work for large # of features (news20.random.1000)
 though it works fine for small # of features (news20.binary.1000).

 aggregate at LBFGS.scala:201 also never finishes at
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201

 ---
 [Evaluated code for LBFGS]

 import org.apache.spark.SparkContext
 import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.mllib.classification.LogisticRegressionModel
 import org.apache.spark.mllib.optimization._

 val data = MLUtils.loadLibSVMFile(sc,
 hdfs://dm01:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false)
 val numFeatures = data.take(1)(0).features.size

 val training = data.map(x = (x.label, 
 MLUtils.appendBias(x.features))).cache()

 // Run training algorithm to build the model
 val numCorrections = 10
 val convergenceTol = 1e-4
 val maxNumIterations = 20
 val regParam = 0.1
 val initialWeightsWithIntercept = Vectors.dense(new
 Array[Double](numFeatures + 1))

 val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
   training,
   new LogisticGradient(),
   new SquaredL2Updater(),
   numCorrections,
   convergenceTol,
   maxNumIterations,
   regParam,
   initialWeightsWithIntercept)
 ---


 Thanks,
 Makoto

 2014-06-17 21:32 GMT+09:00 Makoto Yui yuin...@gmail.com:
 Hello,

 I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
 Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
 the number of training examples used in the evaluation is just 1,000.

 It works fine for the dataset *news20.binary.1000* that has 178,560
 features. However, it does not work for *news20.random.1000* where # of
 features is large  (1,354,731 features) though we used a sparse vector
 through MLUtils.loadLibSVMFile().

 The execution seems not progressing while no error is reported in the
 spark-shell as well as in the stdout/stderr of executors.

 We used 32 executors with each allocating 7GB (2GB is for RDD) for
 working memory.

 Any suggesions? Your help is really appreciated.

 ==
 Executed code
 ==
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

 //val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.binary.1000,
 multiclass=false)
 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false)

 val numFeatures = training .take(1)(0).features.size
 //numFeatures: Int = 178560 for news20.binary.1000
 //numFeatures: Int = 1354731 for news20.random.1000
 val model = LogisticRegressionWithSGD.train(training, numIterations=1)

 ==
 The dataset used in the evaluation
 ==

 http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary

 $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' 
 news20.binary.1000
 $ sort -R news20.binary  news20.random
 $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' 
 news20.random.1000

 You can find the dataset in
 https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
 https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000


 Thanks,
 Makoto


Re: Contribution to Spark MLLib

2014-06-17 Thread Xiangrui Meng
Hi Jayati,

Thanks for asking! MLlib algorithms are all implemented in Scala. It
makes us easier to maintain if we have the implementations in one
place. For the roadmap, please visit
http://www.slideshare.net/xrmeng/m-llib-hadoopsummit to see features
planned for v1.1. Before contributing new algorithms, it would be
great if you can start with working on an existing JIRA.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 12:22 AM, Jayati tiwarijay...@gmail.com wrote:
 Hello,

 I wish to contribute some algorithms to the MLLib of Spark but at the same
 time wanted to make sure that I don't try something redundant.

 Will it be okay with you to let me know the set of algorithms which aren't
 there in your road map in the near future ?

 Also, can I use Java to write machine learning algorithms for Spark MLLib
 instead of Scala ?

 Regards,
 Jayati



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Contribution-to-Spark-MLLib-tp7716.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Execution stalls in LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi Bharath,

Thanks for posting the details! Which Spark version are you using?

Best,
Xiangrui

On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote:
 Hi,

 (Apologies for the long mail, but it's necessary to provide sufficient
 details considering the number of issues faced.)

 I'm running into issues testing LogisticRegressionWithSGD a two node cluster
 (each node with 24 cores and 16G available to slaves out of 24G on the
 system). Here's a description of the application:

 The model is being trained based on categorical features x, y, and (x,y).
 The categorical features are mapped to binary features by converting each
 distinct value in the category enum into a binary feature by itself (i.e
 presence of that value in a record implies corresponding feature = 1, else
 feature = 0. So, there'd be as many distinct features as enum values) . The
 training vector is laid out as
 [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
 training data has only one combination (Xk,Yk) and a label appearing in the
 record. Thus, the corresponding labeledpoint sparse vector would only have 3
 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
 (though parse) would be nearly 614000.  The number of records is about 1.33
 million. The records have been coalesced into 20 partitions across two
 nodes. The input data has not been cached.
 (NOTE: I do realize the records  features may seem large for a two node
 setup, but given the memory  cpu, and the fact that I'm willing to give up
 some turnaround time, I don't see why tasks should inexplicably fail)

 Additional parameters include:

 spark.executor.memory = 14G
 spark.default.parallelism = 1
 spark.cores.max=20
 spark.storage.memoryFraction=0.8 //No cache space required
 (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help
 either)

 The model training was initialized as : new LogisticRegressionWithSGD(1,
 maxIterations, 0.0, 0.05)

 However, after 4 iterations of gradient descent, the entire execution
 appeared to stall inexplicably. The corresponding executor details and
 details of the stalled stage (number 14) are as follows:

 MetricMin25th Median75th Max
 Result serialization time12 ms13 ms14 ms16 ms18 ms
 Duration4 s4 s5 s5 s5 s
 Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
 results
 Scheduler delay6 s6 s6 s6 s
 12 s


 Stage Id
 14 aggregate at GradientDescent.scala:178

 Task IndexTask IDStatusLocality Level Executor
 Launch TimeDurationGC Result Ser TimeErrors

 Time

 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
 2014/06/17 10:32:27 1.1 h
 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
 2014/06/17 10:32:27 1.1 h
 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
 2014/06/17 10:32:27 1.1 h
 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
 2014/06/17 10:32:27 1.1 h
 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
 2014/06/17 10:32:27 1.1 h
 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
 2014/06/17 10:32:27 4 s 2 s 12 ms
 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
 2014/06/17 10:32:27 4 s 1 s 14 ms
 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
 2014/06/17 10:32:27 4 s 2 s 12 ms
 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
 2014/06/17 10:32:27 5 s 1 s 15 ms
 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
 2014/06/17 10:32:27 5 s 1 s 14 ms
 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
 2014/06/17 10:32:27 5 s 1 s 15 ms
 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
 2014/06/17 10:32:27 4 s 1 s 13 ms
 12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
 2014/06/17 10:32:27 5 s 1 s 18 ms
 13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
 2014/06/17 10:32:27 5 s 1 s 13 ms
 14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
 2014/06/17 10:32:27 4 s 1 s 14 ms
 15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
 2014/06/17 10:32:27 4 s 1 s 12 ms
 16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
 2014/06/17 10:32:27 5 s 1 s 15 ms
 17 617 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
 2014/06/17 10:32:27 5 s 1 s 18 ms
 18 618 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
 2014/06/17 10:32:27 5 s 1 s 16 ms
 19 619 SUCCESS PROCESS_LOCAL 

Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi DB,

treeReduce (treeAggregate) is a feature I'm testing now. It is a
compromise between current reduce and butterfly allReduce. The former
runs in linear time on the number of partitions, the latter introduces
too many dependencies. treeAggregate with depth = 2 should run in
O(sqrt(n)) time, where n is the number of partitions. It would be
great if someone can help test its scalability.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote:
 Hi Xiangrui,


 (2014/06/18 4:58), Xiangrui Meng wrote:

 How many partitions did you set? If there are too many partitions,
 please do a coalesce before calling ML algorithms.


 The training data news20.random.1000 is small and thus only 2 partitions
 are used by the default.

 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false).

 We also tried 32 partitions as follows but the aggregate never finishes.

 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false, numFeatures = 1354731 , minPartitions = 32)


 Btw, could you try the tree branch in my repo?
 https://github.com/mengxr/spark/tree/tree

 I used tree aggregate in this branch. It should help with the scalability.


 Is treeAggregate itself available on Spark 1.0?

 I wonder.. Could I test your modification just by running the following code
 on REPL?

 ---
 val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
 .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
   seqOp = (c, v) = (c, v) match { case ((grad, loss), (label,
 features)) =
 val l = gradient.compute(features, label, weights,
 Vectors.fromBreeze(grad))
 (grad, loss + l)
   },
   combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1),
 (grad2, loss2)) =
 (grad1 += grad2, loss1 + loss2)
   }, 2)
 -

 Rebuilding Spark is quite something to do evaluation.

 Thanks,
 Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi Makoto,

Are you using Spark 1.0 or 0.9? Could you go to the executor tab of
the web UI and check the driver's memory?

treeAggregate is not part of 1.0.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote:
 Hi DB,

 treeReduce (treeAggregate) is a feature I'm testing now. It is a
 compromise between current reduce and butterfly allReduce. The former
 runs in linear time on the number of partitions, the latter introduces
 too many dependencies. treeAggregate with depth = 2 should run in
 O(sqrt(n)) time, where n is the number of partitions. It would be
 great if someone can help test its scalability.

 Best,
 Xiangrui

 On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote:
 Hi Xiangrui,


 (2014/06/18 4:58), Xiangrui Meng wrote:

 How many partitions did you set? If there are too many partitions,
 please do a coalesce before calling ML algorithms.


 The training data news20.random.1000 is small and thus only 2 partitions
 are used by the default.

 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false).

 We also tried 32 partitions as follows but the aggregate never finishes.

 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false, numFeatures = 1354731 , minPartitions = 32)


 Btw, could you try the tree branch in my repo?
 https://github.com/mengxr/spark/tree/tree

 I used tree aggregate in this branch. It should help with the scalability.


 Is treeAggregate itself available on Spark 1.0?

 I wonder.. Could I test your modification just by running the following code
 on REPL?

 ---
 val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
 .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
   seqOp = (c, v) = (c, v) match { case ((grad, loss), (label,
 features)) =
 val l = gradient.compute(features, label, weights,
 Vectors.fromBreeze(grad))
 (grad, loss + l)
   },
   combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1),
 (grad2, loss2)) =
 (grad1 += grad2, loss1 + loss2)
   }, 2)
 -

 Rebuilding Spark is quite something to do evaluation.

 Thanks,
 Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
DB, Yes, reduce and aggregate are linear.

Makoto, dense vectors are used to in aggregation. If you have 32
partitions and each one sending a dense vector of size 1,354,731 to
master. Then the driver needs 300M+. That may be the problem. Which
deploy mode are you using, standalone or local?

Debasish, there is an old PR for butterfly allreduce. However, it
doesn't seem to be the right way to go for Spark. I just sent out the
PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it
needs more testing before we are confident to merge it. It would be
great if you can help test it.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das debasish.da...@gmail.com wrote:
 Xiangrui,

 Could you point to the JIRA related to tree aggregate ? ...sounds like the
 allreduce idea...

 I would definitely like to try it on our dataset...

 Makoto,

 I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got
 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB
 memory...

 Although the best result on the same dataset came out of liblinear and
 BFGS-L1 out of box...so I did not tune the SGD further on learning rate and
 other heuristics...it was arnd 5% off...

 Thanks.
 Deb



 On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai dbt...@stanford.edu wrote:

 Hi Xiangrui,

 Does it mean that mapPartition and then reduce shares the same
 behavior as aggregate operation which is O(n)?

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote:
  Hi DB,
 
  treeReduce (treeAggregate) is a feature I'm testing now. It is a
  compromise between current reduce and butterfly allReduce. The former
  runs in linear time on the number of partitions, the latter introduces
  too many dependencies. treeAggregate with depth = 2 should run in
  O(sqrt(n)) time, where n is the number of partitions. It would be
  great if someone can help test its scalability.
 
  Best,
  Xiangrui
 
  On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote:
  Hi Xiangrui,
 
 
  (2014/06/18 4:58), Xiangrui Meng wrote:
 
  How many partitions did you set? If there are too many partitions,
  please do a coalesce before calling ML algorithms.
 
 
  The training data news20.random.1000 is small and thus only 2
  partitions
  are used by the default.
 
  val training = MLUtils.loadLibSVMFile(sc,
  hdfs://host:8020/dataset/news20-binary/news20.random.1000,
  multiclass=false).
 
  We also tried 32 partitions as follows but the aggregate never
  finishes.
 
  val training = MLUtils.loadLibSVMFile(sc,
  hdfs://host:8020/dataset/news20-binary/news20.random.1000,
  multiclass=false, numFeatures = 1354731 , minPartitions = 32)
 
 
  Btw, could you try the tree branch in my repo?
  https://github.com/mengxr/spark/tree/tree
 
  I used tree aggregate in this branch. It should help with the
  scalability.
 
 
  Is treeAggregate itself available on Spark 1.0?
 
  I wonder.. Could I test your modification just by running the following
  code
  on REPL?
 
  ---
  val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 +
  i)
  .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
seqOp = (c, v) = (c, v) match { case ((grad, loss), (label,
  features)) =
  val l = gradient.compute(features, label, weights,
  Vectors.fromBreeze(grad))
  (grad, loss + l)
},
combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1),
  (grad2, loss2)) =
  (grad1 += grad2, loss1 + loss2)
}, 2)
  -
 
  Rebuilding Spark is quite something to do evaluation.
 
  Thanks,
  Makoto




Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Makoto, please use --driver-memory 8G when you launch spark-shell. -Xiangrui

On Tue, Jun 17, 2014 at 4:49 PM, Xiangrui Meng men...@gmail.com wrote:
 DB, Yes, reduce and aggregate are linear.

 Makoto, dense vectors are used to in aggregation. If you have 32
 partitions and each one sending a dense vector of size 1,354,731 to
 master. Then the driver needs 300M+. That may be the problem. Which
 deploy mode are you using, standalone or local?

 Debasish, there is an old PR for butterfly allreduce. However, it
 doesn't seem to be the right way to go for Spark. I just sent out the
 PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it
 needs more testing before we are confident to merge it. It would be
 great if you can help test it.

 Best,
 Xiangrui

 On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das debasish.da...@gmail.com 
 wrote:
 Xiangrui,

 Could you point to the JIRA related to tree aggregate ? ...sounds like the
 allreduce idea...

 I would definitely like to try it on our dataset...

 Makoto,

 I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got
 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB
 memory...

 Although the best result on the same dataset came out of liblinear and
 BFGS-L1 out of box...so I did not tune the SGD further on learning rate and
 other heuristics...it was arnd 5% off...

 Thanks.
 Deb



 On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai dbt...@stanford.edu wrote:

 Hi Xiangrui,

 Does it mean that mapPartition and then reduce shares the same
 behavior as aggregate operation which is O(n)?

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote:
  Hi DB,
 
  treeReduce (treeAggregate) is a feature I'm testing now. It is a
  compromise between current reduce and butterfly allReduce. The former
  runs in linear time on the number of partitions, the latter introduces
  too many dependencies. treeAggregate with depth = 2 should run in
  O(sqrt(n)) time, where n is the number of partitions. It would be
  great if someone can help test its scalability.
 
  Best,
  Xiangrui
 
  On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote:
  Hi Xiangrui,
 
 
  (2014/06/18 4:58), Xiangrui Meng wrote:
 
  How many partitions did you set? If there are too many partitions,
  please do a coalesce before calling ML algorithms.
 
 
  The training data news20.random.1000 is small and thus only 2
  partitions
  are used by the default.
 
  val training = MLUtils.loadLibSVMFile(sc,
  hdfs://host:8020/dataset/news20-binary/news20.random.1000,
  multiclass=false).
 
  We also tried 32 partitions as follows but the aggregate never
  finishes.
 
  val training = MLUtils.loadLibSVMFile(sc,
  hdfs://host:8020/dataset/news20-binary/news20.random.1000,
  multiclass=false, numFeatures = 1354731 , minPartitions = 32)
 
 
  Btw, could you try the tree branch in my repo?
  https://github.com/mengxr/spark/tree/tree
 
  I used tree aggregate in this branch. It should help with the
  scalability.
 
 
  Is treeAggregate itself available on Spark 1.0?
 
  I wonder.. Could I test your modification just by running the following
  code
  on REPL?
 
  ---
  val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 +
  i)
  .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
seqOp = (c, v) = (c, v) match { case ((grad, loss), (label,
  features)) =
  val l = gradient.compute(features, label, weights,
  Vectors.fromBreeze(grad))
  (grad, loss + l)
},
combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1),
  (grad2, loss2)) =
  (grad1 += grad2, loss1 + loss2)
}, 2)
  -
 
  Rebuilding Spark is quite something to do evaluation.
 
  Thanks,
  Makoto




Re: Execution stalls in LogisticRegressionWithSGD

2014-06-18 Thread Xiangrui Meng
Hi Bharath,

This is related to SPARK-1112, which we already found the root cause.
I will let you know when this is fixed.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar reachb...@gmail.com wrote:
 Couple more points:
 1)The inexplicable stalling of execution with large feature sets appears
 similar to that reported with the news-20 dataset:
 http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E

 2) The NPE trying to call mapToPair convert an RDDLong, Long, Integer,
 Integer into a JavaPairRDDTuple2Long,Long, Tuple2Integer,Integer is
 unrelated to mllib.

 Thanks,
 Bharath



 On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:

 Hi  Xiangrui ,

 I'm using 1.0.0.

 Thanks,
 Bharath

 On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote:

 Hi Bharath,

 Thanks for posting the details! Which Spark version are you using?

 Best,
 Xiangrui

 On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Hi,
 
  (Apologies for the long mail, but it's necessary to provide sufficient
  details considering the number of issues faced.)
 
  I'm running into issues testing LogisticRegressionWithSGD a two node
  cluster
  (each node with 24 cores and 16G available to slaves out of 24G on the
  system). Here's a description of the application:
 
  The model is being trained based on categorical features x, y, and
  (x,y).
  The categorical features are mapped to binary features by converting
  each
  distinct value in the category enum into a binary feature by itself
  (i.e
  presence of that value in a record implies corresponding feature = 1,
  else
  feature = 0. So, there'd be as many distinct features as enum values) .
  The
  training vector is laid out as
  [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
  training data has only one combination (Xk,Yk) and a label appearing in
  the
  record. Thus, the corresponding labeledpoint sparse vector would only
  have 3
  values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
  (though parse) would be nearly 614000.  The number of records is about
  1.33
  million. The records have been coalesced into 20 partitions across two
  nodes. The input data has not been cached.
  (NOTE: I do realize the records  features may seem large for a two
  node
  setup, but given the memory  cpu, and the fact that I'm willing to
  give up
  some turnaround time, I don't see why tasks should inexplicably fail)
 
  Additional parameters include:
 
  spark.executor.memory = 14G
  spark.default.parallelism = 1
  spark.cores.max=20
  spark.storage.memoryFraction=0.8 //No cache space required
  (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't
  help
  either)
 
  The model training was initialized as : new
  LogisticRegressionWithSGD(1,
  maxIterations, 0.0, 0.05)
 
  However, after 4 iterations of gradient descent, the entire execution
  appeared to stall inexplicably. The corresponding executor details and
  details of the stalled stage (number 14) are as follows:
 
  MetricMin25th Median75th
  Max
  Result serialization time12 ms13 ms14 ms16 ms18 ms
  Duration4 s4 s5 s5 s
  5 s
  Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
  results
  Scheduler delay6 s6 s6 s6 s
  12 s
 
 
  Stage Id
  14 aggregate at GradientDescent.scala:178
 
  Task IndexTask IDStatusLocality Level Executor
  Launch TimeDurationGC Result Ser TimeErrors
 
  Time
 
  0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 1.1 h
  5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 2 s 12 ms
  6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 1 s 14 ms
  7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 4 s 2 s 12 ms
  8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 15 ms
  9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 14 ms
  10 610 SUCCESS PROCESS_LOCAL
  serious.dataone.foo.bar.com
  2014/06/17 10:32:27 5 s 1 s 15 ms
  11 611 SUCCESS PROCESS_LOCAL

Re: Contribution to Spark MLLib

2014-06-19 Thread Xiangrui Meng
Denis, I think it is fine to have PLSA in MLlib. But I'm not familiar
with the modification you mentioned since the paper is new. We may
need to spend more time to learn the trade-offs. Feel free to create a
JIRA for PLSA and we can move our discussion there. It would be great
if you can share your current implementation. So it is easy for
developers to join the discussion.

Jayati, it is certainly NOT mandatory. But if you want to contribute
something new, please create a JIRA first.

Best,
Xiangrui


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-19 Thread Xiangrui Meng
It is because the frame size is not set correctly in executor backend. see 
spark-1112 . We are going to fix it in v1.0.1 . Did you try the treeAggregate?

 On Jun 19, 2014, at 2:01 AM, Makoto Yui yuin...@gmail.com wrote:
 
 Xiangrui and Debasish,
 
 (2014/06/18 6:33), Debasish Das wrote:
 I did run pretty big sparse dataset (20M rows, 3M sparse features) and I
 got 100 iterations of SGD running in 200 seconds...10 executors each
 with 16 GB memory...
 
 I could figure out what the problem is. spark.akka.frameSize was too large. 
 By setting spark.akka.frameSize=10, it worked for the news20 dataset.
 
 The execution was slow for more large KDD cup 2012, Track 2 dataset (235M+ 
 records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the 
 sequential aggregation of dense vectors on a single driver node.
 
 It took about 7.6m for aggregation for an iteration.
 
 Thanks,
 Makoto


Re: Anything like grid search available for mlbase?

2014-06-20 Thread Xiangrui Meng
This is a planned feature for v1.1. I'm going to work on it after v1.0.1 
release. -Xiangrui

 On Jun 20, 2014, at 6:46 AM, Charles Earl charles.ce...@gmail.com wrote:
 
 Looking for something like scikit's grid search module.
 C


Re: Performance problems on SQL JOIN

2014-06-20 Thread Xiangrui Meng
Your data source is S3 and data is used twice. m1.large does not have very good 
network performance. Please try file.count() and see how fast it goes. -Xiangrui

 On Jun 20, 2014, at 8:16 AM, mathias math...@socialsignificance.co.uk wrote:
 
 Hi there,
 
 We're trying out Spark and are experiencing some performance issues using
 Spark SQL.
 Anyone who can tell us if our results are normal?
 
 We are using the Amazon EC2 scripts to create a cluster with 3
 workers/executors (m1.large).
 Tried both spark 1.0.0 as well as the git master; the Scala as well as the
 Python shells.
 
 Running the following code takes about 5 minutes, which seems a long time
 for this query.
 
 val file = sc.textFile(s3n:// ...  .csv);
 val data = file.map(x = x.split('|')); // 300k rows
 
 case class BookingInfo(num_rooms: String, hotelId: String, toDate: String,
 ...);
 val rooms2 = data.filter(x = x(0) == 2).map(x = BookingInfo(x(0), x(1),
 ... , x(9))); // 50k rows
 val rooms3 = data.filter(x = x(0) == 3).map(x = BookingInfo(x(0), x(1),
 ... , x(9))); // 30k rows
 
 rooms2.registerAsTable(rooms2);
 cacheTable(rooms2);
 rooms3.registerAsTable(rooms3);
 cacheTable(rooms3);
 
 sql(SELECT * FROM rooms2 LEFT JOIN rooms3 ON rooms2.hotelId =
 rooms3.hotelId AND rooms2.toDate = rooms3.toDate).count();
 
 
 Are we doing something wrong here?
 Thanks!
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Performance-problems-on-SQL-JOIN-tp8001.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Improving Spark multithreaded performance?

2014-06-27 Thread Xiangrui Meng
Hi Kyle,

A few questions:

1) Did you use `setIntercept(true)`?
2) How many features?

I'm a little worried about driver's load because the final aggregation
and weights update happen on the driver. Did you check driver's memory
usage as well?

Best,
Xiangrui

On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote:
 As far as I can tell there are is no data to broadcast (unless there is
 something internal to mllib that needs to be broadcast) I've coalesced the
 input RDDs to keep the number of partitions limited. When running, I've
 tried to get up to 500 concurrent stages, and I've coalesced the RDDs down
 to 2 partitions, so about 1000 tasks.
 Despite having over 500 threads in the threadpool working on mllib tasks,
 the total CPU usage never really goes above 150%.
 I've tried increasing 'spark.akka.threads' but that doesn't seem to do
 anything.

 My one thought would be that maybe because I'm using MLUtils.kFold to
 generate the RDDs is that because I have so many tasks working off RDDs that
 are permutations of original RDDs that maybe that is creating some sort of
 dependency bottleneck.

 Kyle


 On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com wrote:

 I don't have specific solutions for you, but the general things to try
 are:

 - Decrease task size by broadcasting any non-trivial objects.
 - Increase duration of tasks by making them less fine-grained.

 How many tasks are you sending? I've seen in the past something like 25
 seconds for ~10k total medium-sized tasks.


 On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu
 wrote:

 I'm working to set up a calculation that involves calling mllib's
 SVMWithSGD.train several thousand times on different permutations of the
 data. I'm trying to run the separate jobs using a threadpool to dispatch the
 different requests to a spark context connected a Mesos's cluster, using
 course scheduling, and a max of 2000 cores on Spark 1.0.
 Total utilization of the system is terrible. Most of the 'aggregate at
 GradientDescent.scala:178' stages(where mllib spends most of its time) take
 about 3 seconds, but have ~25 seconds of scheduler delay time.
 What kind of things can I do to improve this?

 Kyle





Re: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-06-27 Thread Xiangrui Meng
Try to use --executor-memory 12g with spark-summit. Or you can set it
in conf/spark-defaults.properties and rsync it to all workers and then
restart. -Xiangrui

On Fri, Jun 27, 2014 at 1:05 PM, Peng Cheng pc...@uow.edu.au wrote:
 I give up, communication must be blocked by the complex EC2 network topology
 (though the error information indeed need some improvement). It doesn't make
 sense to run a client thousands miles away to communicate frequently with
 workers. I have moved everything to EC2 now.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247p8444.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: TaskNotSerializable when invoking KMeans.run

2014-06-30 Thread Xiangrui Meng
Could you post the code snippet and the error stack trace? -Xiangrui

On Mon, Jun 30, 2014 at 7:03 AM, Daniel Micol dmi...@gmail.com wrote:
 Hello,

 I’m trying to use KMeans with MLLib but am getting a TaskNotSerializable
 error. I’m using Spark 0.9.1 and invoking the KMeans.run method with k = 2
 and numPartitions = 200. Has anyone seen this error before and know what
 could be the reason for this?

 Thanks,

 Daniel


Re: Spark 1.0 and Logistic Regression Python Example

2014-06-30 Thread Xiangrui Meng
You were using an old version of numpy, 1.4? I think this is fixed in
the latest master. Try to replace vec.dot(target) by numpy.dot(vec,
target), or use the latest master. -Xiangrui

On Mon, Jun 30, 2014 at 2:04 PM, Sam Jacobs sam.jac...@us.abb.com wrote:
 Hi,


 I modified the example code for logistic regression to compute the error in
 classification. Please see below. However the code is failing when it makes
 a call to:


 labelsAndPreds.filter(lambda (v, p): v != p).count()


 with the error message (something related to numpy or dot product):


 File /opt/spark-1.0.0-bin-hadoop2/python/pyspark/mllib/classification.py,
 line 65, in predict

 margin = _dot(x, self._coeff) + self._intercept

   File /opt/spark-1.0.0-bin-hadoop2/python/pyspark/mllib/_common.py, line
 443, in _dot

 return vec.dot(target)

 AttributeError: 'numpy.ndarray' object has no attribute 'dot'


 FYI, I am running the code using spark-submit i.e.


 ./bin/spark-submit examples/src/main/python/mllib/logistic_regression2.py



 The code is posted below if it will be useful in any way:


 from math import exp

 import sys
 import time

 from pyspark import SparkContext

 from pyspark.mllib.classification import LogisticRegressionWithSGD
 from pyspark.mllib.regression import LabeledPoint
 from numpy import array


 # Load and parse the data
 def parsePoint(line):
 values = [float(x) for x in line.split(',')]
 if values[0] == -1:   # Convert -1 labels to 0 for MLlib
 values[0] = 0
 return LabeledPoint(values[0], values[1:])

 sc = SparkContext(appName=PythonLR)
 # start timing
 start = time.time()
 #start = time.clock()

 data = sc.textFile(sWAMSpark_train.csv)
 parsedData = data.map(parsePoint)

 # Build the model
 model = LogisticRegressionWithSGD.train(parsedData)

 #load test data

 testdata = sc.textFile(sWSpark_test.csv)
 parsedTestData = testdata.map(parsePoint)

 # Evaluating the model on test data
 labelsAndPreds = parsedTestData.map(lambda p: (p.label,
 model.predict(p.features)))
 trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() /
 float(parsedData.count())
 print(Training Error =  + str(trainErr))
 end = time.time()
 print(Time is =  + str(end - start))









Re: SparkKMeans.scala from examples will show: NoClassDefFoundError: breeze/linalg/Vector

2014-07-01 Thread Xiangrui Meng
You can use either bin/run-example or bin/spark-summit to run example
code. scalac -d classes/ SparkKMeans.scala doesn't recognize Spark
classpath. There are examples in the official doc:
http://spark.apache.org/docs/latest/quick-start.html#where-to-go-from-here
-Xiangrui

On Tue, Jul 1, 2014 at 4:39 AM, Wanda Hawk wanda_haw...@yahoo.com wrote:
 Hello,

 I have installed spark-1.0.0 with scala2.10.3. I have built spark with
 sbt/sbt assembly and added
 /home/wanda/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar
 to my CLASSPATH variable.
 Then I went here
 ../spark-1.0.0/examples/src/main/scala/org/apache/spark/examples created a
 new directory classes and compiled SparkKMeans.scala with scalac -d
 classes/ SparkKMeans.scala
 Then I navigated to classes (I commented this line in the scala file :
 package org.apache.spark.examples ) and tried to run it with java -cp .
 SparkKMeans and I get the following error:
 Exception in thread main java.lang.NoClassDefFoundError:
 breeze/linalg/Vector
 at java.lang.Class.getDeclaredMethods0(Native Method)
 at java.lang.Class.privateGetDeclaredMethods(Class.java:2531)
 at java.lang.Class.getMethod0(Class.java:2774)
 at java.lang.Class.getMethod(Class.java:1663)
 at
 sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
 at
 sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
 Caused by: java.lang.ClassNotFoundException: breeze.linalg.Vector
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 6 more
 
 The jar under
 /home/wanda/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar
 contains the breeze/linalg/Vector* path, I even tried to unpack it and put
 it in CLASSPATH to it does not seem to pick it up


 I am currently running java 1.8
 java version 1.8.0_05
 Java(TM) SE Runtime Environment (build 1.8.0_05-b13)
 Java HotSpot(TM) 64-Bit Server VM (build 25.5-b02, mixed mode)

 What I am doing wrong ?



Re: Questions about disk IOs

2014-07-01 Thread Xiangrui Meng
Try to reduce number of partitions to match the number of cores. We
will add treeAggregate to reduce the communication cost.

PR: https://github.com/apache/spark/pull/1110

-Xiangrui

On Tue, Jul 1, 2014 at 12:55 AM, Charles Li littlee1...@gmail.com wrote:
 Hi Spark,

 I am running LBFGS on our user data. The data size with Kryo serialisation is 
 about 210G. The weight size is around 1,300,000. I am quite confused that the 
 performance is very close whether the data is cached or not.

 The program is simple:
 points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..)
 points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not cached
 gradient = new LogisticGrandient();
 updater = new SquaredL2Updater();
 initWeight = Vectors.sparse(size, new int[]{}, new double[]{})
 result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, 
 convergeTol, maxIter, regParam, initWeight);

 I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its 
 cluster mode. Below are some arguments I am using:
 —executor-memory 10G
 —num-executors 50
 —executor-cores 2

 Storage Using:
 When caching:
 Cached Partitions 951
 Fraction Cached 100%
 Size in Memory 215.7GB
 Size in Tachyon 0.0B
 Size on Disk 1029.7MB

 The time cost by every aggregate is around 5 minutes with cache enabled. Lots 
 of disk IOs can be seen on the hadoop node. I have the same result with cache 
 disabled.

 Should data points caching improve the performance? Should caching decrease 
 the disk IO?

 Thanks in advance.


Re: why is toBreeze private everywhere in mllib?

2014-07-01 Thread Xiangrui Meng
We were not ready to expose it as a public API in v1.0. Both breeze
and MLlib are in rapid development. It would be possible to expose it
as a developer API in v1.1. For now, it should be easy to define a
toBreeze method in your own project. -Xiangrui

On Tue, Jul 1, 2014 at 12:17 PM, Koert Kuipers ko...@tresata.com wrote:
 its kind of handy to be able to convert stuff to breeze... is there some
 other way i am supposed to access that functionality?


Re: One question about RDD.zip function when trying Naive Bayes

2014-07-02 Thread Xiangrui Meng
This is due to a bug in sampling, which was fixed in 1.0.1 and latest
master. See https://github.com/apache/spark/pull/1234 . -Xiangrui

On Wed, Jul 2, 2014 at 8:23 PM, x wasedax...@gmail.com wrote:
 Hello,

 I a newbie to Spark MLlib and ran into a curious case when following the
 instruction at the page below.

 http://spark.apache.org/docs/latest/mllib-naive-bayes.html

 I ran a test program on my local machine using some data.

 val spConfig = (new
 SparkConf).setMaster(local).setAppName(SparkNaiveBayes)
 val sc = new SparkContext(spConfig)

 The test data was as follows and there were three lableled categories I
 wanted to predict.

  1  LabeledPoint(0.0, [4.9,3.0,1.4,0.2])
  2  LabeledPoint(0.0, [4.6,3.4,1.4,0.3])
  3  LabeledPoint(0.0, [5.7,4.4,1.5,0.4])
  4  LabeledPoint(0.0, [5.2,3.4,1.4,0.2])
  5  LabeledPoint(0.0, [4.7,3.2,1.6,0.2])
  6  LabeledPoint(0.0, [4.8,3.1,1.6,0.2])
  7  LabeledPoint(0.0, [5.1,3.8,1.9,0.4])
  8  LabeledPoint(0.0, [4.8,3.0,1.4,0.3])
  9  LabeledPoint(0.0, [5.0,3.3,1.4,0.2])
 10  LabeledPoint(1.0, [6.6,2.9,4.6,1.3])
 11  LabeledPoint(1.0, [5.2,2.7,3.9,1.4])
 12  LabeledPoint(1.0, [5.6,2.5,3.9,1.1])
 13  LabeledPoint(1.0, [6.4,2.9,4.3,1.3])
 14  LabeledPoint(1.0, [6.6,3.0,4.4,1.4])
 15  LabeledPoint(1.0, [6.0,2.7,5.1,1.6])
 16  LabeledPoint(1.0, [5.5,2.6,4.4,1.2])
 17  LabeledPoint(1.0, [5.8,2.6,4.0,1.2])
 18  LabeledPoint(1.0, [5.7,2.9,4.2,1.3])
 19  LabeledPoint(1.0, [5.7,2.8,4.1,1.3])
 20  LabeledPoint(2.0, [6.3,2.9,5.6,1.8])
 21  LabeledPoint(2.0, [6.5,3.0,5.8,2.2])
 22  LabeledPoint(2.0, [6.5,3.0,5.5,1.8])
 23  LabeledPoint(2.0, [6.7,3.3,5.7,2.1])
 24  LabeledPoint(2.0, [7.4,2.8,6.1,1.9])
 25  LabeledPoint(2.0, [6.3,3.4,5.6,2.4])
 26  LabeledPoint(2.0, [6.0,3.0,4.8,1.8])
 27  LabeledPoint(2.0, [6.8,3.2,5.9,2.3])

 The predicted result via NaiveBayes is below. Comparing to test data, only
 two predicted results(#11 and #15) were different.

  1  0.0
  2  0.0
  3  0.0
  4  0.0
  5  0.0
  6  0.0
  7  0.0
  8  0.0
  9  0.0
 10  1.0
 11  2.0
 12  1.0
 13  1.0
 14  1.0
 15  2.0
 16  1.0
 17  1.0
 18  1.0
 19  1.0
 20  2.0
 21  2.0
 22  2.0
 23  2.0
 24  2.0
 25  2.0
 26  2.0
 27  2.0

 After grouping test RDD and predicted RDD via zip I got this.

  1  (0.0,0.0)
  2  (0.0,0.0)
  3  (0.0,0.0)
  4  (0.0,0.0)
  5  (0.0,0.0)
  6  (0.0,0.0)
  7  (0.0,0.0)
  8  (0.0,0.0)
  9  (0.0,1.0)
 10  (0.0,1.0)
 11  (0.0,1.0)
 12  (1.0,1.0)
 13  (1.0,1.0)
 14  (2.0,1.0)
 15  (1.0,1.0)
 16  (1.0,2.0)
 17  (1.0,2.0)
 18  (1.0,2.0)
 19  (1.0,2.0)
 20  (2.0,2.0)
 21  (2.0,2.0)
 22  (2.0,2.0)
 23  (2.0,2.0)
 24  (2.0,2.0)
 25  (2.0,2.0)

 I expected there were 27 pairs but I saw two results were lost.
 Could someone please point out what I missed something here?

 Regards,
 xj


Re: MLLib : Math on Vector and Matrix

2014-07-03 Thread Xiangrui Meng
Hi Thunder,

Please understand that both MLlib and breeze are in active
development. Before v1.0, we used jblas but in the public APIs we only
exposed Array[Double]. In v1.0, we introduced Vector that supports
both dense and sparse data and switched the backend to
breeze/netlib-java (except ALS). We only used few breeze methods in
our implementation and we benchmarked them one by one. It was hard to
foresee problems caused by including breeze at that time, for example,
https://issues.apache.org/jira/browse/SPARK-1520. Being conservative
in v1.0 was not a bad choice. We should benchmark breeze v0.8.1 for
v1.1 and consider make toBreeze a developer API. For now, if you are
migrating code from v0.9, you can use `Vector.toArray` to get the
value array. Sorry for the inconvenience!

Best,
Xiangrui

On Wed, Jul 2, 2014 at 2:42 PM, Dmitriy Lyubimov dlie...@gmail.com wrote:
 in my humble opinion Spark should've supported linalg a-la [1] before it
 even started dumping methodologies into mllib.

 [1] http://mahout.apache.org/users/sparkbindings/home.html


 On Wed, Jul 2, 2014 at 2:16 PM, Thunder Stumpges
 thunder.stump...@gmail.com wrote:

 Thanks. I always hate having to do stuff like this. It seems like they
 went a bit overboard with all the private[mllib] declarations... possibly
 all in the name of thou shalt not change your public API. If you don't
 make your public API usable, we end up having to work around it anyway...

 Oh well.

 Thunder



 On Wed, Jul 2, 2014 at 2:05 PM, Koert Kuipers ko...@tresata.com wrote:

 i did the second option: re-implemented .toBreeze as .breeze using pimp
 classes


 On Wed, Jul 2, 2014 at 5:00 PM, Thunder Stumpges
 thunder.stump...@gmail.com wrote:

 I am upgrading from Spark 0.9.0 to 1.0 and I had a pretty good amount of
 code working with internals of MLLib. One of the big changes was the move
 from the old jblas.Matrix to the Vector/Matrix classes included in MLLib.

 However I don't see how we're supposed to use them for ANYTHING other
 than a container for passing data to the included APIs... how do we do any
 math on them? Looking at the internal code, there are quite a number of
 private[mllib] declarations including access to the Breeze representations
 of the classes.

 Was there a good reason this was not exposed? I could see maybe not
 wanting to expose the 'toBreeze' function which would tie it to the breeze
 implementation, however it would be nice to have the various mathematics
 wrapped at least.

 Right now I see no way to code any vector/matrix math without moving my
 code namespaces into org.apache.spark.mllib or duplicating the code in
 'toBreeze' in my own util functions. Not very appealing.

 What are others doing?
 thanks,
 Thunder






Re: MLLib : Math on Vector and Matrix

2014-07-03 Thread Xiangrui Meng
Hi Dmitriy,

It is sweet to have the bindings, but it is very easy to downgrade the
performance with them. The BLAS/LAPACK APIs have been there for more
than 20 years and they are still the top choice for high-performance
linear algebra. I'm thinking about whether it is possible to make the
evaluation lazy in bindings. For example,

y += a * x

can be translated to an AXPY call instead of creating a temporary
vector for a*x. There were some work in C++ but none achieved good
performance. I'm not sure whether this is a good direction to explore.

Best,
Xiangrui


Re: Execution stalls in LogisticRegressionWithSGD

2014-07-06 Thread Xiangrui Meng
 at
 /tmp/spark-local-20140704062240-6a65
 14/07/04 06:22:40 INFO MemoryStore: MemoryStore started with capacity 6.7
 GB.
 14/07/04 06:22:40 INFO ConnectionManager: Bound socket to port 46901 with id
 = ConnectionManagerId(slave1,46901)
 14/07/04 06:22:40 INFO BlockManagerMaster: Trying to register BlockManager
 14/07/04 06:22:40 INFO BlockManagerMaster: Registered BlockManager
 14/07/04 06:22:40 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-9eba78f9-8ae9-477c-9338-7222ae6fe306
 14/07/04 06:22:40 INFO HttpServer: Starting HTTP Server
 14/07/04 06:22:42 INFO CoarseGrainedExecutorBackend: Got assigned task 0
 14/07/04 06:22:42 INFO Executor: Running task ID 0
 14/07/04 06:22:42 INFO CoarseGrainedExecutorBackend: Got assigned task 2
 14/07/04 06:22:42 INFO Executor: Running task ID 2
 ...



 On Fri, Jul 4, 2014 at 5:52 AM, Xiangrui Meng men...@gmail.com wrote:

 The feature dimension is small. You don't need a big akka.frameSize.
 The default one (10M) should be sufficient. Did you cache the data
 before calling LRWithSGD? -Xiangrui

 On Thu, Jul 3, 2014 at 10:02 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  I tried another run after setting the driver memory to 8G (and
  spark.akka.frameSize = 500 on the executors and the driver). In
  addition, I
  also tried to reduce the amount of data that a single task processes, by
  increasing the number of partitions (of the labeled points) to 120
  (instead
  of 2 used earlier), and then setting max cores to 2. That made no
  difference
  since, at the end of 120 tasks, the familiar error message appeared on a
  slave:
 
  snipped earlier logs
  14/07/03 16:18:48 INFO CoarseGrainedExecutorBackend: Got assigned task
  1436
  14/07/03 16:18:48 INFO Executor: Running task ID 1436
  14/07/03 16:18:53 INFO HadoopRDD: Input split:
  file:~//2014-05-24-02/part-r-00014:0+2215337
  14/07/03 16:18:54 INFO HadoopRDD: Input split:
  file:~//2014-05-24-02/part-r-00014:2215337+2215338
  14/07/03 16:18:54 INFO HadoopRDD: Input split:
  file:~//2014-05-24-02/part-r-3:0+2196429
  14/07/03 16:18:54 INFO HadoopRDD: Input split:
  file:~//2014-05-24-02/part-r-3:2196429+2196430
  14/07/03 16:18:54 INFO HadoopRDD: Input split:
  file:~//2014-05-24-02/part-r-00010:0+2186751
  14/07/03 16:18:54 INFO HadoopRDD: Input split:
  file:~//2014-05-24-02/part-r-00010:2186751+2186751
  14/07/03 16:18:54 INFO Executor: Serialized size of result for 1436 is
  5958822
  14/07/03 16:18:54 INFO Executor: Sending result for 1436 directly to
  driver
  14/07/03 16:18:54 INFO Executor: Finished task ID 1436
  14/07/03 16:18:54 INFO CoarseGrainedExecutorBackend: Got assigned task
  1438
  14/07/03 16:18:54 INFO Executor: Running task ID 1438
  14/07/03 16:19:00 INFO HadoopRDD: Input split:
  file:~//2014-05-24-02/part-r-4:0+2209615
  14/07/03 16:19:00 INFO HadoopRDD: Input split:
  file:~//2014-05-24-02/part-r-4:2209615+2209616
  14/07/03 16:19:00 INFO HadoopRDD: Input split:
  file:~//2014-05-24-02/part-r-00011:0+2202240
  14/07/03 16:19:00 INFO HadoopRDD: Input split:
  file:~//2014-05-24-02/part-r-00011:2202240+2202240
  14/07/03 16:19:00 INFO HadoopRDD: Input split:
  file:~//2014-05-24-02/part-r-9:0+2194423
  14/07/03 16:19:00 INFO HadoopRDD: Input split:
  file:~//2014-05-24-02/part-r-9:2194423+2194424
  14/07/03 16:19:00 INFO Executor: Serialized size of result for 1438 is
  5958822
  14/07/03 16:19:00 INFO Executor: Sending result for 1438 directly to
  driver
  14/07/03 16:19:00 INFO Executor: Finished task ID 1438
  14/07/03 16:19:14 ERROR CoarseGrainedExecutorBackend: Driver
  Disassociated
  [akka.tcp://sparkExecutor@slave1:51099] -
  [akka.tcp://spark@master:58272]
  disassociated! Shutting down.
 
 
  The corresponding master logs were:
 
  4/07/03 16:02:14 INFO Master: Registering app LogRegExp
  14/07/03 16:02:14 INFO Master: Registered app LogRegExp with ID
  app-20140703160214-0028
  14/07/03 16:02:14 INFO Master: Launching executor
  app-20140703160214-0028/1
  on worker worker-20140630124441-slave1-40182
  14/07/03 16:19:15 INFO Master: Removing executor
  app-20140703160214-0028/1
  because it is EXITED
  14/07/03 16:19:15 INFO Master: Launching executor
  app-20140703160214-0028/2
  on worker worker-20140630124441-slave1-40182
  14/07/03 16:19:15 INFO Master: Removing executor
  app-20140703160214-0028/0
  because it is EXITED
  14/07/03 16:19:15 INFO Master: Launching executor
  app-20140703160214-0028/3
  on worker worker-20140630102913-slave2-44735
  14/07/03 16:19:18 INFO Master: Removing executor
  app-20140703160214-0028/2
  because it is EXITED
  14/07/03 16:19:18 INFO Master: Launching executor
  app-20140703160214-0028/4
  on worker worker-20140630124441-slave1-40182
  14/07/03 16:19:18 INFO Master: Removing executor
  app-20140703160214-0028/3
  because it is EXITED
  14/07/03 16:19:18 INFO Master: Launching executor
  app-20140703160214-0028/5
  on worker worker-20140630102913-slave2-44735
  14/07/03 16:19:20 INFO Master

Re: Execution stalls in LogisticRegressionWithSGD

2014-07-07 Thread Xiangrui Meng
It seems to me a setup issue. I just tested news20.binary (1355191
features) on a 2-node EC2 cluster and it worked well. I added one line
to conf/spark-env.sh:

export SPARK_JAVA_OPTS= -Dspark.akka.frameSize=20 

and launched spark-shell with --driver-memory 20g. Could you re-try
with an EC2 setup? If it still doesn't work, please attach all your
code and logs.

Best,
Xiangrui

On Sun, Jul 6, 2014 at 1:35 AM, Bharath Ravi Kumar reachb...@gmail.com wrote:
 Hi Xiangrui,

 1) Yes, I used the same build (compiled locally from source) to the host
 that has (master, slave1) and the second host with slave2.

 2) The execution was successful when run in local mode with reduced number
 of partitions. Does this imply issues communicating/coordinating across
 processes (i.e. driver, master and workers)?

 Thanks,
 Bharath



 On Sun, Jul 6, 2014 at 11:37 AM, Xiangrui Meng men...@gmail.com wrote:

 Hi Bharath,

 1) Did you sync the spark jar and conf to the worker nodes after build?
 2) Since the dataset is not large, could you try local mode first
 using `spark-summit --driver-memory 12g --master local[*]`?
 3) Try to use less number of partitions, say 5.

 If the problem is still there, please attach the full master/worker log
 files.

 Best,
 Xiangrui

 On Fri, Jul 4, 2014 at 12:16 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Xiangrui,
 
  Leaving the frameSize unspecified led to an error message (and failure)
  stating that the task size (~11M) was larger. I hence set it to an
  arbitrarily large value ( I realize 500 was unrealistic  unnecessary in
  this case). I've now set the size to 20M and repeated the runs. The
  earlier
  runs were on an uncached RDD. Caching the RDD (and setting
  spark.storage.memoryFraction=0.5) resulted in marginal speed up of
  execution, but the end result remained the same. The cached RDD size is
  as
  follows:
 
  RDD NameStorage LevelCached Partitions
  Fraction CachedSize in MemorySize in TachyonSize on Disk
  1084 Memory Deserialized 1x Replicated 80
  100% 165.9 MB 0.0 B 0.0 B
 
 
 
  The corresponding master logs were:
 
  14/07/04 06:29:34 INFO Master: Removing executor
  app-20140704062238-0033/1
  because it is EXITED
  14/07/04 06:29:34 INFO Master: Launching executor
  app-20140704062238-0033/2
  on worker worker-20140630124441-slave1-40182
  14/07/04 06:29:34 INFO Master: Removing executor
  app-20140704062238-0033/0
  because it is EXITED
  14/07/04 06:29:34 INFO Master: Launching executor
  app-20140704062238-0033/3
  on worker worker-20140630102913-slave2-44735
  14/07/04 06:29:37 INFO Master: Removing executor
  app-20140704062238-0033/2
  because it is EXITED
  14/07/04 06:29:37 INFO Master: Launching executor
  app-20140704062238-0033/4
  on worker worker-20140630124441-slave1-40182
  14/07/04 06:29:37 INFO Master: Removing executor
  app-20140704062238-0033/3
  because it is EXITED
  14/07/04 06:29:37 INFO Master: Launching executor
  app-20140704062238-0033/5
  on worker worker-20140630102913-slave2-44735
  14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
  disassociated, removing it.
  14/07/04 06:29:39 INFO Master: Removing app app-20140704062238-0033
  14/07/04 06:29:39 INFO LocalActorRef: Message
  [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
  from
  Actor[akka://sparkMaster/deadLetters] to
 
  Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.3.1.135%3A33061-123#1986674260]
  was not delivered. [39] dead letters encountered. This logging can be
  turned
  off or adjusted with configuration settings 'akka.log-dead-letters' and
  'akka.log-dead-letters-during-shutdown'.
  14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
  disassociated, removing it.
  14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
  disassociated, removing it.
  14/07/04 06:29:39 ERROR EndpointWriter: AssociationError
  [akka.tcp://sparkMaster@master:7077] - [akka.tcp://spark@slave2:45172]:
  Error [Association failed with [akka.tcp://spark@slave2:45172]] [
  akka.remote.EndpointAssociationException: Association failed with
  [akka.tcp://spark@slave2:45172]
  Caused by:
  akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
  Connection refused: slave2/10.3.1.135:45172
  ]
  14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
  disassociated, removing it.
  14/07/04 06:29:39 ERROR EndpointWriter: AssociationError
  [akka.tcp://sparkMaster@master:7077] - [akka.tcp://spark@slave2:45172]:
  Error [Association failed with [akka.tcp://spark@slave2:45172]] [
  akka.remote.EndpointAssociationException: Association failed with
  [akka.tcp://spark@slave2:45172]
  Caused by:
  akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
  Connection refused: slave2/10.3.1.135:45172
  ]
  14/07/04 06:29:39 ERROR EndpointWriter

Re: Dense to sparse vector converter

2014-07-07 Thread Xiangrui Meng
No, but it should be easy to add one. -Xiangrui

On Mon, Jul 7, 2014 at 12:37 AM, Ulanov, Alexander
alexander.ula...@hp.com wrote:
 Hi,



 Is there a method in Spark/MLlib to convert DenseVector to SparseVector?



 Best regards, Alexander


Re: How to incorporate the new data in the MLlib-NaiveBayes model along with predicting?

2014-07-08 Thread Xiangrui Meng
Hi Rahul,

We plan to add online model updates with Spark Streaming, perhaps in
v1.1, starting with linear methods. Please open a JIRA for Naive
Bayes. For Naive Bayes, we need to update the priors and conditional
probabilities, which means we should also remember the number of
observations for the updates.

Best,
Xiangrui

On Tue, Jul 8, 2014 at 7:35 AM, Rahul Bhojwani
rahulbhojwani2...@gmail.com wrote:
 Hi,

 I am using the MLlib Naive Bayes for a text classification problem. I have
 very less amount of training data. And then the data will be coming
 continuously and I need to classify it as either A or B. I am training the
 MLlib Naive Bayes model using the training data but next time when data
 comes, I want to predict its class and then incorporate that also in the
 model for next time prediction of new data(I think that is obvious).

 So I am not able to figure out what is the way to do that using MLlib Naive
 Bayes. Is it that I have to train the model on the whole data every time new
 data comes in??

 Thanks in Advance!
 --
 Rahul K Bhojwani
 3rd Year B.Tech
 Computer Science and Engineering
 National Institute of Technology, Karnataka


Re: Is MLlib NaiveBayes implementation for Spark 0.9.1 correct?

2014-07-08 Thread Xiangrui Meng
Well, I believe this is a correct implementation but please let us
know if you run into problems. The NaiveBayes implementation in MLlib
v1.0 supports sparse data, which is usually the case for text
classificiation. I would recommend upgrading to v1.0. -Xiangrui

On Tue, Jul 8, 2014 at 7:20 AM, Rahul Bhojwani
rahulbhojwani2...@gmail.com wrote:
 Hi,

 I wanted to use Naive Bayes for a text classification problem.I am using
 Spark 0.9.1.
 I was just curious to ask that is the Naive Bayes implementation in Spark
 0.9.1 correct? Or are there any bugs in the Spark 0.9.1 implementation which
 are taken care in Spark 1.0. My question is specific about MLlib Naive Bayes
 implementation only. Also I am using Python.(If that adds any ease for
 answer)


 Thanks
 --
 Rahul K Bhojwani
 3rd Year B.Tech
 Computer Science and Engineering
 National Institute of Technology, Karnataka


Re: got java.lang.AssertionError when run sbt/sbt compile

2014-07-08 Thread Xiangrui Meng
try sbt/sbt clean first

On Tue, Jul 8, 2014 at 8:25 AM, bai阿蒙 smallmonkey...@hotmail.com wrote:
 Hi guys,
 when i try to compile the latest source by sbt/sbt compile, I got an error.
 Can any one help me?

 The following is the detail: it may cause by TestSQLContext.scala
 [error]
 [error]  while compiling:
 /disk3/spark/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
 [error] during phase: jvm
 [error]  library version: version 2.10.4
 [error] compiler version: version 2.10.4
 [error]   reconstructed args: -Xmax-classfile-name 120 -classpath
 

Re: Error and doubts in using Mllib Naive bayes for text clasification

2014-07-08 Thread Xiangrui Meng
1) The feature dimension should be a fixed number before you run
NaiveBayes. If you use bag of words, you need to handle the
word-to-index dictionary by yourself. You can either ignore the words
that never appear in training (because they have no effect in
prediction), or use hashing to randomly project words to a fixed-sized
feature space (collision may happen).

3) Yes, we saved the log conditional probabilities. So to compute the
likelihood, we only need summation.

Best,
Xiangrui

On Tue, Jul 8, 2014 at 12:01 AM, Rahul Bhojwani
rahulbhojwani2...@gmail.com wrote:
 I am really sorry. Its actually my mistake. My problem 2 is wrong because
 using a single feature is a senseless thing. Sorry for the inconvenience.
 But still I will be waiting for the solutions for problem 1 and 3.

 Thanks,


 On Tue, Jul 8, 2014 at 12:14 PM, Rahul Bhojwani
 rahulbhojwani2...@gmail.com wrote:

 Hello,

 I am a novice.I want to classify the text into two classes. For this
 purpose I  want to use Naive Bayes model. I am using Python for it.

 Here are the problems I am facing:

 Problem 1: I wanted to use all words as features for the bag of words
 model. Which means my features will be count of individual words. In this
 case whenever a new word comes in the test data (which was never present in
 the train data) I need to increase the size of the feature vector to
 incorporate that word as well. Correct me if I am wrong. Can I do that in
 the present Mllib NaiveBayes. Or what is the way in which I can incorporate
 this?

 Problem 2: As I was not able to proceed with all words I did some
 pre-processing and figured out few features from the text. But using this
 also is giving errors.
 Right now I was testing for only one feature from the text that is count
 of positive words. I am submitting the code below, along with the error:


 #Code

 import tokenizer
 import gettingWordLists as gl
 from pyspark.mllib.classification import NaiveBayes
 from numpy import array
 from pyspark import SparkContext, SparkConf

 conf = (SparkConf().setMaster(local[6]).setAppName(My
 app).set(spark.executor.memory, 1g))

 sc=SparkContext(conf = conf)

 # Getting the positive dict:
 pos_list = []
 pos_list = gl.getPositiveList()
 tok = tokenizer.Tokenizer(preserve_case=False)


 train_data  = []

 with open(training_file.csv,r) as train_file:
 for line in train_file:
 tokens = line.split(,)
 msg = tokens[0]
 sentiment = tokens[1]
 count = 0
 tokens = set(tok.tokenize(msg))
 for i in tokens:
 if i.encode('utf-8') in pos_list:
 count+=1
 if sentiment.__contains__('NEG'):
 label = 0.0
 else:
 label = 1.0
 feature = []
 feature.append(label)
 feature.append(float(count))
 train_data.append(feature)


 model = NaiveBayes.train(sc.parallelize(array(train_data)))
 print model.pi
 print model.theta
 print \n\n\n\n\n , model.predict(array([5.0]))

 ##
 This is the output:

 [-2.24512292 -0.11195389]
 [[ 0.]
  [ 0.]]





 Traceback (most recent call last):
   File naive_bayes_analyser.py, line 77, in module
 print \n\n\n\n\n , model.predict(array([5.0]))
   File
 F:\spark-0.9.1\spark-0.9.1\python\pyspark\mllib\classification.py, line
  101, in predict
 return numpy.argmax(self.pi + dot(x, self.theta))
 ValueError: matrices are not aligned

 ##

 Problem 3: As you can see the output for model.pi is -ve. That is prior
 probabilities are negative. Can someone explain that also. Is it the log of
 the probability?



 Thanks,
 --
 Rahul K Bhojwani
 3rd Year B.Tech
 Computer Science and Engineering
 National Institute of Technology, Karnataka




 --
 Rahul K Bhojwani
 3rd Year B.Tech
 Computer Science and Engineering
 National Institute of Technology, Karnataka


Re: Help for the large number of the input data files

2014-07-08 Thread Xiangrui Meng
You can either use sc.wholeTextFiles and then a flatMap to reduce the
number of partitions, or give more memory to the driver process by
using --driver-memory 20g and then call RDD.repartition(small number)
after you load the data in. -Xiangrui

On Mon, Jul 7, 2014 at 7:38 PM, innowireless TaeYun Kim
taeyun@innowireless.co.kr wrote:
 Hi,



 A help for the implementation best practice is needed.

 The operating environment is as follows:



 - Log data file arrives irregularly.

 - The size of a log data file is from 3.9KB to 8.5MB. The average is about
 1MB.

 - The number of records of a data file is from 13 lines to 22000 lines. The
 average is about 2700 lines.

 - Data file must be post-processed before aggregation.

 - Post-processing algorithm can be changed.

 - Post-processed file is managed separately with original data file, since
 the post-processing algorithm might be changed.

 - Daily aggregation is performed. All post-processed data file must be
 filtered record-by-record and aggregation(average, max min…) is calculated.

 - Since aggregation is fine-grained, the number of records after the
 aggregation is not so small. It can be about half of the number of the
 original records.

 - At a point, the number of the post-processed file can be about 200,000.

 - A data file should be able to be deleted individually.



 In a test, I tried to process 160,000 post-processed files by Spark starting
 with sc.textFile() with glob path, it failed with OutOfMemory exception on
 the driver process.



 What is the best practice to handle this kind of data?

 Should I use HBase instead of plain files to save post-processed data?



 Thank you.




Re: Execution stalls in LogisticRegressionWithSGD

2014-07-09 Thread Xiangrui Meng
We have maven-enforcer-plugin defined in the pom. I don't know why it
didn't work for you. Could you try rebuild with maven2 and confirm
that there is no error message? If that is the case, please create a
JIRA for it. Thanks! -Xiangrui

On Wed, Jul 9, 2014 at 3:53 AM, Bharath Ravi Kumar reachb...@gmail.com wrote:
 Xiangrui,

 Thanks for all the help in resolving this issue. The  cause turned out to
 bethe build environment rather than runtime configuration. The build process
 had picked up maven2 while building spark. Using binaries that were rebuilt
 using m3, the entire processing went through fine. While I'm aware that the
 build instruction page specifies m3 as the min requirement, declaratively
 preventing accidental m2 usage (e.g. through something like the maven
 enforcer plugin?) might help other developers avoid such issues.

 -Bharath



 On Mon, Jul 7, 2014 at 9:43 PM, Xiangrui Meng men...@gmail.com wrote:

 It seems to me a setup issue. I just tested news20.binary (1355191
 features) on a 2-node EC2 cluster and it worked well. I added one line
 to conf/spark-env.sh:

 export SPARK_JAVA_OPTS= -Dspark.akka.frameSize=20 

 and launched spark-shell with --driver-memory 20g. Could you re-try
 with an EC2 setup? If it still doesn't work, please attach all your
 code and logs.

 Best,
 Xiangrui

 On Sun, Jul 6, 2014 at 1:35 AM, Bharath Ravi Kumar reachb...@gmail.com
 wrote:
  Hi Xiangrui,
 
  1) Yes, I used the same build (compiled locally from source) to the host
  that has (master, slave1) and the second host with slave2.
 
  2) The execution was successful when run in local mode with reduced
  number
  of partitions. Does this imply issues communicating/coordinating across
  processes (i.e. driver, master and workers)?
 
  Thanks,
  Bharath
 
 
 
  On Sun, Jul 6, 2014 at 11:37 AM, Xiangrui Meng men...@gmail.com wrote:
 
  Hi Bharath,
 
  1) Did you sync the spark jar and conf to the worker nodes after build?
  2) Since the dataset is not large, could you try local mode first
  using `spark-summit --driver-memory 12g --master local[*]`?
  3) Try to use less number of partitions, say 5.
 
  If the problem is still there, please attach the full master/worker log
  files.
 
  Best,
  Xiangrui
 
  On Fri, Jul 4, 2014 at 12:16 AM, Bharath Ravi Kumar
  reachb...@gmail.com
  wrote:
   Xiangrui,
  
   Leaving the frameSize unspecified led to an error message (and
   failure)
   stating that the task size (~11M) was larger. I hence set it to an
   arbitrarily large value ( I realize 500 was unrealistic  unnecessary
   in
   this case). I've now set the size to 20M and repeated the runs. The
   earlier
   runs were on an uncached RDD. Caching the RDD (and setting
   spark.storage.memoryFraction=0.5) resulted in marginal speed up of
   execution, but the end result remained the same. The cached RDD size
   is
   as
   follows:
  
   RDD NameStorage LevelCached Partitions
   Fraction CachedSize in MemorySize in TachyonSize on
   Disk
   1084 Memory Deserialized 1x Replicated 80
   100% 165.9 MB 0.0 B 0.0 B
  
  
  
   The corresponding master logs were:
  
   14/07/04 06:29:34 INFO Master: Removing executor
   app-20140704062238-0033/1
   because it is EXITED
   14/07/04 06:29:34 INFO Master: Launching executor
   app-20140704062238-0033/2
   on worker worker-20140630124441-slave1-40182
   14/07/04 06:29:34 INFO Master: Removing executor
   app-20140704062238-0033/0
   because it is EXITED
   14/07/04 06:29:34 INFO Master: Launching executor
   app-20140704062238-0033/3
   on worker worker-20140630102913-slave2-44735
   14/07/04 06:29:37 INFO Master: Removing executor
   app-20140704062238-0033/2
   because it is EXITED
   14/07/04 06:29:37 INFO Master: Launching executor
   app-20140704062238-0033/4
   on worker worker-20140630124441-slave1-40182
   14/07/04 06:29:37 INFO Master: Removing executor
   app-20140704062238-0033/3
   because it is EXITED
   14/07/04 06:29:37 INFO Master: Launching executor
   app-20140704062238-0033/5
   on worker worker-20140630102913-slave2-44735
   14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
   disassociated, removing it.
   14/07/04 06:29:39 INFO Master: Removing app app-20140704062238-0033
   14/07/04 06:29:39 INFO LocalActorRef: Message
   [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
   from
   Actor[akka://sparkMaster/deadLetters] to
  
  
   Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.3.1.135%3A33061-123#1986674260]
   was not delivered. [39] dead letters encountered. This logging can be
   turned
   off or adjusted with configuration settings 'akka.log-dead-letters'
   and
   'akka.log-dead-letters-during-shutdown'.
   14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got
   disassociated, removing it.
   14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2

Re: KMeans code is rubbish

2014-07-10 Thread Xiangrui Meng
SparkKMeans is a naive implementation. Please use
mllib.clustering.KMeans in practice. I created a JIRA for this:
https://issues.apache.org/jira/browse/SPARK-2434 -Xiangrui

On Thu, Jul 10, 2014 at 2:45 AM, Tathagata Das
tathagata.das1...@gmail.com wrote:
 I ran the SparkKMeans example (not the mllib KMeans that Sean ran) with your
 dataset as well, I got the expected answer. And I believe that even though
 initialization is done using sampling, the example actually sets the seed to
 a constant 42, so the result should always be the same no matter how many
 times you run it. So I am not really sure whats going on here.

 Can you tell us more about which version of Spark you are running? Which
 Java version?


 ==

 [tdas @ Xion spark2] cat input
 2 1
 1 2
 3 2
 2 3
 4 1
 5 1
 6 1
 4 2
 6 2
 4 3
 5 3
 6 3
 [tdas @ Xion spark2] ./bin/run-example SparkKMeans input 2 0.001
 2014-07-10 02:45:06.764 java[45244:d17] Unable to load realm info from
 SCDynamicStore
 14/07/10 02:45:07 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 14/07/10 02:45:07 WARN LoadSnappy: Snappy native library not loaded
 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeSystemBLAS
 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeRefBLAS
 Finished iteration (delta = 3.0)
 Finished iteration (delta = 0.0)
 Final centers:
 DenseVector(5.0, 2.0)
 DenseVector(2.0, 2.0)



 On Thu, Jul 10, 2014 at 2:17 AM, Wanda Hawk wanda_haw...@yahoo.com wrote:

 so this is what I am running:
 ./bin/run-example SparkKMeans ~/Documents/2dim2.txt 2 0.001

 And this is the input file:
 ┌───[spark2013@SparkOne]──[~/spark-1.0.0].$
 └───#!cat ~/Documents/2dim2.txt
 2 1
 1 2
 3 2
 2 3
 4 1
 5 1
 6 1
 4 2
 6 2
 4 3
 5 3
 6 3
 

 This is the final output from spark:
 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 Getting 2 non-empty blocks out of 2 blocks
 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 Started 0 remote fetches in 0 ms
 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 maxBytesInFlight: 50331648, targetRequestSize: 10066329
 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 Getting 2 non-empty blocks out of 2 blocks
 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
 Started 0 remote fetches in 0 ms
 14/07/10 20:05:12 INFO Executor: Serialized size of result for 14 is 1433
 14/07/10 20:05:12 INFO Executor: Sending result for 14 directly to driver
 14/07/10 20:05:12 INFO Executor: Finished task ID 14
 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 0)
 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 14 in 5 ms on
 localhost (progress: 1/2)
 14/07/10 20:05:12 INFO Executor: Serialized size of result for 15 is 1433
 14/07/10 20:05:12 INFO Executor: Sending result for 15 directly to driver
 14/07/10 20:05:12 INFO Executor: Finished task ID 15
 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 1)
 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 15 in 7 ms on
 localhost (progress: 2/2)
 14/07/10 20:05:12 INFO DAGScheduler: Stage 6 (collectAsMap at
 SparkKMeans.scala:75) finished in 0.008 s
 14/07/10 20:05:12 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks
 have all completed, from pool
 14/07/10 20:05:12 INFO SparkContext: Job finished: collectAsMap at
 SparkKMeans.scala:75, took 0.02472681 s
 Finished iteration (delta = 0.0)
 Final centers:
 DenseVector(2.8571428571428568, 2.0)
 DenseVector(5.6005, 2.0)
 




 On Thursday, July 10, 2014 12:02 PM, Bertrand Dechoux decho...@gmail.com
 wrote:


 A picture is worth a thousand... Well, a picture with this dataset, what
 you are expecting and what you get, would help answering your initial
 question.

 Bertrand


 On Thu, Jul 10, 2014 at 10:44 AM, Wanda Hawk wanda_haw...@yahoo.com
 wrote:

 Can someone please run the standard kMeans code on this input with 2
 centers ?:
 2 1
 1 2
 3 2
 2 3
 4 1
 5 1
 6 1
 4 2
 6 2
 4 3
 5 3
 6 3

 The obvious result should be (2,2) and (5,2) ... (you can draw them if you
 don't believe me ...)

 Thanks,
 Wanda







Re: Terminal freeze during SVM

2014-07-10 Thread Xiangrui Meng
news20.binary's feature dimension is 1.35M. So the serialized task
size is above the default limit 10M. You need to set
spark.akka.frameSize to, e.g, 20. Due to a bug SPARK-1112, this
parameter is not passed to executors automatically, which causes Spark
freezes. This was fixed in the latest master and v1.0.1-rc2. If you
rebuild spark, remember to sync the assembly jar to workers. -Xiangrui

On Thu, Jul 10, 2014 at 7:56 AM, AlexanderRiggers
alexander.rigg...@gmail.com wrote:
 Tried the newest branch, but still get stuck on the same task: (kill) runJob
 at SlidingRDD.scala:74





 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Terminal-freeze-during-SVM-Broken-pipe-tp9022p9304.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to RDD.take(middle 10 elements)

2014-07-10 Thread Xiangrui Meng
This is expensive but doable:

rdd.zipWithIndex().filter { case (_, idx) = idx = 10  idx  20 }.collect()

-Xiangrui

On Thu, Jul 10, 2014 at 12:53 PM, Nick Chammas
nicholas.cham...@gmail.com wrote:
 Interesting question on Stack Overflow:
 http://stackoverflow.com/q/24677180/877069

 Basically, is there a way to take() elements of an RDD at an arbitrary
 index?

 Nick


 
 View this message in context: How to RDD.take(middle 10 elements)
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: ML classifier and data format for dataset with variable number of features

2014-07-11 Thread Xiangrui Meng
You can load the dataset as an RDD of JSON object and use a flatMap to
extract feature vectors at object level. Then you can filter the
training examples you want for binary classification. If you want to
try multiclass, checkout DB's PR at
https://github.com/apache/spark/pull/1379

Best,
Xiangrui

On Fri, Jul 11, 2014 at 5:12 PM, SK skrishna...@gmail.com wrote:
 Hi,

 I need to perform binary classification on an image dataset. Each image is a
 data point described by a Json object. The feature set for each image is a
 set of feature vectors, each feature vector corresponding to a distinct
 object in the image. For example, if an image has 5 objects, its feature set
 will have 5 feature vectors, whereas an image that has 3 objects will have a
 feature set consisting of 3 feature vectors. So  the number of feature
 vectors  may be different for different images, although  each feature
 vector has the same number of attributes. The classification depends on the
 features of the individual objects, so I cannot aggregate them all into a
 flat vector.

 I have looked through the Mllib examples and it appears that the libSVM data
 format and the LabeledData format that Mllib uses, require  all the points
 to have the same number of features and they read in a flat feature vector.
 I would like to know if any of the Mllib supervised learning classifiers can
 be used with json data format and whether they can be used to classify
 points with different number of features as described above.

 thanks




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/ML-classifier-and-data-format-for-dataset-with-variable-number-of-features-tp9486.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: mapPartitionsWithIndex

2014-07-14 Thread Xiangrui Meng
You should return an iterator in mapPartitionsWIthIndex. This is from
the programming guide
(http://spark.apache.org/docs/latest/programming-guide.html):

mapPartitionsWithIndex(func): Similar to mapPartitions, but also
provides func with an integer value representing the index of the
partition, so func must be of type (Int, IteratorT) = IteratorU
when running on an RDD of type T.

For your case, try something similar to the following:

val keyval=dRDD.mapPartitionsWithIndex { (ind,iter) =
  iter.map(x = process(ind,x.trim().split(' ').map(_.toDouble),q,m,r))
}

-Xiangrui

On Sun, Jul 13, 2014 at 11:26 PM, Madhura das.madhur...@gmail.com wrote:
 I have a text file consisting of a large number of random floating values
 separated by spaces. I am loading this file into a RDD in scala.

 I have heard of mapPartitionsWithIndex but I haven't been able to implement
 it. For each partition I want to call a method(process in this case) to
 which I want to pass the partition and it's respective index as parameters.

 My method returns a pair of values.
 This is what I have done.

 val dRDD = sc.textFile(hdfs://master:54310/Data/input*)
 var ind:Int=0
 val keyval= dRDD.mapPartitionsWithIndex((ind,x) = process(ind,x,...))
 val res=keyval.collect()

 We are not able to access res(0)._1 and res(0)._2

 The error log is as follows.

 [error] SimpleApp.scala:420: value trim is not a member of Iterator[String]
 [error] Error occurred in an application involving default arguments.
 [error] val keyval=dRDD.mapPartitionsWithIndex( (ind,x) =
 process(ind,x.trim().split(' ').map(_.toDouble),q,m,r))
 [error]
 ^
 [error] SimpleApp.scala:425: value mkString is not a member of
 Array[Nothing]
 [error]   println(res.mkString())
 [error]   ^
 [error] /SimpleApp.scala:427: value _1 is not a member of Nothing
 [error]   var final= res(0)._1
 [error] ^
 [error] /home/madhura/DTWspark/src/main/scala/SimpleApp.scala:428: value _2
 is not a member of Nothing
 [error]   var final1 = res(0)._2 - m +1
 [error]  ^




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitionsWithIndex-tp9590.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Error when testing with large sparse svm

2014-07-14 Thread Xiangrui Meng
You need to set a larger `spark.akka.frameSize`, e.g., 128, for the
serialized weight vector. There is a JIRA about switching
automatically between sending through akka or broadcast:
https://issues.apache.org/jira/browse/SPARK-2361 . -Xiangrui

On Mon, Jul 14, 2014 at 12:15 AM, crater cq...@ucmerced.edu wrote:
 Hi,

 I encounter an error when testing svm (example one) on very large sparse
 data. The dataset I ran on was a toy dataset with only ten examples but 13
 million sparse vector with a few thousands non-zero entries.

 The errors is showing below. I am wondering is this a bug or I am missing
 something?

 14/07/13 23:59:44 INFO SecurityManager: Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 14/07/13 23:59:44 INFO SecurityManager: Changing view acls to: chengjie
 14/07/13 23:59:44 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(chengjie)
 14/07/13 23:59:45 INFO Slf4jLogger: Slf4jLogger started
 14/07/13 23:59:45 INFO Remoting: Starting remoting
 14/07/13 23:59:45 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://spark@master:53173]
 14/07/13 23:59:45 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://spark@master:53173]
 14/07/13 23:59:45 INFO SparkEnv: Registering MapOutputTracker
 14/07/13 23:59:45 INFO SparkEnv: Registering BlockManagerMaster
 14/07/13 23:59:45 INFO DiskBlockManager: Created local directory at
 /tmp/spark-local-20140713235945-c78f
 14/07/13 23:59:45 INFO MemoryStore: MemoryStore started with capacity 14.4
 GB.
 14/07/13 23:59:45 INFO ConnectionManager: Bound socket to port 37674 with id
 = ConnectionManagerId(master,37674)
 14/07/13 23:59:45 INFO BlockManagerMaster: Trying to register BlockManager
 14/07/13 23:59:45 INFO BlockManagerInfo: Registering block manager
 master:37674 with 14.4 GB RAM
 14/07/13 23:59:45 INFO BlockManagerMaster: Registered BlockManager
 14/07/13 23:59:45 INFO HttpServer: Starting HTTP Server
 14/07/13 23:59:45 INFO HttpBroadcast: Broadcast server started at
 http://10.10.255.128:41838
 14/07/13 23:59:45 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-ac459d4b-a3c4-4577-bad4-576ac427d0bf
 14/07/13 23:59:45 INFO HttpServer: Starting HTTP Server
 14/07/13 23:59:51 INFO SparkUI: Started SparkUI at http://master:4040
 14/07/13 23:59:51 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 14/07/13 23:59:52 INFO EventLoggingListener: Logging events to
 /tmp/spark-events/binaryclassification-with-params(hdfs---master-9001-splice.small,1,1.0,svm,l1,0.1)-1405317591776
 14/07/13 23:59:52 INFO SparkContext: Added JAR
 file:/home/chengjie/spark-1.0.1/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.3.0.jar
 at http://10.10.255.128:54689/jars/spark-examples-1.0.1-hadoop2.3.0.jar with
 timestamp 1405317592653
 14/07/13 23:59:52 INFO AppClient$ClientActor: Connecting to master
 spark://master:7077...
 14/07/14 00:00:08 WARN TaskSchedulerImpl: Initial job has not accepted any
 resources; check your cluster UI to ensure that workers are registered and
 have sufficient memory
 14/07/14 00:00:23 WARN TaskSchedulerImpl: Initial job has not accepted any
 resources; check your cluster UI to ensure that workers are registered and
 have sufficient memory
 14/07/14 00:00:38 WARN TaskSchedulerImpl: Initial job has not accepted any
 resources; check your cluster UI to ensure that workers are registered and
 have sufficient memory
 14/07/14 00:00:53 WARN TaskSchedulerImpl: Initial job has not accepted any
 resources; check your cluster UI to ensure that workers are registered and
 have sufficient memory
 Training: 10
 14/07/14 00:01:09 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeSystemBLAS
 14/07/14 00:01:09 WARN BLAS: Failed to load implementation from:
 com.github.fommil.netlib.NativeRefBLAS
 *Exception in thread main org.apache.spark.SparkException: Job aborted due
 to stage failure: Serialized task 20:0 was 94453098 bytes which exceeds
 spark.akka.frameSize (10485760 bytes). Consider using broadcast variables
 for large values.*
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
 at
 

Re: Error when testing with large sparse svm

2014-07-14 Thread Xiangrui Meng
Is it on a standalone server? There are several settings worthing checking:

1) number of partitions, which should match the number of cores
2) driver memory (you can see it from the executor tab of the Spark
WebUI and set it with --driver-memory 10g
3) the version of Spark you were running

Best,
Xiangrui

On Mon, Jul 14, 2014 at 12:14 PM, Srikrishna S srikrishna...@gmail.com wrote:
 That is exactly the same error that I got. I am still having no success.

 Regards,
 Krishna

 On Mon, Jul 14, 2014 at 11:50 AM, crater cq...@ucmerced.edu wrote:
 Hi Krishna,

 Thanks for your help. Are you able to get your 29M data running yet? I fix
 the previous problem by setting larger spark.akka.frameSize, but now I get
 some other errors below. Did you get these errors before?


 14/07/14 11:32:20 ERROR TaskSchedulerImpl: Lost executor 1 on node7: remote
 Akka client disassociated
 14/07/14 11:32:20 WARN TaskSetManager: Lost TID 20 (task 13.0:0)
 14/07/14 11:32:21 ERROR TaskSchedulerImpl: Lost executor 3 on node8: remote
 Akka client disassociated
 14/07/14 11:32:21 WARN TaskSetManager: Lost TID 21 (task 13.0:1)
 14/07/14 11:32:23 ERROR TaskSchedulerImpl: Lost executor 6 on node3: remote
 Akka client disassociated
 14/07/14 11:32:23 WARN TaskSetManager: Lost TID 22 (task 13.0:0)
 14/07/14 11:32:25 ERROR TaskSchedulerImpl: Lost executor 0 on node4: remote
 Akka client disassociated
 14/07/14 11:32:25 WARN TaskSetManager: Lost TID 23 (task 13.0:1)
 14/07/14 11:32:26 ERROR TaskSchedulerImpl: Lost executor 5 on node1: remote
 Akka client disassociated
 14/07/14 11:32:26 WARN TaskSetManager: Lost TID 24 (task 13.0:0)
 14/07/14 11:32:28 ERROR TaskSchedulerImpl: Lost executor 7 on node6: remote
 Akka client disassociated
 14/07/14 11:32:28 WARN TaskSetManager: Lost TID 26 (task 13.0:0)
 14/07/14 11:32:28 ERROR TaskSetManager: Task 13.0:0 failed 4 times; aborting
 job
 Exception in thread main org.apache.spark.SparkException: Job aborted due
 to stage failure: Task 13.0:0 failed 4 times, most recent failure: TID 26 on
 host node6 failed for unknown reason
 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at 
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at 
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9623.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: ALS on EC2

2014-07-15 Thread Xiangrui Meng
Could you share the code of RecommendationALS and the complete
spark-submit command line options? Thanks! -Xiangrui

On Mon, Jul 14, 2014 at 11:23 PM, Srikrishna S srikrishna...@gmail.com wrote:
 Using properties file: null
 Main class:
 RecommendationALS
 Arguments:
 _train.csv
 _validation.csv
 _test.csv
 System properties:
 SPARK_SUBMIT - true
 spark.app.name - RecommendationALS
 spark.jars - 
 file:/root/projects/spark-recommendation-benchmark/benchmark_mf/target/scala-2.10/recommendation-benchmark_2.10-1.0.jar
 spark.master - local[8]
 Classpath elements:
 file:/root/projects/spark-recommendation-benchmark/benchmark_mf/target/scala-2.10/recommendation-benchmark_2.10-1.0.jar


 14/07/15 05:57:41 INFO Slf4jLogger: Slf4jLogger started
 14/07/15 05:57:41 INFO Remoting: Starting remoting
 14/07/15 05:57:41 INFO Remoting: Remoting started; listening on
 addresses :[akka.tcp://sp...@ip-172-31-19-62.us-west-2.compute.internal:57349]
 14/07/15 05:57:41 INFO Remoting: Remoting now listens on addresses:
 [akka.tcp://sp...@ip-172-31-19-62.us-west-2.compute.internal:57349]
 14/07/15 05:57:42 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where
 applicable
 --args is deprecated. Use --arg instead.
 14/07/15 05:57:43 INFO RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
 14/07/15 05:57:44 INFO Client: Retrying connect to server:
 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1
 SECONDS)
 14/07/15 05:57:45 INFO Client: Retrying connect to server:
 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is
 RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1
 SECONDS)
 14

  and it continues trying and sleeping.

 Any help?


  1   2   3   4   5   >