Spark 2.2.0 GC Overhead Limit Exceeded and OOM errors in the executors

2017-10-27 Thread Supun Nakandala
Hi all,

I am trying to do some image analytics type workload using Spark. The
images are read in JPEG format and then are converted to the raw format in
map functions and this causes the size of the partitions to grow by an
order of 1. In addition to this, I am caching some of the data because my
pipeline is iterative.

I get OOM errors and GC overhead limit exceeded errors and I fix them by
increasing the heap size or number of partitions even though after doing
that there is still high GC pressure.

I know that my partitions should be small enough such that it can fit in
memory. But when I did the calculation using the size of cache partitions
available in Spark UI I think the individual partitions are small enough
given the heap size and storage fraction. I am interested in getting your
input on what other things can cause OOM errors in executors. Is caching
data can be a problem (SPARK-1777)?

Thank you in advance.
-Supun


Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-14 Thread Supun Nakandala
Hi Weichen,

Thank you very much for the explanation.

On Fri, Oct 13, 2017 at 6:56 PM, Weichen Xu <weichen...@databricks.com>
wrote:

> Hi Supun,
>
> Dataframe API is NOT using the old RDD implementation under the covers,
> dataframe has its own implementation. (Dataframe use binary row format and
> columnar storage when cached). So dataframe has no relationship with the
> `RDD[Row]` you want get.
>
> When calling `df.rdd`, and then cache, it need to turn this dataframe into
> rdd, it will extract each row from dataframe, unserialize them, and compose
> the new RDD.
>
> Thanks!
>
> On Sat, Oct 14, 2017 at 6:17 AM, Stephen Boesch <java...@gmail.com> wrote:
>
>> @Vadim   Would it be true to say the `.rdd` *may* be creating a new job -
>> depending on whether the DataFrame/DataSet had already been materialized
>> via an action or checkpoint?   If the only prior operations on the
>> DataFrame had been transformations then the dataframe would still not have
>> been calculated.  In that case would it also be true that a subsequent
>> action/checkpoint on the DataFrame (not the rdd) would then generate a
>> separate job?
>>
>> 2017-10-13 14:50 GMT-07:00 Vadim Semenov <vadim.seme...@datadoghq.com>:
>>
>>> When you do `Dataset.rdd` you actually create a new job
>>>
>>> here you can see what it does internally:
>>> https://github.com/apache/spark/blob/master/sql/core/src/mai
>>> n/scala/org/apache/spark/sql/Dataset.scala#L2816-L2828
>>>
>>>
>>>
>>> On Fri, Oct 13, 2017 at 5:24 PM, Supun Nakandala <
>>> supun.nakand...@gmail.com> wrote:
>>>
>>>> Hi Weichen,
>>>>
>>>> Thank you for the reply.
>>>>
>>>> My understanding was Dataframe API is using the old RDD implementation
>>>> under the covers though it presents a different API. And calling
>>>> df.rdd will simply give access to the underlying RDD. Is this assumption
>>>> wrong? I would appreciate if you can shed more insights on this issue or
>>>> point me to documentation where I can learn them.
>>>>
>>>> Thank you in advance.
>>>>
>>>> On Fri, Oct 13, 2017 at 3:19 AM, Weichen Xu <weichen...@databricks.com>
>>>> wrote:
>>>>
>>>>> You should use `df.cache()`
>>>>> `df.rdd.cache()` won't work, because `df.rdd` generate a new RDD from
>>>>> the original `df`. and then cache the new RDD.
>>>>>
>>>>> On Fri, Oct 13, 2017 at 3:35 PM, Supun Nakandala <
>>>>> supun.nakand...@gmail.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I have been experimenting with cache/persist/unpersist methods with
>>>>>> respect to both Dataframes and RDD APIs. However, I am experiencing
>>>>>> different behaviors Ddataframe API compared RDD API such Dataframes are 
>>>>>> not
>>>>>> getting cached when count() is called.
>>>>>>
>>>>>> Is there a difference between how these operations act wrt to
>>>>>> Dataframe and RDD APIs?
>>>>>>
>>>>>> Thank You.
>>>>>> -Supun
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Supun Nakandala
Hi Weichen,

Thank you for the reply.

My understanding was Dataframe API is using the old RDD implementation
under the covers though it presents a different API. And calling
df.rdd will simply give access to the underlying RDD. Is this assumption
wrong? I would appreciate if you can shed more insights on this issue or
point me to documentation where I can learn them.

Thank you in advance.

On Fri, Oct 13, 2017 at 3:19 AM, Weichen Xu <weichen...@databricks.com>
wrote:

> You should use `df.cache()`
> `df.rdd.cache()` won't work, because `df.rdd` generate a new RDD from the
> original `df`. and then cache the new RDD.
>
> On Fri, Oct 13, 2017 at 3:35 PM, Supun Nakandala <
> supun.nakand...@gmail.com> wrote:
>
>> Hi all,
>>
>> I have been experimenting with cache/persist/unpersist methods with
>> respect to both Dataframes and RDD APIs. However, I am experiencing
>> different behaviors Ddataframe API compared RDD API such Dataframes are not
>> getting cached when count() is called.
>>
>> Is there a difference between how these operations act wrt to Dataframe
>> and RDD APIs?
>>
>> Thank You.
>> -Supun
>>
>
>


Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Supun Nakandala
Hi all,

I have been experimenting with cache/persist/unpersist methods with respect
to both Dataframes and RDD APIs. However, I am experiencing different
behaviors Ddataframe API compared RDD API such Dataframes are not getting
cached when count() is called.

Is there a difference between how these operations act wrt to Dataframe and
RDD APIs?

Thank You.
-Supun