Yeah, without caching makes it gets really slow. I will try to minimize the
number of columns on my tables, that may save lots of memory and will
eventually work.
I will let you know.


On Tue, Mar 3, 2015 at 8:58 PM, Joseph Bradley <>

> I would recommend caching; if you can't persist, iterative algorithms will
> not work well.
> I don't think calling count on the dataset is problematic; every iteration
> in LBFGS iterates over the whole dataset and does a lot more computation
> than count().
> It would be helpful to see some error occurring within LBFGS.  With the
> given stack trace, I'm not sure what part of LBFGS it's happening in.
> On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres <
>> wrote:
>> Yeah, I can call count before that and it works. Also I was over caching
>> tables but I removed those. Now there is no caching but it gets really slow
>> since it calculates my table RDD many times.
>> Also hacked the LBFGS code to pass the number of examples which I
>> calculated outside in a Spark SQL query but just moved the location of the
>> problem.
>> The query I'm running looks like this:
>> s"SELECT $mappedFields, as b_id FROM tableA LEFT JOIN tableB
>>  ON WHERE tableA.status='' OR tableB.status='' "
>> mappedFields contains a list of fields which I'm interested in. The
>> result of that query goes through (including sampling) some transformations
>> before being input to LBFGS.
>> My dataset has 180GB just for feature selection, I'm planning to use
>> 450GB to train the final model and I'm using 16 c3.2xlarge EC2 instances,
>> that means I have 240GB of RAM available.
>> Any suggestion? I'm starting to check the algorithm because I don't
>> understand why it needs to count the dataset.
>> Thanks
>> Gustavo
>> On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <>
>> wrote:
>>> Is that error actually occurring in LBFGS?  It looks like it might be
>>> happening before the data even gets to LBFGS.  (Perhaps the outer join
>>> you're trying to do is making the dataset size explode a bit.)  Are you
>>> able to call count() (or any RDD action) on the data before you pass it to
>>> LBFGS?
>>> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres <
>>>> wrote:
>>>> Just did with the same error.
>>>> I think the problem is the "data.count()" call in LBFGS because for
>>>> huge datasets that's naive to do.
>>>> I was thinking to write my version of LBFGS but instead of doing
>>>> data.count() I will pass that parameter which I will calculate from a Spark
>>>> SQL query.
>>>> I will let you know.
>>>> Thanks
>>>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <>
>>>> wrote:
>>>>> Can you try increasing your driver memory, reducing the executors and
>>>>> increasing the executor memory?
>>>>> Thanks
>>>>> Best Regards
>>>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres <
>>>>>> wrote:
>>>>>> Hi there:
>>>>>> I'm using LBFGS optimizer to train a logistic regression model. The
>>>>>> code I implemented follows the pattern showed in
>>>>>> but
>>>>>> training data is obtained from a Spark SQL RDD.
>>>>>> The problem I'm having is that LBFGS tries to count the elements in
>>>>>> my RDD and that results in a OOM exception since my dataset is huge.
>>>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on
>>>>>> Hadoop YARN. My dataset is about 150 GB but I sample (I take only 1% of 
>>>>>> the
>>>>>> data) it in order to scale logistic regression.
>>>>>> The exception I'm getting is this:
>>>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
>>>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>         at java.util.Arrays.copyOfRange(
>>>>>>         at java.lang.String.<init>(
>>>>>>         at
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
>>>>>>         at com.esotericsoftware.kryo.Kryo.readObject(
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$
>>>>>>         at
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(
>>>>>>         at
>>>>>>         at
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(
>>>>>>         at
>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>>>>>>         at
>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>>>         at
>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>         at
>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>         at
>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>         at
>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>         at
>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>         at
>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>         at
>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>         at
>>>>>> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
>>>>>>         at
>>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
>>>>>>         at
>>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
>>>>>>         at
>>>>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>>>>         at
>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>> I'm using this parameters at runtime:
>>>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G
>>>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>>>>>> --conf
>>>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
>>>>>> error.
>>>>>> I will appreciate any help on this problem. I have been trying to
>>>>>> solve it for days and I'm running out of time and hair.
>>>>>> Thanks
>>>>>> Gustavo

Reply via email to