I am running into this issue as well, when storing large Arrays as the value in a kv pair and then doing a reducebykey. Can one of the experts please comment if it would make sense to add an operation to add values in place like accumulators do - this would essentially merge the vectors for a given key in place, avoiding multiple allocations of temp array/vectors. This should be faster for datasets with frequently repeated keys.
On Tue, Jan 27, 2015 at 11:03 AM, Xiangrui Meng <men...@gmail.com> wrote: > 60m-vector costs 480MB memory. You have 12 of them to be reduced to the > driver. So you need ~6GB memory not counting the temp vectors generated > from '_+_'. You need to increase driver memory to make it work. That being > said, ~10^7 hits the limit for the current impl of glm. -Xiangrui > On Jan 23, 2015 2:19 PM, "DB Tsai" <dbt...@dbtsai.com> wrote: > > > Hi Alexander, > > > > For `reduce`, it's an action that will collect all the data from > > mapper to driver, and perform the aggregation in driver. As a result, > > if the output from the mapper is very large, and the numbers of > > partitions in mapper are large, it might cause a problem. > > > > For `treeReduce`, as the name indicates, the way it works is in the > > first layer, it aggregates the output of the mappers two by two > > resulting half of the numbers of output. And then, we continuously do > > the aggregation layer by layer. The final aggregation will be done in > > driver but in this time, the numbers of data are small. > > > > By default, depth 2 is used, so if you have so many partitions of > > large vector, this may still cause issue. You can increase the depth > > into higher numbers such that in the final reduce in driver, the > > number of partitions are very small. > > > > Sincerely, > > > > DB Tsai > > ------------------------------------------------------- > > Blog: https://www.dbtsai.com > > LinkedIn: https://www.linkedin.com/in/dbtsai > > > > > > > > On Fri, Jan 23, 2015 at 12:07 PM, Ulanov, Alexander > > <alexander.ula...@hp.com> wrote: > > > Hi DB Tsai, > > > > > > Thank you for your suggestion. Actually, I've started my experiments > > with "treeReduce". Originally, I had "vv.treeReduce(_ + _, 2)" in my > script > > exactly because MLlib optimizers are using it, as you pointed out with > > LBFGS. However, it leads to the same problems as "reduce", but presumably > > not so directly. As far as I understand, treeReduce limits the number of > > communications between workers and master forcing workers to partially > > compute the reduce operation. > > > > > > Are you sure that driver will first collect all results (or all partial > > results in treeReduce) and ONLY then perform aggregation? If that is the > > problem, then how to force it to do aggregation after receiving each > > portion of data from Workers? > > > > > > Best regards, Alexander > > > > > > -----Original Message----- > > > From: DB Tsai [mailto:dbt...@dbtsai.com] > > > Sent: Friday, January 23, 2015 11:53 AM > > > To: Ulanov, Alexander > > > Cc: dev@spark.apache.org > > > Subject: Re: Maximum size of vector that reduce can handle > > > > > > Hi Alexander, > > > > > > When you use `reduce` to aggregate the vectors, those will actually be > > pulled into driver, and merged over there. Obviously, it's not scaleable > > given you are doing deep neural networks which have so many coefficients. > > > > > > Please try treeReduce instead which is what we do in linear regression > > and logistic regression. > > > > > > See > > > https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala > > > for example. > > > > > > val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), > > 0.0))( seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, > > features)) => val l = localGradient.compute( features, label, bcW.value, > > grad) (grad, loss + l) }, combOp = (c1, c2) => (c1, c2) match { case > > ((grad1, loss1), (grad2, loss2)) => axpy(1.0, grad2, grad1) (grad1, > loss1 + > > loss2) > > > }) > > > > > > Sincerely, > > > > > > DB Tsai > > > ------------------------------------------------------- > > > Blog: https://www.dbtsai.com > > > LinkedIn: https://www.linkedin.com/in/dbtsai > > > > > > > > > > > > On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander < > > alexander.ula...@hp.com> wrote: > > >> Dear Spark developers, > > >> > > >> I am trying to measure the Spark reduce performance for big vectors. > My > > motivation is related to machine learning gradient. Gradient is a vector > > that is computed on each worker and then all results need to be summed up > > and broadcasted back to workers. For example, present machine learning > > applications involve very long parameter vectors, for deep neural > networks > > it can be up to 2Billions. So, I want to measure the time that is needed > > for this operation depending on the size of vector and number of > workers. I > > wrote few lines of code that assume that Spark will distribute partitions > > among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4 > cores, > > 16GB RAM), each runs 2 Workers. > > >> > > >> import org.apache.spark.mllib.rdd.RDDFunctions._ > > >> import breeze.linalg._ > > >> import org.apache.log4j._ > > >> Logger.getRootLogger.setLevel(Level.OFF) > > >> val n = 60000000 > > >> val p = 12 > > >> val vv = sc.parallelize(0 until p, p).map(i => > > >> DenseVector.rand[Double]( n )) vv.reduce(_ + _) > > >> > > >> When executing in shell with 60M vector it crashes after some period > of > > time. One of the node contains the following in stdout: > > >> Java HotSpot(TM) 64-Bit Server VM warning: INFO: > > >> os::commit_memory(0x0000000755500000, 2863661056, 0) failed; > > >> error='Cannot allocate memory' (errno=12) # # There is insufficient > > memory for the Java Runtime Environment to continue. > > >> # Native memory allocation (malloc) failed to allocate 2863661056 > bytes > > for committing reserved memory. > > >> > > >> I run shell with --executor-memory 8G --driver-memory 8G, so handling > > 60M vector of Double should not be a problem. Are there any big overheads > > for this? What is the maximum size of vector that reduce can handle? > > >> > > >> Best regards, Alexander > > >> > > >> P.S. > > >> > > >> "spark.driver.maxResultSize 0" needs to set in order to run this code. > > I also needed to change "java.io.tmpdir" and "spark.local.dir" folders > > because my /tmp folder which is default, was too small and Spark swaps > > heavily into this folder. Without these settings I get either "no space > > left on device" or "out of memory" exceptions. > > >> > > >> I also submitted a bug > > >> https://issues.apache.org/jira/browse/SPARK-5386 > > >> > > >> --------------------------------------------------------------------- > > >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For > > >> additional commands, e-mail: dev-h...@spark.apache.org > > >> > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > > For additional commands, e-mail: dev-h...@spark.apache.org > > > > >