Yeah, without caching makes it gets really slow. I will try to minimize the number of columns on my tables, that may save lots of memory and will eventually work. I will let you know.
Thanks! Gustavo On Tue, Mar 3, 2015 at 8:58 PM, Joseph Bradley <jos...@databricks.com> wrote: > I would recommend caching; if you can't persist, iterative algorithms will > not work well. > > I don't think calling count on the dataset is problematic; every iteration > in LBFGS iterates over the whole dataset and does a lot more computation > than count(). > > It would be helpful to see some error occurring within LBFGS. With the > given stack trace, I'm not sure what part of LBFGS it's happening in. > > On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres < > gsala...@ime.usp.br> wrote: > >> Yeah, I can call count before that and it works. Also I was over caching >> tables but I removed those. Now there is no caching but it gets really slow >> since it calculates my table RDD many times. >> Also hacked the LBFGS code to pass the number of examples which I >> calculated outside in a Spark SQL query but just moved the location of the >> problem. >> >> The query I'm running looks like this: >> >> s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB >> ON tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' " >> >> mappedFields contains a list of fields which I'm interested in. The >> result of that query goes through (including sampling) some transformations >> before being input to LBFGS. >> >> My dataset has 180GB just for feature selection, I'm planning to use >> 450GB to train the final model and I'm using 16 c3.2xlarge EC2 instances, >> that means I have 240GB of RAM available. >> >> Any suggestion? I'm starting to check the algorithm because I don't >> understand why it needs to count the dataset. >> >> Thanks >> >> Gustavo >> >> On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <jos...@databricks.com> >> wrote: >> >>> Is that error actually occurring in LBFGS? It looks like it might be >>> happening before the data even gets to LBFGS. (Perhaps the outer join >>> you're trying to do is making the dataset size explode a bit.) Are you >>> able to call count() (or any RDD action) on the data before you pass it to >>> LBFGS? >>> >>> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres < >>> gsala...@ime.usp.br> wrote: >>> >>>> Just did with the same error. >>>> I think the problem is the "data.count()" call in LBFGS because for >>>> huge datasets that's naive to do. >>>> I was thinking to write my version of LBFGS but instead of doing >>>> data.count() I will pass that parameter which I will calculate from a Spark >>>> SQL query. >>>> >>>> I will let you know. >>>> >>>> Thanks >>>> >>>> >>>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com> >>>> wrote: >>>> >>>>> Can you try increasing your driver memory, reducing the executors and >>>>> increasing the executor memory? >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres < >>>>> gsala...@ime.usp.br> wrote: >>>>> >>>>>> Hi there: >>>>>> >>>>>> I'm using LBFGS optimizer to train a logistic regression model. The >>>>>> code I implemented follows the pattern showed in >>>>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but >>>>>> training data is obtained from a Spark SQL RDD. >>>>>> The problem I'm having is that LBFGS tries to count the elements in >>>>>> my RDD and that results in a OOM exception since my dataset is huge. >>>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on >>>>>> Hadoop YARN. My dataset is about 150 GB but I sample (I take only 1% of >>>>>> the >>>>>> data) it in order to scale logistic regression. >>>>>> The exception I'm getting is this: >>>>>> >>>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in >>>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal): >>>>>> java.lang.OutOfMemoryError: Java heap space >>>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694) >>>>>> at java.lang.String.<init>(String.java:203) >>>>>> at >>>>>> com.esotericsoftware.kryo.io.Input.readString(Input.java:448) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146) >>>>>> at >>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) >>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) >>>>>> at >>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >>>>>> at >>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) >>>>>> at >>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) >>>>>> at >>>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) >>>>>> at >>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) >>>>>> at >>>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144) >>>>>> at >>>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) >>>>>> at >>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>>> at >>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>> at >>>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>> at >>>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>>>>> at >>>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>>>>> at >>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>> at >>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>> at org.apache.spark.sql.execution.joins.HashOuterJoin.org >>>>>> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179) >>>>>> at >>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199) >>>>>> at >>>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196) >>>>>> at >>>>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) >>>>>> at >>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>>>> at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>>>>> at >>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>>>> >>>>>> I'm using this parameters at runtime: >>>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G >>>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer >>>>>> --conf spark.storage.memoryFraction=0.2 >>>>>> >>>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same >>>>>> error. >>>>>> I will appreciate any help on this problem. I have been trying to >>>>>> solve it for days and I'm running out of time and hair. >>>>>> >>>>>> Thanks >>>>>> Gustavo >>>>>> >>>>> >>>>> >>>> >>> >> >