I am running into this issue as well, when storing large Arrays as the
value in a kv pair
and then doing a reducebykey.
Can one of the experts please comment if it would make sense to add an
operation to
add values in place like accumulators do - this would essentially merge the
vectors for
a given key in place, avoiding multiple allocations of temp array/vectors.
This should
be faster for datasets with frequently repeated keys.

On Tue, Jan 27, 2015 at 11:03 AM, Xiangrui Meng <men...@gmail.com> wrote:

> 60m-vector costs 480MB memory. You have 12 of them to be reduced to the
> driver. So you need ~6GB memory not counting the temp vectors generated
> from '_+_'. You need to increase driver memory to make it work. That being
> said, ~10^7 hits the limit for the current impl of glm. -Xiangrui
> On Jan 23, 2015 2:19 PM, "DB Tsai" <dbt...@dbtsai.com> wrote:
>
> > Hi Alexander,
> >
> > For `reduce`, it's an action that will collect all the data from
> > mapper to driver, and perform the aggregation in driver. As a result,
> > if the output from the mapper is very large, and the numbers of
> > partitions in mapper are large, it might cause a problem.
> >
> > For `treeReduce`, as the name indicates, the way it works is in the
> > first layer, it aggregates the output of the mappers two by two
> > resulting half of the numbers of output. And then, we continuously do
> > the aggregation layer by layer. The final aggregation will be done in
> > driver but in this time, the numbers of data are small.
> >
> > By default, depth 2 is used, so if you have so many partitions of
> > large vector, this may still cause issue. You can increase the depth
> > into higher numbers such that in the final reduce in driver, the
> > number of partitions are very small.
> >
> > Sincerely,
> >
> > DB Tsai
> > -------------------------------------------------------
> > Blog: https://www.dbtsai.com
> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >
> >
> >
> > On Fri, Jan 23, 2015 at 12:07 PM, Ulanov, Alexander
> > <alexander.ula...@hp.com> wrote:
> > > Hi DB Tsai,
> > >
> > > Thank you for your suggestion. Actually, I've started my experiments
> > with "treeReduce". Originally, I had "vv.treeReduce(_ + _, 2)" in my
> script
> > exactly because MLlib optimizers are using it, as you pointed out with
> > LBFGS. However, it leads to the same problems as "reduce", but presumably
> > not so directly. As far as I understand, treeReduce limits the number of
> > communications between workers and master forcing workers to partially
> > compute the reduce operation.
> > >
> > > Are you sure that driver will first collect all results (or all partial
> > results in treeReduce) and ONLY then perform aggregation? If that is the
> > problem, then how to force it to do aggregation after receiving each
> > portion of data from Workers?
> > >
> > > Best regards, Alexander
> > >
> > > -----Original Message-----
> > > From: DB Tsai [mailto:dbt...@dbtsai.com]
> > > Sent: Friday, January 23, 2015 11:53 AM
> > > To: Ulanov, Alexander
> > > Cc: dev@spark.apache.org
> > > Subject: Re: Maximum size of vector that reduce can handle
> > >
> > > Hi Alexander,
> > >
> > > When you use `reduce` to aggregate the vectors, those will actually be
> > pulled into driver, and merged over there. Obviously, it's not scaleable
> > given you are doing deep neural networks which have so many coefficients.
> > >
> > > Please try treeReduce instead which is what we do in linear regression
> > and logistic regression.
> > >
> > > See
> >
> https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
> > > for example.
> > >
> > > val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n),
> > 0.0))( seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
> > features)) => val l = localGradient.compute( features, label, bcW.value,
> > grad) (grad, loss + l) }, combOp = (c1, c2) => (c1, c2) match { case
> > ((grad1, loss1), (grad2, loss2)) => axpy(1.0, grad2, grad1) (grad1,
> loss1 +
> > loss2)
> > > })
> > >
> > > Sincerely,
> > >
> > > DB Tsai
> > > -------------------------------------------------------
> > > Blog: https://www.dbtsai.com
> > > LinkedIn: https://www.linkedin.com/in/dbtsai
> > >
> > >
> > >
> > > On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander <
> > alexander.ula...@hp.com> wrote:
> > >> Dear Spark developers,
> > >>
> > >> I am trying to measure the Spark reduce performance for big vectors.
> My
> > motivation is related to machine learning gradient. Gradient is a vector
> > that is computed on each worker and then all results need to be summed up
> > and broadcasted back to workers. For example, present machine learning
> > applications involve very long parameter vectors, for deep neural
> networks
> > it can be up to 2Billions. So, I want to measure the time that is needed
> > for this operation depending on the size of vector and number of
> workers. I
> > wrote few lines of code that assume that Spark will distribute partitions
> > among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4
> cores,
> > 16GB RAM), each runs 2 Workers.
> > >>
> > >> import org.apache.spark.mllib.rdd.RDDFunctions._
> > >> import breeze.linalg._
> > >> import org.apache.log4j._
> > >> Logger.getRootLogger.setLevel(Level.OFF)
> > >> val n = 60000000
> > >> val p = 12
> > >> val vv = sc.parallelize(0 until p, p).map(i =>
> > >> DenseVector.rand[Double]( n )) vv.reduce(_ + _)
> > >>
> > >> When executing in shell with 60M vector it crashes after some period
> of
> > time. One of the node contains the following in stdout:
> > >> Java HotSpot(TM) 64-Bit Server VM warning: INFO:
> > >> os::commit_memory(0x0000000755500000, 2863661056, 0) failed;
> > >> error='Cannot allocate memory' (errno=12) # # There is insufficient
> > memory for the Java Runtime Environment to continue.
> > >> # Native memory allocation (malloc) failed to allocate 2863661056
> bytes
> > for committing reserved memory.
> > >>
> > >> I run shell with --executor-memory 8G --driver-memory 8G, so handling
> > 60M vector of Double should not be a problem. Are there any big overheads
> > for this? What is the maximum size of vector that reduce can handle?
> > >>
> > >> Best regards, Alexander
> > >>
> > >> P.S.
> > >>
> > >> "spark.driver.maxResultSize 0" needs to set in order to run this code.
> > I also needed to change "java.io.tmpdir" and "spark.local.dir" folders
> > because my /tmp folder which is default, was too small and Spark swaps
> > heavily into this folder. Without these settings I get either "no space
> > left on device" or "out of memory" exceptions.
> > >>
> > >> I also submitted a bug
> > >> https://issues.apache.org/jira/browse/SPARK-5386
> > >>
> > >> ---------------------------------------------------------------------
> > >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For
> > >> additional commands, e-mail: dev-h...@spark.apache.org
> > >>
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> > For additional commands, e-mail: dev-h...@spark.apache.org
> >
> >
>

Reply via email to