Hi there: Yeah, I came to that same conclusion after tuning spark sql shuffle parameter. Also cut out some classes I was using to parse my dataset and finally created schema only with the fields needed for my model (before that I was creating it with 63 fields while I just needed 15). So I came with this set of parameters:
--num-executors 200 --executor-memory 800M --conf spark.executor.extraJavaOptions="-XX:+UseCompressedOops -XX:+AggressiveOpts" --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.storage.memoryFraction=0.3 --conf spark.rdd.compress=true --conf spark.sql.shuffle.partitions=4000 --driver-memory 4G Now I processed 270 GB in 35 minutes and no OOM errors. I have one question though: Does Spark SQL handle skewed tables? I was wondering about that since my data has that feature and maybe there is more room for performance improvement. Thanks again. Gustavo On Thu, Mar 5, 2015 at 6:45 PM, DB Tsai <dbt...@dbtsai.com> wrote: > PS, I will recommend you compress the data when you cache the RDD. > There will be some overhead in compression/decompression, and > serialization/deserialization, but it will help a lot for iterative > algorithms with ability to caching more data. > > Sincerely, > > DB Tsai > ------------------------------------------------------- > Blog: https://www.dbtsai.com > > > On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres > <gsala...@ime.usp.br> wrote: > > Yeah, I can call count before that and it works. Also I was over caching > > tables but I removed those. Now there is no caching but it gets really > slow > > since it calculates my table RDD many times. > > Also hacked the LBFGS code to pass the number of examples which I > calculated > > outside in a Spark SQL query but just moved the location of the problem. > > > > The query I'm running looks like this: > > > > s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB > ON > > tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' " > > > > mappedFields contains a list of fields which I'm interested in. The > result > > of that query goes through (including sampling) some transformations > before > > being input to LBFGS. > > > > My dataset has 180GB just for feature selection, I'm planning to use > 450GB > > to train the final model and I'm using 16 c3.2xlarge EC2 instances, that > > means I have 240GB of RAM available. > > > > Any suggestion? I'm starting to check the algorithm because I don't > > understand why it needs to count the dataset. > > > > Thanks > > > > Gustavo > > > > On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <jos...@databricks.com> > > wrote: > >> > >> Is that error actually occurring in LBFGS? It looks like it might be > >> happening before the data even gets to LBFGS. (Perhaps the outer join > >> you're trying to do is making the dataset size explode a bit.) Are you > able > >> to call count() (or any RDD action) on the data before you pass it to > LBFGS? > >> > >> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres > >> <gsala...@ime.usp.br> wrote: > >>> > >>> Just did with the same error. > >>> I think the problem is the "data.count()" call in LBFGS because for > huge > >>> datasets that's naive to do. > >>> I was thinking to write my version of LBFGS but instead of doing > >>> data.count() I will pass that parameter which I will calculate from a > Spark > >>> SQL query. > >>> > >>> I will let you know. > >>> > >>> Thanks > >>> > >>> > >>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com> > >>> wrote: > >>>> > >>>> Can you try increasing your driver memory, reducing the executors and > >>>> increasing the executor memory? > >>>> > >>>> Thanks > >>>> Best Regards > >>>> > >>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres > >>>> <gsala...@ime.usp.br> wrote: > >>>>> > >>>>> Hi there: > >>>>> > >>>>> I'm using LBFGS optimizer to train a logistic regression model. The > >>>>> code I implemented follows the pattern showed in > >>>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but > training > >>>>> data is obtained from a Spark SQL RDD. > >>>>> The problem I'm having is that LBFGS tries to count the elements in > my > >>>>> RDD and that results in a OOM exception since my dataset is huge. > >>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on > Hadoop > >>>>> YARN. My dataset is about 150 GB but I sample (I take only 1% of the > data) > >>>>> it in order to scale logistic regression. > >>>>> The exception I'm getting is this: > >>>>> > >>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in > >>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal): > >>>>> java.lang.OutOfMemoryError: Java heap space > >>>>> at java.util.Arrays.copyOfRange(Arrays.java:2694) > >>>>> at java.lang.String.<init>(String.java:203) > >>>>> at > >>>>> com.esotericsoftware.kryo.io.Input.readString(Input.java:448) > >>>>> at > >>>>> > com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157) > >>>>> at > >>>>> > com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146) > >>>>> at > >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) > >>>>> at > >>>>> > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) > >>>>> at > >>>>> > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) > >>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651) > >>>>> at > >>>>> > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) > >>>>> at > >>>>> > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > >>>>> at > >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) > >>>>> at > >>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) > >>>>> at > >>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) > >>>>> at > >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) > >>>>> at > >>>>> > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144) > >>>>> at > >>>>> > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) > >>>>> at > >>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > >>>>> at > >>>>> > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > >>>>> at > >>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > >>>>> at > >>>>> > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > >>>>> at > >>>>> > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > >>>>> at > >>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > >>>>> at > >>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > >>>>> at > >>>>> org.apache.spark.sql.execution.joins.HashOuterJoin.org > $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179) > >>>>> at > >>>>> > org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199) > >>>>> at > >>>>> > org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196) > >>>>> at > >>>>> > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) > >>>>> at > >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) > >>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) > >>>>> at > >>>>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > >>>>> at > >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) > >>>>> > >>>>> I'm using this parameters at runtime: > >>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G > >>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer > >>>>> --conf spark.storage.memoryFraction=0.2 > >>>>> > >>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same > >>>>> error. > >>>>> I will appreciate any help on this problem. I have been trying to > solve > >>>>> it for days and I'm running out of time and hair. > >>>>> > >>>>> Thanks > >>>>> Gustavo > >>>> > >>>> > >>> > >> > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >