Re: possible bug in Spark's ALS implementation...
Hi Michael, I can help check the current implementation. Would you please go to https://spark-project.atlassian.net/browse/SPARK and create a ticket about this issue with component MLlib? Thanks! Best, Xiangrui On Tue, Mar 11, 2014 at 3:18 PM, Michael Allman m...@allman.ms wrote: Hi, I'm implementing a recommender based on the algorithm described in http://www2.research.att.com/~yifanhu/PUB/cf.pdf. This algorithm forms the basis for Spark's ALS implementation for data sets with implicit features. The data set I'm working with is proprietary and I cannot share it, however I can say that it's based on the same kind of data in the paper---relative viewing time of videos. (Specifically, the rating for each video is defined as total viewing time across all visitors divided by video duration). I'm seeing counterintuitive, sometimes nonsensical recommendations. For comparison, I've run the training data through Oryx's in-VM implementation of implicit ALS with the same parameters. Oryx uses the same algorithm. (Source in this file: https://github.com/cloudera/oryx/blob/master/als-common/src/main/java/com/cloudera/oryx/als/common/factorizer/als/AlternatingLeastSquares.java) The recommendations made by each system compared to one other are very different---moreso than I think could be explained by differences in initial state. The recommendations made by the Oryx models look much better, especially as I increase the number of latent factors and the iterations. The Spark models' recommendations don't improve with increases in either latent factors or iterations. Sometimes, they get worse. Because of the (understandably) highly-optimized and terse style of Spark's ALS implementation, I've had a very hard time following it well enough to debug the issue definitively. However, I have found a section of code that looks incorrect. As described in the paper, part of the implicit ALS algorithm involves computing a matrix product YtCuY (equation 4 in the paper). To optimize this computation, this expression is rewritten as YtY + Yt(Cu - I)Y. I believe that's what should be happening here: https://github.com/apache/incubator-spark/blob/v0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L376 However, it looks like this code is in fact computing YtY + YtY(Cu - I), which is the same as YtYCu. If so, that's a bug. Can someone familiar with this code evaluate my claim? Cheers, Michael
Re: possible bug in Spark's ALS implementation...
The factor matrix Y is used twice in implicit ALS computation, one to compute global Y^T Y, and another to compute local Y_i^T C_i Y_i. -Xiangrui On Sun, Mar 16, 2014 at 1:18 PM, Matei Zaharia matei.zaha...@gmail.com wrote: On Mar 14, 2014, at 5:52 PM, Michael Allman m...@allman.ms wrote: I also found that the product and user RDDs were being rebuilt many times over in my tests, even for tiny data sets. By persisting the RDD returned from updateFeatures() I was able to avoid a raft of duplicate computations. Is there a reason not to do this? This sounds like a good thing to add, though I'd like to understand why these are being recomputed (it seemed that the code would only use each one once). Do you have any sense why that is? Matei
Re: possible bug in Spark's ALS implementation...
Hi Michael, I made couple changes to implicit ALS. One gives faster construction of YtY (https://github.com/apache/spark/pull/161), which was merged into master. The other caches intermediate matrix factors properly (https://github.com/apache/spark/pull/165). They should give you the same result as before, but faster (~2x in my local tests). If you have time to try the improved version, please let me know the speed-up on your data. Thanks! Best, Xiangrui On Mon, Mar 17, 2014 at 5:07 PM, Michael Allman m...@allman.ms wrote: I've created https://spark-project.atlassian.net/browse/SPARK-1263 to address the issue of the factor matrix recomputation. I'm planning to submit a related pull request shortly. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p2785.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: possible bug in Spark's ALS implementation...
Sorry, the link was wrong. Should be https://github.com/apache/spark/pull/131 -Xiangrui On Tue, Mar 18, 2014 at 10:20 AM, Michael Allman m...@allman.ms wrote: Hi Xiangrui, I don't see how https://github.com/apache/spark/pull/161 relates to ALS. Can you explain? Also, thanks for addressing the issue with factor matrix persistence in PR 165. I was probably not going to get to that for a while. I will try to test your changes today for speed improvements. Cheers, Michael -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p2817.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Feed KMeans algorithm with a row major matrix
Hi Jaonary, With the current implementation, you need to call Array.slice to make each row an Array[Double] and cache the result RDD. There is a plan to support block-wise input data and I will keep you informed. Best, Xiangrui On Tue, Mar 18, 2014 at 2:46 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear All, I'm trying to cluster data from native library code with Spark Kmeans||. In my native library the data are represented as a matrix (row = number of data and col = dimension). For efficiency reason, they are copied into a one dimensional scala Array row major wise so after the computation I have a RDD[Array[Double]] but the dimension of each array represents a set of data instead of the data itself. I need to transfrom these array into Array[Array[Double]] before running the KMeans|| algorithm. How to do this efficiently ? Best regards,
Re: possible bug in Spark's ALS implementation...
Glad to hear the speed-up. Wish we can improve the implementation further in the future. -Xiangrui On Tue, Mar 18, 2014 at 1:55 PM, Michael Allman m...@allman.ms wrote: I just ran a runtime performance comparison between 0.9.0-incubating and your als branch. I saw a 1.5x improvement in performance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/possible-bug-in-Spark-s-ALS-implementation-tp2567p2823.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Kmeans example reduceByKey slow
Hi Tsai, Could you share more information about the machine you used and the training parameters (runs, k, and iterations)? It can help solve your issues. Thanks! Best, Xiangrui On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, At the reduceBuyKey stage, it takes a few minutes before the tasks start working. I have -Dspark.default.parallelism=127 cores (n-1). CPU/Network/IO is idling across all nodes when this is happening. And there is nothing particular on the master log file. From the spark-shell: 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 on executor 2: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 38765155 bytes in 193 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 on executor 1: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 38765155 bytes in 96 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 on executor 0: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 38765155 bytes in 100 ms But it stops there for some significant time before any movement. In the stage detail of the UI, I can see that there are 127 tasks running but the duration each is at least a few minutes. I'm working off local storage (not hdfs) and the kmeans data is about 6.5GB (50M rows). Is this a normal behaviour? Thanks!
Re: Kmeans example reduceByKey slow
K = 50 is certainly a large number for k-means. If there is no particular reason to have 50 clusters, could you try to reduce it to, e.g, 100 or 1000? Also, the example code is not for large-scale problems. You should use the KMeans algorithm in mllib clustering for your problem. -Xiangrui On Sun, Mar 23, 2014 at 11:53 PM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, This is on a 4 nodes cluster each with 32 cores/256GB Ram. (0.9.0) is deployed in a stand alone mode. Each worker is configured with 192GB. Spark executor memory is also 192GB. This is on the first iteration. K=50. Here's the code I use: http://pastebin.com/2yXL3y8i , which is a copy-and-paste of the example. Thanks! On 24 Mar, 2014, at 2:46 pm, Xiangrui Meng men...@gmail.com wrote: Hi Tsai, Could you share more information about the machine you used and the training parameters (runs, k, and iterations)? It can help solve your issues. Thanks! Best, Xiangrui On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, At the reduceBuyKey stage, it takes a few minutes before the tasks start working. I have -Dspark.default.parallelism=127 cores (n-1). CPU/Network/IO is idling across all nodes when this is happening. And there is nothing particular on the master log file. From the spark-shell: 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 on executor 2: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 38765155 bytes in 193 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 on executor 1: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 38765155 bytes in 96 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 on executor 0: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 38765155 bytes in 100 ms But it stops there for some significant time before any movement. In the stage detail of the UI, I can see that there are 127 tasks running but the duration each is at least a few minutes. I'm working off local storage (not hdfs) and the kmeans data is about 6.5GB (50M rows). Is this a normal behaviour? Thanks!
Re: Kmeans example reduceByKey slow
Number of rows doesn't matter much as long as you have enough workers to distribute the work. K-means has complexity O(n * d * k), where n is number of points, d is the dimension, and k is the number of clusters. If you use the KMeans implementation from MLlib, the initialization stage is done on master, so a large k would slow down the initialization stage. If your data is sparse, the latest change to KMeans will help with the speed, depending on how sparse your data is. -Xiangrui On Mon, Mar 24, 2014 at 12:44 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Thanks, Let me try with a smaller K. Does the size of the input data matters for the example? Currently I have 50M rows. What is a reasonable size to demonstrate the capability of Spark? On 24 Mar, 2014, at 3:38 pm, Xiangrui Meng men...@gmail.com wrote: K = 50 is certainly a large number for k-means. If there is no particular reason to have 50 clusters, could you try to reduce it to, e.g, 100 or 1000? Also, the example code is not for large-scale problems. You should use the KMeans algorithm in mllib clustering for your problem. -Xiangrui On Sun, Mar 23, 2014 at 11:53 PM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, This is on a 4 nodes cluster each with 32 cores/256GB Ram. (0.9.0) is deployed in a stand alone mode. Each worker is configured with 192GB. Spark executor memory is also 192GB. This is on the first iteration. K=50. Here's the code I use: http://pastebin.com/2yXL3y8i , which is a copy-and-paste of the example. Thanks! On 24 Mar, 2014, at 2:46 pm, Xiangrui Meng men...@gmail.com wrote: Hi Tsai, Could you share more information about the machine you used and the training parameters (runs, k, and iterations)? It can help solve your issues. Thanks! Best, Xiangrui On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, At the reduceBuyKey stage, it takes a few minutes before the tasks start working. I have -Dspark.default.parallelism=127 cores (n-1). CPU/Network/IO is idling across all nodes when this is happening. And there is nothing particular on the master log file. From the spark-shell: 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 on executor 2: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 38765155 bytes in 193 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 on executor 1: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 38765155 bytes in 96 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 on executor 0: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 38765155 bytes in 100 ms But it stops there for some significant time before any movement. In the stage detail of the UI, I can see that there are 127 tasks running but the duration each is at least a few minutes. I'm working off local storage (not hdfs) and the kmeans data is about 6.5GB (50M rows). Is this a normal behaviour? Thanks!
Re: Kmeans example reduceByKey slow
Sorry, I meant the master branch of https://github.com/apache/spark. -Xiangrui On Mon, Mar 24, 2014 at 6:27 PM, Tsai Li Ming mailingl...@ltsai.com wrote: Thanks again. If you use the KMeans implementation from MLlib, the initialization stage is done on master, The master here is the app/driver/spark-shell? Thanks! On 25 Mar, 2014, at 1:03 am, Xiangrui Meng men...@gmail.com wrote: Number of rows doesn't matter much as long as you have enough workers to distribute the work. K-means has complexity O(n * d * k), where n is number of points, d is the dimension, and k is the number of clusters. If you use the KMeans implementation from MLlib, the initialization stage is done on master, so a large k would slow down the initialization stage. If your data is sparse, the latest change to KMeans will help with the speed, depending on how sparse your data is. -Xiangrui On Mon, Mar 24, 2014 at 12:44 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Thanks, Let me try with a smaller K. Does the size of the input data matters for the example? Currently I have 50M rows. What is a reasonable size to demonstrate the capability of Spark? On 24 Mar, 2014, at 3:38 pm, Xiangrui Meng men...@gmail.com wrote: K = 50 is certainly a large number for k-means. If there is no particular reason to have 50 clusters, could you try to reduce it to, e.g, 100 or 1000? Also, the example code is not for large-scale problems. You should use the KMeans algorithm in mllib clustering for your problem. -Xiangrui On Sun, Mar 23, 2014 at 11:53 PM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, This is on a 4 nodes cluster each with 32 cores/256GB Ram. (0.9.0) is deployed in a stand alone mode. Each worker is configured with 192GB. Spark executor memory is also 192GB. This is on the first iteration. K=50. Here's the code I use: http://pastebin.com/2yXL3y8i , which is a copy-and-paste of the example. Thanks! On 24 Mar, 2014, at 2:46 pm, Xiangrui Meng men...@gmail.com wrote: Hi Tsai, Could you share more information about the machine you used and the training parameters (runs, k, and iterations)? It can help solve your issues. Thanks! Best, Xiangrui On Sun, Mar 23, 2014 at 3:15 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, At the reduceBuyKey stage, it takes a few minutes before the tasks start working. I have -Dspark.default.parallelism=127 cores (n-1). CPU/Network/IO is idling across all nodes when this is happening. And there is nothing particular on the master log file. From the spark-shell: 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:124 as TID 538 on executor 2: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:124 as 38765155 bytes in 193 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:125 as TID 539 on executor 1: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:125 as 38765155 bytes in 96 ms 14/03/23 18:13:50 INFO TaskSetManager: Starting task 3.0:126 as TID 540 on executor 0: XXX (PROCESS_LOCAL) 14/03/23 18:13:50 INFO TaskSetManager: Serialized task 3.0:126 as 38765155 bytes in 100 ms But it stops there for some significant time before any movement. In the stage detail of the UI, I can see that there are 127 tasks running but the duration each is at least a few minutes. I'm working off local storage (not hdfs) and the kmeans data is about 6.5GB (50M rows). Is this a normal behaviour? Thanks!
Re: Issue with zip and partitions
From API docs: Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the *same number of partitions* and the *same number of elements in each partition* (e.g. one was made through a map on the other). Basically, one RDD should be a mapped RDD of the other, or both RDDs are mapped RDDs of the same RDD. Btw, your message says Dell - Internal Use - Confidential... Best, Xiangrui On Tue, Apr 1, 2014 at 7:27 PM, patrick_nico...@dell.com wrote: Dell - Internal Use - Confidential I got an exception can't zip RDDs with unusual numbers of Partitions when I apply any action (reduce, collect) of dataset created by zipping two dataset of 10 million entries each. The problem occurs independently of the number of partitions or when I let Spark creates those partitions. Interestingly enough, I do not have problem zipping datasets of 1 and 2.5 million entries. A similar problem was reported on this board with 0.8 but remember if the problem was fixed. Any idea? Any workaround? I appreciate.
Re: ui broken in latest 1.0.0
That commit did work for me. Could you confirm the following: 1) After you called cache(), did you make any actions like count() or reduce()? If you don't materialize the RDD, it won't show up in the storage tab. 2) Did you run ./make-distribution.sh after you switched to the current master? Xiangrui On Tue, Apr 8, 2014 at 9:33 AM, Koert Kuipers ko...@tresata.com wrote: i tried again with latest master, which includes commit below, but ui page still shows nothing on storage tab. koert commit ada310a9d3d5419e101b24d9b41398f609da1ad3 Author: Andrew Or andrewo...@gmail.com Date: Mon Mar 31 23:01:14 2014 -0700 [Hot Fix #42] Persisted RDD disappears on storage page if re-used If a previously persisted RDD is re-used, its information disappears from the Storage page. This is because the tasks associated with re-using the RDD do not report the RDD's blocks as updated (which is correct). On stage submit, however, we overwrite any existing Author: Andrew Or andrewo...@gmail.com Closes #281 from andrewor14/ui-storage-fix and squashes the following commits: 408585a [Andrew Or] Fix storage UI bug On Mon, Apr 7, 2014 at 4:21 PM, Koert Kuipers ko...@tresata.com wrote: got it thanks On Mon, Apr 7, 2014 at 4:08 PM, Xiangrui Meng men...@gmail.com wrote: This is fixed in https://github.com/apache/spark/pull/281. Please try again with the latest master. -Xiangrui On Mon, Apr 7, 2014 at 1:06 PM, Koert Kuipers ko...@tresata.com wrote: i noticed that for spark 1.0.0-SNAPSHOT which i checked out a few days ago (apr 5) that the application detail ui no longer shows any RDDs on the storage tab, despite the fact that they are definitely cached. i am running spark in standalone mode.
Re: ui broken in latest 1.0.0
That commit fixed the exact problem you described. That is why I want to confirm that you switched to the master branch. bin/spark-shell doesn't detect code changes, so you need to run ./make-distribution.sh to re-compile Spark first. -Xiangrui On Tue, Apr 8, 2014 at 9:57 AM, Koert Kuipers ko...@tresata.com wrote: sorry, i meant to say: note that for a cached rdd in the spark shell it all works fine. but something is going wrong with the SPARK-APPLICATION-UI in our applications that extensively cache and re-use RDDs On Tue, Apr 8, 2014 at 12:55 PM, Koert Kuipers ko...@tresata.com wrote: note that for a cached rdd in the spark shell it all works fine. but something is going wrong with the spark-shell in our applications that extensively cache and re-use RDDs On Tue, Apr 8, 2014 at 12:33 PM, Koert Kuipers ko...@tresata.com wrote: i tried again with latest master, which includes commit below, but ui page still shows nothing on storage tab. koert commit ada310a9d3d5419e101b24d9b41398f609da1ad3 Author: Andrew Or andrewo...@gmail.com Date: Mon Mar 31 23:01:14 2014 -0700 [Hot Fix #42] Persisted RDD disappears on storage page if re-used If a previously persisted RDD is re-used, its information disappears from the Storage page. This is because the tasks associated with re-using the RDD do not report the RDD's blocks as updated (which is correct). On stage submit, however, we overwrite any existing Author: Andrew Or andrewo...@gmail.com Closes #281 from andrewor14/ui-storage-fix and squashes the following commits: 408585a [Andrew Or] Fix storage UI bug On Mon, Apr 7, 2014 at 4:21 PM, Koert Kuipers ko...@tresata.com wrote: got it thanks On Mon, Apr 7, 2014 at 4:08 PM, Xiangrui Meng men...@gmail.com wrote: This is fixed in https://github.com/apache/spark/pull/281. Please try again with the latest master. -Xiangrui On Mon, Apr 7, 2014 at 1:06 PM, Koert Kuipers ko...@tresata.com wrote: i noticed that for spark 1.0.0-SNAPSHOT which i checked out a few days ago (apr 5) that the application detail ui no longer shows any RDDs on the storage tab, despite the fact that they are definitely cached. i am running spark in standalone mode.
Re: Error when compiling spark in IDEA and best practice to use IDE?
After sbt/sbt gen-diea, do not import as an SBT project but choose open project and point it to the spark folder. -Xiangrui On Tue, Apr 8, 2014 at 10:45 PM, Sean Owen so...@cloudera.com wrote: I let IntelliJ read the Maven build directly and that works fine. -- Sean Owen | Director, Data Science | London On Wed, Apr 9, 2014 at 6:14 AM, Dong Mo monted...@gmail.com wrote: Dear list, SBT compiles fine, but when I do the following: sbt/sbt gen-idea import project as SBT project to IDEA 13.1 Make Project and these errors show up: Error:(28, 8) object FileContext is not a member of package org.apache.hadoop.fs import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil} ^ Error:(31, 8) object Master is not a member of package org.apache.hadoop.mapred import org.apache.hadoop.mapred.Master ^ Error:(34, 26) object yarn is not a member of package org.apache.hadoop import org.apache.hadoop.yarn.api._ ^ Error:(35, 26) object yarn is not a member of package org.apache.hadoop import org.apache.hadoop.yarn.api.ApplicationConstants.Environment ^ Error:(36, 26) object yarn is not a member of package org.apache.hadoop import org.apache.hadoop.yarn.api.protocolrecords._ ^ Error:(37, 26) object yarn is not a member of package org.apache.hadoop import org.apache.hadoop.yarn.api.records._ ^ Error:(38, 26) object yarn is not a member of package org.apache.hadoop import org.apache.hadoop.yarn.client.YarnClientImpl ^ Error:(39, 26) object yarn is not a member of package org.apache.hadoop import org.apache.hadoop.yarn.conf.YarnConfiguration ^ Error:(40, 26) object yarn is not a member of package org.apache.hadoop import org.apache.hadoop.yarn.ipc.YarnRPC ^ Error:(41, 26) object yarn is not a member of package org.apache.hadoop import org.apache.hadoop.yarn.util.{Apps, Records} ^ Error:(49, 11) not found: type YarnClientImpl extends YarnClientImpl with Logging { ^ Error:(48, 20) not found: type ClientArguments class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf) ^ Error:(51, 18) not found: type ClientArguments def this(args: ClientArguments, sparkConf: SparkConf) = ^ Error:(54, 18) not found: type ClientArguments def this(args: ClientArguments) = this(args, new SparkConf()) ^ Error:(56, 12) not found: type YarnRPC var rpc: YarnRPC = YarnRPC.create(conf) ^ Error:(56, 22) not found: value YarnRPC var rpc: YarnRPC = YarnRPC.create(conf) ^ Error:(57, 17) not found: type YarnConfiguration val yarnConf: YarnConfiguration = new YarnConfiguration(conf) ^ Error:(57, 41) not found: type YarnConfiguration val yarnConf: YarnConfiguration = new YarnConfiguration(conf) ^ Error:(58, 59) value getCredentials is not a member of org.apache.hadoop.security.UserGroupInformation val credentials = UserGroupInformation.getCurrentUser().getCredentials() ^ Error:(60, 34) not found: type ClientDistributedCacheManager private val distCacheMgr = new ClientDistributedCacheManager() ^ Error:(72, 5) not found: value init init(yarnConf) ^ Error:(73, 5) not found: value start start() ^ Error:(76, 24) value getNewApplication is not a member of org.apache.spark.Logging val newApp = super.getNewApplication() ^ Error:(137, 35) not found: type GetNewApplicationResponse def verifyClusterResources(app: GetNewApplicationResponse) = { ^ Error:(156, 65) not found: type ApplicationSubmissionContext def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = { ^ Error:(156, 49) not found: type ApplicationId def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = { ^ Error:(118, 31) not found: type ApplicationId def getAppStagingDir(appId: ApplicationId): String = { ^ Error:(224, 69) not found: type LocalResource def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = { ^ Error:(307, 39) not found: type LocalResource localResources: HashMap[String, LocalResource], ^ Error:(343, 38) not found: type ContainerLaunchContext env: HashMap[String, String]): ContainerLaunchContext = { ^
Re: SVD under spark/mllib/linalg
It was moved to mllib.linalg.distributed.RowMatrix. With RowMatrix, you can compute column summary statistics, gram matrix, covariance, SVD, and PCA. We will provide multiplication for distributed matrices, but not in v1.0. -Xiangrui On Fri, Apr 11, 2014 at 9:12 PM, wxhsdp wxh...@gmail.com wrote: Hi, all the code under https://github.com/apache/spark/tree/master/mllib/src/main/scala/org/apache/spark/mllib/linalg has changed. previous matrix classes are all removed, like MatrixEntry, MatrixSVD. Instead breeze matrix definition appears. Do we move to Breeze Linear Algebra when do linear algorithm? another question, are there any matrix multiplication optimized codes in spark? i only see the outer product method in the removed SVD.scala // Compute A^T A, assuming rows are sparse enough to fit in memory val rows = data.map(entry = (entry.i, (entry.j, entry.mval))).groupByKey() val emits = rows.flatMap{ case (rowind, cols) = cols.flatMap{ case (colind1, mval1) = cols.map{ case (colind2, mval2) = ((colind1, colind2), mval1*mval2) } }//colind1: col index, colind2: row index }.reduceByKey(_ + _) thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SVD-under-spark-mllib-linalg-tp4156.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: checkpointing without streaming?
Checkpoint clears dependencies. You might need checkpoint to cut a long lineage in iterative algorithms. -Xiangrui On Mon, Apr 21, 2014 at 11:34 AM, Diana Carroll dcarr...@cloudera.com wrote: I'm trying to understand when I would want to checkpoint an RDD rather than just persist to disk. Every reference I can find to checkpoint related to Spark Streaming. But the method is defined in the core Spark library, not Streaming. Does it exist solely for streaming, or are there circumstances unrelated to streaming in which I might want to checkpoint...and if so, like what? Thanks, Diana
Re: skip lines in spark
If the first partition doesn't have enough records, then it may not drop enough lines. Try rddData.zipWithIndex().filter(_._2 = 10L).map(_._1) It might trigger a job. Best, Xiangrui On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai dbt...@stanford.edu wrote: Hi Chengi, If you just want to skip first n lines in RDD, you can do rddData.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String]) = { if (partitionIdx == 0) { lines.drop(n) } lines } Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, What is the easiest way to skip first n lines in rdd?? I am not able to figure this one out? Thanks
Re: Spark hangs when i call parallelize + count on a ArrayListbyte[] having 40k elements
How big is each entry, and how much memory do you have on each executor? You generated all data on driver and sc.parallelize(bytesList) will send the entire dataset to a single executor. You may run into I/O or memory issues. If the entries are generated, you should create a simple RDD sc.parallelize(0 until 20, 20) and call mapPartitions to generate them in parallel. -Xiangrui On Wed, Apr 23, 2014 at 9:23 AM, amit karmakar amit.codenam...@gmail.com wrote: Spark hangs after i perform the following operations ArrayListbyte[] bytesList = new ArrayListbyte[](); /* add 40k entries to bytesList */ JavaRDDbyte[] rdd = sparkContext.parallelize(bytesList); System.out.println(Count= + rdd.count()); If i add just one entry it works. It works if i modify, JavaRDDbyte[] rdd = sparkContext.parallelize(bytesList) to JavaRDDbyte[] rdd = sparkContext.parallelize(bytesList, 20); There is nothing in the logs that can help understand the reason. What could be reason for this ? Regards, Amit Kumar Karmakar
Re: skip lines in spark
Sorry, I didn't realize that zipWithIndex() is not in v0.9.1. It is in the master branch and will be included in v1.0. It first counts number of records per partition and then assigns indices starting from 0. -Xiangrui On Wed, Apr 23, 2014 at 9:56 AM, Chengi Liu chengi.liu...@gmail.com wrote: Also, zipWithIndex() is not valid.. Did you meant zipParititions? On Wed, Apr 23, 2014 at 9:55 AM, Chengi Liu chengi.liu...@gmail.com wrote: Xiangrui, So, is it that full code suggestion is : val trigger = rddData.zipWithIndex().filter( _._2 = 10L).map(_._1) and then what DB Tsai recommended trigger.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String]) = { if (partitionIdx == 0) { lines.drop(n) } lines }) Is that the full operation.. What happens, if I have to drop so many records that the number exceeds partition 0.. ?? How do i handle that case? On Wed, Apr 23, 2014 at 9:51 AM, Xiangrui Meng men...@gmail.com wrote: If the first partition doesn't have enough records, then it may not drop enough lines. Try rddData.zipWithIndex().filter(_._2 = 10L).map(_._1) It might trigger a job. Best, Xiangrui On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai dbt...@stanford.edu wrote: Hi Chengi, If you just want to skip first n lines in RDD, you can do rddData.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String]) = { if (partitionIdx == 0) { lines.drop(n) } lines } Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu chengi.liu...@gmail.com wrote: Hi, What is the easiest way to skip first n lines in rdd?? I am not able to figure this one out? Thanks
Re: Hadoop—streaming
PipedRDD is an RDD[String]. If you know how to parse each result line into (key, value) pairs, then you can call reduce after. piped.map(x = (key, value)).reduceByKey((v1, v2) = v) -Xiangrui On Wed, Apr 23, 2014 at 2:09 AM, zhxfl 291221...@qq.com wrote: Hello,we know Hadoop-streaming is use for Hadoop to run native program. Hadoop-streaming supports Map and Reduce logic. Reduce logic means Hadoop collect all values with same key and give the stream for the native application. Spark has PipeRDD too, but PipeRDD doesn't support Reduce logic. So it's difficulty for us to transplant our application from Hadoop to Spark. Anyone can give me advise, thanks!
Re: Spark mllib throwing error
Could you share the command you used and more of the error message? Also, is it an MLlib specific problem? -Xiangrui On Thu, Apr 24, 2014 at 11:49 AM, John King usedforprinting...@gmail.com wrote: ./spark-shell: line 153: 17654 Killed $FWDIR/bin/spark-class org.apache.spark.repl.Main $@ Any ideas?
Re: Trying to use pyspark mllib NaiveBayes
Is your Spark cluster running? Try to start with generating simple RDDs and counting. -Xiangrui On Thu, Apr 24, 2014 at 11:38 AM, John King usedforprinting...@gmail.com wrote: I receive this error: Traceback (most recent call last): File stdin, line 1, in module File /home/ubuntu/spark-1.0.0-rc2/python/pyspark/mllib/classification.py, line 178, in train ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_) File /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 535, in __call__ File /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 368, in send_command File /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 361, in send_command File /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 317, in _get_connection File /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 324, in _create_connection File /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 431, in start py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server
Re: Trying to use pyspark mllib NaiveBayes
I tried locally with the example described in the latest guide: http://54.82.157.211:4000/mllib-naive-bayes.html , and it worked fine. Do you mind sharing the code you used? -Xiangrui On Thu, Apr 24, 2014 at 1:57 PM, John King usedforprinting...@gmail.com wrote: Yes, I got it running for large RDD (~7 million lines) and mapping. Just received this error when trying to classify. On Thu, Apr 24, 2014 at 4:32 PM, Xiangrui Meng men...@gmail.com wrote: Is your Spark cluster running? Try to start with generating simple RDDs and counting. -Xiangrui On Thu, Apr 24, 2014 at 11:38 AM, John King usedforprinting...@gmail.com wrote: I receive this error: Traceback (most recent call last): File stdin, line 1, in module File /home/ubuntu/spark-1.0.0-rc2/python/pyspark/mllib/classification.py, line 178, in train ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_) File /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 535, in __call__ File /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 368, in send_command File /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 361, in send_command File /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 317, in _get_connection File /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 324, in _create_connection File /home/ubuntu/spark-1.0.0-rc2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 431, in start py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server
Re: Spark mllib throwing error
Do you mind sharing more code and error messages? The information you provided is too little to identify the problem. -Xiangrui On Thu, Apr 24, 2014 at 1:55 PM, John King usedforprinting...@gmail.com wrote: Last command was: val model = new NaiveBayes().run(points) On Thu, Apr 24, 2014 at 4:27 PM, Xiangrui Meng men...@gmail.com wrote: Could you share the command you used and more of the error message? Also, is it an MLlib specific problem? -Xiangrui On Thu, Apr 24, 2014 at 11:49 AM, John King usedforprinting...@gmail.com wrote: ./spark-shell: line 153: 17654 Killed $FWDIR/bin/spark-class org.apache.spark.repl.Main $@ Any ideas?
Re: Spark mllib throwing error
I don't see anything wrong with your code. Could you do points.count() to see how many training examples you have? Also, make sure you don't have negative feature values. The error message you sent did not say NaiveBayes went wrong, but the Spark shell was killed. -Xiangrui On Thu, Apr 24, 2014 at 4:05 PM, John King usedforprinting...@gmail.com wrote: In the other thread I had an issue with Python. In this issue, I tried switching to Scala. The code is: import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.linalg.SparseVector; import org.apache.spark.mllib.classification.NaiveBayes; import scala.collection.mutable.ArrayBuffer def isEmpty(a: String): Boolean = a != null !a.replaceAll((?m)\s+$, ).isEmpty() def parsePoint(a: String): LabeledPoint = { val values = a.split('\t') val feat = values(1).split(' ') val indices = ArrayBuffer.empty[Int] val featValues = ArrayBuffer.empty[Double] for (f - feat) { val q = f.split(':') if (q.length == 2) { indices += (q(0).toInt) featValues += (q(1).toDouble) } } val vector = new SparseVector(2357815, indices.toArray, featValues.toArray) return LabeledPoint(values(0).toDouble, vector) } val data = sc.textFile(data.txt) val empty = data.filter(isEmpty) val points = empty.map(parsePoint) points.cache() val model = new NaiveBayes().run(points) On Thu, Apr 24, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote: Do you mind sharing more code and error messages? The information you provided is too little to identify the problem. -Xiangrui On Thu, Apr 24, 2014 at 1:55 PM, John King usedforprinting...@gmail.com wrote: Last command was: val model = new NaiveBayes().run(points) On Thu, Apr 24, 2014 at 4:27 PM, Xiangrui Meng men...@gmail.com wrote: Could you share the command you used and more of the error message? Also, is it an MLlib specific problem? -Xiangrui On Thu, Apr 24, 2014 at 11:49 AM, John King usedforprinting...@gmail.com wrote: ./spark-shell: line 153: 17654 Killed $FWDIR/bin/spark-class org.apache.spark.repl.Main $@ Any ideas?
Re: Spark mllib throwing error
I only see one risk: if your feature indices are not sorted, it might have undefined behavior. Other than that, I don't see any thing suspicious. -Xiangrui On Thu, Apr 24, 2014 at 4:56 PM, John King usedforprinting...@gmail.com wrote: It just displayed this error and stopped on its own. Do the lines of code mentioned in the error have anything to do with it? On Thu, Apr 24, 2014 at 7:54 PM, Xiangrui Meng men...@gmail.com wrote: I don't see anything wrong with your code. Could you do points.count() to see how many training examples you have? Also, make sure you don't have negative feature values. The error message you sent did not say NaiveBayes went wrong, but the Spark shell was killed. -Xiangrui On Thu, Apr 24, 2014 at 4:05 PM, John King usedforprinting...@gmail.com wrote: In the other thread I had an issue with Python. In this issue, I tried switching to Scala. The code is: import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.linalg.SparseVector; import org.apache.spark.mllib.classification.NaiveBayes; import scala.collection.mutable.ArrayBuffer def isEmpty(a: String): Boolean = a != null !a.replaceAll((?m)\s+$, ).isEmpty() def parsePoint(a: String): LabeledPoint = { val values = a.split('\t') val feat = values(1).split(' ') val indices = ArrayBuffer.empty[Int] val featValues = ArrayBuffer.empty[Double] for (f - feat) { val q = f.split(':') if (q.length == 2) { indices += (q(0).toInt) featValues += (q(1).toDouble) } } val vector = new SparseVector(2357815, indices.toArray, featValues.toArray) return LabeledPoint(values(0).toDouble, vector) } val data = sc.textFile(data.txt) val empty = data.filter(isEmpty) val points = empty.map(parsePoint) points.cache() val model = new NaiveBayes().run(points) On Thu, Apr 24, 2014 at 6:57 PM, Xiangrui Meng men...@gmail.com wrote: Do you mind sharing more code and error messages? The information you provided is too little to identify the problem. -Xiangrui On Thu, Apr 24, 2014 at 1:55 PM, John King usedforprinting...@gmail.com wrote: Last command was: val model = new NaiveBayes().run(points) On Thu, Apr 24, 2014 at 4:27 PM, Xiangrui Meng men...@gmail.com wrote: Could you share the command you used and more of the error message? Also, is it an MLlib specific problem? -Xiangrui On Thu, Apr 24, 2014 at 11:49 AM, John King usedforprinting...@gmail.com wrote: ./spark-shell: line 153: 17654 Killed $FWDIR/bin/spark-class org.apache.spark.repl.Main $@ Any ideas?
Re: Running out of memory Naive Bayes
How many labels does your dataset have? -Xiangrui On Sat, Apr 26, 2014 at 6:03 PM, DB Tsai dbt...@stanford.edu wrote: Which version of mllib are you using? For Spark 1.0, mllib will support sparse feature vector which will improve performance a lot when computing the distance between points and centroid. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sat, Apr 26, 2014 at 5:49 AM, John King usedforprinting...@gmail.com wrote: I'm just wondering are the SparkVector calculations really taking into account the sparsity or just converting to dense? On Fri, Apr 25, 2014 at 10:06 PM, John King usedforprinting...@gmail.com wrote: I've been trying to use the Naive Bayes classifier. Each example in the dataset is about 2 million features, only about 20-50 of which are non-zero, so the vectors are very sparse. I keep running out of memory though, even for about 1000 examples on 30gb RAM while the entire dataset is 4 million examples. And I would also like to note that I'm using the sparse vector class.
Re: running SparkALS
Hi Diana, SparkALS is an example implementation of ALS. It doesn't call the ALS algorithm implemented in MLlib. M, U, and F are used to generate synthetic data. I'm updating the examples. In the meantime, you can take a look at the updated MLlib guide: http://50.17.120.186:4000/mllib-collaborative-filtering.html and try the example code there. Thanks, Xiangrui On Mon, Apr 28, 2014 at 10:30 AM, Diana Carroll dcarr...@cloudera.com wrote: Hi everyone. I'm trying to run some of the Spark example code, and most of it appears to be undocumented (unless I'm missing something). Can someone help me out? I'm particularly interested in running SparkALS, which wants parameters: M U F iter slices What are these variables? They appear to be integers and the default values are 100, 500 and 10 respectively but beyond that...huh? Thanks! Diana
Re: Turn BLAS on MacOSX
Those are warning messages instead of errors. You need to add netlib-java:all to use native BLAS/LAPACK. But it won't work if you include netlib-java:all in an assembly jar. It has to be a separate jar when you submit your job. For SGD, we only use level-1 BLAS, so I don't think native code is called. -Xiangrui On Sun, May 11, 2014 at 9:32 AM, DB Tsai dbt...@stanford.edu wrote: Hi Debasish, In https://github.com/apache/spark/blob/master/docs/mllib-guide.md Dependencies section, the document talks about the native blas dependencies issue. For netlib which breeze uses internally, if the native library isn't found, the jblas implementation will be used. Here is more detail about how to install native library in different platform. https://github.com/fommil/netlib-java/blob/master/README.md#machine-optimised-system-libraries Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, May 7, 2014 at 10:52 AM, Debasish Das debasish.da...@gmail.com wrote: Hi, How do I load native BLAS libraries on Mac ? I am getting the following errors while running LR and SVM with SGD: 14/05/07 10:48:13 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/05/07 10:48:13 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS centos it was fine...but on mac I am getting these warnings.. Also when it fails to run native blas does it use java code for BLAS operations ? May be after addition of breeze, we should add these details on a page as well so that users are aware of it before they report any performance results.. Thanks. Deb
Re: Accuracy in mllib BinaryClassificationMetrics
Hi Deb, feel free to add accuracy along with precision and recall. -Xiangrui On Mon, May 12, 2014 at 1:26 PM, Debasish Das debasish.da...@gmail.com wrote: Hi, I see precision and recall but no accuracy in mllib.evaluation.binary. Is it already under development or it needs to be added ? Thanks. Deb
Re: Reading from .bz2 files with Spark
Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes the problem you described, but it does contain several fixes to bzip2 format. -Xiangrui On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote: Hi all, Is anyone reading and writing to .bz2 files stored in HDFS from Spark with success? I'm finding the following results on a recent commit (756c96 from 24hr ago) and CDH 4.4.0: Works: val r = sc.textFile(/user/aa/myfile.bz2).count Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) = s+| ).count Specifically, I'm getting an exception coming out of the bzip2 libraries (see below stacktraces), which is unusual because I'm able to read from that file without an issue using the same libraries via Pig. It was originally created from Pig as well. Digging a little deeper I found this line in the .bz2 decompressor's javadoc for CBZip2InputStream: Instances of this class are not threadsafe. [source] My current working theory is that Spark has a much higher level of parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds exceptions much more frequently (as in can't finish a run over a little 2M row file) vs hardly at all in other libraries. The only other reference I could find to the issue was in presto-users, but the recommendation to leave .bz2 for .lzo doesn't help if I actually do want the higher compression levels of .bz2. Would love to hear if I have some kind of configuration issue or if there's a bug in .bz2 that's fixed in later versions of CDH, or generally any other thoughts on the issue. Thanks! Andrew Below are examples of some exceptions I'm getting: 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 65535 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) java.lang.ArrayIndexOutOfBoundsException: 90 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
Re: Distribute jar dependencies via sc.AddJar(fileName)
I don't know whether this would fix the problem. In v0.9, you need `yarn-standalone` instead of `yarn-cluster`. See https://github.com/apache/spark/commit/328c73d037c17440c2a91a6c88b4258fbefa0c08 On Tue, May 13, 2014 at 11:36 PM, Xiangrui Meng men...@gmail.com wrote: Does v0.9 support yarn-cluster mode? I checked SparkContext.scala in v0.9.1 and didn't see special handling of `yarn-cluster`. -Xiangrui On Mon, May 12, 2014 at 11:14 AM, DB Tsai dbt...@stanford.edu wrote: We're deploying Spark in yarn-cluster mode (Spark 0.9), and we add jar dependencies in command line with --addJars option. However, those external jars are only available in the driver (application running in hadoop), and not available in the executors (workers). After doing some research, we realize that we've to push those jars to executors in driver via sc.AddJar(fileName). Although in the driver's log (see the following), the jar is successfully added in the http server in the driver, and I confirm that it's downloadable from any machine in the network, I still get `java.lang.NoClassDefFoundError` in the executors. 14/05/09 14:51:41 INFO spark.SparkContext: Added JAR analyticshadoop-eba5cdce1.jar at http://10.0.0.56:42522/jars/analyticshadoop-eba5cdce1.jar with timestamp 1399672301568 Then I check the log in the executors, and I don't find anything `Fetching file with timestamp timestamp`, which implies something is wrong; the executors are not downloading the external jars. Any suggestion what we can look at? After digging into how spark distributes external jars, I wonder the scalability of this approach. What if there are thousands of nodes downloading the jar from single http server in the driver? Why don't we push the jars into HDFS distributed cache by default instead of distributing them via http server? Thanks. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai
Re: spark on yarn-standalone, throws StackOverflowError and fails somtimes and succeed for the rest
Could you try `println(result.toDebugString())` right after `val result = ...` and attach the result? -Xiangrui On Fri, May 9, 2014 at 8:20 AM, phoenix bai mingzhi...@gmail.com wrote: after a couple of tests, I find that, if I use: val result = model.predict(prdctpairs) result.map(x = x.user+,+x.product+,+x.rating).saveAsTextFile(output) it always fails with above error and the exception seems iterative. but if I do: val result = model.predict(prdctpairs) result.cach() result.map(x = x.user+,+x.product+,+x.rating).saveAsTextFile(output) it succeeds. could anyone help explain why the cach() is necessary? thanks On Fri, May 9, 2014 at 6:45 PM, phoenix bai mingzhi...@gmail.com wrote: Hi all, My spark code is running on yarn-standalone. the last three lines of the code as below, val result = model.predict(prdctpairs) result.map(x = x.user+,+x.product+,+x.rating).saveAsTextFile(output) sc.stop() the same code, sometimes be able to run successfully and could give out the right result, while from time to time, it throws StackOverflowError and fail. and I don`t have a clue how I should debug. below is the error, (the start and end portion to be exact): 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17] MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 44 to sp...@rxx43.mc10.site.net:43885 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17] MapOutputTrackerMaster: Size of output statuses for shuffle 44 is 148 bytes 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-35] MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 45 to sp...@rxx43.mc10.site.net:43885 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-35] MapOutputTrackerMaster: Size of output statuses for shuffle 45 is 453 bytes 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-20] MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 44 to sp...@rxx43.mc10.site.net:56767 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29] MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 45 to sp...@rxx43.mc10.site.net:56767 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29] MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 44 to sp...@rxx43.mc10.site.net:49879 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-29] MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 45 to sp...@rxx43.mc10.site.net:49879 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17] TaskSetManager: Starting task 946.0:17 as TID 146 on executor 6: rx15.mc10.site.net (PROCESS_LOCAL) 14-05-09 17:55:51 INFO [spark-akka.actor.default-dispatcher-17] TaskSetManager: Serialized task 946.0:17 as 6414 bytes in 0 ms 14-05-09 17:55:51 WARN [Result resolver thread-0] TaskSetManager: Lost TID 133 (task 946.0:4) 14-05-09 17:55:51 WARN [Result resolver thread-0] TaskSetManager: Loss was due to java.lang.StackOverflowError java.lang.StackOverflowError at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631) at java.lang.ClassLoader.defineClass(ClassLoader.java:615) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) at java.net.URLClassLoader.defineClass(URLClassLoader.java:283) at java.net.URLClassLoader.access$000(URLClassLoader.java:58) at java.net.URLClassLoader$1.run(URLClassLoader.java:197) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631) at java.lang.ClassLoader.defineClass(ClassLoader.java:615) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:969) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at
Re: How to run the SVM and LogisticRegression
If you check out the master branch, there are some examples that can be used as templates under examples/src/main/scala/org/apache/spark/examples/mllib Best, Xiangrui On Wed, May 14, 2014 at 1:36 PM, yxzhao yxz...@ualr.edu wrote: Hello, I found the classfication algorithms SVM and LogisticRegression implemented in the following directory. And how to run them? What is the commnad line should be? Thanks. spark-0.9.0-incubating/mllib/src/main/scala/org/apache/spark/mllib/classification -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-the-SVM-and-LogisticRegression-tp5720.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Reading from .bz2 files with Spark
Hi Andrew, Could you try varying the minPartitions parameter? For example: val r = sc.textFile(/user/aa/myfile.bz2, 4).count val r = sc.textFile(/user/aa/myfile.bz2, 8).count Best, Xiangrui On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote: Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes the problem you described, but it does contain several fixes to bzip2 format. -Xiangrui On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote: Hi all, Is anyone reading and writing to .bz2 files stored in HDFS from Spark with success? I'm finding the following results on a recent commit (756c96 from 24hr ago) and CDH 4.4.0: Works: val r = sc.textFile(/user/aa/myfile.bz2).count Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) = s+| ).count Specifically, I'm getting an exception coming out of the bzip2 libraries (see below stacktraces), which is unusual because I'm able to read from that file without an issue using the same libraries via Pig. It was originally created from Pig as well. Digging a little deeper I found this line in the .bz2 decompressor's javadoc for CBZip2InputStream: Instances of this class are not threadsafe. [source] My current working theory is that Spark has a much higher level of parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds exceptions much more frequently (as in can't finish a run over a little 2M row file) vs hardly at all in other libraries. The only other reference I could find to the issue was in presto-users, but the recommendation to leave .bz2 for .lzo doesn't help if I actually do want the higher compression levels of .bz2. Would love to hear if I have some kind of configuration issue or if there's a bug in .bz2 that's fixed in later versions of CDH, or generally any other thoughts on the issue. Thanks! Andrew Below are examples of some exceptions I'm getting: 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 65535 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) java.lang.ArrayIndexOutOfBoundsException: 90 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327
Re: Reading from .bz2 files with Spark
Hi Andrew, I verified that this is due to thread safety. I changed SPARK_WORKER_CORES to 1 in spark-env.sh, so there is only 1 thread per worker. Then I can load the file without any problem with different values of minPartitions. I will submit a JIRA to both Spark and Hadoop. Best, Xiangrui On Thu, May 15, 2014 at 3:48 PM, Xiangrui Meng men...@gmail.com wrote: Hi Andrew, Could you try varying the minPartitions parameter? For example: val r = sc.textFile(/user/aa/myfile.bz2, 4).count val r = sc.textFile(/user/aa/myfile.bz2, 8).count Best, Xiangrui On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote: Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes the problem you described, but it does contain several fixes to bzip2 format. -Xiangrui On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote: Hi all, Is anyone reading and writing to .bz2 files stored in HDFS from Spark with success? I'm finding the following results on a recent commit (756c96 from 24hr ago) and CDH 4.4.0: Works: val r = sc.textFile(/user/aa/myfile.bz2).count Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) = s+| ).count Specifically, I'm getting an exception coming out of the bzip2 libraries (see below stacktraces), which is unusual because I'm able to read from that file without an issue using the same libraries via Pig. It was originally created from Pig as well. Digging a little deeper I found this line in the .bz2 decompressor's javadoc for CBZip2InputStream: Instances of this class are not threadsafe. [source] My current working theory is that Spark has a much higher level of parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds exceptions much more frequently (as in can't finish a run over a little 2M row file) vs hardly at all in other libraries. The only other reference I could find to the issue was in presto-users, but the recommendation to leave .bz2 for .lzo doesn't help if I actually do want the higher compression levels of .bz2. Would love to hear if I have some kind of configuration issue or if there's a bug in .bz2 that's fixed in later versions of CDH, or generally any other thoughts on the issue. Thanks! Andrew Below are examples of some exceptions I'm getting: 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 65535 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) java.lang.ArrayIndexOutOfBoundsException: 90 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181
Re: Reading from .bz2 files with Spark
Hi Andrew, This is the JIRA I created: https://issues.apache.org/jira/browse/MAPREDUCE-5893 . Hopefully someone wants to work on it. Best, Xiangrui On Fri, May 16, 2014 at 6:47 PM, Xiangrui Meng men...@gmail.com wrote: Hi Andre, I could reproduce the bug with Hadoop 2.2.0. Some older version of Hadoop do not support splittable compression, so you ended up with sequential reads. It is easy to reproduce the bug with the following setup: 1) Workers are configured with multiple cores. 2) BZip2 files are big enough or minPartitions is large enough when you load the file via sc.textFile(), so that one worker has more than one tasks. Best, Xiangrui On Fri, May 16, 2014 at 4:06 PM, Andrew Ash and...@andrewash.com wrote: Hi Xiangrui, // FYI I'm getting your emails late due to the Apache mailing list outage I'm using CDH4.4.0, which I think uses the MapReduce v2 API. The .jars are named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar I'm also glad you were able to reproduce! Please paste a link to the Hadoop bug you file so I can follow along. Thanks! Andrew On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote: Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes the problem you described, but it does contain several fixes to bzip2 format. -Xiangrui On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote: Hi all, Is anyone reading and writing to .bz2 files stored in HDFS from Spark with success? I'm finding the following results on a recent commit (756c96 from 24hr ago) and CDH 4.4.0: Works: val r = sc.textFile(/user/aa/myfile.bz2).count Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) = s+| ).count Specifically, I'm getting an exception coming out of the bzip2 libraries (see below stacktraces), which is unusual because I'm able to read from that file without an issue using the same libraries via Pig. It was originally created from Pig as well. Digging a little deeper I found this line in the .bz2 decompressor's javadoc for CBZip2InputStream: Instances of this class are not threadsafe. [source] My current working theory is that Spark has a much higher level of parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds exceptions much more frequently (as in can't finish a run over a little 2M row file) vs hardly at all in other libraries. The only other reference I could find to the issue was in presto-users, but the recommendation to leave .bz2 for .lzo doesn't help if I actually do want the higher compression levels of .bz2. Would love to hear if I have some kind of configuration issue or if there's a bug in .bz2 that's fixed in later versions of CDH, or generally any other thoughts on the issue. Thanks! Andrew Below are examples of some exceptions I'm getting: 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 65535 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) java.lang.ArrayIndexOutOfBoundsException: 90 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397
Re: Reading from .bz2 files with Spark
Hi Andre, I could reproduce the bug with Hadoop 2.2.0. Some older version of Hadoop do not support splittable compression, so you ended up with sequential reads. It is easy to reproduce the bug with the following setup: 1) Workers are configured with multiple cores. 2) BZip2 files are big enough or minPartitions is large enough when you load the file via sc.textFile(), so that one worker has more than one tasks. Best, Xiangrui On Fri, May 16, 2014 at 4:06 PM, Andrew Ash and...@andrewash.com wrote: Hi Xiangrui, // FYI I'm getting your emails late due to the Apache mailing list outage I'm using CDH4.4.0, which I think uses the MapReduce v2 API. The .jars are named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar I'm also glad you were able to reproduce! Please paste a link to the Hadoop bug you file so I can follow along. Thanks! Andrew On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote: Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes the problem you described, but it does contain several fixes to bzip2 format. -Xiangrui On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote: Hi all, Is anyone reading and writing to .bz2 files stored in HDFS from Spark with success? I'm finding the following results on a recent commit (756c96 from 24hr ago) and CDH 4.4.0: Works: val r = sc.textFile(/user/aa/myfile.bz2).count Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) = s+| ).count Specifically, I'm getting an exception coming out of the bzip2 libraries (see below stacktraces), which is unusual because I'm able to read from that file without an issue using the same libraries via Pig. It was originally created from Pig as well. Digging a little deeper I found this line in the .bz2 decompressor's javadoc for CBZip2InputStream: Instances of this class are not threadsafe. [source] My current working theory is that Spark has a much higher level of parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds exceptions much more frequently (as in can't finish a run over a little 2M row file) vs hardly at all in other libraries. The only other reference I could find to the issue was in presto-users, but the recommendation to leave .bz2 for .lzo doesn't help if I actually do want the higher compression levels of .bz2. Would love to hear if I have some kind of configuration issue or if there's a bug in .bz2 that's fixed in later versions of CDH, or generally any other thoughts on the issue. Thanks! Andrew Below are examples of some exceptions I'm getting: 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 65535 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) java.lang.ArrayIndexOutOfBoundsException: 90 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209
Re: breeze DGEMM slow in spark
You need to include breeze-natives or netlib:all to load the native libraries. Check the log messages to ensure native libraries are used, especially on the worker nodes. The easiest way to use OpenBLAS is copying the shared library to /usr/lib/libblas.so.3 and /usr/lib/liblapack.so.3. -Xiangrui On Sat, May 17, 2014 at 8:02 PM, wxhsdp wxh...@gmail.com wrote: i think maybe it's related to m1.large, because i also tested on my laptop, the two case cost nearly the same amount of time. my laptop: model name : Intel(R) Core(TM) i5-3380M CPU @ 2.90GHz cpu MHz : 2893.549 os: Linux ubuntu 3.11.0-12-generic #19-Ubuntu SMP Wed Oct 9 16:20:46 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/breeze-DGEMM-slow-in-spark-tp5950p5971.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: reading large XML files
Try sc.wholeTextFiles(). It reads the entire file into a string record. -Xiangrui On Tue, May 20, 2014 at 8:25 AM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: We are trying to read some large GraphML files to use in spark. Is there an easy way to read XML-based files like this that accounts for partition boundaries and the like? Thanks, Nathan -- Nathan Kronenfeld Senior Visualization Developer Oculus Info Inc 2 Berkeley Street, Suite 600, Toronto, Ontario M5A 4J5 Phone: +1-416-203-3003 x 238 Email: nkronenf...@oculusinfo.com
Re: Job Processing Large Data Set Got Stuck
Many OutOfMemoryErrors in the log. Is your data distributed evenly? -Xiangrui On Wed, May 21, 2014 at 11:23 AM, yxzhao yxz...@ualr.edu wrote: I run the pagerank example processing a large data set, 5GB in size, using 48 machines. The job got stuck at the time point: 14/05/20 21:32:17, as the attached log shows. It was stuck there for more than 10 hours and then I killed it at last. But I did not find any information explaining why it was stuck. Any suggestions? Thanks. Spark_OK_48_pagerank.log http://apache-spark-user-list.1001560.n3.nabble.com/file/n6185/Spark_OK_48_pagerank.log -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Job-Processing-Large-Data-Set-Got-Stuck-tp6185.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Job Processing Large Data Set Got Stuck
If the RDD is cached, you can check its storage information in the Storage tab of the Web UI. On Wed, May 21, 2014 at 12:31 PM, yxzhao yxz...@ualr.edu wrote: Thanks Xiangrui, How to check and make sure the data is distributed evenly? Thanks again. On Wed, May 21, 2014 at 2:17 PM, Xiangrui Meng [via Apache Spark User List] [hidden email] wrote: Many OutOfMemoryErrors in the log. Is your data distributed evenly? -Xiangrui On Wed, May 21, 2014 at 11:23 AM, yxzhao [hidden email] wrote: I run the pagerank example processing a large data set, 5GB in size, using 48 machines. The job got stuck at the time point: 14/05/20 21:32:17, as the attached log shows. It was stuck there for more than 10 hours and then I killed it at last. But I did not find any information explaining why it was stuck. Any suggestions? Thanks. Spark_OK_48_pagerank.log http://apache-spark-user-list.1001560.n3.nabble.com/file/n6185/Spark_OK_48_pagerank.log -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Job-Processing-Large-Data-Set-Got-Stuck-tp6185.html Sent from the Apache Spark User List mailing list archive at Nabble.com. If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Job-Processing-Large-Data-Set-Got-Stuck-tp6185p6187.html To unsubscribe from Job Processing Large Data Set Got Stuck, click here. NAML View this message in context: Re: Job Processing Large Data Set Got Stuck Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Inconsistent RDD Sample size
It doesn't guarantee the exact sample size. If you fix the random seed, it would return the same result every time. -Xiangrui On Wed, May 21, 2014 at 2:05 PM, glxc r.ryan.mcc...@gmail.com wrote: I have a graph and am trying to take a random sample of vertices without replacement, using the RDD.sample() method verts are the vertices in the graph val verts = graph.vertices and executing this multiple times in a row verts.sample(false, 1.toDouble/v1.count.toDouble, System.currentTimeMillis).count yields different results roughly each time (albeit +/- a small % of the target) why does this happen? Looked at PartionwiseSampledRDD but can't figure it out Also, is there another method/technique to yield the same result each time? My understanding is that grabbing random indices may not be the best use of the RDD model -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Inconsistent-RDD-Sample-size-tp6197.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error while launching ec2 spark cluster with HVM (r3.large)
Was the error message the same as you posted when you used `root` as the user id? Could you try this: 1) Do not specify user id. (Default would be `root`.) 2) If it fails in the middle, try `spark-ec2 --resume launch cluster` to continue launching the cluster. Best, Xiangrui On Thu, May 22, 2014 at 12:44 PM, adparker adpar...@gmail.com wrote: I had this problem too and fixed it by setting the wait time-out to a larger value: --wait For example, in stand alone mode with default values, a time out of 480 seconds worked for me: $ cd spark-0.9.1/ec2 $ ./spark-ec2 --key-pair= --identity-file= --instance-type=r3.large --wait=480 launch -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-launching-ec2-spark-cluster-with-HVM-r3-large-tp5862p6276.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: pyspark MLlib examples don't work with Spark 1.0.0
The documentation you looked at is not official, though it is from @pwendell's website. It was for the Spark SQL release. Please find the official documentation here: http://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-support-vector-machine-svm It contains a working example showing how to construct LabeledPoint and use it for training. Best, Xiangrui On Fri, May 30, 2014 at 5:10 AM, jamborta jambo...@gmail.com wrote: thanks for the reply. I am definitely running 1.0.0, I set it up manually. To answer my question, I found out from the examples that it would need a new data type called LabeledPoint instead of numpy array. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-MLlib-examples-don-t-work-with-Spark-1-0-0-tp6546p6579.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Create/shutdown objects before/after RDD use (or: Non-serializable classes)
Hi Tobias, One hack you can try is: rdd.mapPartitions(iter = { val x = new X() iter.map(row = x.doSomethingWith(row)) ++ { x.shutdown(); Iterator.empty } }) Best, Xiangrui On Thu, May 29, 2014 at 11:38 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, I want to use an object x in my RDD processing as follows: val x = new X() rdd.map(row = x.doSomethingWith(row)) println(rdd.count()) x.shutdown() Now the problem is that X is non-serializable, so while this works locally, it does not work in cluster setup. I thought I could do rdd.mapPartitions(iter = { val x = new X() val result = iter.map(row = x.doSomethingWith(row)) x.shutdown() result }) to create an instance of X locally, but obviously x.shutdown() is called before the first row is processed. How can I specify these node-local setup/teardown functions or how do I deal in general with non-serializable classes? Thanks Tobias
Re: Using String Dataset for Logistic Regression
Yes. MLlib 1.0 supports sparse input data for linear methods. -Xiangrui On Mon, Jun 2, 2014 at 11:36 PM, praveshjain1991 praveshjain1...@gmail.com wrote: I am not sure. I have just been using some numerical datasets. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-String-Dataset-for-Logistic-Regression-tp5523p6784.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Using MLLib in Scala
Hi Suela, (Please subscribe our user mailing list and send your questions there in the future.) For your case, each file contains a column of numbers. So you can use `sc.textFile` to read them first, zip them together, and then create labeled points: val xx = sc.textFile(/path/to/ex2x.dat).map(x = Vectors.dense(_.toDouble)) val yy = sc.textFile(/path/to/ex2y.dat).map(_.toDouble) val examples = yy.zip(xx).map { case (y, x) = LabeledPoint(y, x) } Best, Xiangrui On Thu, May 29, 2014 at 2:35 AM, Suela Haxhi suelaha...@gmail.com wrote: Hello Xiangrui , my name is Suela Haxhi. Let me ask you a little help. I find some difficulty in uploading files in Mllib , namely: Binary Classification ; Linear Regression ; E.g. , the file mllib / data / sample_svm_data.txt contains the following data : 1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0 0 2.857738033247042 0 0 2.619965104088255 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0 etc .. I don't understand what are the input / output. The problem comes when I want to load another type of dataset. E.g. , I want to make a Binary Classification on the presence of a disease. For example, the estimated proffessor Andrew Ng, on courses in machine learning explains: Download ex2Data.zip, and extract the files from the zip file.The files Contain some example measurements of various heights for boys between the ages of two and eights. The y-values are the heights Measured in meters, and the x-values are the ages of the boys Corresponding to the heights. Each height and age tuples constitutes one training example $ (x ^ {(i)}, y ^ {(i)} $ in our dataset. = There are $ m $ 50 training examples, and you will use them to develop a linear regression model . In this problem, you'll Implement linear regression using gradient descent. In Matlab / Octave, you can load the training set using the commands x = load ( ' ex2x.dat ' ) ; y = load ( ' ex2y.dat ' ) ; But, in Mllib, I can't figure out what these data mean (mllib / data / sample_svm_data.txt). And I don't know how to load another type of data set using the following code: Binary Classification import org.apache.spark.SparkContext import org.apache.spark.mllib.classification.SVMWithSGD import org.apache.spark.mllib.regression.LabeledPoint / / Load and parse the data file / / Run training algorithm to build the model / / Evaluate model on training examples and compute the training error Can you help me please? Thank you in advance. Best Regards Suela Haxhi
Re: How to stop a running SparkContext in the proper way?
Did you try sc.stop()? On Tue, Jun 3, 2014 at 9:54 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi, I want to know how I can stop a running SparkContext in a proper way so that next time when I start a new SparkContext, the web UI can be launched on the same port 4040.Now when i quit the job using ctrl+z the new sc are launched in new ports. I have the same problem with ipython notebook.It is launched on a different port when I start the notebook second time after closing the first one.I am starting ipython using the command IPYTHON_OPTS=notebook --ip --pylab inline ./bin/pyspark Thanks Regards, Meethu M
Re: IllegalArgumentException on calling KMeans.train()
Could you check whether the vectors have the same size? -Xiangrui On Wed, Jun 4, 2014 at 1:43 AM, bluejoe2008 bluejoe2...@gmail.com wrote: what does this exception mean? 14/06/04 16:35:15 ERROR executor.Executor: Exception in task ID 6 java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221) at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271) at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366) at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268) at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) my spark version: 1.0.0 Java: 1.7 my codes: JavaRDDVector docVectors = generateDocVector(...); int numClusters = 20; int numIterations = 20; KMeansModel clusters = KMeans.train(docVectors.rdd(), numClusters, numIterations); another strange thing is that the mapPartitionsWithIndex() method call in generateDocVector() are invoked for 3 times... 2014-06-04 bluejoe2008
Re: Logistic Regression MLLib Slow
80M by 4 should be about 2.5GB uncompressed. 10 iterations shouldn't take that long, even on a single executor. Besides what Matei suggested, could you also verify the executor memory in http://localhost:4040 in the Executors tab. It is very likely the executors do not have enough memory. In that case, caching may be slower than reading directly from disk. -Xiangrui On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Ah, is the file gzipped by any chance? We can’t decompress gzipped files in parallel so they get processed by a single task. It may also be worth looking at the application UI (http://localhost:4040) to see 1) whether all the data fits in memory in the Storage tab (maybe it somehow becomes larger, though it seems unlikely that it would exceed 20 GB) and 2) how many parallel tasks run in each iteration. Matei On Jun 4, 2014, at 6:56 PM, Srikrishna S srikrishna...@gmail.com wrote: I am using the MLLib one (LogisticRegressionWithSGD) with PySpark. I am running to only 10 iterations. The MLLib version of logistic regression doesn't seem to use all the cores on my machine. Regards, Krishna On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Are you using the logistic_regression.py in examples/src/main/python or examples/src/main/python/mllib? The first one is an example of writing logistic regression by hand and won’t be as efficient as the MLlib one. I suggest trying the MLlib one. You may also want to check how many iterations it runs — by default I think it runs 100, which may be more than you need. Matei On Jun 4, 2014, at 5:47 PM, Srikrishna S srikrishna...@gmail.com wrote: Hi All., I am new to Spark and I am trying to run LogisticRegression (with SGD) using MLLib on a beefy single machine with about 128GB RAM. The dataset has about 80M rows with only 4 features so it barely occupies 2Gb on disk. I am running the code using all 8 cores with 20G memory using spark-submit --executor-memory 20G --master local[8] logistic_regression.py It seems to take about 3.5 hours without caching and over 5 hours with caching. What is the recommended use for Spark on a beefy single machine? Any suggestions will help! Regards, Krishna Code sample: - # Dataset d = sys.argv[1] data = sc.textFile(d) # Load and parse the data # -- def parsePoint(line): values = [float(x) for x in line.split(',')] return LabeledPoint(values[0], values[1:]) _parsedData = data.map(parsePoint) parsedData = _parsedData.cache() results = {} # Spark # -- start_time = time.time() # Build the gl_model niters = 10 spark_model = LogisticRegressionWithSGD.train(parsedData, iterations=niters) # Evaluate the gl_model on training data labelsAndPreds = parsedData.map(lambda p: (p.label, spark_model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
Re: Logistic Regression MLLib Slow
Hi Krishna, Specifying executor memory in local mode has no effect, because all of the threads run inside the same JVM. You can either try --driver-memory 60g or start a standalone server. Best, Xiangrui On Wed, Jun 4, 2014 at 7:28 PM, Xiangrui Meng men...@gmail.com wrote: 80M by 4 should be about 2.5GB uncompressed. 10 iterations shouldn't take that long, even on a single executor. Besides what Matei suggested, could you also verify the executor memory in http://localhost:4040 in the Executors tab. It is very likely the executors do not have enough memory. In that case, caching may be slower than reading directly from disk. -Xiangrui On Wed, Jun 4, 2014 at 7:06 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Ah, is the file gzipped by any chance? We can’t decompress gzipped files in parallel so they get processed by a single task. It may also be worth looking at the application UI (http://localhost:4040) to see 1) whether all the data fits in memory in the Storage tab (maybe it somehow becomes larger, though it seems unlikely that it would exceed 20 GB) and 2) how many parallel tasks run in each iteration. Matei On Jun 4, 2014, at 6:56 PM, Srikrishna S srikrishna...@gmail.com wrote: I am using the MLLib one (LogisticRegressionWithSGD) with PySpark. I am running to only 10 iterations. The MLLib version of logistic regression doesn't seem to use all the cores on my machine. Regards, Krishna On Wed, Jun 4, 2014 at 6:47 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Are you using the logistic_regression.py in examples/src/main/python or examples/src/main/python/mllib? The first one is an example of writing logistic regression by hand and won’t be as efficient as the MLlib one. I suggest trying the MLlib one. You may also want to check how many iterations it runs — by default I think it runs 100, which may be more than you need. Matei On Jun 4, 2014, at 5:47 PM, Srikrishna S srikrishna...@gmail.com wrote: Hi All., I am new to Spark and I am trying to run LogisticRegression (with SGD) using MLLib on a beefy single machine with about 128GB RAM. The dataset has about 80M rows with only 4 features so it barely occupies 2Gb on disk. I am running the code using all 8 cores with 20G memory using spark-submit --executor-memory 20G --master local[8] logistic_regression.py It seems to take about 3.5 hours without caching and over 5 hours with caching. What is the recommended use for Spark on a beefy single machine? Any suggestions will help! Regards, Krishna Code sample: - # Dataset d = sys.argv[1] data = sc.textFile(d) # Load and parse the data # -- def parsePoint(line): values = [float(x) for x in line.split(',')] return LabeledPoint(values[0], values[1:]) _parsedData = data.map(parsePoint) parsedData = _parsedData.cache() results = {} # Spark # -- start_time = time.time() # Build the gl_model niters = 10 spark_model = LogisticRegressionWithSGD.train(parsedData, iterations=niters) # Evaluate the gl_model on training data labelsAndPreds = parsedData.map(lambda p: (p.label, spark_model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
Re: Native library can not be loaded when using Mllib PCA
For standalone and yarn mode, you need to install native libraries on all nodes. The best solution is installing them to /usr/lib/libblas.so.3 and /usr/lib/liblapack.so.3 . If your matrix is sparse, the native libraries cannot help because they are for dense linear algebra. You can create RDD of sparse rows and try k-means directly, it supports sparse input. -Xiangrui Sent from my iPad On Jun 5, 2014, at 2:36 AM, yangliuyu yangli...@163.com wrote: Hi, We're using Mllib (1.0.0 release version) on a k-means clustering problem. We want to reduce the matrix column size before send the points to k-means solver. It works on my mac with the local mode: spark-test-run-assembly-1.0.jar contains my application code, com.github.fommil, netlib code and netlib-native*.so files (include jnilib and dll files) spark-submit --class test.TestMllibPCA --master local[4] --executor-memory 3g --driver-memory 3g --driver-class-path /data/user/dump/spark-test-run-assembly-1.0.jar /data/user/dump/spark-test-run-assembly-1.0.jar /data/user/dump/user_fav_2014_04_09.csv.head1w But if --driver-class-path removed, the warn message appears: 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 14/06/05 16:36:20 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK or set SPARK_CLASSPATH=/data/user/dump/spark-test-run-assembly-1.0.jar can also solve the problem. The matrix contain sparse data with rows: 6778, columns: 2487 and the time consume of calculating PCA is 10s and 47s respectively which infers the native library works well. Then I want to test it on a spark standalone cluster(on CentOS), but it failed again. After change JDK logging level to FINEST, got the message: 14/06/05 16:19:15 INFO JniLoader: JNI LIB = netlib-native_system-linux-x86_64.so 14/06/05 16:19:15 INFO JniLoader: extracting jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_system-linux-x86_64.so to /tmp/jniloader6648403281987654682netlib-native_system-linux-x86_64.so 14/06/05 16:19:15 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 14/06/05 16:19:15 INFO JniLoader: JNI LIB = netlib-native_ref-linux-x86_64.so 14/06/05 16:19:15 INFO JniLoader: extracting jar:file:/data/user/dump/spark-test-run-assembly-1.0.jar!/netlib-native_ref-linux-x86_64.so to /tmp/jniloader2298588627398263902netlib-native_ref-linux-x86_64.so 14/06/05 16:19:16 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK 14/06/05 16:19:16 INFO LAPACK: Implementation provided by class com.github.fommil.netlib.F2jLAPACK The libgfortran ,atlas, blas, lapack and arpack are all installed and all of the .so files are located under /usr/lib64, spark.executor.extraLibraryPath is set to /usr/lib64 in conf/spark-defaults.conf but none of them works. I tried add --jars /data/user/dump/spark-test-run-assembly-1.0.jar but no good news. What should I try next? Is the native library need to be visible for driver and executor both? In local mode the problem seems to be a classpath problem, but for standalone and yarn mode it get more complex. A detail document is really helpful. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Native-library-can-not-be-loaded-when-using-Mllib-PCA-tp7042.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to process multiple classification with SVM in MLlib
At this time, you need to do one-vs-all manually for multiclass training. For your second question, if the algorithm is implemented in Java/Scala/Python and designed for single machine, you can broadcast the dataset to each worker, train models on workers. If the algorithm is implemented in a different language, maybe you need pipe to train the models outside JVM (similar to Hadoop Streaming). If the algorithm is designed for a different parallel platform, then it may be hard to use it in Spark. -Xiangrui On Sat, Jun 7, 2014 at 7:15 AM, littlebird cxp...@163.com wrote: Hi All, As we know, In MLlib the SVM is used for binary classification. I wonder how to train SVM model for mutiple classification in MLlib. In addition, how to apply the machine learning algorithm in Spark if the algorithm isn't included in MLlib. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-multiple-classification-with-SVM-in-MLlib-tp7174.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Classpath errors with Breeze
Hi dlaw, You are using breeze-0.8.1, but the spark assembly jar depends on breeze-0.7. If the spark assembly jar comes the first on the classpath but the method from DenseMatrix is only available in breeze-0.8.1, you get NoSuchMethod. So, a) If you don't need the features in breeze-0.8.1, do not include it as a dependency. or b) Try an experimental features by turning on spark.files.userClassPathFirst in your Spark configuration. Best, Xiangrui On Sun, Jun 8, 2014 at 10:08 PM, dlaw dieterich.law...@gmail.com wrote: Thanks for the quick response. No, I actually build my jar via 'sbt package' on EC2 on the master itself. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Classpath-errors-with-Breeze-tp7220p7225.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Classpath errors with Breeze
Hi Tobias, Which file system and which encryption are you using? Best, Xiangrui On Sun, Jun 8, 2014 at 10:16 PM, Xiangrui Meng men...@gmail.com wrote: Hi dlaw, You are using breeze-0.8.1, but the spark assembly jar depends on breeze-0.7. If the spark assembly jar comes the first on the classpath but the method from DenseMatrix is only available in breeze-0.8.1, you get NoSuchMethod. So, a) If you don't need the features in breeze-0.8.1, do not include it as a dependency. or b) Try an experimental features by turning on spark.files.userClassPathFirst in your Spark configuration. Best, Xiangrui On Sun, Jun 8, 2014 at 10:08 PM, dlaw dieterich.law...@gmail.com wrote: Thanks for the quick response. No, I actually build my jar via 'sbt package' on EC2 on the master itself. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Classpath-errors-with-Breeze-tp7220p7225.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to process multiple classification with SVM in MLlib
For broadcast data, please read http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables . For one-vs-all, please read https://en.wikipedia.org/wiki/Multiclass_classification . -Xiangrui On Mon, Jun 9, 2014 at 7:24 AM, littlebird cxp...@163.com wrote: Thank you for your reply, I don't quite understand how to do one-vs-all manually for multiclass training. And for the second question, My algorithm is implemented in Java and designed for single machine, How can I broadcast the dataset to each worker, train models on workers? Thank you very much. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-multiple-classification-with-SVM-in-MLlib-tp7174p7251.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Not fully cached when there is enough memory
Could you try to click one that RDD and see the storage info per partition? I tried continuously caching RDDs, so new ones kick old ones out when there is not enough memory. I saw similar glitches but the storage info per partition is correct. If you find a way to reproduce this error, please create a JIRA. Thanks! -Xiangrui
Re: Convert text into tfidf vectors for Classification
You can create tf vectors and then use RowMatrix.computeColumnSummaryStatistics to get df (numNonzeros). For tokenizer and stemmer, you can use scalanlp/chalk. Yes, it is worth having a simple interface for it. -Xiangrui On Fri, Jun 13, 2014 at 1:21 AM, Stuti Awasthi stutiawas...@hcl.com wrote: Hi all, I wanted to perform Text Classification using Spark1.0 Naïve Bayes. I was looking for the way to convert text into sparse vector with TFIDF weighting scheme. I found that MLI library supports that but it is compatible with Spark 0.8. What are all the options available to achieve text vectorization. Is there any pre-built api’s which can be used or other way in which we can achieve this Please suggest Thanks Stuti Awasthi ::DISCLAIMER:: The contents of this e-mail and any attachment(s) are confidential and intended for the named recipient(s) only. E-mail transmission is not guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or may contain viruses in transmission. The e mail and its contents (with or without referred errors) shall therefore not attach any liability on the originator or HCL or its affiliates. Views or opinions, if any, presented in this email are solely those of the author and may not necessarily reflect the views or opinions of HCL or its affiliates. Any form of reproduction, dissemination, copying, disclosure, modification, distribution and / or publication of this message without the prior written consent of authorized representative of HCL is strictly prohibited. If you have received this email in error please delete it and notify the sender immediately. Before opening any email and/or attachments, please check them for viruses and other defects.
Re: MLlib-Missing Regularization Parameter and Intercept for Logistic Regression
1. examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala contains example code that shows how to set regParam. 2. A static method with more than 3 parameters becomes hard to remember and hard to maintain. Please use LogistricRegressionWithSGD's default constructor and setters. -Xiangrui
Re: MLlib-Missing Regularization Parameter and Intercept for Logistic Regression
Someone is working on weighted regularization. Stay tuned. -Xiangrui On Mon, Jun 16, 2014 at 9:36 AM, FIXED-TERM Yi Congrui (CR/RTC1.3-NA) fixed-term.congrui...@us.bosch.com wrote: Hi Xiangrui, Thank you for the reply! I have tried customizing LogisticRegressionSGD.optimizer as in the example you mentioned, but the source code reveals that the intercept is also penalized if one is included, which is usually inappropriate. The developer should fix this problem. Best, Congrui -Original Message- From: Xiangrui Meng [mailto:men...@gmail.com] Sent: Friday, June 13, 2014 11:50 PM To: user@spark.apache.org Cc: user Subject: Re: MLlib-Missing Regularization Parameter and Intercept for Logistic Regression 1. examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassification.scala contains example code that shows how to set regParam. 2. A static method with more than 3 parameters becomes hard to remember and hard to maintain. Please use LogistricRegressionWithSGD's default constructor and setters. -Xiangrui
Re: news20-binary classification with LogisticRegressionWithSGD
Hi Makoto, How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Best, Xiangrui On Tue, Jun 17, 2014 at 12:22 PM, Makoto Yui yuin...@gmail.com wrote: Here is follow-up to the previous evaluation. aggregate at GradientDescent.scala:178 never finishes at https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178 We confirmed, by -verbose:gc, that GC is not happening during the aggregate and the cumulative CPU time for the task is increasing little by little. LBFGS also does not work for large # of features (news20.random.1000) though it works fine for small # of features (news20.binary.1000). aggregate at LBFGS.scala:201 also never finishes at https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201 --- [Evaluated code for LBFGS] import org.apache.spark.SparkContext import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.classification.LogisticRegressionModel import org.apache.spark.mllib.optimization._ val data = MLUtils.loadLibSVMFile(sc, hdfs://dm01:8020/dataset/news20-binary/news20.random.1000, multiclass=false) val numFeatures = data.take(1)(0).features.size val training = data.map(x = (x.label, MLUtils.appendBias(x.features))).cache() // Run training algorithm to build the model val numCorrections = 10 val convergenceTol = 1e-4 val maxNumIterations = 20 val regParam = 0.1 val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1)) val (weightsWithIntercept, loss) = LBFGS.runLBFGS( training, new LogisticGradient(), new SquaredL2Updater(), numCorrections, convergenceTol, maxNumIterations, regParam, initialWeightsWithIntercept) --- Thanks, Makoto 2014-06-17 21:32 GMT+09:00 Makoto Yui yuin...@gmail.com: Hello, I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though the number of training examples used in the evaluation is just 1,000. It works fine for the dataset *news20.binary.1000* that has 178,560 features. However, it does not work for *news20.random.1000* where # of features is large (1,354,731 features) though we used a sparse vector through MLUtils.loadLibSVMFile(). The execution seems not progressing while no error is reported in the spark-shell as well as in the stdout/stderr of executors. We used 32 executors with each allocating 7GB (2GB is for RDD) for working memory. Any suggesions? Your help is really appreciated. == Executed code == import org.apache.spark.mllib.util.MLUtils import org.apache.spark.mllib.classification.LogisticRegressionWithSGD //val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.binary.1000, multiclass=false) val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false) val numFeatures = training .take(1)(0).features.size //numFeatures: Int = 178560 for news20.binary.1000 //numFeatures: Int = 1354731 for news20.random.1000 val model = LogisticRegressionWithSGD.train(training, numIterations=1) == The dataset used in the evaluation == http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' news20.binary.1000 $ sort -R news20.binary news20.random $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' news20.random.1000 You can find the dataset in https://dl.dropboxusercontent.com/u/13123103/news20.random.1000 https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000 Thanks, Makoto
Re: Contribution to Spark MLLib
Hi Jayati, Thanks for asking! MLlib algorithms are all implemented in Scala. It makes us easier to maintain if we have the implementations in one place. For the roadmap, please visit http://www.slideshare.net/xrmeng/m-llib-hadoopsummit to see features planned for v1.1. Before contributing new algorithms, it would be great if you can start with working on an existing JIRA. Best, Xiangrui On Tue, Jun 17, 2014 at 12:22 AM, Jayati tiwarijay...@gmail.com wrote: Hello, I wish to contribute some algorithms to the MLLib of Spark but at the same time wanted to make sure that I don't try something redundant. Will it be okay with you to let me know the set of algorithms which aren't there in your road map in the near future ? Also, can I use Java to write machine learning algorithms for Spark MLLib instead of Scala ? Regards, Jayati -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Contribution-to-Spark-MLLib-tp7716.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Execution stalls in LogisticRegressionWithSGD
Hi Bharath, Thanks for posting the details! Which Spark version are you using? Best, Xiangrui On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, (Apologies for the long mail, but it's necessary to provide sufficient details considering the number of issues faced.) I'm running into issues testing LogisticRegressionWithSGD a two node cluster (each node with 24 cores and 16G available to slaves out of 24G on the system). Here's a description of the application: The model is being trained based on categorical features x, y, and (x,y). The categorical features are mapped to binary features by converting each distinct value in the category enum into a binary feature by itself (i.e presence of that value in a record implies corresponding feature = 1, else feature = 0. So, there'd be as many distinct features as enum values) . The training vector is laid out as [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the training data has only one combination (Xk,Yk) and a label appearing in the record. Thus, the corresponding labeledpoint sparse vector would only have 3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector (though parse) would be nearly 614000. The number of records is about 1.33 million. The records have been coalesced into 20 partitions across two nodes. The input data has not been cached. (NOTE: I do realize the records features may seem large for a two node setup, but given the memory cpu, and the fact that I'm willing to give up some turnaround time, I don't see why tasks should inexplicably fail) Additional parameters include: spark.executor.memory = 14G spark.default.parallelism = 1 spark.cores.max=20 spark.storage.memoryFraction=0.8 //No cache space required (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help either) The model training was initialized as : new LogisticRegressionWithSGD(1, maxIterations, 0.0, 0.05) However, after 4 iterations of gradient descent, the entire execution appeared to stall inexplicably. The corresponding executor details and details of the stalled stage (number 14) are as follows: MetricMin25th Median75th Max Result serialization time12 ms13 ms14 ms16 ms18 ms Duration4 s4 s5 s5 s5 s Time spent fetching task 0 ms0 ms0 ms0 ms0 ms results Scheduler delay6 s6 s6 s6 s 12 s Stage Id 14 aggregate at GradientDescent.scala:178 Task IndexTask IDStatusLocality Level Executor Launch TimeDurationGC Result Ser TimeErrors Time 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 14 ms 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 11 611 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 13 ms 12 612 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 13 613 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 13 ms 14 614 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 15 615 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 12 ms 16 616 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 17 617 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 18 ms 18 618 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 16 ms 19 619 SUCCESS PROCESS_LOCAL
Re: news20-binary classification with LogisticRegressionWithSGD
Hi DB, treeReduce (treeAggregate) is a feature I'm testing now. It is a compromise between current reduce and butterfly allReduce. The former runs in linear time on the number of partitions, the latter introduces too many dependencies. treeAggregate with depth = 2 should run in O(sqrt(n)) time, where n is the number of partitions. It would be great if someone can help test its scalability. Best, Xiangrui On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote: Hi Xiangrui, (2014/06/18 4:58), Xiangrui Meng wrote: How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. The training data news20.random.1000 is small and thus only 2 partitions are used by the default. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false). We also tried 32 partitions as follows but the aggregate never finishes. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false, numFeatures = 1354731 , minPartitions = 32) Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Is treeAggregate itself available on Spark 1.0? I wonder.. Could I test your modification just by running the following code on REPL? --- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) (grad, loss + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = (grad1 += grad2, loss1 + loss2) }, 2) - Rebuilding Spark is quite something to do evaluation. Thanks, Makoto
Re: news20-binary classification with LogisticRegressionWithSGD
Hi Makoto, Are you using Spark 1.0 or 0.9? Could you go to the executor tab of the web UI and check the driver's memory? treeAggregate is not part of 1.0. Best, Xiangrui On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote: Hi DB, treeReduce (treeAggregate) is a feature I'm testing now. It is a compromise between current reduce and butterfly allReduce. The former runs in linear time on the number of partitions, the latter introduces too many dependencies. treeAggregate with depth = 2 should run in O(sqrt(n)) time, where n is the number of partitions. It would be great if someone can help test its scalability. Best, Xiangrui On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote: Hi Xiangrui, (2014/06/18 4:58), Xiangrui Meng wrote: How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. The training data news20.random.1000 is small and thus only 2 partitions are used by the default. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false). We also tried 32 partitions as follows but the aggregate never finishes. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false, numFeatures = 1354731 , minPartitions = 32) Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Is treeAggregate itself available on Spark 1.0? I wonder.. Could I test your modification just by running the following code on REPL? --- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) (grad, loss + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = (grad1 += grad2, loss1 + loss2) }, 2) - Rebuilding Spark is quite something to do evaluation. Thanks, Makoto
Re: news20-binary classification with LogisticRegressionWithSGD
DB, Yes, reduce and aggregate are linear. Makoto, dense vectors are used to in aggregation. If you have 32 partitions and each one sending a dense vector of size 1,354,731 to master. Then the driver needs 300M+. That may be the problem. Which deploy mode are you using, standalone or local? Debasish, there is an old PR for butterfly allreduce. However, it doesn't seem to be the right way to go for Spark. I just sent out the PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it needs more testing before we are confident to merge it. It would be great if you can help test it. Best, Xiangrui On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das debasish.da...@gmail.com wrote: Xiangrui, Could you point to the JIRA related to tree aggregate ? ...sounds like the allreduce idea... I would definitely like to try it on our dataset... Makoto, I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB memory... Although the best result on the same dataset came out of liblinear and BFGS-L1 out of box...so I did not tune the SGD further on learning rate and other heuristics...it was arnd 5% off... Thanks. Deb On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, Does it mean that mapPartition and then reduce shares the same behavior as aggregate operation which is O(n)? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote: Hi DB, treeReduce (treeAggregate) is a feature I'm testing now. It is a compromise between current reduce and butterfly allReduce. The former runs in linear time on the number of partitions, the latter introduces too many dependencies. treeAggregate with depth = 2 should run in O(sqrt(n)) time, where n is the number of partitions. It would be great if someone can help test its scalability. Best, Xiangrui On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote: Hi Xiangrui, (2014/06/18 4:58), Xiangrui Meng wrote: How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. The training data news20.random.1000 is small and thus only 2 partitions are used by the default. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false). We also tried 32 partitions as follows but the aggregate never finishes. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false, numFeatures = 1354731 , minPartitions = 32) Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Is treeAggregate itself available on Spark 1.0? I wonder.. Could I test your modification just by running the following code on REPL? --- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) (grad, loss + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = (grad1 += grad2, loss1 + loss2) }, 2) - Rebuilding Spark is quite something to do evaluation. Thanks, Makoto
Re: news20-binary classification with LogisticRegressionWithSGD
Makoto, please use --driver-memory 8G when you launch spark-shell. -Xiangrui On Tue, Jun 17, 2014 at 4:49 PM, Xiangrui Meng men...@gmail.com wrote: DB, Yes, reduce and aggregate are linear. Makoto, dense vectors are used to in aggregation. If you have 32 partitions and each one sending a dense vector of size 1,354,731 to master. Then the driver needs 300M+. That may be the problem. Which deploy mode are you using, standalone or local? Debasish, there is an old PR for butterfly allreduce. However, it doesn't seem to be the right way to go for Spark. I just sent out the PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it needs more testing before we are confident to merge it. It would be great if you can help test it. Best, Xiangrui On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das debasish.da...@gmail.com wrote: Xiangrui, Could you point to the JIRA related to tree aggregate ? ...sounds like the allreduce idea... I would definitely like to try it on our dataset... Makoto, I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB memory... Although the best result on the same dataset came out of liblinear and BFGS-L1 out of box...so I did not tune the SGD further on learning rate and other heuristics...it was arnd 5% off... Thanks. Deb On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai dbt...@stanford.edu wrote: Hi Xiangrui, Does it mean that mapPartition and then reduce shares the same behavior as aggregate operation which is O(n)? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote: Hi DB, treeReduce (treeAggregate) is a feature I'm testing now. It is a compromise between current reduce and butterfly allReduce. The former runs in linear time on the number of partitions, the latter introduces too many dependencies. treeAggregate with depth = 2 should run in O(sqrt(n)) time, where n is the number of partitions. It would be great if someone can help test its scalability. Best, Xiangrui On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote: Hi Xiangrui, (2014/06/18 4:58), Xiangrui Meng wrote: How many partitions did you set? If there are too many partitions, please do a coalesce before calling ML algorithms. The training data news20.random.1000 is small and thus only 2 partitions are used by the default. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false). We also tried 32 partitions as follows but the aggregate never finishes. val training = MLUtils.loadLibSVMFile(sc, hdfs://host:8020/dataset/news20-binary/news20.random.1000, multiclass=false, numFeatures = 1354731 , minPartitions = 32) Btw, could you try the tree branch in my repo? https://github.com/mengxr/spark/tree/tree I used tree aggregate in this branch. It should help with the scalability. Is treeAggregate itself available on Spark 1.0? I wonder.. Could I test your modification just by running the following code on REPL? --- val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad)) (grad, loss + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = (grad1 += grad2, loss1 + loss2) }, 2) - Rebuilding Spark is quite something to do evaluation. Thanks, Makoto
Re: Execution stalls in LogisticRegressionWithSGD
Hi Bharath, This is related to SPARK-1112, which we already found the root cause. I will let you know when this is fixed. Best, Xiangrui On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Couple more points: 1)The inexplicable stalling of execution with large feature sets appears similar to that reported with the news-20 dataset: http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E 2) The NPE trying to call mapToPair convert an RDDLong, Long, Integer, Integer into a JavaPairRDDTuple2Long,Long, Tuple2Integer,Integer is unrelated to mllib. Thanks, Bharath On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Xiangrui , I'm using 1.0.0. Thanks, Bharath On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, Thanks for posting the details! Which Spark version are you using? Best, Xiangrui On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, (Apologies for the long mail, but it's necessary to provide sufficient details considering the number of issues faced.) I'm running into issues testing LogisticRegressionWithSGD a two node cluster (each node with 24 cores and 16G available to slaves out of 24G on the system). Here's a description of the application: The model is being trained based on categorical features x, y, and (x,y). The categorical features are mapped to binary features by converting each distinct value in the category enum into a binary feature by itself (i.e presence of that value in a record implies corresponding feature = 1, else feature = 0. So, there'd be as many distinct features as enum values) . The training vector is laid out as [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the training data has only one combination (Xk,Yk) and a label appearing in the record. Thus, the corresponding labeledpoint sparse vector would only have 3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector (though parse) would be nearly 614000. The number of records is about 1.33 million. The records have been coalesced into 20 partitions across two nodes. The input data has not been cached. (NOTE: I do realize the records features may seem large for a two node setup, but given the memory cpu, and the fact that I'm willing to give up some turnaround time, I don't see why tasks should inexplicably fail) Additional parameters include: spark.executor.memory = 14G spark.default.parallelism = 1 spark.cores.max=20 spark.storage.memoryFraction=0.8 //No cache space required (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help either) The model training was initialized as : new LogisticRegressionWithSGD(1, maxIterations, 0.0, 0.05) However, after 4 iterations of gradient descent, the entire execution appeared to stall inexplicably. The corresponding executor details and details of the stalled stage (number 14) are as follows: MetricMin25th Median75th Max Result serialization time12 ms13 ms14 ms16 ms18 ms Duration4 s4 s5 s5 s 5 s Time spent fetching task 0 ms0 ms0 ms0 ms0 ms results Scheduler delay6 s6 s6 s6 s 12 s Stage Id 14 aggregate at GradientDescent.scala:178 Task IndexTask IDStatusLocality Level Executor Launch TimeDurationGC Result Ser TimeErrors Time 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 14 ms 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 11 611 SUCCESS PROCESS_LOCAL
Re: Contribution to Spark MLLib
Denis, I think it is fine to have PLSA in MLlib. But I'm not familiar with the modification you mentioned since the paper is new. We may need to spend more time to learn the trade-offs. Feel free to create a JIRA for PLSA and we can move our discussion there. It would be great if you can share your current implementation. So it is easy for developers to join the discussion. Jayati, it is certainly NOT mandatory. But if you want to contribute something new, please create a JIRA first. Best, Xiangrui
Re: news20-binary classification with LogisticRegressionWithSGD
It is because the frame size is not set correctly in executor backend. see spark-1112 . We are going to fix it in v1.0.1 . Did you try the treeAggregate? On Jun 19, 2014, at 2:01 AM, Makoto Yui yuin...@gmail.com wrote: Xiangrui and Debasish, (2014/06/18 6:33), Debasish Das wrote: I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB memory... I could figure out what the problem is. spark.akka.frameSize was too large. By setting spark.akka.frameSize=10, it worked for the news20 dataset. The execution was slow for more large KDD cup 2012, Track 2 dataset (235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the sequential aggregation of dense vectors on a single driver node. It took about 7.6m for aggregation for an iteration. Thanks, Makoto
Re: Anything like grid search available for mlbase?
This is a planned feature for v1.1. I'm going to work on it after v1.0.1 release. -Xiangrui On Jun 20, 2014, at 6:46 AM, Charles Earl charles.ce...@gmail.com wrote: Looking for something like scikit's grid search module. C
Re: Performance problems on SQL JOIN
Your data source is S3 and data is used twice. m1.large does not have very good network performance. Please try file.count() and see how fast it goes. -Xiangrui On Jun 20, 2014, at 8:16 AM, mathias math...@socialsignificance.co.uk wrote: Hi there, We're trying out Spark and are experiencing some performance issues using Spark SQL. Anyone who can tell us if our results are normal? We are using the Amazon EC2 scripts to create a cluster with 3 workers/executors (m1.large). Tried both spark 1.0.0 as well as the git master; the Scala as well as the Python shells. Running the following code takes about 5 minutes, which seems a long time for this query. val file = sc.textFile(s3n:// ... .csv); val data = file.map(x = x.split('|')); // 300k rows case class BookingInfo(num_rooms: String, hotelId: String, toDate: String, ...); val rooms2 = data.filter(x = x(0) == 2).map(x = BookingInfo(x(0), x(1), ... , x(9))); // 50k rows val rooms3 = data.filter(x = x(0) == 3).map(x = BookingInfo(x(0), x(1), ... , x(9))); // 30k rows rooms2.registerAsTable(rooms2); cacheTable(rooms2); rooms3.registerAsTable(rooms3); cacheTable(rooms3); sql(SELECT * FROM rooms2 LEFT JOIN rooms3 ON rooms2.hotelId = rooms3.hotelId AND rooms2.toDate = rooms3.toDate).count(); Are we doing something wrong here? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-problems-on-SQL-JOIN-tp8001.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Improving Spark multithreaded performance?
Hi Kyle, A few questions: 1) Did you use `setIntercept(true)`? 2) How many features? I'm a little worried about driver's load because the final aggregation and weights update happen on the driver. Did you check driver's memory usage as well? Best, Xiangrui On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: As far as I can tell there are is no data to broadcast (unless there is something internal to mllib that needs to be broadcast) I've coalesced the input RDDs to keep the number of partitions limited. When running, I've tried to get up to 500 concurrent stages, and I've coalesced the RDDs down to 2 partitions, so about 1000 tasks. Despite having over 500 threads in the threadpool working on mllib tasks, the total CPU usage never really goes above 150%. I've tried increasing 'spark.akka.threads' but that doesn't seem to do anything. My one thought would be that maybe because I'm using MLUtils.kFold to generate the RDDs is that because I have so many tasks working off RDDs that are permutations of original RDDs that maybe that is creating some sort of dependency bottleneck. Kyle On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com wrote: I don't have specific solutions for you, but the general things to try are: - Decrease task size by broadcasting any non-trivial objects. - Increase duration of tasks by making them less fine-grained. How many tasks are you sending? I've seen in the past something like 25 seconds for ~10k total medium-sized tasks. On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working to set up a calculation that involves calling mllib's SVMWithSGD.train several thousand times on different permutations of the data. I'm trying to run the separate jobs using a threadpool to dispatch the different requests to a spark context connected a Mesos's cluster, using course scheduling, and a max of 2000 cores on Spark 1.0. Total utilization of the system is terrible. Most of the 'aggregate at GradientDescent.scala:178' stages(where mllib spends most of its time) take about 3 seconds, but have ~25 seconds of scheduler delay time. What kind of things can I do to improve this? Kyle
Re: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
Try to use --executor-memory 12g with spark-summit. Or you can set it in conf/spark-defaults.properties and rsync it to all workers and then restart. -Xiangrui On Fri, Jun 27, 2014 at 1:05 PM, Peng Cheng pc...@uow.edu.au wrote: I give up, communication must be blocked by the complex EC2 network topology (though the error information indeed need some improvement). It doesn't make sense to run a client thousands miles away to communicate frequently with workers. I have moved everything to EC2 now. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247p8444.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: TaskNotSerializable when invoking KMeans.run
Could you post the code snippet and the error stack trace? -Xiangrui On Mon, Jun 30, 2014 at 7:03 AM, Daniel Micol dmi...@gmail.com wrote: Hello, I’m trying to use KMeans with MLLib but am getting a TaskNotSerializable error. I’m using Spark 0.9.1 and invoking the KMeans.run method with k = 2 and numPartitions = 200. Has anyone seen this error before and know what could be the reason for this? Thanks, Daniel
Re: Spark 1.0 and Logistic Regression Python Example
You were using an old version of numpy, 1.4? I think this is fixed in the latest master. Try to replace vec.dot(target) by numpy.dot(vec, target), or use the latest master. -Xiangrui On Mon, Jun 30, 2014 at 2:04 PM, Sam Jacobs sam.jac...@us.abb.com wrote: Hi, I modified the example code for logistic regression to compute the error in classification. Please see below. However the code is failing when it makes a call to: labelsAndPreds.filter(lambda (v, p): v != p).count() with the error message (something related to numpy or dot product): File /opt/spark-1.0.0-bin-hadoop2/python/pyspark/mllib/classification.py, line 65, in predict margin = _dot(x, self._coeff) + self._intercept File /opt/spark-1.0.0-bin-hadoop2/python/pyspark/mllib/_common.py, line 443, in _dot return vec.dot(target) AttributeError: 'numpy.ndarray' object has no attribute 'dot' FYI, I am running the code using spark-submit i.e. ./bin/spark-submit examples/src/main/python/mllib/logistic_regression2.py The code is posted below if it will be useful in any way: from math import exp import sys import time from pyspark import SparkContext from pyspark.mllib.classification import LogisticRegressionWithSGD from pyspark.mllib.regression import LabeledPoint from numpy import array # Load and parse the data def parsePoint(line): values = [float(x) for x in line.split(',')] if values[0] == -1: # Convert -1 labels to 0 for MLlib values[0] = 0 return LabeledPoint(values[0], values[1:]) sc = SparkContext(appName=PythonLR) # start timing start = time.time() #start = time.clock() data = sc.textFile(sWAMSpark_train.csv) parsedData = data.map(parsePoint) # Build the model model = LogisticRegressionWithSGD.train(parsedData) #load test data testdata = sc.textFile(sWSpark_test.csv) parsedTestData = testdata.map(parsePoint) # Evaluating the model on test data labelsAndPreds = parsedTestData.map(lambda p: (p.label, model.predict(p.features))) trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count()) print(Training Error = + str(trainErr)) end = time.time() print(Time is = + str(end - start))
Re: SparkKMeans.scala from examples will show: NoClassDefFoundError: breeze/linalg/Vector
You can use either bin/run-example or bin/spark-summit to run example code. scalac -d classes/ SparkKMeans.scala doesn't recognize Spark classpath. There are examples in the official doc: http://spark.apache.org/docs/latest/quick-start.html#where-to-go-from-here -Xiangrui On Tue, Jul 1, 2014 at 4:39 AM, Wanda Hawk wanda_haw...@yahoo.com wrote: Hello, I have installed spark-1.0.0 with scala2.10.3. I have built spark with sbt/sbt assembly and added /home/wanda/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar to my CLASSPATH variable. Then I went here ../spark-1.0.0/examples/src/main/scala/org/apache/spark/examples created a new directory classes and compiled SparkKMeans.scala with scalac -d classes/ SparkKMeans.scala Then I navigated to classes (I commented this line in the scala file : package org.apache.spark.examples ) and tried to run it with java -cp . SparkKMeans and I get the following error: Exception in thread main java.lang.NoClassDefFoundError: breeze/linalg/Vector at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2531) at java.lang.Class.getMethod0(Class.java:2774) at java.lang.Class.getMethod(Class.java:1663) at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494) at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486) Caused by: java.lang.ClassNotFoundException: breeze.linalg.Vector at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 6 more The jar under /home/wanda/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.0.4.jar contains the breeze/linalg/Vector* path, I even tried to unpack it and put it in CLASSPATH to it does not seem to pick it up I am currently running java 1.8 java version 1.8.0_05 Java(TM) SE Runtime Environment (build 1.8.0_05-b13) Java HotSpot(TM) 64-Bit Server VM (build 25.5-b02, mixed mode) What I am doing wrong ?
Re: Questions about disk IOs
Try to reduce number of partitions to match the number of cores. We will add treeAggregate to reduce the communication cost. PR: https://github.com/apache/spark/pull/1110 -Xiangrui On Tue, Jul 1, 2014 at 12:55 AM, Charles Li littlee1...@gmail.com wrote: Hi Spark, I am running LBFGS on our user data. The data size with Kryo serialisation is about 210G. The weight size is around 1,300,000. I am quite confused that the performance is very close whether the data is cached or not. The program is simple: points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..) points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not cached gradient = new LogisticGrandient(); updater = new SquaredL2Updater(); initWeight = Vectors.sparse(size, new int[]{}, new double[]{}) result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, convergeTol, maxIter, regParam, initWeight); I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its cluster mode. Below are some arguments I am using: —executor-memory 10G —num-executors 50 —executor-cores 2 Storage Using: When caching: Cached Partitions 951 Fraction Cached 100% Size in Memory 215.7GB Size in Tachyon 0.0B Size on Disk 1029.7MB The time cost by every aggregate is around 5 minutes with cache enabled. Lots of disk IOs can be seen on the hadoop node. I have the same result with cache disabled. Should data points caching improve the performance? Should caching decrease the disk IO? Thanks in advance.
Re: why is toBreeze private everywhere in mllib?
We were not ready to expose it as a public API in v1.0. Both breeze and MLlib are in rapid development. It would be possible to expose it as a developer API in v1.1. For now, it should be easy to define a toBreeze method in your own project. -Xiangrui On Tue, Jul 1, 2014 at 12:17 PM, Koert Kuipers ko...@tresata.com wrote: its kind of handy to be able to convert stuff to breeze... is there some other way i am supposed to access that functionality?
Re: One question about RDD.zip function when trying Naive Bayes
This is due to a bug in sampling, which was fixed in 1.0.1 and latest master. See https://github.com/apache/spark/pull/1234 . -Xiangrui On Wed, Jul 2, 2014 at 8:23 PM, x wasedax...@gmail.com wrote: Hello, I a newbie to Spark MLlib and ran into a curious case when following the instruction at the page below. http://spark.apache.org/docs/latest/mllib-naive-bayes.html I ran a test program on my local machine using some data. val spConfig = (new SparkConf).setMaster(local).setAppName(SparkNaiveBayes) val sc = new SparkContext(spConfig) The test data was as follows and there were three lableled categories I wanted to predict. 1 LabeledPoint(0.0, [4.9,3.0,1.4,0.2]) 2 LabeledPoint(0.0, [4.6,3.4,1.4,0.3]) 3 LabeledPoint(0.0, [5.7,4.4,1.5,0.4]) 4 LabeledPoint(0.0, [5.2,3.4,1.4,0.2]) 5 LabeledPoint(0.0, [4.7,3.2,1.6,0.2]) 6 LabeledPoint(0.0, [4.8,3.1,1.6,0.2]) 7 LabeledPoint(0.0, [5.1,3.8,1.9,0.4]) 8 LabeledPoint(0.0, [4.8,3.0,1.4,0.3]) 9 LabeledPoint(0.0, [5.0,3.3,1.4,0.2]) 10 LabeledPoint(1.0, [6.6,2.9,4.6,1.3]) 11 LabeledPoint(1.0, [5.2,2.7,3.9,1.4]) 12 LabeledPoint(1.0, [5.6,2.5,3.9,1.1]) 13 LabeledPoint(1.0, [6.4,2.9,4.3,1.3]) 14 LabeledPoint(1.0, [6.6,3.0,4.4,1.4]) 15 LabeledPoint(1.0, [6.0,2.7,5.1,1.6]) 16 LabeledPoint(1.0, [5.5,2.6,4.4,1.2]) 17 LabeledPoint(1.0, [5.8,2.6,4.0,1.2]) 18 LabeledPoint(1.0, [5.7,2.9,4.2,1.3]) 19 LabeledPoint(1.0, [5.7,2.8,4.1,1.3]) 20 LabeledPoint(2.0, [6.3,2.9,5.6,1.8]) 21 LabeledPoint(2.0, [6.5,3.0,5.8,2.2]) 22 LabeledPoint(2.0, [6.5,3.0,5.5,1.8]) 23 LabeledPoint(2.0, [6.7,3.3,5.7,2.1]) 24 LabeledPoint(2.0, [7.4,2.8,6.1,1.9]) 25 LabeledPoint(2.0, [6.3,3.4,5.6,2.4]) 26 LabeledPoint(2.0, [6.0,3.0,4.8,1.8]) 27 LabeledPoint(2.0, [6.8,3.2,5.9,2.3]) The predicted result via NaiveBayes is below. Comparing to test data, only two predicted results(#11 and #15) were different. 1 0.0 2 0.0 3 0.0 4 0.0 5 0.0 6 0.0 7 0.0 8 0.0 9 0.0 10 1.0 11 2.0 12 1.0 13 1.0 14 1.0 15 2.0 16 1.0 17 1.0 18 1.0 19 1.0 20 2.0 21 2.0 22 2.0 23 2.0 24 2.0 25 2.0 26 2.0 27 2.0 After grouping test RDD and predicted RDD via zip I got this. 1 (0.0,0.0) 2 (0.0,0.0) 3 (0.0,0.0) 4 (0.0,0.0) 5 (0.0,0.0) 6 (0.0,0.0) 7 (0.0,0.0) 8 (0.0,0.0) 9 (0.0,1.0) 10 (0.0,1.0) 11 (0.0,1.0) 12 (1.0,1.0) 13 (1.0,1.0) 14 (2.0,1.0) 15 (1.0,1.0) 16 (1.0,2.0) 17 (1.0,2.0) 18 (1.0,2.0) 19 (1.0,2.0) 20 (2.0,2.0) 21 (2.0,2.0) 22 (2.0,2.0) 23 (2.0,2.0) 24 (2.0,2.0) 25 (2.0,2.0) I expected there were 27 pairs but I saw two results were lost. Could someone please point out what I missed something here? Regards, xj
Re: MLLib : Math on Vector and Matrix
Hi Thunder, Please understand that both MLlib and breeze are in active development. Before v1.0, we used jblas but in the public APIs we only exposed Array[Double]. In v1.0, we introduced Vector that supports both dense and sparse data and switched the backend to breeze/netlib-java (except ALS). We only used few breeze methods in our implementation and we benchmarked them one by one. It was hard to foresee problems caused by including breeze at that time, for example, https://issues.apache.org/jira/browse/SPARK-1520. Being conservative in v1.0 was not a bad choice. We should benchmark breeze v0.8.1 for v1.1 and consider make toBreeze a developer API. For now, if you are migrating code from v0.9, you can use `Vector.toArray` to get the value array. Sorry for the inconvenience! Best, Xiangrui On Wed, Jul 2, 2014 at 2:42 PM, Dmitriy Lyubimov dlie...@gmail.com wrote: in my humble opinion Spark should've supported linalg a-la [1] before it even started dumping methodologies into mllib. [1] http://mahout.apache.org/users/sparkbindings/home.html On Wed, Jul 2, 2014 at 2:16 PM, Thunder Stumpges thunder.stump...@gmail.com wrote: Thanks. I always hate having to do stuff like this. It seems like they went a bit overboard with all the private[mllib] declarations... possibly all in the name of thou shalt not change your public API. If you don't make your public API usable, we end up having to work around it anyway... Oh well. Thunder On Wed, Jul 2, 2014 at 2:05 PM, Koert Kuipers ko...@tresata.com wrote: i did the second option: re-implemented .toBreeze as .breeze using pimp classes On Wed, Jul 2, 2014 at 5:00 PM, Thunder Stumpges thunder.stump...@gmail.com wrote: I am upgrading from Spark 0.9.0 to 1.0 and I had a pretty good amount of code working with internals of MLLib. One of the big changes was the move from the old jblas.Matrix to the Vector/Matrix classes included in MLLib. However I don't see how we're supposed to use them for ANYTHING other than a container for passing data to the included APIs... how do we do any math on them? Looking at the internal code, there are quite a number of private[mllib] declarations including access to the Breeze representations of the classes. Was there a good reason this was not exposed? I could see maybe not wanting to expose the 'toBreeze' function which would tie it to the breeze implementation, however it would be nice to have the various mathematics wrapped at least. Right now I see no way to code any vector/matrix math without moving my code namespaces into org.apache.spark.mllib or duplicating the code in 'toBreeze' in my own util functions. Not very appealing. What are others doing? thanks, Thunder
Re: MLLib : Math on Vector and Matrix
Hi Dmitriy, It is sweet to have the bindings, but it is very easy to downgrade the performance with them. The BLAS/LAPACK APIs have been there for more than 20 years and they are still the top choice for high-performance linear algebra. I'm thinking about whether it is possible to make the evaluation lazy in bindings. For example, y += a * x can be translated to an AXPY call instead of creating a temporary vector for a*x. There were some work in C++ but none achieved good performance. I'm not sure whether this is a good direction to explore. Best, Xiangrui
Re: Execution stalls in LogisticRegressionWithSGD
at /tmp/spark-local-20140704062240-6a65 14/07/04 06:22:40 INFO MemoryStore: MemoryStore started with capacity 6.7 GB. 14/07/04 06:22:40 INFO ConnectionManager: Bound socket to port 46901 with id = ConnectionManagerId(slave1,46901) 14/07/04 06:22:40 INFO BlockManagerMaster: Trying to register BlockManager 14/07/04 06:22:40 INFO BlockManagerMaster: Registered BlockManager 14/07/04 06:22:40 INFO HttpFileServer: HTTP File server directory is /tmp/spark-9eba78f9-8ae9-477c-9338-7222ae6fe306 14/07/04 06:22:40 INFO HttpServer: Starting HTTP Server 14/07/04 06:22:42 INFO CoarseGrainedExecutorBackend: Got assigned task 0 14/07/04 06:22:42 INFO Executor: Running task ID 0 14/07/04 06:22:42 INFO CoarseGrainedExecutorBackend: Got assigned task 2 14/07/04 06:22:42 INFO Executor: Running task ID 2 ... On Fri, Jul 4, 2014 at 5:52 AM, Xiangrui Meng men...@gmail.com wrote: The feature dimension is small. You don't need a big akka.frameSize. The default one (10M) should be sufficient. Did you cache the data before calling LRWithSGD? -Xiangrui On Thu, Jul 3, 2014 at 10:02 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: I tried another run after setting the driver memory to 8G (and spark.akka.frameSize = 500 on the executors and the driver). In addition, I also tried to reduce the amount of data that a single task processes, by increasing the number of partitions (of the labeled points) to 120 (instead of 2 used earlier), and then setting max cores to 2. That made no difference since, at the end of 120 tasks, the familiar error message appeared on a slave: snipped earlier logs 14/07/03 16:18:48 INFO CoarseGrainedExecutorBackend: Got assigned task 1436 14/07/03 16:18:48 INFO Executor: Running task ID 1436 14/07/03 16:18:53 INFO HadoopRDD: Input split: file:~//2014-05-24-02/part-r-00014:0+2215337 14/07/03 16:18:54 INFO HadoopRDD: Input split: file:~//2014-05-24-02/part-r-00014:2215337+2215338 14/07/03 16:18:54 INFO HadoopRDD: Input split: file:~//2014-05-24-02/part-r-3:0+2196429 14/07/03 16:18:54 INFO HadoopRDD: Input split: file:~//2014-05-24-02/part-r-3:2196429+2196430 14/07/03 16:18:54 INFO HadoopRDD: Input split: file:~//2014-05-24-02/part-r-00010:0+2186751 14/07/03 16:18:54 INFO HadoopRDD: Input split: file:~//2014-05-24-02/part-r-00010:2186751+2186751 14/07/03 16:18:54 INFO Executor: Serialized size of result for 1436 is 5958822 14/07/03 16:18:54 INFO Executor: Sending result for 1436 directly to driver 14/07/03 16:18:54 INFO Executor: Finished task ID 1436 14/07/03 16:18:54 INFO CoarseGrainedExecutorBackend: Got assigned task 1438 14/07/03 16:18:54 INFO Executor: Running task ID 1438 14/07/03 16:19:00 INFO HadoopRDD: Input split: file:~//2014-05-24-02/part-r-4:0+2209615 14/07/03 16:19:00 INFO HadoopRDD: Input split: file:~//2014-05-24-02/part-r-4:2209615+2209616 14/07/03 16:19:00 INFO HadoopRDD: Input split: file:~//2014-05-24-02/part-r-00011:0+2202240 14/07/03 16:19:00 INFO HadoopRDD: Input split: file:~//2014-05-24-02/part-r-00011:2202240+2202240 14/07/03 16:19:00 INFO HadoopRDD: Input split: file:~//2014-05-24-02/part-r-9:0+2194423 14/07/03 16:19:00 INFO HadoopRDD: Input split: file:~//2014-05-24-02/part-r-9:2194423+2194424 14/07/03 16:19:00 INFO Executor: Serialized size of result for 1438 is 5958822 14/07/03 16:19:00 INFO Executor: Sending result for 1438 directly to driver 14/07/03 16:19:00 INFO Executor: Finished task ID 1438 14/07/03 16:19:14 ERROR CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@slave1:51099] - [akka.tcp://spark@master:58272] disassociated! Shutting down. The corresponding master logs were: 4/07/03 16:02:14 INFO Master: Registering app LogRegExp 14/07/03 16:02:14 INFO Master: Registered app LogRegExp with ID app-20140703160214-0028 14/07/03 16:02:14 INFO Master: Launching executor app-20140703160214-0028/1 on worker worker-20140630124441-slave1-40182 14/07/03 16:19:15 INFO Master: Removing executor app-20140703160214-0028/1 because it is EXITED 14/07/03 16:19:15 INFO Master: Launching executor app-20140703160214-0028/2 on worker worker-20140630124441-slave1-40182 14/07/03 16:19:15 INFO Master: Removing executor app-20140703160214-0028/0 because it is EXITED 14/07/03 16:19:15 INFO Master: Launching executor app-20140703160214-0028/3 on worker worker-20140630102913-slave2-44735 14/07/03 16:19:18 INFO Master: Removing executor app-20140703160214-0028/2 because it is EXITED 14/07/03 16:19:18 INFO Master: Launching executor app-20140703160214-0028/4 on worker worker-20140630124441-slave1-40182 14/07/03 16:19:18 INFO Master: Removing executor app-20140703160214-0028/3 because it is EXITED 14/07/03 16:19:18 INFO Master: Launching executor app-20140703160214-0028/5 on worker worker-20140630102913-slave2-44735 14/07/03 16:19:20 INFO Master
Re: Execution stalls in LogisticRegressionWithSGD
It seems to me a setup issue. I just tested news20.binary (1355191 features) on a 2-node EC2 cluster and it worked well. I added one line to conf/spark-env.sh: export SPARK_JAVA_OPTS= -Dspark.akka.frameSize=20 and launched spark-shell with --driver-memory 20g. Could you re-try with an EC2 setup? If it still doesn't work, please attach all your code and logs. Best, Xiangrui On Sun, Jul 6, 2014 at 1:35 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Xiangrui, 1) Yes, I used the same build (compiled locally from source) to the host that has (master, slave1) and the second host with slave2. 2) The execution was successful when run in local mode with reduced number of partitions. Does this imply issues communicating/coordinating across processes (i.e. driver, master and workers)? Thanks, Bharath On Sun, Jul 6, 2014 at 11:37 AM, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, 1) Did you sync the spark jar and conf to the worker nodes after build? 2) Since the dataset is not large, could you try local mode first using `spark-summit --driver-memory 12g --master local[*]`? 3) Try to use less number of partitions, say 5. If the problem is still there, please attach the full master/worker log files. Best, Xiangrui On Fri, Jul 4, 2014 at 12:16 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Xiangrui, Leaving the frameSize unspecified led to an error message (and failure) stating that the task size (~11M) was larger. I hence set it to an arbitrarily large value ( I realize 500 was unrealistic unnecessary in this case). I've now set the size to 20M and repeated the runs. The earlier runs were on an uncached RDD. Caching the RDD (and setting spark.storage.memoryFraction=0.5) resulted in marginal speed up of execution, but the end result remained the same. The cached RDD size is as follows: RDD NameStorage LevelCached Partitions Fraction CachedSize in MemorySize in TachyonSize on Disk 1084 Memory Deserialized 1x Replicated 80 100% 165.9 MB 0.0 B 0.0 B The corresponding master logs were: 14/07/04 06:29:34 INFO Master: Removing executor app-20140704062238-0033/1 because it is EXITED 14/07/04 06:29:34 INFO Master: Launching executor app-20140704062238-0033/2 on worker worker-20140630124441-slave1-40182 14/07/04 06:29:34 INFO Master: Removing executor app-20140704062238-0033/0 because it is EXITED 14/07/04 06:29:34 INFO Master: Launching executor app-20140704062238-0033/3 on worker worker-20140630102913-slave2-44735 14/07/04 06:29:37 INFO Master: Removing executor app-20140704062238-0033/2 because it is EXITED 14/07/04 06:29:37 INFO Master: Launching executor app-20140704062238-0033/4 on worker worker-20140630124441-slave1-40182 14/07/04 06:29:37 INFO Master: Removing executor app-20140704062238-0033/3 because it is EXITED 14/07/04 06:29:37 INFO Master: Launching executor app-20140704062238-0033/5 on worker worker-20140630102913-slave2-44735 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got disassociated, removing it. 14/07/04 06:29:39 INFO Master: Removing app app-20140704062238-0033 14/07/04 06:29:39 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkMaster/deadLetters] to Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.3.1.135%3A33061-123#1986674260] was not delivered. [39] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got disassociated, removing it. 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got disassociated, removing it. 14/07/04 06:29:39 ERROR EndpointWriter: AssociationError [akka.tcp://sparkMaster@master:7077] - [akka.tcp://spark@slave2:45172]: Error [Association failed with [akka.tcp://spark@slave2:45172]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@slave2:45172] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: slave2/10.3.1.135:45172 ] 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got disassociated, removing it. 14/07/04 06:29:39 ERROR EndpointWriter: AssociationError [akka.tcp://sparkMaster@master:7077] - [akka.tcp://spark@slave2:45172]: Error [Association failed with [akka.tcp://spark@slave2:45172]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://spark@slave2:45172] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: slave2/10.3.1.135:45172 ] 14/07/04 06:29:39 ERROR EndpointWriter
Re: Dense to sparse vector converter
No, but it should be easy to add one. -Xiangrui On Mon, Jul 7, 2014 at 12:37 AM, Ulanov, Alexander alexander.ula...@hp.com wrote: Hi, Is there a method in Spark/MLlib to convert DenseVector to SparseVector? Best regards, Alexander
Re: How to incorporate the new data in the MLlib-NaiveBayes model along with predicting?
Hi Rahul, We plan to add online model updates with Spark Streaming, perhaps in v1.1, starting with linear methods. Please open a JIRA for Naive Bayes. For Naive Bayes, we need to update the priors and conditional probabilities, which means we should also remember the number of observations for the updates. Best, Xiangrui On Tue, Jul 8, 2014 at 7:35 AM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hi, I am using the MLlib Naive Bayes for a text classification problem. I have very less amount of training data. And then the data will be coming continuously and I need to classify it as either A or B. I am training the MLlib Naive Bayes model using the training data but next time when data comes, I want to predict its class and then incorporate that also in the model for next time prediction of new data(I think that is obvious). So I am not able to figure out what is the way to do that using MLlib Naive Bayes. Is it that I have to train the model on the whole data every time new data comes in?? Thanks in Advance! -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka
Re: Is MLlib NaiveBayes implementation for Spark 0.9.1 correct?
Well, I believe this is a correct implementation but please let us know if you run into problems. The NaiveBayes implementation in MLlib v1.0 supports sparse data, which is usually the case for text classificiation. I would recommend upgrading to v1.0. -Xiangrui On Tue, Jul 8, 2014 at 7:20 AM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hi, I wanted to use Naive Bayes for a text classification problem.I am using Spark 0.9.1. I was just curious to ask that is the Naive Bayes implementation in Spark 0.9.1 correct? Or are there any bugs in the Spark 0.9.1 implementation which are taken care in Spark 1.0. My question is specific about MLlib Naive Bayes implementation only. Also I am using Python.(If that adds any ease for answer) Thanks -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka
Re: got java.lang.AssertionError when run sbt/sbt compile
try sbt/sbt clean first On Tue, Jul 8, 2014 at 8:25 AM, bai阿蒙 smallmonkey...@hotmail.com wrote: Hi guys, when i try to compile the latest source by sbt/sbt compile, I got an error. Can any one help me? The following is the detail: it may cause by TestSQLContext.scala [error] [error] while compiling: /disk3/spark/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala [error] during phase: jvm [error] library version: version 2.10.4 [error] compiler version: version 2.10.4 [error] reconstructed args: -Xmax-classfile-name 120 -classpath
Re: Error and doubts in using Mllib Naive bayes for text clasification
1) The feature dimension should be a fixed number before you run NaiveBayes. If you use bag of words, you need to handle the word-to-index dictionary by yourself. You can either ignore the words that never appear in training (because they have no effect in prediction), or use hashing to randomly project words to a fixed-sized feature space (collision may happen). 3) Yes, we saved the log conditional probabilities. So to compute the likelihood, we only need summation. Best, Xiangrui On Tue, Jul 8, 2014 at 12:01 AM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: I am really sorry. Its actually my mistake. My problem 2 is wrong because using a single feature is a senseless thing. Sorry for the inconvenience. But still I will be waiting for the solutions for problem 1 and 3. Thanks, On Tue, Jul 8, 2014 at 12:14 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: Hello, I am a novice.I want to classify the text into two classes. For this purpose I want to use Naive Bayes model. I am using Python for it. Here are the problems I am facing: Problem 1: I wanted to use all words as features for the bag of words model. Which means my features will be count of individual words. In this case whenever a new word comes in the test data (which was never present in the train data) I need to increase the size of the feature vector to incorporate that word as well. Correct me if I am wrong. Can I do that in the present Mllib NaiveBayes. Or what is the way in which I can incorporate this? Problem 2: As I was not able to proceed with all words I did some pre-processing and figured out few features from the text. But using this also is giving errors. Right now I was testing for only one feature from the text that is count of positive words. I am submitting the code below, along with the error: #Code import tokenizer import gettingWordLists as gl from pyspark.mllib.classification import NaiveBayes from numpy import array from pyspark import SparkContext, SparkConf conf = (SparkConf().setMaster(local[6]).setAppName(My app).set(spark.executor.memory, 1g)) sc=SparkContext(conf = conf) # Getting the positive dict: pos_list = [] pos_list = gl.getPositiveList() tok = tokenizer.Tokenizer(preserve_case=False) train_data = [] with open(training_file.csv,r) as train_file: for line in train_file: tokens = line.split(,) msg = tokens[0] sentiment = tokens[1] count = 0 tokens = set(tok.tokenize(msg)) for i in tokens: if i.encode('utf-8') in pos_list: count+=1 if sentiment.__contains__('NEG'): label = 0.0 else: label = 1.0 feature = [] feature.append(label) feature.append(float(count)) train_data.append(feature) model = NaiveBayes.train(sc.parallelize(array(train_data))) print model.pi print model.theta print \n\n\n\n\n , model.predict(array([5.0])) ## This is the output: [-2.24512292 -0.11195389] [[ 0.] [ 0.]] Traceback (most recent call last): File naive_bayes_analyser.py, line 77, in module print \n\n\n\n\n , model.predict(array([5.0])) File F:\spark-0.9.1\spark-0.9.1\python\pyspark\mllib\classification.py, line 101, in predict return numpy.argmax(self.pi + dot(x, self.theta)) ValueError: matrices are not aligned ## Problem 3: As you can see the output for model.pi is -ve. That is prior probabilities are negative. Can someone explain that also. Is it the log of the probability? Thanks, -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka
Re: Help for the large number of the input data files
You can either use sc.wholeTextFiles and then a flatMap to reduce the number of partitions, or give more memory to the driver process by using --driver-memory 20g and then call RDD.repartition(small number) after you load the data in. -Xiangrui On Mon, Jul 7, 2014 at 7:38 PM, innowireless TaeYun Kim taeyun@innowireless.co.kr wrote: Hi, A help for the implementation best practice is needed. The operating environment is as follows: - Log data file arrives irregularly. - The size of a log data file is from 3.9KB to 8.5MB. The average is about 1MB. - The number of records of a data file is from 13 lines to 22000 lines. The average is about 2700 lines. - Data file must be post-processed before aggregation. - Post-processing algorithm can be changed. - Post-processed file is managed separately with original data file, since the post-processing algorithm might be changed. - Daily aggregation is performed. All post-processed data file must be filtered record-by-record and aggregation(average, max min…) is calculated. - Since aggregation is fine-grained, the number of records after the aggregation is not so small. It can be about half of the number of the original records. - At a point, the number of the post-processed file can be about 200,000. - A data file should be able to be deleted individually. In a test, I tried to process 160,000 post-processed files by Spark starting with sc.textFile() with glob path, it failed with OutOfMemory exception on the driver process. What is the best practice to handle this kind of data? Should I use HBase instead of plain files to save post-processed data? Thank you.
Re: Execution stalls in LogisticRegressionWithSGD
We have maven-enforcer-plugin defined in the pom. I don't know why it didn't work for you. Could you try rebuild with maven2 and confirm that there is no error message? If that is the case, please create a JIRA for it. Thanks! -Xiangrui On Wed, Jul 9, 2014 at 3:53 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Xiangrui, Thanks for all the help in resolving this issue. The cause turned out to bethe build environment rather than runtime configuration. The build process had picked up maven2 while building spark. Using binaries that were rebuilt using m3, the entire processing went through fine. While I'm aware that the build instruction page specifies m3 as the min requirement, declaratively preventing accidental m2 usage (e.g. through something like the maven enforcer plugin?) might help other developers avoid such issues. -Bharath On Mon, Jul 7, 2014 at 9:43 PM, Xiangrui Meng men...@gmail.com wrote: It seems to me a setup issue. I just tested news20.binary (1355191 features) on a 2-node EC2 cluster and it worked well. I added one line to conf/spark-env.sh: export SPARK_JAVA_OPTS= -Dspark.akka.frameSize=20 and launched spark-shell with --driver-memory 20g. Could you re-try with an EC2 setup? If it still doesn't work, please attach all your code and logs. Best, Xiangrui On Sun, Jul 6, 2014 at 1:35 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Xiangrui, 1) Yes, I used the same build (compiled locally from source) to the host that has (master, slave1) and the second host with slave2. 2) The execution was successful when run in local mode with reduced number of partitions. Does this imply issues communicating/coordinating across processes (i.e. driver, master and workers)? Thanks, Bharath On Sun, Jul 6, 2014 at 11:37 AM, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, 1) Did you sync the spark jar and conf to the worker nodes after build? 2) Since the dataset is not large, could you try local mode first using `spark-summit --driver-memory 12g --master local[*]`? 3) Try to use less number of partitions, say 5. If the problem is still there, please attach the full master/worker log files. Best, Xiangrui On Fri, Jul 4, 2014 at 12:16 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Xiangrui, Leaving the frameSize unspecified led to an error message (and failure) stating that the task size (~11M) was larger. I hence set it to an arbitrarily large value ( I realize 500 was unrealistic unnecessary in this case). I've now set the size to 20M and repeated the runs. The earlier runs were on an uncached RDD. Caching the RDD (and setting spark.storage.memoryFraction=0.5) resulted in marginal speed up of execution, but the end result remained the same. The cached RDD size is as follows: RDD NameStorage LevelCached Partitions Fraction CachedSize in MemorySize in TachyonSize on Disk 1084 Memory Deserialized 1x Replicated 80 100% 165.9 MB 0.0 B 0.0 B The corresponding master logs were: 14/07/04 06:29:34 INFO Master: Removing executor app-20140704062238-0033/1 because it is EXITED 14/07/04 06:29:34 INFO Master: Launching executor app-20140704062238-0033/2 on worker worker-20140630124441-slave1-40182 14/07/04 06:29:34 INFO Master: Removing executor app-20140704062238-0033/0 because it is EXITED 14/07/04 06:29:34 INFO Master: Launching executor app-20140704062238-0033/3 on worker worker-20140630102913-slave2-44735 14/07/04 06:29:37 INFO Master: Removing executor app-20140704062238-0033/2 because it is EXITED 14/07/04 06:29:37 INFO Master: Launching executor app-20140704062238-0033/4 on worker worker-20140630124441-slave1-40182 14/07/04 06:29:37 INFO Master: Removing executor app-20140704062238-0033/3 because it is EXITED 14/07/04 06:29:37 INFO Master: Launching executor app-20140704062238-0033/5 on worker worker-20140630102913-slave2-44735 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got disassociated, removing it. 14/07/04 06:29:39 INFO Master: Removing app app-20140704062238-0033 14/07/04 06:29:39 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkMaster/deadLetters] to Actor[akka://sparkMaster/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkMaster%4010.3.1.135%3A33061-123#1986674260] was not delivered. [39] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2:45172 got disassociated, removing it. 14/07/04 06:29:39 INFO Master: akka.tcp://spark@slave2
Re: KMeans code is rubbish
SparkKMeans is a naive implementation. Please use mllib.clustering.KMeans in practice. I created a JIRA for this: https://issues.apache.org/jira/browse/SPARK-2434 -Xiangrui On Thu, Jul 10, 2014 at 2:45 AM, Tathagata Das tathagata.das1...@gmail.com wrote: I ran the SparkKMeans example (not the mllib KMeans that Sean ran) with your dataset as well, I got the expected answer. And I believe that even though initialization is done using sampling, the example actually sets the seed to a constant 42, so the result should always be the same no matter how many times you run it. So I am not really sure whats going on here. Can you tell us more about which version of Spark you are running? Which Java version? == [tdas @ Xion spark2] cat input 2 1 1 2 3 2 2 3 4 1 5 1 6 1 4 2 6 2 4 3 5 3 6 3 [tdas @ Xion spark2] ./bin/run-example SparkKMeans input 2 0.001 2014-07-10 02:45:06.764 java[45244:d17] Unable to load realm info from SCDynamicStore 14/07/10 02:45:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/10 02:45:07 WARN LoadSnappy: Snappy native library not loaded 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/07/10 02:45:08 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS Finished iteration (delta = 3.0) Finished iteration (delta = 0.0) Final centers: DenseVector(5.0, 2.0) DenseVector(2.0, 2.0) On Thu, Jul 10, 2014 at 2:17 AM, Wanda Hawk wanda_haw...@yahoo.com wrote: so this is what I am running: ./bin/run-example SparkKMeans ~/Documents/2dim2.txt 2 0.001 And this is the input file: ┌───[spark2013@SparkOne]──[~/spark-1.0.0].$ └───#!cat ~/Documents/2dim2.txt 2 1 1 2 3 2 2 3 4 1 5 1 6 1 4 2 6 2 4 3 5 3 6 3 This is the final output from spark: 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/10 20:05:12 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 0 ms 14/07/10 20:05:12 INFO Executor: Serialized size of result for 14 is 1433 14/07/10 20:05:12 INFO Executor: Sending result for 14 directly to driver 14/07/10 20:05:12 INFO Executor: Finished task ID 14 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 0) 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 14 in 5 ms on localhost (progress: 1/2) 14/07/10 20:05:12 INFO Executor: Serialized size of result for 15 is 1433 14/07/10 20:05:12 INFO Executor: Sending result for 15 directly to driver 14/07/10 20:05:12 INFO Executor: Finished task ID 15 14/07/10 20:05:12 INFO DAGScheduler: Completed ResultTask(6, 1) 14/07/10 20:05:12 INFO TaskSetManager: Finished TID 15 in 7 ms on localhost (progress: 2/2) 14/07/10 20:05:12 INFO DAGScheduler: Stage 6 (collectAsMap at SparkKMeans.scala:75) finished in 0.008 s 14/07/10 20:05:12 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 14/07/10 20:05:12 INFO SparkContext: Job finished: collectAsMap at SparkKMeans.scala:75, took 0.02472681 s Finished iteration (delta = 0.0) Final centers: DenseVector(2.8571428571428568, 2.0) DenseVector(5.6005, 2.0) On Thursday, July 10, 2014 12:02 PM, Bertrand Dechoux decho...@gmail.com wrote: A picture is worth a thousand... Well, a picture with this dataset, what you are expecting and what you get, would help answering your initial question. Bertrand On Thu, Jul 10, 2014 at 10:44 AM, Wanda Hawk wanda_haw...@yahoo.com wrote: Can someone please run the standard kMeans code on this input with 2 centers ?: 2 1 1 2 3 2 2 3 4 1 5 1 6 1 4 2 6 2 4 3 5 3 6 3 The obvious result should be (2,2) and (5,2) ... (you can draw them if you don't believe me ...) Thanks, Wanda
Re: Terminal freeze during SVM
news20.binary's feature dimension is 1.35M. So the serialized task size is above the default limit 10M. You need to set spark.akka.frameSize to, e.g, 20. Due to a bug SPARK-1112, this parameter is not passed to executors automatically, which causes Spark freezes. This was fixed in the latest master and v1.0.1-rc2. If you rebuild spark, remember to sync the assembly jar to workers. -Xiangrui On Thu, Jul 10, 2014 at 7:56 AM, AlexanderRiggers alexander.rigg...@gmail.com wrote: Tried the newest branch, but still get stuck on the same task: (kill) runJob at SlidingRDD.scala:74 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Terminal-freeze-during-SVM-Broken-pipe-tp9022p9304.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to RDD.take(middle 10 elements)
This is expensive but doable: rdd.zipWithIndex().filter { case (_, idx) = idx = 10 idx 20 }.collect() -Xiangrui On Thu, Jul 10, 2014 at 12:53 PM, Nick Chammas nicholas.cham...@gmail.com wrote: Interesting question on Stack Overflow: http://stackoverflow.com/q/24677180/877069 Basically, is there a way to take() elements of an RDD at an arbitrary index? Nick View this message in context: How to RDD.take(middle 10 elements) Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: ML classifier and data format for dataset with variable number of features
You can load the dataset as an RDD of JSON object and use a flatMap to extract feature vectors at object level. Then you can filter the training examples you want for binary classification. If you want to try multiclass, checkout DB's PR at https://github.com/apache/spark/pull/1379 Best, Xiangrui On Fri, Jul 11, 2014 at 5:12 PM, SK skrishna...@gmail.com wrote: Hi, I need to perform binary classification on an image dataset. Each image is a data point described by a Json object. The feature set for each image is a set of feature vectors, each feature vector corresponding to a distinct object in the image. For example, if an image has 5 objects, its feature set will have 5 feature vectors, whereas an image that has 3 objects will have a feature set consisting of 3 feature vectors. So the number of feature vectors may be different for different images, although each feature vector has the same number of attributes. The classification depends on the features of the individual objects, so I cannot aggregate them all into a flat vector. I have looked through the Mllib examples and it appears that the libSVM data format and the LabeledData format that Mllib uses, require all the points to have the same number of features and they read in a flat feature vector. I would like to know if any of the Mllib supervised learning classifiers can be used with json data format and whether they can be used to classify points with different number of features as described above. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ML-classifier-and-data-format-for-dataset-with-variable-number-of-features-tp9486.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: mapPartitionsWithIndex
You should return an iterator in mapPartitionsWIthIndex. This is from the programming guide (http://spark.apache.org/docs/latest/programming-guide.html): mapPartitionsWithIndex(func): Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, IteratorT) = IteratorU when running on an RDD of type T. For your case, try something similar to the following: val keyval=dRDD.mapPartitionsWithIndex { (ind,iter) = iter.map(x = process(ind,x.trim().split(' ').map(_.toDouble),q,m,r)) } -Xiangrui On Sun, Jul 13, 2014 at 11:26 PM, Madhura das.madhur...@gmail.com wrote: I have a text file consisting of a large number of random floating values separated by spaces. I am loading this file into a RDD in scala. I have heard of mapPartitionsWithIndex but I haven't been able to implement it. For each partition I want to call a method(process in this case) to which I want to pass the partition and it's respective index as parameters. My method returns a pair of values. This is what I have done. val dRDD = sc.textFile(hdfs://master:54310/Data/input*) var ind:Int=0 val keyval= dRDD.mapPartitionsWithIndex((ind,x) = process(ind,x,...)) val res=keyval.collect() We are not able to access res(0)._1 and res(0)._2 The error log is as follows. [error] SimpleApp.scala:420: value trim is not a member of Iterator[String] [error] Error occurred in an application involving default arguments. [error] val keyval=dRDD.mapPartitionsWithIndex( (ind,x) = process(ind,x.trim().split(' ').map(_.toDouble),q,m,r)) [error] ^ [error] SimpleApp.scala:425: value mkString is not a member of Array[Nothing] [error] println(res.mkString()) [error] ^ [error] /SimpleApp.scala:427: value _1 is not a member of Nothing [error] var final= res(0)._1 [error] ^ [error] /home/madhura/DTWspark/src/main/scala/SimpleApp.scala:428: value _2 is not a member of Nothing [error] var final1 = res(0)._2 - m +1 [error] ^ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitionsWithIndex-tp9590.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Error when testing with large sparse svm
You need to set a larger `spark.akka.frameSize`, e.g., 128, for the serialized weight vector. There is a JIRA about switching automatically between sending through akka or broadcast: https://issues.apache.org/jira/browse/SPARK-2361 . -Xiangrui On Mon, Jul 14, 2014 at 12:15 AM, crater cq...@ucmerced.edu wrote: Hi, I encounter an error when testing svm (example one) on very large sparse data. The dataset I ran on was a toy dataset with only ten examples but 13 million sparse vector with a few thousands non-zero entries. The errors is showing below. I am wondering is this a bug or I am missing something? 14/07/13 23:59:44 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/07/13 23:59:44 INFO SecurityManager: Changing view acls to: chengjie 14/07/13 23:59:44 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(chengjie) 14/07/13 23:59:45 INFO Slf4jLogger: Slf4jLogger started 14/07/13 23:59:45 INFO Remoting: Starting remoting 14/07/13 23:59:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@master:53173] 14/07/13 23:59:45 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@master:53173] 14/07/13 23:59:45 INFO SparkEnv: Registering MapOutputTracker 14/07/13 23:59:45 INFO SparkEnv: Registering BlockManagerMaster 14/07/13 23:59:45 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140713235945-c78f 14/07/13 23:59:45 INFO MemoryStore: MemoryStore started with capacity 14.4 GB. 14/07/13 23:59:45 INFO ConnectionManager: Bound socket to port 37674 with id = ConnectionManagerId(master,37674) 14/07/13 23:59:45 INFO BlockManagerMaster: Trying to register BlockManager 14/07/13 23:59:45 INFO BlockManagerInfo: Registering block manager master:37674 with 14.4 GB RAM 14/07/13 23:59:45 INFO BlockManagerMaster: Registered BlockManager 14/07/13 23:59:45 INFO HttpServer: Starting HTTP Server 14/07/13 23:59:45 INFO HttpBroadcast: Broadcast server started at http://10.10.255.128:41838 14/07/13 23:59:45 INFO HttpFileServer: HTTP File server directory is /tmp/spark-ac459d4b-a3c4-4577-bad4-576ac427d0bf 14/07/13 23:59:45 INFO HttpServer: Starting HTTP Server 14/07/13 23:59:51 INFO SparkUI: Started SparkUI at http://master:4040 14/07/13 23:59:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/13 23:59:52 INFO EventLoggingListener: Logging events to /tmp/spark-events/binaryclassification-with-params(hdfs---master-9001-splice.small,1,1.0,svm,l1,0.1)-1405317591776 14/07/13 23:59:52 INFO SparkContext: Added JAR file:/home/chengjie/spark-1.0.1/examples/target/scala-2.10/spark-examples-1.0.1-hadoop2.3.0.jar at http://10.10.255.128:54689/jars/spark-examples-1.0.1-hadoop2.3.0.jar with timestamp 1405317592653 14/07/13 23:59:52 INFO AppClient$ClientActor: Connecting to master spark://master:7077... 14/07/14 00:00:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/07/14 00:00:23 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/07/14 00:00:38 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/07/14 00:00:53 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Training: 10 14/07/14 00:01:09 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 14/07/14 00:01:09 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS *Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 20:0 was 94453098 bytes which exceeds spark.akka.frameSize (10485760 bytes). Consider using broadcast variables for large values.* at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at
Re: Error when testing with large sparse svm
Is it on a standalone server? There are several settings worthing checking: 1) number of partitions, which should match the number of cores 2) driver memory (you can see it from the executor tab of the Spark WebUI and set it with --driver-memory 10g 3) the version of Spark you were running Best, Xiangrui On Mon, Jul 14, 2014 at 12:14 PM, Srikrishna S srikrishna...@gmail.com wrote: That is exactly the same error that I got. I am still having no success. Regards, Krishna On Mon, Jul 14, 2014 at 11:50 AM, crater cq...@ucmerced.edu wrote: Hi Krishna, Thanks for your help. Are you able to get your 29M data running yet? I fix the previous problem by setting larger spark.akka.frameSize, but now I get some other errors below. Did you get these errors before? 14/07/14 11:32:20 ERROR TaskSchedulerImpl: Lost executor 1 on node7: remote Akka client disassociated 14/07/14 11:32:20 WARN TaskSetManager: Lost TID 20 (task 13.0:0) 14/07/14 11:32:21 ERROR TaskSchedulerImpl: Lost executor 3 on node8: remote Akka client disassociated 14/07/14 11:32:21 WARN TaskSetManager: Lost TID 21 (task 13.0:1) 14/07/14 11:32:23 ERROR TaskSchedulerImpl: Lost executor 6 on node3: remote Akka client disassociated 14/07/14 11:32:23 WARN TaskSetManager: Lost TID 22 (task 13.0:0) 14/07/14 11:32:25 ERROR TaskSchedulerImpl: Lost executor 0 on node4: remote Akka client disassociated 14/07/14 11:32:25 WARN TaskSetManager: Lost TID 23 (task 13.0:1) 14/07/14 11:32:26 ERROR TaskSchedulerImpl: Lost executor 5 on node1: remote Akka client disassociated 14/07/14 11:32:26 WARN TaskSetManager: Lost TID 24 (task 13.0:0) 14/07/14 11:32:28 ERROR TaskSchedulerImpl: Lost executor 7 on node6: remote Akka client disassociated 14/07/14 11:32:28 WARN TaskSetManager: Lost TID 26 (task 13.0:0) 14/07/14 11:32:28 ERROR TaskSetManager: Task 13.0:0 failed 4 times; aborting job Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 13.0:0 failed 4 times, most recent failure: TID 26 on host node6 failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-tp9592p9623.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: ALS on EC2
Could you share the code of RecommendationALS and the complete spark-submit command line options? Thanks! -Xiangrui On Mon, Jul 14, 2014 at 11:23 PM, Srikrishna S srikrishna...@gmail.com wrote: Using properties file: null Main class: RecommendationALS Arguments: _train.csv _validation.csv _test.csv System properties: SPARK_SUBMIT - true spark.app.name - RecommendationALS spark.jars - file:/root/projects/spark-recommendation-benchmark/benchmark_mf/target/scala-2.10/recommendation-benchmark_2.10-1.0.jar spark.master - local[8] Classpath elements: file:/root/projects/spark-recommendation-benchmark/benchmark_mf/target/scala-2.10/recommendation-benchmark_2.10-1.0.jar 14/07/15 05:57:41 INFO Slf4jLogger: Slf4jLogger started 14/07/15 05:57:41 INFO Remoting: Starting remoting 14/07/15 05:57:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@ip-172-31-19-62.us-west-2.compute.internal:57349] 14/07/15 05:57:41 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@ip-172-31-19-62.us-west-2.compute.internal:57349] 14/07/15 05:57:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable --args is deprecated. Use --arg instead. 14/07/15 05:57:43 INFO RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 14/07/15 05:57:44 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 14/07/15 05:57:45 INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 14 and it continues trying and sleeping. Any help?