Hi there:

Yeah, I came to that same conclusion after tuning spark sql shuffle
parameter. Also cut out some classes I was using to parse my dataset and
finally created schema only with the fields needed for my model (before
that I was creating it with 63 fields while I just needed 15).
So I came with this set of parameters:

--num-executors 200
--executor-memory 800M
--conf spark.executor.extraJavaOptions="-XX:+UseCompressedOops
-XX:+AggressiveOpts"
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.storage.memoryFraction=0.3
--conf spark.rdd.compress=true
--conf spark.sql.shuffle.partitions=4000
--driver-memory 4G

Now I processed 270 GB in 35 minutes and no OOM errors.
I have one question though: Does Spark SQL handle skewed tables? I was
wondering about that since my data has that feature and maybe there is more
room for performance improvement.

Thanks again.

Gustavo


On Thu, Mar 5, 2015 at 6:45 PM, DB Tsai <dbt...@dbtsai.com> wrote:

> PS, I will recommend you compress the data when you cache the RDD.
> There will be some overhead in compression/decompression, and
> serialization/deserialization, but it will help a lot for iterative
> algorithms with ability to caching more data.
>
> Sincerely,
>
> DB Tsai
> -------------------------------------------------------
> Blog: https://www.dbtsai.com
>
>
> On Tue, Mar 3, 2015 at 2:27 PM, Gustavo Enrique Salazar Torres
> <gsala...@ime.usp.br> wrote:
> > Yeah, I can call count before that and it works. Also I was over caching
> > tables but I removed those. Now there is no caching but it gets really
> slow
> > since it calculates my table RDD many times.
> > Also hacked the LBFGS code to pass the number of examples which I
> calculated
> > outside in a Spark SQL query but just moved the location of the problem.
> >
> > The query I'm running looks like this:
> >
> > s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB
> ON
> > tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' "
> >
> > mappedFields contains a list of fields which I'm interested in. The
> result
> > of that query goes through (including sampling) some transformations
> before
> > being input to LBFGS.
> >
> > My dataset has 180GB just for feature selection, I'm planning to use
> 450GB
> > to train the final model and I'm using 16 c3.2xlarge EC2 instances, that
> > means I have 240GB of RAM available.
> >
> > Any suggestion? I'm starting to check the algorithm because I don't
> > understand why it needs to count the dataset.
> >
> > Thanks
> >
> > Gustavo
> >
> > On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <jos...@databricks.com>
> > wrote:
> >>
> >> Is that error actually occurring in LBFGS?  It looks like it might be
> >> happening before the data even gets to LBFGS.  (Perhaps the outer join
> >> you're trying to do is making the dataset size explode a bit.)  Are you
> able
> >> to call count() (or any RDD action) on the data before you pass it to
> LBFGS?
> >>
> >> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres
> >> <gsala...@ime.usp.br> wrote:
> >>>
> >>> Just did with the same error.
> >>> I think the problem is the "data.count()" call in LBFGS because for
> huge
> >>> datasets that's naive to do.
> >>> I was thinking to write my version of LBFGS but instead of doing
> >>> data.count() I will pass that parameter which I will calculate from a
> Spark
> >>> SQL query.
> >>>
> >>> I will let you know.
> >>>
> >>> Thanks
> >>>
> >>>
> >>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <ak...@sigmoidanalytics.com>
> >>> wrote:
> >>>>
> >>>> Can you try increasing your driver memory, reducing the executors and
> >>>> increasing the executor memory?
> >>>>
> >>>> Thanks
> >>>> Best Regards
> >>>>
> >>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres
> >>>> <gsala...@ime.usp.br> wrote:
> >>>>>
> >>>>> Hi there:
> >>>>>
> >>>>> I'm using LBFGS optimizer to train a logistic regression model. The
> >>>>> code I implemented follows the pattern showed in
> >>>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but
> training
> >>>>> data is obtained from a Spark SQL RDD.
> >>>>> The problem I'm having is that LBFGS tries to count the elements in
> my
> >>>>> RDD and that results in a OOM exception since my dataset is huge.
> >>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on
> Hadoop
> >>>>> YARN. My dataset is about 150 GB but I sample (I take only 1% of the
> data)
> >>>>> it in order to scale logistic regression.
> >>>>> The exception I'm getting is this:
> >>>>>
> >>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
> >>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
> >>>>> java.lang.OutOfMemoryError: Java heap space
> >>>>>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
> >>>>>         at java.lang.String.<init>(String.java:203)
> >>>>>         at
> >>>>> com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
> >>>>>         at
> >>>>>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
> >>>>>         at
> >>>>>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
> >>>>>         at
> >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> >>>>>         at
> >>>>>
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
> >>>>>         at
> >>>>>
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
> >>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
> >>>>>         at
> >>>>>
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
> >>>>>         at
> >>>>>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
> >>>>>         at
> >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> >>>>>         at
> >>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
> >>>>>         at
> >>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
> >>>>>         at
> >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
> >>>>>         at
> >>>>>
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
> >>>>>         at
> >>>>>
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
> >>>>>         at
> >>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >>>>>         at
> >>>>>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> >>>>>         at
> >>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>>>>         at
> >>>>>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> >>>>>         at
> >>>>>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> >>>>>         at
> >>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>>>         at
> >>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>>>         at
> >>>>> org.apache.spark.sql.execution.joins.HashOuterJoin.org
> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
> >>>>>         at
> >>>>>
> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
> >>>>>         at
> >>>>>
> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
> >>>>>         at
> >>>>>
> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
> >>>>>         at
> >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> >>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
> >>>>>         at
> >>>>>
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> >>>>>         at
> >>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
> >>>>>
> >>>>> I'm using this parameters at runtime:
> >>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G
> >>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
> >>>>> --conf spark.storage.memoryFraction=0.2
> >>>>>
> >>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
> >>>>> error.
> >>>>> I will appreciate any help on this problem. I have been trying to
> solve
> >>>>> it for days and I'm running out of time and hair.
> >>>>>
> >>>>> Thanks
> >>>>> Gustavo
> >>>>
> >>>>
> >>>
> >>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to