[jira] [Comment Edited] (SPARK-23682) Memory issue with Spark structured streaming
[ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417474#comment-16417474 ] Andrew Korzhuev edited comment on SPARK-23682 at 3/28/18 3:00 PM: -- I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on: * AWS S3 checkpoint * Spark 2.3.0 on k8s * Structured stream - stream join I managed to track the leak down to [https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_ {code:java} private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code} _,_ which appears not to clean up _UnsafeRow_ coming from: {code:java} type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code} I noticed that memory leaks slower if data is buffered to disk: {code:java} spark.hadoop.fs.s3a.fast.upload true spark.hadoop.fs.s3a.fast.upload.bufferdisk {code} It also seems that the state persisted to S3 is never cleaned up, as both number of objects and volume grows indefinitely. Before worker dies: !screen_shot_2018-03-20_at_15.23.29.png! Heap dump of worker running for some time: !Screen Shot 2018-03-28 at 16.44.20.png! was (Author: akorzhuev): I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on: * AWS S3 checkpoint * Spark 2.3.0 on k8s * Structured stream - stream join I managed to track the leak down to [https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_ {code:java} private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code} _,_ which appears not to clean up _UnsafeRow_ coming from: {code:java} type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code} I noticed that memory leaks slower if data is buffered to disk: {code:java} spark.hadoop.fs.s3a.fast.upload true spark.hadoop.fs.s3a.fast.upload.bufferdisk {code} It also seems that the state persisted to S3 is never cleaned up, as both number of objects and volume grows indefinitely. !Screen Shot 2018-03-28 at 16.44.20.png! > Memory issue with Spark structured streaming > > > Key: SPARK-23682 > URL: https://issues.apache.org/jira/browse/SPARK-23682 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 2.2.0 > Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 > |spark.blacklist.decommissioning.enabled|true| > |spark.blacklist.decommissioning.timeout|1h| > |spark.cleaner.periodicGC.interval|10min| > |spark.default.parallelism|18| > |spark.dynamicAllocation.enabled|false| > |spark.eventLog.enabled|true| > |spark.executor.cores|3| > |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC > -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 > -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'| > |spark.executor.id|driver| > |spark.executor.instances|3| > |spark.executor.memory|22G| > |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2| > |spark.hadoop.parquet.enable.summary-metadata|false| > |spark.hadoop.yarn.timeline-service.enabled|false| > |spark.jars| | > |spark.master|yarn| > |spark.memory.fraction|0.9| > |spark.memory.storageFraction|0.3| > |spark.memory.useLegacyMode|false| > |spark.rdd.compress|true| > |spark.resourceManager.cleanupExpiredHost|true| > |spark.scheduler.mode|FIFO| > |spark.serializer|org.apache.spark.serializer.KryoSerializer| > |spark.shuffle.service.enabled|true| > |spark.speculation|false| > |spark.sql.parquet.filterPushdown|true| > |spark.sql.parquet.mergeSchema|false| > |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse| > |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true| > |spark.submit.deployMode|client| > |spark.yarn.am.cores|1| > |spark.yarn.am.memory|2G| > |spark.yarn.am.memoryOverhead|1G| > |spark.yarn.executor.memoryOverhead|3G| >Reporter: Yuriy Bondaruk >Priority: Major > Labels: Memory, memory, memory-leak > Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot > 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen > Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, > Spark executors GC time.png, image-2018-03-22-14-46-31-960.png, > screen_shot_2018-03-20_at_15.23.29.png > > > It seems like there is an issue with memory in structured streaming. A stream > with aggregation (dropDuplicates()) and data partitioning constantly > increases memory usage and finally executors fails with exit code 137: >
[jira] [Comment Edited] (SPARK-23682) Memory issue with Spark structured streaming
[ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417474#comment-16417474 ] Andrew Korzhuev edited comment on SPARK-23682 at 3/28/18 2:56 PM: -- I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on: * AWS S3 checkpoint * Spark 2.3.0 on k8s * Structured stream - stream join I managed to track the leak down to [https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_ {code:java} private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code} _,_ which appears not to clean up _UnsafeRow_ coming from: {code:java} type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code} I noticed that memory leaks slower if data is buffered to disk: {code:java} spark.hadoop.fs.s3a.fast.upload true spark.hadoop.fs.s3a.fast.upload.bufferdisk {code} It also seems that the state persisted to S3 is never cleaned up, as both number of objects and volume grows indefinitely. !Screen Shot 2018-03-28 at 16.44.20.png! was (Author: akorzhuev): I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on: * AWS S3 checkpoint * Spark 2.3.0 on k8s * Structured stream - stream join I managed to track the leak down to [https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_ {code:java} private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code} _,_ which appears not to clean up _UnsafeRow_s coming from: {code:java} type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code} I noticed that memory leaks slower if data is buffered to disk: {code:java} spark.hadoop.fs.s3a.fast.upload true spark.hadoop.fs.s3a.fast.upload.bufferdisk {code} It also seems that the state persisted to S3 is never cleaned up, as both number of objects and volume grows indefinitely. !Screen Shot 2018-03-28 at 16.44.20.png! > Memory issue with Spark structured streaming > > > Key: SPARK-23682 > URL: https://issues.apache.org/jira/browse/SPARK-23682 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 2.2.0 > Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 > |spark.blacklist.decommissioning.enabled|true| > |spark.blacklist.decommissioning.timeout|1h| > |spark.cleaner.periodicGC.interval|10min| > |spark.default.parallelism|18| > |spark.dynamicAllocation.enabled|false| > |spark.eventLog.enabled|true| > |spark.executor.cores|3| > |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC > -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 > -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'| > |spark.executor.id|driver| > |spark.executor.instances|3| > |spark.executor.memory|22G| > |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2| > |spark.hadoop.parquet.enable.summary-metadata|false| > |spark.hadoop.yarn.timeline-service.enabled|false| > |spark.jars| | > |spark.master|yarn| > |spark.memory.fraction|0.9| > |spark.memory.storageFraction|0.3| > |spark.memory.useLegacyMode|false| > |spark.rdd.compress|true| > |spark.resourceManager.cleanupExpiredHost|true| > |spark.scheduler.mode|FIFO| > |spark.serializer|org.apache.spark.serializer.KryoSerializer| > |spark.shuffle.service.enabled|true| > |spark.speculation|false| > |spark.sql.parquet.filterPushdown|true| > |spark.sql.parquet.mergeSchema|false| > |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse| > |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true| > |spark.submit.deployMode|client| > |spark.yarn.am.cores|1| > |spark.yarn.am.memory|2G| > |spark.yarn.am.memoryOverhead|1G| > |spark.yarn.executor.memoryOverhead|3G| >Reporter: Yuriy Bondaruk >Priority: Major > Labels: Memory, memory, memory-leak > Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot > 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen > Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, > Spark executors GC time.png, image-2018-03-22-14-46-31-960.png > > > It seems like there is an issue with memory in structured streaming. A stream > with aggregation (dropDuplicates()) and data partitioning constantly > increases memory usage and finally executors fails with exit code 137: > {quote}ExecutorLostFailure (executor 2 exited caused by one of the running > tasks) Reason: Container marked as failed: >
[jira] [Comment Edited] (SPARK-23682) Memory issue with Spark structured streaming
[ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417474#comment-16417474 ] Andrew Korzhuev edited comment on SPARK-23682 at 3/28/18 2:55 PM: -- I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on: * AWS S3 checkpoint * Spark 2.3.0 on k8s * Structured stream - stream join I managed to track the leak down to [https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_ {code:java} private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code} _,_ which appears not to clean up _UnsafeRow_s coming from: {code:java} type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code} I noticed that memory leaks slower if data is buffered to disk: {code:java} spark.hadoop.fs.s3a.fast.upload true spark.hadoop.fs.s3a.fast.upload.bufferdisk {code} It also seems that the state persisted to S3 is never cleaned up, as both number of objects and volume grows indefinitely. !Screen Shot 2018-03-28 at 16.44.20.png! was (Author: akorzhuev): I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on: * AWS S3 checkpoint * Spark 2.3.0 on k8s * Structured stream - stream join I managed to track the leak down to [https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_ _private lazy val loadedMaps = new mutable.HashMap[Long, MapType]_ _,_ which appears not to clean up _UnsafeRow_s coming from: {code:java} type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code} I noticed that memory leaks slower if data is buffered to disk: {code:java} spark.hadoop.fs.s3a.fast.upload true spark.hadoop.fs.s3a.fast.upload.bufferdisk {code} It also seems that the state persisted to S3 is never cleaned up, as both number of objects and volume grows indefinitely. !Screen Shot 2018-03-28 at 16.44.20.png! > Memory issue with Spark structured streaming > > > Key: SPARK-23682 > URL: https://issues.apache.org/jira/browse/SPARK-23682 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 2.2.0 > Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 > |spark.blacklist.decommissioning.enabled|true| > |spark.blacklist.decommissioning.timeout|1h| > |spark.cleaner.periodicGC.interval|10min| > |spark.default.parallelism|18| > |spark.dynamicAllocation.enabled|false| > |spark.eventLog.enabled|true| > |spark.executor.cores|3| > |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC > -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 > -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'| > |spark.executor.id|driver| > |spark.executor.instances|3| > |spark.executor.memory|22G| > |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2| > |spark.hadoop.parquet.enable.summary-metadata|false| > |spark.hadoop.yarn.timeline-service.enabled|false| > |spark.jars| | > |spark.master|yarn| > |spark.memory.fraction|0.9| > |spark.memory.storageFraction|0.3| > |spark.memory.useLegacyMode|false| > |spark.rdd.compress|true| > |spark.resourceManager.cleanupExpiredHost|true| > |spark.scheduler.mode|FIFO| > |spark.serializer|org.apache.spark.serializer.KryoSerializer| > |spark.shuffle.service.enabled|true| > |spark.speculation|false| > |spark.sql.parquet.filterPushdown|true| > |spark.sql.parquet.mergeSchema|false| > |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse| > |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true| > |spark.submit.deployMode|client| > |spark.yarn.am.cores|1| > |spark.yarn.am.memory|2G| > |spark.yarn.am.memoryOverhead|1G| > |spark.yarn.executor.memoryOverhead|3G| >Reporter: Yuriy Bondaruk >Priority: Major > Labels: Memory, memory, memory-leak > Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot > 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen > Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, > Spark executors GC time.png, image-2018-03-22-14-46-31-960.png > > > It seems like there is an issue with memory in structured streaming. A stream > with aggregation (dropDuplicates()) and data partitioning constantly > increases memory usage and finally executors fails with exit code 137: > {quote}ExecutorLostFailure (executor 2 exited caused by one of the running > tasks) Reason: Container marked as failed: >