[jira] [Comment Edited] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-28 Thread Andrew Korzhuev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417474#comment-16417474
 ] 

Andrew Korzhuev edited comment on SPARK-23682 at 3/28/18 3:00 PM:
--

I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_
{code:java}
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code}
_,_ which appears not to clean up _UnsafeRow_ coming from:
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload   true
spark.hadoop.fs.s3a.fast.upload.bufferdisk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

Before worker dies:

!screen_shot_2018-03-20_at_15.23.29.png!

Heap dump of worker running for some time:

!Screen Shot 2018-03-28 at 16.44.20.png!


was (Author: akorzhuev):
I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_
{code:java}
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code}
_,_ which appears not to clean up _UnsafeRow_ coming from:
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload   true
spark.hadoop.fs.s3a.fast.upload.bufferdisk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!

> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>Reporter: Yuriy Bondaruk
>Priority: Major
>  Labels: Memory, memory, memory-leak
> Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 
> 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen 
> Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, 
> Spark executors GC time.png, image-2018-03-22-14-46-31-960.png, 
> screen_shot_2018-03-20_at_15.23.29.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code 137:
> 

[jira] [Comment Edited] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-28 Thread Andrew Korzhuev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417474#comment-16417474
 ] 

Andrew Korzhuev edited comment on SPARK-23682 at 3/28/18 2:56 PM:
--

I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_
{code:java}
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code}
_,_ which appears not to clean up _UnsafeRow_ coming from:
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload   true
spark.hadoop.fs.s3a.fast.upload.bufferdisk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!


was (Author: akorzhuev):
I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_
{code:java}
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code}
_,_ which appears not to clean up _UnsafeRow_s coming from: 
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload   true
spark.hadoop.fs.s3a.fast.upload.bufferdisk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!

> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>Reporter: Yuriy Bondaruk
>Priority: Major
>  Labels: Memory, memory, memory-leak
> Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 
> 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen 
> Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, 
> Spark executors GC time.png, image-2018-03-22-14-46-31-960.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
> tasks) Reason: Container marked as failed: 
> 

[jira] [Comment Edited] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-28 Thread Andrew Korzhuev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417474#comment-16417474
 ] 

Andrew Korzhuev edited comment on SPARK-23682 at 3/28/18 2:55 PM:
--

I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_
{code:java}
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code}
_,_ which appears not to clean up _UnsafeRow_s coming from: 
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload   true
spark.hadoop.fs.s3a.fast.upload.bufferdisk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!


was (Author: akorzhuev):
I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_

 

_private lazy val loadedMaps = new mutable.HashMap[Long, MapType]_

 

_,_ which appears not to clean up _UnsafeRow_s coming from:

 
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload   true
spark.hadoop.fs.s3a.fast.upload.bufferdisk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!

> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>Reporter: Yuriy Bondaruk
>Priority: Major
>  Labels: Memory, memory, memory-leak
> Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 
> 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen 
> Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, 
> Spark executors GC time.png, image-2018-03-22-14-46-31-960.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
> tasks) Reason: Container marked as failed: 
>