Parquet reads in Spark need lots of tempory heap memory due to
ColumnVectors and write block size. See a similar issue:
https://jira.snappydata.io/browse/SNAP-3111

In addition writes too consume significant amount of heap due to
parquet.block.size. One solution is to reduce the spark.executor.cores in
such a job (note the approx heap calculation noted in the ticket) to reduce
concurrent use and also
reduce spark.sql.files.maxPartitionBytes/parquet.block.size to reduce
overhead of reads and writes. Other solution is increasing executor heap.
Or use off-heap configuration with Spark 2.4 which will remove the pressure
for reads but not for writes.

regards
sumedh

On Sun, 22 Dec, 2019, 14:29 Ruijing Li, <liruijin...@gmail.com> wrote:

> I was experimenting and found something interesting. I have executor OOM
> even if I don’t write to remote clusters. So it is purely a dataframe read
> and write issue
> —————————————————————
> To recap, I have an ETL data pipeline that does some logic, repartitions
> to reduce the amount of files written, writes the output to HDFS as parquet
> files. After, it reads the output and writes it to other locations, doesn’t
> matter if on the same hadoop cluster or multiple. This is a simple piece of
> code
> ```
> destPaths.foreach(path =>
> Try(spark.read.parquet(sourceOutputPath).write.mode(SaveMode.Overwrite).parquet(path))
> match {
> //log failure or success
> }
> ```
> However this stage - read from sourceOutput and write to different
> locations - is failing in Spark, despite all other stages succeeding,
> including the heavy duty logic. And the data is not too big to handle for
> spark.
>
> Only bumping memoryOverhead, and also repartitioning output to more
> partitions, 40 precisely (when it failed, we partitioned the output to 20
> after logic is finished but before writing to HDFS) have made the
> read&write stage succeed.
>
> Not understanding how spark read&write stage can experience OOM issues.
> Hoping to shed some light on why.
>
> On Sat, Dec 21, 2019 at 7:57 PM Chris Teoh <chris.t...@gmail.com> wrote:
>
>> I'm not entirely sure what the behaviour is when writing to remote
>> cluster. It could be that the connections are being established for every
>> element in your dataframe, perhaps having to use for each partition may
>> reduce the number of connections? You may have to look at what the
>> executors do when they reach out to the remote cluster.
>>
>> On Sun, 22 Dec 2019, 8:07 am Ruijing Li, <liruijin...@gmail.com> wrote:
>>
>>> I managed to make the failing stage work by increasing memoryOverhead to
>>> something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped
>>> spark.mesos.executor.memoryOverhead=8G
>>>
>>> *Can someone explain why this solved the issue?* As I understand, usage
>>> of memoryOverhead is for VM overhead and non heap items, which a simple
>>> read and write should not use (albeit to different hadoop clusters, but
>>> network should be nonissue since they are from the same machines).
>>>
>>> We use spark defaults for everything else.
>>>
>>> We are calling df.repartition(20) in our write after logic is done
>>> (before failing stage of multiple cluster write) to prevent spark’s small
>>> files problem. We reduce from 4000 partitions to 20.
>>>
>>> On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li <liruijin...@gmail.com>
>>> wrote:
>>>
>>>> Not for the stage that fails, all it does is read and write - the
>>>> number of tasks is # of cores * # of executor instances. For us that is 60
>>>> (3 cores 20 executors)
>>>>
>>>> The input partition size for the failing stage, when spark reads the 20
>>>> files each 132M, it comes out to be 40 partitions.
>>>>
>>>>
>>>>
>>>> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh <chris.t...@gmail.com>
>>>> wrote:
>>>>
>>>>> If you're using Spark SQL, that configuration setting causes a shuffle
>>>>> if the number of your input partitions to the write is larger than that
>>>>> configuration.
>>>>>
>>>>> Is there anything in the executor logs or the Spark UI DAG that
>>>>> indicates a shuffle? I don't expect a shuffle if it is a straight write.
>>>>> What's the input partition size?
>>>>>
>>>>> On Sat, 21 Dec 2019, 10:24 am Ruijing Li, <liruijin...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Could you explain why shuffle partitions might be a good starting
>>>>>> point?
>>>>>>
>>>>>> Some more details: when I write the output the first time after logic
>>>>>> is complete, I repartition the files to 20 after having
>>>>>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small 
>>>>>> files.
>>>>>> Data is small about 130MB per file. When spark reads it reads in 40
>>>>>> partitions and tries to output that to the different cluster. 
>>>>>> Unfortunately
>>>>>> during that read and write stage executors drop off.
>>>>>>
>>>>>> We keep hdfs block 128Mb
>>>>>>
>>>>>> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh <chris.t...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> spark.sql.shuffle.partitions might be a start.
>>>>>>>
>>>>>>> Is there a difference in the number of partitions when the parquet
>>>>>>> is read to spark.sql.shuffle.partitions? Is it much higher than
>>>>>>> spark.sql.shuffle.partitions?
>>>>>>>
>>>>>>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, <liruijin...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I have encountered a strange executor OOM error. I have a data
>>>>>>>> pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the 
>>>>>>>> output to
>>>>>>>> one HDFS location as parquet then reads the files back in and writes to
>>>>>>>> multiple hadoop clusters (all co-located in the same datacenter).  It
>>>>>>>> should be a very simple task, but executors are being killed off 
>>>>>>>> exceeding
>>>>>>>> container thresholds. From logs, it is exceeding given memory (using 
>>>>>>>> Mesos
>>>>>>>> as the cluster manager).
>>>>>>>>
>>>>>>>> The ETL process works perfectly fine with the given resources,
>>>>>>>> doing joins and adding columns. The output is written successfully the
>>>>>>>> first time. *Only when the pipeline at the end reads the output
>>>>>>>> from HDFS and writes it to different HDFS cluster paths does it fail.* 
>>>>>>>> (It
>>>>>>>> does a spark.read.parquet(source).write.parquet(dest))
>>>>>>>>
>>>>>>>> This doesn't really make sense and I'm wondering what
>>>>>>>> configurations I should start looking at.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Cheers,
>>>>>>>> Ruijing Li
>>>>>>>> --
>>>>>>>> Cheers,
>>>>>>>> Ruijing Li
>>>>>>>>
>>>>>>> --
>>>>>> Cheers,
>>>>>> Ruijing Li
>>>>>>
>>>>> --
>>>> Cheers,
>>>> Ruijing Li
>>>>
>>> --
>>> Cheers,
>>> Ruijing Li
>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>

Reply via email to