You can get some more insights by using the Spark history server
(http://spark.apache.org/docs/latest/monitoring.html), it can show you
which task is failing and some other information that might help you
debugging the issue.


On 05/10/2016 19:00, Babak Alipour wrote:
> The issue seems to lie in the RangePartitioner trying to create equal
> ranges. [1]
>
> [1]
> https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.ml
> <https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html>
>  
>
>  The /Double/ values I'm trying to sort are mostly in the range [0,1]
> (~70% of the data which roughly equates 1 billion records), other
> numbers in the dataset are as high as 2000. With the RangePartitioner
> trying to create equal ranges, some tasks are becoming almost empty
> while others are extremely large, due to the heavily skewed distribution. 
>
> This is either a bug in Apache Spark or a major limitation of the
> framework. Has anyone else encountered this?
>
> */Babak Alipour ,/*
> */University of Florida/*
>
> On Sun, Oct 2, 2016 at 1:38 PM, Babak Alipour <babak.alip...@gmail.com
> <mailto:babak.alip...@gmail.com>> wrote:
>
>     Thanks Vadim for sharing your experience, but I have tried
>     multi-JVM setup (2 workers), various sizes for
>     spark.executor.memory (8g, 16g, 20g, 32g, 64g) and
>     spark.executor.core (2-4), same error all along.
>
>     As for the files, these are all .snappy.parquet files, resulting
>     from inserting some data from other tables. None of them actually
>     exceeds 25MiB (I don't know why this number) Setting the DataFrame
>     to persist using StorageLevel.MEMORY_ONLY shows size in memory at
>     ~10g.  I still cannot understand why it is trying to create such a
>     big page when sorting. The entire column (this df has only 1
>     column) is not that big, neither are the original files. Any ideas?
>
>
>     >Babak
>
>
>
>     */Babak Alipour ,/*
>     */University of Florida/*
>
>     On Sun, Oct 2, 2016 at 1:45 AM, Vadim Semenov
>     <vadim.seme...@datadoghq.com <mailto:vadim.seme...@datadoghq.com>>
>     wrote:
>
>         oh, and try to run even smaller executors, i.e. with
>         `spark.executor.memory` <= 16GiB. I wonder what result you're
>         going to get.
>
>         On Sun, Oct 2, 2016 at 1:24 AM, Vadim Semenov
>         <vadim.seme...@datadoghq.com
>         <mailto:vadim.seme...@datadoghq.com>> wrote:
>
>             > Do you mean running a multi-JVM 'cluster' on the single
>             machine? 
>             Yes, that's what I suggested.
>
>             You can get some information here: 
>             
> http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
>             
> <http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/>
>
>             > How would that affect performance/memory-consumption? If
>             a multi-JVM setup can handle such a large input, then why
>             can't a single-JVM break down the job into smaller tasks?
>             I don't have an answer to these questions, it requires
>             understanding of Spark, JVM, and your setup internal.
>
>             I ran into the same issue only once when I tried to read a
>             gzipped file which size was >16GiB. That's the only time I
>             had to meet
>             this 
> https://github.com/apache/spark/blob/5d84c7fd83502aeb551d46a740502db4862508fe/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L238-L243
>             
> <https://github.com/apache/spark/blob/5d84c7fd83502aeb551d46a740502db4862508fe/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L238-L243>
>             In the end I had to recompress my file into bzip2 that is
>             splittable to be able to read it with spark.
>
>
>             I'd look into size of your files and if they're huge I'd
>             try to connect the error you got to the size of the files
>             (but it's strange to me as a block size of a Parquet file
>             is 128MiB). I don't have any other suggestions, I'm sorry.
>
>
>             On Sat, Oct 1, 2016 at 11:35 PM, Babak Alipour
>             <babak.alip...@gmail.com <mailto:babak.alip...@gmail.com>>
>             wrote:
>
>                 Do you mean running a multi-JVM 'cluster' on the
>                 single machine? How would that affect
>                 performance/memory-consumption? If a multi-JVM setup
>                 can handle such a large input, then why can't a
>                 single-JVM break down the job into smaller tasks?
>
>                 I also found that SPARK-9411 mentions making the
>                 page_size configurable but it's hard-limited
>                 to ((1L<<31) -1) *8L [1]
>
>                 [1]
>                 
> https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
>                 
> <https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java>
>
>
>                 ​Spark-9452 also talks about larger page sizes but I
>                 don't know how that affects my use case.​ [2]
>
>                 [2] https://github.com/apache/spark/pull/7891
>                 <https://github.com/apache/spark/pull/7891>
>
>
>                 ​The reason provided here is that the on-heap
>                 allocator's maximum page size is limited by the
>                 maximum amount of data that can be stored in a long[]​. 
>                 Is it possible to force this specific operation to go
>                 off-heap so that it can possibly use a bigger page size?
>
>
>
>                 ​>Babak​
>
>
>                 */Babak Alipour ,/*
>                 */University of Florida/*
>
>                 On Fri, Sep 30, 2016 at 3:03 PM, Vadim Semenov
>                 <vadim.seme...@datadoghq.com
>                 <mailto:vadim.seme...@datadoghq.com>> wrote:
>
>                     Run more smaller executors: change
>                     `spark.executor.memory` to 32g and
>                     `spark.executor.cores` to 2-4, for example.
>
>                     Changing driver's memory won't help because it
>                     doesn't participate in execution.
>
>                     On Fri, Sep 30, 2016 at 2:58 PM, Babak Alipour
>                     <babak.alip...@gmail.com
>                     <mailto:babak.alip...@gmail.com>> wrote:
>
>                         Thank you for your replies.
>
>                         @Mich, using LIMIT 100 in the query prevents
>                         the exception but given the fact that there's
>                         enough memory, I don't think this should
>                         happen even without LIMIT. 
>
>                         @Vadim, here's the full stack trace:
>
>                         Caused by: java.lang.IllegalArgumentException:
>                         Cannot allocate a page with more than
>                         17179869176 <tel:17179869176> bytes
>                                 at
>                         
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241)
>                                 at
>                         
> org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121)
>                                 at
>                         
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
>                                 at
>                         
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
>                                 at
>                         
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
>                                 at
>                         
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown
>                         Source)
>                                 at
>                         
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>                         Source)
>                                 at
>                         
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>                         Source)
>                                 at
>                         
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>                                 at
>                         
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>                                 at
>                         
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>                                 at
>                         
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>                                 at
>                         
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>                                 at
>                         
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>                                 at
>                         org.apache.spark.scheduler.Task.run(Task.scala:85)
>                                 at
>                         
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>                                 at
>                         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>                                 at
>                         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>                                 at java.lang.Thread.run(Thread.java:745)
>
>                         I'm running spark in local mode so there is
>                         only one executor, the driver
>                         and spark.driver.memory is set to 64g.
>                         Changing the driver's memory doesn't help. 
>
>                         */Babak Alipour ,/*
>                         */University of Florida/*
>
>                         On Fri, Sep 30, 2016 at 2:05 PM, Vadim Semenov
>                         <vadim.seme...@datadoghq.com
>                         <mailto:vadim.seme...@datadoghq.com>> wrote:
>
>                             Can you post the whole exception stack trace?
>                             What are your executor memory settings?
>
>                             Right now I assume that it happens in
>                             UnsafeExternalRowSorter ->
>                             UnsafeExternalSorter:insertRecord
>                             Running more executors with lower
>                             `spark.executor.memory` should help.
>                             On Fri, Sep 30, 2016 at 12:57 PM, Babak
>                             Alipour <babak.alip...@gmail.com
>                             <mailto:babak.alip...@gmail.com>> wrote:
>
>                                 Greetings everyone,
>                                 I'm trying to read a single field of a
>                                 Hive table stored as Parquet in Spark
>                                 (~140GB for the entire table, this
>                                 single field should be just a few GB)
>                                 and look at the sorted output using
>                                 the following:
>
>                                 sql("SELECT " + field + " FROM MY_TABLEORDER 
> BY " + field + " DESC") 
>
>                                 ​But this simple line of code gives:
>                                 *//*
>                                 Caused by:
>                                 java.lang.IllegalArgumentException:
>                                 Cannot allocate a page with more than
>                                 17179869176 <tel:17179869176> bytes
>                                 Same error for:
>
>                                 sql("SELECT " + field + " FROM 
> MY_TABLE).sort(field)
>
>                                 and:
>
>                                 sql("SELECT " + field + " FROM 
> MY_TABLE).orderBy(field)
>
>                                 I'm running this on a machine with
>                                 more than 200GB of RAM, running in
>                                 local mode with spark.driver.memory
>                                 set to 64g.
>                                 I do not know why it cannot allocate a
>                                 big enough page, and why is it trying
>                                 to allocate such a big page in the
>                                 first place?
>                                 I hope someone with more knowledge of
>                                 Spark can shed some light on this.
>                                 Thank you!
>                                 */
>                                 ​Best regards,​
>                                 /*
>                                 */Babak Alipour ,/*
>                                 */University of Florida/*
>

Reply via email to