Yeah, without caching makes it gets really slow. I will try to minimize the
number of columns on my tables, that may save lots of memory and will
eventually work.
I will let you know.

Thanks!
Gustavo

On Tue, Mar 3, 2015 at 8:58 PM, Joseph Bradley <jos...@databricks.com>
wrote:

> I would recommend caching; if you can't persist, iterative algorithms will
> not work well.
>
> I don't think calling count on the dataset is problematic; every iteration
> in LBFGS iterates over the whole dataset and does a lot more computation
> than count().
>
> It would be helpful to see some error occurring within LBFGS.  With the
> given stack trace, I'm not sure what part of LBFGS it's happening in.
>
> On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres <
> gsala...@ime.usp.br> wrote:
>
>> Yeah, I can call count before that and it works. Also I was over caching
>> tables but I removed those. Now there is no caching but it gets really slow
>> since it calculates my table RDD many times.
>> Also hacked the LBFGS code to pass the number of examples which I
>> calculated outside in a Spark SQL query but just moved the location of the
>> problem.
>>
>> The query I'm running looks like this:
>>
>> s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB
>>  ON tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' "
>>
>> mappedFields contains a list of fields which I'm interested in. The
>> result of that query goes through (including sampling) some transformations
>> before being input to LBFGS.
>>
>> My dataset has 180GB just for feature selection, I'm planning to use
>> 450GB to train the final model and I'm using 16 c3.2xlarge EC2 instances,
>> that means I have 240GB of RAM available.
>>
>> Any suggestion? I'm starting to check the algorithm because I don't
>> understand why it needs to count the dataset.
>>
>> Thanks
>>
>> Gustavo
>>
>> On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <jos...@databricks.com>
>> wrote:
>>
>>> Is that error actually occurring in LBFGS?  It looks like it might be
>>> happening before the data even gets to LBFGS.  (Perhaps the outer join
>>> you're trying to do is making the dataset size explode a bit.)  Are you
>>> able to call count() (or any RDD action) on the data before you pass it to
>>> LBFGS?
>>>
>>> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres <
>>> gsala...@ime.usp.br> wrote:
>>>
>>>> Just did with the same error.
>>>> I think the problem is the "data.count()" call in LBFGS because for
>>>> huge datasets that's naive to do.
>>>> I was thinking to write my version of LBFGS but instead of doing
>>>> data.count() I will pass that parameter which I will calculate from a Spark
>>>> SQL query.
>>>>
>>>> I will let you know.
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> Can you try increasing your driver memory, reducing the executors and
>>>>> increasing the executor memory?
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres <
>>>>> gsala...@ime.usp.br> wrote:
>>>>>
>>>>>> Hi there:
>>>>>>
>>>>>> I'm using LBFGS optimizer to train a logistic regression model. The
>>>>>> code I implemented follows the pattern showed in
>>>>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but
>>>>>> training data is obtained from a Spark SQL RDD.
>>>>>> The problem I'm having is that LBFGS tries to count the elements in
>>>>>> my RDD and that results in a OOM exception since my dataset is huge.
>>>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on
>>>>>> Hadoop YARN. My dataset is about 150 GB but I sample (I take only 1% of 
>>>>>> the
>>>>>> data) it in order to scale logistic regression.
>>>>>> The exception I'm getting is this:
>>>>>>
>>>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
>>>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>>>         at java.lang.String.<init>(String.java:203)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>>         at
>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>>>>         at
>>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>>>         at
>>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>>>>>>         at
>>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>>>         at
>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>>>         at
>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>         at
>>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>         at
>>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>>>         at
>>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>>>         at
>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>         at
>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>>>         at org.apache.spark.sql.execution.joins.HashOuterJoin.org
>>>>>> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
>>>>>>         at
>>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
>>>>>>         at
>>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
>>>>>>         at
>>>>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>>>>         at
>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>>>
>>>>>> I'm using this parameters at runtime:
>>>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G
>>>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>>>>>> --conf spark.storage.memoryFraction=0.2
>>>>>>
>>>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
>>>>>> error.
>>>>>> I will appreciate any help on this problem. I have been trying to
>>>>>> solve it for days and I'm running out of time and hair.
>>>>>>
>>>>>> Thanks
>>>>>> Gustavo
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to