[jira] [Updated] (SPARK-26116) Spark SQL - Sort when writing partitioned parquet leads to OOM errors

2018-11-23 Thread Pierre Lienhart (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Lienhart updated SPARK-26116:

Description: 
When writing partitioned parquet using {{partitionBy}}, it looks like Spark 
sorts each partition before writing but this sort consumes a huge amount of 
memory compared to the size of the data. The executors can then go OOM and get 
killed by YARN. As a consequence, it also forces to provision huge amounts of 
memory compared to the data to be written.

Error messages found in the Spark UI are like the following :

{code:java}
Spark UI description of failure : Job aborted due to stage failure: Task 169 in 
stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 
(TID 98, x.xx.x.xx, executor 1): ExecutorLostFailure (executor 
1 exited caused by one of the running tasks) Reason: Container killed by YARN 
for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider 
boosting spark.yarn.executor.memoryOverhead.
{code}
 
{code:java}
Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most 
recent failure: Lost task 66.0 in stage 4.0 (TID 56, xxx.x.x.xx, 
executor 1): org.apache.spark.SparkException: Task failed while writing rows

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)

 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

 at org.apache.spark.scheduler.Task.run(Task.scala:99)

 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

 at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.OutOfMemoryError: error while calling spill() on 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804 : 
/app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad
 (No such file or directory)

 at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)

 at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)

 at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)

 at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)

 at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)

 at 
org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)

 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)

 ... 8 more{code}
 
In the stderr logs, we can see that huge amount of sort data (the partition 
being sorted here is 250 MB when persisted into memory, deserialized) is being 
spilled to the disk ({{INFO UnsafeExternalSorter: Thread 155 spilling sort data 
of 3.6 GB to disk}}). Sometimes the data is spilled in time to the disk and the 
sort completes ({{INFO FileFormatWriter: Sorting complete. Writing out 
partition files one at a time.}}) but sometimes it does not and we see multiple 
{{TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.}} 
until the application finally runs OOM with logs such as {{ERROR 
UnsafeExternalSorter: Unable to grow the pointer array}}. Even when it works, 
GC time is pretty high (~20% of the total write task duration) and I guess that 
these disk spills further impair performance.  

Contrary to what the above error message suggests, 

[jira] [Updated] (SPARK-26116) Spark SQL - Sort when writing partitioned parquet leads to OOM errors

2018-11-22 Thread Pierre Lienhart (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Lienhart updated SPARK-26116:

Description: 
When writing partitioned parquet using {{partitionBy}}, it looks like Spark 
sorts each partition before writing but this sort consumes a huge amount of 
memory compared to the size of the data. The executors can then go OOM and get 
killed by YARN. As a consequence, it also forces to provision huge amounts of 
memory compared to the data to be written.

Error messages found in the Spark UI are like the following :

{code:java}
Spark UI description of failure : Job aborted due to stage failure: Task 169 in 
stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 
(TID 98, x.xx.x.xx, executor 1): ExecutorLostFailure (executor 
1 exited caused by one of the running tasks) Reason: Container killed by YARN 
for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider 
boosting spark.yarn.executor.memoryOverhead.
{code}
 
{code:java}
Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most 
recent failure: Lost task 66.0 in stage 4.0 (TID 56, xxx.x.x.xx, 
executor 1): org.apache.spark.SparkException: Task failed while writing rows

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)

 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

 at org.apache.spark.scheduler.Task.run(Task.scala:99)

 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

 at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.OutOfMemoryError: error while calling spill() on 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804 : 
/app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad
 (No such file or directory)

 at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)

 at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)

 at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)

 at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)

 at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)

 at 
org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)

 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)

 ... 8 more{code}
 
In the stderr logs, we can see that huge amount of sort data (the partition 
being sorted here is 250 MB when persisted into memory, deserialized) is being 
spilled to the disk ({{INFO UnsafeExternalSorter: Thread 155 spilling sort data 
of 3.6 GB to disk}}). Sometimes the data is spilled in time to the disk and the 
sort completes ({{INFO FileFormatWriter: Sorting complete. Writing out 
partition files one at a time.}}) but sometimes it does not and we see multiple 
{{TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.}} 
until the application finally runs OOM with logs such as {{ERROR 
UnsafeExternalSorter: Unable to grow the pointer array}}.

Contrary to what the above error message suggests, increasing off-heap memory 
(to 4, 8 and then 16 GiB) through either {{spark.yarn.executor.memoryOverhead}} 
or {{XX:MaxDirectMemorySize}} does not solve the 

[jira] [Updated] (SPARK-26116) Spark SQL - Sort when writing partitioned parquet leads to OOM errors

2018-11-22 Thread Pierre Lienhart (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Lienhart updated SPARK-26116:

Description: 
When writing partitioned parquet using {{partitionBy}}, it looks like Spark 
sorts each partition before writing but this sort consumes a huge amount of 
memory compared to the size of the data. The executors can then go OOM and get 
killed by YARN. As a consequence, it also forces to provision huge amounts of 
memory compared to the data to be written.

Error messages found in the Spark UI are like the following :

{code:java}
Spark UI description of failure : Job aborted due to stage failure: Task 169 in 
stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 
(TID 98, x.xx.x.xx, executor 1): ExecutorLostFailure (executor 
1 exited caused by one of the running tasks) Reason: Container killed by YARN 
for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider 
boosting spark.yarn.executor.memoryOverhead.
{code}
 
{code:java}
Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most 
recent failure: Lost task 66.0 in stage 4.0 (TID 56, xxx.x.x.xx, 
executor 1): org.apache.spark.SparkException: Task failed while writing rows

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)

 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

 at org.apache.spark.scheduler.Task.run(Task.scala:99)

 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

 at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.OutOfMemoryError: error while calling spill() on 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804 : 
/app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad
 (No such file or directory)

 at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)

 at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)

 at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)

 at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)

 at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)

 at 
org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)

 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)

 ... 8 more{code}
 
In the stderr logs, we can see that huge amount of sort data (the partition 
being sorted here is 250 MB when persisted into memory, deserialized) is being 
spilled to the disk ({{INFO UnsafeExternalSorter: Thread 155 spilling sort data 
of 3.6 GB to disk}}). Sometimes the data is spilled in time to the disk and the 
sort completes ({{INFO FileFormatWriter: Sorting complete. Writing out 
partition files one at a time.}}) but sometimes it does not and we see multiple 
{{TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.}} 
until the application finally runs OOM with logs such as {{ERROR 
UnsafeExternalSorter: Unable to grow the pointer array}}.

Contrary to what the above error message suggests, increasing off-heap memory 
through either 

I should mention that when looking at individual (successful) write tasks in 
the Spark UI, the Peak Execution 

[jira] [Updated] (SPARK-26116) Spark SQL - Sort when writing partitioned parquet leads to OOM errors

2018-11-22 Thread Pierre Lienhart (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Lienhart updated SPARK-26116:

Description: 
When writing partitioned parquet using {{partitionBy}}, it looks like Spark 
sorts each partition before writing but this sort consumes a huge amount of 
memory compared to the size of the data. The executors can then go OOM and get 
killed by YARN. As a consequence, it also forces to provision huge amounts of 
memory compared to the data to be written.

Error messages found in the Spark UI are like the following :

{code:java}
Spark UI description of failure : Job aborted due to stage failure: Task 169 in 
stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 
(TID 98, x.xx.x.xx, executor 1): ExecutorLostFailure (executor 
1 exited caused by one of the running tasks) Reason: Container killed by YARN 
for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider 
boosting spark.yarn.executor.memoryOverhead.
{code}
 
{code:java}
Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most 
recent failure: Lost task 66.0 in stage 4.0 (TID 56, xxx.x.x.xx, 
executor 1): org.apache.spark.SparkException: Task failed while writing rows

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)

 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

 at org.apache.spark.scheduler.Task.run(Task.scala:99)

 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

 at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.OutOfMemoryError: error while calling spill() on 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804 : 
/app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad
 (No such file or directory)

 at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)

 at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)

 at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)

 at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)

 at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)

 at 
org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)

 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)

 ... 8 more{code}
 
In the stderr logs, we can see that huge amount of sort data (the partition 
being sorted here is 250 MB when persisted into memory, deserialized) is being 
spilled to the disk ({{INFO UnsafeExternalSorter: Thread 155 spilling sort data 
of 3.6 GB to disk}}). Sometimes the data is spilled in time to the disk and the 
sort completes ({{INFO FileFormatWriter: Sorting complete. Writing out 
partition files one at a time.}}) but sometimes it does not and we see multiple 
{{TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.}} 
until the application finally runs OOM with logs such as {{ERROR 
UnsafeExternalSorter: Unable to grow the pointer array}}.

Contrary to what the above error message suggests, increasing off-heap memory 
through either {{spark.yarn.executor.memoryOverhead}} or 

I should mention that when looking at individual (successful) write 

[jira] [Updated] (SPARK-26116) Spark SQL - Sort when writing partitioned parquet leads to OOM errors

2018-11-19 Thread Pierre Lienhart (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Lienhart updated SPARK-26116:

Description: 
When writing partitioned parquet using {{partitionBy}}, it looks like Spark 
sorts each partition before writing but this sort consumes a huge amount of 
memory compared to the size of the data. The executors can then go OOM and get 
killed by YARN. As a consequence, it also forces to provision huge amounts of 
memory compared to the data to be written.

Error messages found in the Spark UI are like the following :

{code:java}
Spark UI description of failure : Job aborted due to stage failure: Task 169 in 
stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 
(TID 98, x.xx.x.xx, executor 1): ExecutorLostFailure (executor 
1 exited caused by one of the running tasks) Reason: Container killed by YARN 
for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider 
boosting spark.yarn.executor.memoryOverhead.
{code}
 
{code:java}
Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most 
recent failure: Lost task 66.0 in stage 4.0 (TID 56, xxx.x.x.xx, 
executor 1): org.apache.spark.SparkException: Task failed while writing rows

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)

 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

 at org.apache.spark.scheduler.Task.run(Task.scala:99)

 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

 at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.OutOfMemoryError: error while calling spill() on 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804 : 
/app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad
 (No such file or directory)

 at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)

 at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)

 at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)

 at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)

 at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)

 at 
org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)

 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)

 ... 8 more{code}
 
In the stderr logs, we can see that huge amount of sort data (the partition 
being sorted here is 250 MB when persisted into memory, deserialized) is being 
spilled to the disk ({{INFO UnsafeExternalSorter: Thread 155 spilling sort data 
of 3.6 GB to disk}}). Sometimes the data is spilled in time to the disk and the 
sort completes ({{INFO FileFormatWriter: Sorting complete. Writing out 
partition files one at a time.}}) but sometimes it does not and we see multiple 
{{TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.}} 
until the application finally runs OOM with logs such as {{ERROR 
UnsafeExternalSorter: Unable to grow the pointer array}}.

I should mention that when looking at individual (successful) write tasks in 
the Spark UI, the Peak Execution Memory metric is always 0.  

It looks like a known issue : SPARK-12546 is explicitly related and 

[jira] [Updated] (SPARK-26116) Spark SQL - Sort when writing partitioned parquet leads to OOM errors

2018-11-19 Thread Pierre Lienhart (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Lienhart updated SPARK-26116:

Description: 
When writing partitioned parquet using {{partitionBy}}, it looks like Spark 
sorts each partition before writing but this sort consumes a huge amount of 
memory compared to the size of the data. The executors can then go OOM and get 
killed by YARN. As a consequence, it also forces to provision huge amount of 
memory compared to the data to be written.

Error messages found in the Spark UI are like the following :

{code:java}
Spark UI description of failure : Job aborted due to stage failure: Task 169 in 
stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 
(TID 98, x.xx.x.xx, executor 1): ExecutorLostFailure (executor 
1 exited caused by one of the running tasks) Reason: Container killed by YARN 
for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider 
boosting spark.yarn.executor.memoryOverhead.
{code}
 
{code:java}
Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most 
recent failure: Lost task 66.0 in stage 4.0 (TID 56, xxx.x.x.xx, 
executor 1): org.apache.spark.SparkException: Task failed while writing rows

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)

 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

 at org.apache.spark.scheduler.Task.run(Task.scala:99)

 at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

 at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.OutOfMemoryError: error while calling spill() on 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804 : 
/app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad
 (No such file or directory)

 at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)

 at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)

 at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)

 at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)

 at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)

 at 
org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)

 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)

 at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)

 ... 8 more{code}
 
In the stderr logs, we can see that huge amount of sort data (the partition 
being sorted here is 250 MB when persisted into memory, deserialized) is being 
spilled to the disk ({{INFO UnsafeExternalSorter: Thread 155 spilling sort data 
of 3.6 GB to disk}}). Sometimes the data is spilled in time to the disk and the 
sort completes ({{INFO FileFormatWriter: Sorting complete. Writing out 
partition files one at a time.}}) but sometimes it does not and we see multiple 
{{TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.}} 
until the application finally runs OOM with logs such as {{ERROR 
UnsafeExternalSorter: Unable to grow the pointer array}}.

I should mention that when looking at individual (successful) write tasks in 
the Spark UI, the Peak Execution Memory metric is always 0.  

It looks like a known issue : SPARK-12546 is explicitly related and 

[jira] [Updated] (SPARK-26116) Spark SQL - Sort when writing partitioned parquet leads to OOM errors

2018-11-19 Thread Pierre Lienhart (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Lienhart updated SPARK-26116:

Summary: Spark SQL - Sort when writing partitioned parquet leads to OOM 
errors  (was: Spark SQL - Sort when writing partitioned parquet lead to OOM 
errors)

> Spark SQL - Sort when writing partitioned parquet leads to OOM errors
> -
>
> Key: SPARK-26116
> URL: https://issues.apache.org/jira/browse/SPARK-26116
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.1
>Reporter: Pierre Lienhart
>Priority: Major
>
> When writing partitioned parquet using {{partitionBy}}, it looks like Spark 
> sorts each partition before writing but this sort consumes a huge amount of 
> memory compared to the size of the data. The executors can then go OOM and 
> get killed by YARN. As a consequence, it also force to provision huge amount 
> of memory compared to the data to be written.
> Error messages found in the Spark UI are like the following :
> {code:java}
> Spark UI description of failure : Job aborted due to stage failure: Task 169 
> in stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 
> 2.0 (TID 98, x.xx.x.xx, executor 1): ExecutorLostFailure 
> (executor 1 exited caused by one of the running tasks) Reason: Container 
> killed by YARN for exceeding memory limits. 8.1 GB of 8 GB physical memory 
> used. Consider boosting spark.yarn.executor.memoryOverhead.
> {code}
>  
> {code:java}
> Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most 
> recent failure: Lost task 66.0 in stage 4.0 (TID 56, xxx.x.x.xx, 
> executor 1): org.apache.spark.SparkException: Task failed while writing rows
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>  at org.apache.spark.scheduler.Task.run(Task.scala:99)
>  at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.OutOfMemoryError: error while calling spill() on 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804 : 
> /app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad
>  (No such file or directory)
>  at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)
>  at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)
>  at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)
>  at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)
>  at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)
>  at 
> org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
>  ... 8 more{code}
>  
> In the stderr logs, we can see that huge amount of sort data (the partition 
> being sorted here is 250 MB when persisted into memory, deserialized) is 
> being spilled to the disk ({{INFO