Maximum size of vector that reduce can handle
Dear Spark developers, I am trying to measure the Spark reduce performance for big vectors. My motivation is related to machine learning gradient. Gradient is a vector that is computed on each worker and then all results need to be summed up and broadcasted back to workers. For example, present machine learning applications involve very long parameter vectors, for deep neural networks it can be up to 2Billions. So, I want to measure the time that is needed for this operation depending on the size of vector and number of workers. I wrote few lines of code that assume that Spark will distribute partitions among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores, 16GB RAM), each runs 2 Workers. import org.apache.spark.mllib.rdd.RDDFunctions._ import breeze.linalg._ import org.apache.log4j._ Logger.getRootLogger.setLevel(Level.OFF) val n = 6000 val p = 12 val vv = sc.parallelize(0 until p, p).map(i = DenseVector.rand[Double]( n )) vv.reduce(_ + _) When executing in shell with 60M vector it crashes after some period of time. One of the node contains the following in stdout: Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00075550, 2863661056, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (malloc) failed to allocate 2863661056 bytes for committing reserved memory. I run shell with --executor-memory 8G --driver-memory 8G, so handling 60M vector of Double should not be a problem. Are there any big overheads for this? What is the maximum size of vector that reduce can handle? Best regards, Alexander P.S. spark.driver.maxResultSize 0 needs to set in order to run this code. I also needed to change java.io.tmpdir and spark.local.dir folders because my /tmp folder which is default, was too small and Spark swaps heavily into this folder. Without these settings I get either no space left on device or out of memory exceptions. I also submitted a bug https://issues.apache.org/jira/browse/SPARK-5386 - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: query planner design doc?
Okay, thanks. The design document mostly details the infrastructure for optimization strategies but doesn’t detail the strategies themselves. I take it the set of strategies are basically embodied in SparkStrategies.scala...is there a design doc/roadmap/JIRA issue detailing what strategies exist and which are planned? Thanks, Nick On Jan 22, 2015, at 7:45 PM, Michael Armbrust mich...@databricks.com wrote: Here is the initial design document for catalyst : https://docs.google.com/document/d/1Hc_Ehtr0G8SQUg69cmViZsMi55_Kf3tISD9GPGU5M1Y/edit https://docs.google.com/document/d/1Hc_Ehtr0G8SQUg69cmViZsMi55_Kf3tISD9GPGU5M1Y/edit Strategies (many of which are in SparkStragegies.scala) are the part that creates the physical operators from a catalyst logical plan. These operators have execute() methods that actually call RDD operations. On Thu, Jan 22, 2015 at 3:19 PM, Nicholas Murphy halcyo...@gmail.com mailto:halcyo...@gmail.com wrote: Hi- Quick question: is there a design doc (or something more than “look at the code”) for the query planner for Spark SQL (i.e., the component that takes…Catalyst?…operator trees and translates them into SPARK operations)? Thanks, Nick - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org mailto:dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org mailto:dev-h...@spark.apache.org
Re: Optimize encoding/decoding strings when using Parquet
Added PR https://github.com/apache/spark/pull/4139 https://github.com/apache/spark/pull/4139 - I think tests have been re-arranged so merge necessary Mick On 19 Jan 2015, at 18:31, Reynold Xin r...@databricks.com wrote: Definitely go for a pull request! On Mon, Jan 19, 2015 at 10:10 AM, Mick Davies michael.belldav...@gmail.com mailto:michael.belldav...@gmail.com wrote: Looking at Parquet code - it looks like hooks are already in place to support this. In particular PrimitiveConverter has methods hasDictionarySupport and addValueFromDictionary for this purpose. These are not used by CatalystPrimitiveConverter. I think that it would be pretty straightforward to add this. Has anyone considered this? Shall I get a pull request together for it. Mick -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Optimize-encoding-decoding-strings-when-using-Parquet-tp10141p10195.html http://apache-spark-developers-list.1001551.n3.nabble.com/Optimize-encoding-decoding-strings-when-using-Parquet-tp10141p10195.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org mailto:dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org mailto:dev-h...@spark.apache.org
RE: Maximum size of vector that reduce can handle
Hi DB Tsai, Thank you for your suggestion. Actually, I've started my experiments with treeReduce. Originally, I had vv.treeReduce(_ + _, 2) in my script exactly because MLlib optimizers are using it, as you pointed out with LBFGS. However, it leads to the same problems as reduce, but presumably not so directly. As far as I understand, treeReduce limits the number of communications between workers and master forcing workers to partially compute the reduce operation. Are you sure that driver will first collect all results (or all partial results in treeReduce) and ONLY then perform aggregation? If that is the problem, then how to force it to do aggregation after receiving each portion of data from Workers? Best regards, Alexander -Original Message- From: DB Tsai [mailto:dbt...@dbtsai.com] Sent: Friday, January 23, 2015 11:53 AM To: Ulanov, Alexander Cc: dev@spark.apache.org Subject: Re: Maximum size of vector that reduce can handle Hi Alexander, When you use `reduce` to aggregate the vectors, those will actually be pulled into driver, and merged over there. Obviously, it's not scaleable given you are doing deep neural networks which have so many coefficients. Please try treeReduce instead which is what we do in linear regression and logistic regression. See https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala for example. val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))( seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = localGradient.compute( features, label, bcW.value, grad) (grad, loss + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = axpy(1.0, grad2, grad1) (grad1, loss1 + loss2) }) Sincerely, DB Tsai --- Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander alexander.ula...@hp.com wrote: Dear Spark developers, I am trying to measure the Spark reduce performance for big vectors. My motivation is related to machine learning gradient. Gradient is a vector that is computed on each worker and then all results need to be summed up and broadcasted back to workers. For example, present machine learning applications involve very long parameter vectors, for deep neural networks it can be up to 2Billions. So, I want to measure the time that is needed for this operation depending on the size of vector and number of workers. I wrote few lines of code that assume that Spark will distribute partitions among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores, 16GB RAM), each runs 2 Workers. import org.apache.spark.mllib.rdd.RDDFunctions._ import breeze.linalg._ import org.apache.log4j._ Logger.getRootLogger.setLevel(Level.OFF) val n = 6000 val p = 12 val vv = sc.parallelize(0 until p, p).map(i = DenseVector.rand[Double]( n )) vv.reduce(_ + _) When executing in shell with 60M vector it crashes after some period of time. One of the node contains the following in stdout: Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00075550, 2863661056, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (malloc) failed to allocate 2863661056 bytes for committing reserved memory. I run shell with --executor-memory 8G --driver-memory 8G, so handling 60M vector of Double should not be a problem. Are there any big overheads for this? What is the maximum size of vector that reduce can handle? Best regards, Alexander P.S. spark.driver.maxResultSize 0 needs to set in order to run this code. I also needed to change java.io.tmpdir and spark.local.dir folders because my /tmp folder which is default, was too small and Spark swaps heavily into this folder. Without these settings I get either no space left on device or out of memory exceptions. I also submitted a bug https://issues.apache.org/jira/browse/SPARK-5386 - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: query planner design doc?
No, are you looking for something in particular? On Fri, Jan 23, 2015 at 9:44 AM, Nicholas Murphy halcyo...@gmail.com wrote: Okay, thanks. The design document mostly details the infrastructure for optimization strategies but doesn’t detail the strategies themselves. I take it the set of strategies are basically embodied in SparkStrategies.scala...is there a design doc/roadmap/JIRA issue detailing what strategies exist and which are planned? Thanks, Nick On Jan 22, 2015, at 7:45 PM, Michael Armbrust mich...@databricks.com wrote: Here is the initial design document for catalyst : https://docs.google.com/document/d/1Hc_Ehtr0G8SQUg69cmViZsMi55_Kf3tISD9GPGU5M1Y/edit Strategies (many of which are in SparkStragegies.scala) are the part that creates the physical operators from a catalyst logical plan. These operators have execute() methods that actually call RDD operations. On Thu, Jan 22, 2015 at 3:19 PM, Nicholas Murphy halcyo...@gmail.com wrote: Hi- Quick question: is there a design doc (or something more than “look at the code”) for the query planner for Spark SQL (i.e., the component that takes…Catalyst?…operator trees and translates them into SPARK operations)? Thanks, Nick - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Spark 1.2.0: MissingRequirementError
much appreciated if somebody could help fixing this issue -- or at least give me some hints what might be wrong thanks, Peter 2015-01-15 14:04 GMT+01:00 PierreB pierre.borckm...@realimpactanalytics.com : Hi guys, A few people seem to have the same problem with Spark 1.2.0 so I figured I would push it here. see: http://apache-spark-user-list.1001560.n3.nabble.com/MissingRequirementError-with-spark-td21149.html In a nutshell, for sbt test to work, we now need to fork a JVM and also give more memory to be able to run tests. See also: https://github.com/deanwampler/spark-workshop/blob/master/project/Build.scala This all used to work fine until 1.2.0. Could u have a look please? Thanks P. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-2-0-MissingRequirementError-tp10123.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org -- Peter Prettenhofer
Re: Maximum size of vector that reduce can handle
Hi Alexander, For `reduce`, it's an action that will collect all the data from mapper to driver, and perform the aggregation in driver. As a result, if the output from the mapper is very large, and the numbers of partitions in mapper are large, it might cause a problem. For `treeReduce`, as the name indicates, the way it works is in the first layer, it aggregates the output of the mappers two by two resulting half of the numbers of output. And then, we continuously do the aggregation layer by layer. The final aggregation will be done in driver but in this time, the numbers of data are small. By default, depth 2 is used, so if you have so many partitions of large vector, this may still cause issue. You can increase the depth into higher numbers such that in the final reduce in driver, the number of partitions are very small. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Jan 23, 2015 at 12:07 PM, Ulanov, Alexander alexander.ula...@hp.com wrote: Hi DB Tsai, Thank you for your suggestion. Actually, I've started my experiments with treeReduce. Originally, I had vv.treeReduce(_ + _, 2) in my script exactly because MLlib optimizers are using it, as you pointed out with LBFGS. However, it leads to the same problems as reduce, but presumably not so directly. As far as I understand, treeReduce limits the number of communications between workers and master forcing workers to partially compute the reduce operation. Are you sure that driver will first collect all results (or all partial results in treeReduce) and ONLY then perform aggregation? If that is the problem, then how to force it to do aggregation after receiving each portion of data from Workers? Best regards, Alexander -Original Message- From: DB Tsai [mailto:dbt...@dbtsai.com] Sent: Friday, January 23, 2015 11:53 AM To: Ulanov, Alexander Cc: dev@spark.apache.org Subject: Re: Maximum size of vector that reduce can handle Hi Alexander, When you use `reduce` to aggregate the vectors, those will actually be pulled into driver, and merged over there. Obviously, it's not scaleable given you are doing deep neural networks which have so many coefficients. Please try treeReduce instead which is what we do in linear regression and logistic regression. See https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala for example. val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), 0.0))( seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, features)) = val l = localGradient.compute( features, label, bcW.value, grad) (grad, loss + l) }, combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) = axpy(1.0, grad2, grad1) (grad1, loss1 + loss2) }) Sincerely, DB Tsai --- Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander alexander.ula...@hp.com wrote: Dear Spark developers, I am trying to measure the Spark reduce performance for big vectors. My motivation is related to machine learning gradient. Gradient is a vector that is computed on each worker and then all results need to be summed up and broadcasted back to workers. For example, present machine learning applications involve very long parameter vectors, for deep neural networks it can be up to 2Billions. So, I want to measure the time that is needed for this operation depending on the size of vector and number of workers. I wrote few lines of code that assume that Spark will distribute partitions among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores, 16GB RAM), each runs 2 Workers. import org.apache.spark.mllib.rdd.RDDFunctions._ import breeze.linalg._ import org.apache.log4j._ Logger.getRootLogger.setLevel(Level.OFF) val n = 6000 val p = 12 val vv = sc.parallelize(0 until p, p).map(i = DenseVector.rand[Double]( n )) vv.reduce(_ + _) When executing in shell with 60M vector it crashes after some period of time. One of the node contains the following in stdout: Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00075550, 2863661056, 0) failed; error='Cannot allocate memory' (errno=12) # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (malloc) failed to allocate 2863661056 bytes for committing reserved memory. I run shell with --executor-memory 8G --driver-memory 8G, so handling 60M vector of Double should not be a problem. Are there any big overheads for this? What is the maximum size of vector that reduce can handle? Best regards, Alexander P.S. spark.driver.maxResultSize 0 needs to set in order to run this code. I also needed to change
Re: Spark performance gains for small queries
Hey Matei, Thanks for your reply. We would keep in mind to use JDBC server for smaller queries. For the mapreduce job start-up, are you pointing towards JVM initialization latencies in MR? Other than JVM initialization, does Spark do any optimization (that is not done by mapreduce) to speed up the startup? -- Regards, Saumitra Shahapure On Fri, Jan 23, 2015 at 2:08 PM, Matei Zaharia matei.zaha...@gmail.com wrote: It's hard to tell without more details, but the start-up latency in Hive can sometimes be high, especially if you are running Hive on MapReduce. MR just takes 20-30 seconds per job to spin up even if the job is doing nothing. For real use of Spark SQL for short queries by the way, I'd recommend using the JDBC server so that you can have a long-running Spark process. It gets quite a bit faster after the first few queries. Matei On Jan 22, 2015, at 10:22 PM, Saumitra Shahapure (Vizury) saumitra.shahap...@vizury.com wrote: Hello, We were comparing performance of some of our production hive queries between Hive and Spark. We compared Hive(0.13)+hadoop (1.2.1) against both Spark 0.9 and 1.1. We could see that the performance gains have been good in Spark. We tried a very simple query, select count(*) from T where col3=123 in both sparkSQL and Hive (with hive.map.aggr=true) and found that Spark performance had been 2x better than Hive (120sec vs 60sec). Table T is stored in S3 and contains 600MB single GZIP file. My question is, why Spark is faster than Hive here? In both of the cases, the file will be downloaded, uncompressed and lines will be counted by a single process. For Hive case, reducer will be identity function since hive.map.aggr is true. Note that disk spills and network I/O are very less for Hive's case as well, -- Regards, Saumitra Shahapure
Find the two storage Locations of each partition of a replicated rdd.
hi, I wanna find the storage locations( BlockManagerIds) of each partition when the rdd is replicated twice. I mean, If a twice replicated rdd has got 5 partitions, I would like to know the first and second storage locations of each partition. Basically, I am trying to modify the list of nodes selected for replicating an rdd. I just want to checkout where exactly does the first and second copies of each partition gets stored. I tried upon the rdd storage details in the webUI, but couldn't gain much. Any help please!! Thank you Karthik
Re: Spark performance gains for small queries
It's hard to tell without more details, but the start-up latency in Hive can sometimes be high, especially if you are running Hive on MapReduce. MR just takes 20-30 seconds per job to spin up even if the job is doing nothing. For real use of Spark SQL for short queries by the way, I'd recommend using the JDBC server so that you can have a long-running Spark process. It gets quite a bit faster after the first few queries. Matei On Jan 22, 2015, at 10:22 PM, Saumitra Shahapure (Vizury) saumitra.shahap...@vizury.com wrote: Hello, We were comparing performance of some of our production hive queries between Hive and Spark. We compared Hive(0.13)+hadoop (1.2.1) against both Spark 0.9 and 1.1. We could see that the performance gains have been good in Spark. We tried a very simple query, select count(*) from T where col3=123 in both sparkSQL and Hive (with hive.map.aggr=true) and found that Spark performance had been 2x better than Hive (120sec vs 60sec). Table T is stored in S3 and contains 600MB single GZIP file. My question is, why Spark is faster than Hive here? In both of the cases, the file will be downloaded, uncompressed and lines will be counted by a single process. For Hive case, reducer will be identity function since hive.map.aggr is true. Note that disk spills and network I/O are very less for Hive's case as well, -- Regards, Saumitra Shahapure - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: spark 1.1.0 (w/ hadoop 2.4) vs aws java sdk 1.7.2
Did you use spark.files.userClassPathFirst = true? it's exactly for this kind of problem. On Fri, Jan 23, 2015 at 4:42 AM, William-Smith williamsmith.m...@gmail.com wrote: I have had the same issue while using HttpClient from AWS EMR Spark Streaming to post to a nodejs server. I have found ... using Classloder.getResource('org/apache/http/client/HttpClient) that the class Is being loaded front the spark-assembly-1.1.0-hadoop2.4.0.jar. That in itself is not the issue because the version is 4.2.5 the same version I am using on my local machine with success using Hadoop cdh 5. The issue is that HttpClient relies on Httpcore and there is an old commons-httpcore-1.3.jar as well as httpcore-4.5.2 in the spark-assembly jar. It looks like the old one is getting loaded first. So the fix might be to build the Spark jar myself without the httpcore-1.3 and replace it on bootstrap. I will keep you posted on the outcome. -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/spark-1-1-0-w-hadoop-2-4-vs-aws-java-sdk-1-7-2-tp8481p10250.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org