[jira] [Commented] (SPARK-22865) Publish Official Apache Spark Docker images
[ https://issues.apache.org/jira/browse/SPARK-22865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695338#comment-16695338 ] Andrew Korzhuev commented on SPARK-22865: - I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them from [https://hub.docker.com/r/andrusha/spark-k8s/tags/,] scripts were also updated. I would love to help making this happen for Spark, but I need somebody to show me around Apache CI/CD infrastructure. > Publish Official Apache Spark Docker images > --- > > Key: SPARK-22865 > URL: https://issues.apache.org/jira/browse/SPARK-22865 > Project: Spark > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Anirudh Ramanathan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22865) Publish Official Apache Spark Docker images
[ https://issues.apache.org/jira/browse/SPARK-22865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695338#comment-16695338 ] Andrew Korzhuev edited comment on SPARK-22865 at 11/21/18 11:00 PM: I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them from [https://hub.docker.com/r/andrusha/spark-k8s/tags/] scripts were also updated. I would love to help making this happen for Spark, but I need somebody to show me around Apache CI/CD infrastructure. was (Author: akorzhuev): I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them from [https://hub.docker.com/r/andrusha/spark-k8s/tags/|https://hub.docker.com/r/andrusha/spark-k8s/tags/,] scripts were also updated. I would love to help making this happen for Spark, but I need somebody to show me around Apache CI/CD infrastructure. > Publish Official Apache Spark Docker images > --- > > Key: SPARK-22865 > URL: https://issues.apache.org/jira/browse/SPARK-22865 > Project: Spark > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Anirudh Ramanathan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22865) Publish Official Apache Spark Docker images
[ https://issues.apache.org/jira/browse/SPARK-22865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695338#comment-16695338 ] Andrew Korzhuev edited comment on SPARK-22865 at 11/21/18 11:00 PM: I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them from [https://hub.docker.com/r/andrusha/spark-k8s/tags/|https://hub.docker.com/r/andrusha/spark-k8s/tags/,] scripts were also updated. I would love to help making this happen for Spark, but I need somebody to show me around Apache CI/CD infrastructure. was (Author: akorzhuev): I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them from [https://hub.docker.com/r/andrusha/spark-k8s/tags/|https://hub.docker.com/r/andrusha/spark-k8s/tags/,] scripts were also updated. I would love to help making this happen for Spark, but I need somebody to show me around Apache CI/CD infrastructure. > Publish Official Apache Spark Docker images > --- > > Key: SPARK-22865 > URL: https://issues.apache.org/jira/browse/SPARK-22865 > Project: Spark > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Anirudh Ramanathan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22865) Publish Official Apache Spark Docker images
[ https://issues.apache.org/jira/browse/SPARK-22865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695338#comment-16695338 ] Andrew Korzhuev edited comment on SPARK-22865 at 11/21/18 10:59 PM: I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them from [https://hub.docker.com/r/andrusha/spark-k8s/tags/|https://hub.docker.com/r/andrusha/spark-k8s/tags/,] scripts were also updated. I would love to help making this happen for Spark, but I need somebody to show me around Apache CI/CD infrastructure. was (Author: akorzhuev): I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them from [https://hub.docker.com/r/andrusha/spark-k8s/tags/,] scripts were also updated. I would love to help making this happen for Spark, but I need somebody to show me around Apache CI/CD infrastructure. > Publish Official Apache Spark Docker images > --- > > Key: SPARK-22865 > URL: https://issues.apache.org/jira/browse/SPARK-22865 > Project: Spark > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Anirudh Ramanathan >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24422) Add JDK9+ in our Jenkins' build servers
[ https://issues.apache.org/jira/browse/SPARK-24422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531241#comment-16531241 ] Andrew Korzhuev commented on SPARK-24422: - Also `.travis.yml` needs to be fixed in the following way: {code:java} # 2. Choose language and target JDKs for parallel builds. language: java jdk: - openjdk8 - openjdk9 - openjdk10 {code} > Add JDK9+ in our Jenkins' build servers > --- > > Key: SPARK-24422 > URL: https://issues.apache.org/jira/browse/SPARK-24422 > Project: Spark > Issue Type: Sub-task > Components: Project Infra >Affects Versions: 2.3.0 >Reporter: DB Tsai >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24422) Add JDK9+ in our Jenkins' build servers
[ https://issues.apache.org/jira/browse/SPARK-24422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531241#comment-16531241 ] Andrew Korzhuev edited comment on SPARK-24422 at 7/3/18 11:46 AM: -- Also `.travis.yml` needs to be fixed in the following way: {code:java} # 2. Choose language and target JDKs for parallel builds. language: java jdk: - openjdk8 - openjdk9 {code} was (Author: akorzhuev): Also `.travis.yml` needs to be fixed in the following way: {code:java} # 2. Choose language and target JDKs for parallel builds. language: java jdk: - openjdk8 - openjdk9 - openjdk10 {code} > Add JDK9+ in our Jenkins' build servers > --- > > Key: SPARK-24422 > URL: https://issues.apache.org/jira/browse/SPARK-24422 > Project: Spark > Issue Type: Sub-task > Components: Project Infra >Affects Versions: 2.3.0 >Reporter: DB Tsai >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-24421) sun.misc.Unsafe in JDK9+
[ https://issues.apache.org/jira/browse/SPARK-24421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531235#comment-16531235 ] Andrew Korzhuev edited comment on SPARK-24421 at 7/3/18 11:44 AM: -- If I understand this correctly, then the only deprecated JDK9+ API Spark is using is `sun.misc.Cleaner` (while `sun.misc.Unsafe` is still accessible) in `[common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java|https://github.com/andrusha/spark/commit/7da06d3c725169f9764225f5a29886eb56bee191#diff-c7483c7efce631c783676f014ba2b0ed]`, which is fixable in the following way: {code:java} @@ -22,7 +22,7 @@ import java.lang.reflect.Method; import java.nio.ByteBuffer; -import sun.misc.Cleaner; +import java.lang.ref.Cleaner; import sun.misc.Unsafe; public final class Platform { @@ -169,7 +169,8 @@ public static ByteBuffer allocateDirectBuffer(int size) { cleanerField.setAccessible(true); long memory = allocateMemory(size); ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size); - Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory)); + Cleaner cleaner = Cleaner.create(); + cleaner.register(buffer, () -> freeMemory(memory)); cleanerField.set(buffer, cleaner); return buffer; {code} [https://github.com/andrusha/spark/commit/7da06d3c725169f9764225f5a29886eb56bee191#diff-c7483c7efce631c783676f014ba2b0ed] was (Author: akorzhuev): If I understand this correctly, then the only deprecated JDK9+ API Spark is using is `sun.misc.Cleaner` (while `sun.misc.Unsafe` is still accessible) in `[common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java|https://github.com/andrusha/spark/commit/7da06d3c725169f9764225f5a29886eb56bee191#diff-c7483c7efce631c783676f014ba2b0ed]`, which is fixable in the following way: {code:java} @@ -22,7 +22,7 @@ import java.lang.reflect.Method; import java.nio.ByteBuffer; -import sun.misc.Cleaner; +import java.lang.ref.Cleaner; import sun.misc.Unsafe; public final class Platform { @@ -169,7 +169,8 @@ public static ByteBuffer allocateDirectBuffer(int size) { cleanerField.setAccessible(true); long memory = allocateMemory(size); ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size); - Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory)); + Cleaner cleaner = Cleaner.create(); + cleaner.register(buffer, () -> freeMemory(memory)); cleanerField.set(buffer, cleaner); return buffer; {code} [https://github.com/andrusha/spark/commit/7da06d3c725169f9764225f5a29886eb56bee191#diff-c7483c7efce631c783676f014ba2b0ed] > sun.misc.Unsafe in JDK9+ > > > Key: SPARK-24421 > URL: https://issues.apache.org/jira/browse/SPARK-24421 > Project: Spark > Issue Type: Sub-task > Components: Build >Affects Versions: 2.3.0 >Reporter: DB Tsai >Priority: Major > > Many internal APIs such as unsafe are encapsulated in JDK9+, see > http://openjdk.java.net/jeps/260 for detail. > To use Unsafe, we need to add *jdk.unsupported* to our code’s module > declaration: > {code:java} > module java9unsafe { > requires jdk.unsupported; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24421) sun.misc.Unsafe in JDK9+
[ https://issues.apache.org/jira/browse/SPARK-24421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531235#comment-16531235 ] Andrew Korzhuev commented on SPARK-24421: - If I understand this correctly, then the only deprecated JDK9+ API Spark is using is `sun.misc.Cleaner` (while `sun.misc.Unsafe` is still accessible) in `[common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java|https://github.com/andrusha/spark/commit/7da06d3c725169f9764225f5a29886eb56bee191#diff-c7483c7efce631c783676f014ba2b0ed]`, which is fixable in the following way: {code:java} @@ -22,7 +22,7 @@ import java.lang.reflect.Method; import java.nio.ByteBuffer; -import sun.misc.Cleaner; +import java.lang.ref.Cleaner; import sun.misc.Unsafe; public final class Platform { @@ -169,7 +169,8 @@ public static ByteBuffer allocateDirectBuffer(int size) { cleanerField.setAccessible(true); long memory = allocateMemory(size); ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size); - Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory)); + Cleaner cleaner = Cleaner.create(); + cleaner.register(buffer, () -> freeMemory(memory)); cleanerField.set(buffer, cleaner); return buffer; {code} [https://github.com/andrusha/spark/commit/7da06d3c725169f9764225f5a29886eb56bee191#diff-c7483c7efce631c783676f014ba2b0ed] > sun.misc.Unsafe in JDK9+ > > > Key: SPARK-24421 > URL: https://issues.apache.org/jira/browse/SPARK-24421 > Project: Spark > Issue Type: Sub-task > Components: Build >Affects Versions: 2.3.0 >Reporter: DB Tsai >Priority: Major > > Many internal APIs such as unsafe are encapsulated in JDK9+, see > http://openjdk.java.net/jeps/260 for detail. > To use Unsafe, we need to add *jdk.unsupported* to our code’s module > declaration: > {code:java} > module java9unsafe { > requires jdk.unsupported; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23682) Memory issue with Spark structured streaming
[ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417474#comment-16417474 ] Andrew Korzhuev edited comment on SPARK-23682 at 3/28/18 3:00 PM: -- I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on: * AWS S3 checkpoint * Spark 2.3.0 on k8s * Structured stream - stream join I managed to track the leak down to [https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_ {code:java} private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code} _,_ which appears not to clean up _UnsafeRow_ coming from: {code:java} type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code} I noticed that memory leaks slower if data is buffered to disk: {code:java} spark.hadoop.fs.s3a.fast.upload true spark.hadoop.fs.s3a.fast.upload.bufferdisk {code} It also seems that the state persisted to S3 is never cleaned up, as both number of objects and volume grows indefinitely. Before worker dies: !screen_shot_2018-03-20_at_15.23.29.png! Heap dump of worker running for some time: !Screen Shot 2018-03-28 at 16.44.20.png! was (Author: akorzhuev): I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on: * AWS S3 checkpoint * Spark 2.3.0 on k8s * Structured stream - stream join I managed to track the leak down to [https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_ {code:java} private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code} _,_ which appears not to clean up _UnsafeRow_ coming from: {code:java} type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code} I noticed that memory leaks slower if data is buffered to disk: {code:java} spark.hadoop.fs.s3a.fast.upload true spark.hadoop.fs.s3a.fast.upload.bufferdisk {code} It also seems that the state persisted to S3 is never cleaned up, as both number of objects and volume grows indefinitely. !Screen Shot 2018-03-28 at 16.44.20.png! > Memory issue with Spark structured streaming > > > Key: SPARK-23682 > URL: https://issues.apache.org/jira/browse/SPARK-23682 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 2.2.0 > Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 > |spark.blacklist.decommissioning.enabled|true| > |spark.blacklist.decommissioning.timeout|1h| > |spark.cleaner.periodicGC.interval|10min| > |spark.default.parallelism|18| > |spark.dynamicAllocation.enabled|false| > |spark.eventLog.enabled|true| > |spark.executor.cores|3| > |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC > -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 > -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'| > |spark.executor.id|driver| > |spark.executor.instances|3| > |spark.executor.memory|22G| > |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2| > |spark.hadoop.parquet.enable.summary-metadata|false| > |spark.hadoop.yarn.timeline-service.enabled|false| > |spark.jars| | > |spark.master|yarn| > |spark.memory.fraction|0.9| > |spark.memory.storageFraction|0.3| > |spark.memory.useLegacyMode|false| > |spark.rdd.compress|true| > |spark.resourceManager.cleanupExpiredHost|true| > |spark.scheduler.mode|FIFO| > |spark.serializer|org.apache.spark.serializer.KryoSerializer| > |spark.shuffle.service.enabled|true| > |spark.speculation|false| > |spark.sql.parquet.filterPushdown|true| > |spark.sql.parquet.mergeSchema|false| > |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse| > |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true| > |spark.submit.deployMode|client| > |spark.yarn.am.cores|1| > |spark.yarn.am.memory|2G| > |spark.yarn.am.memoryOverhead|1G| > |spark.yarn.executor.memoryOverhead|3G| >Reporter: Yuriy Bondaruk >Priority: Major > Labels: Memory, memory, memory-leak > Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot > 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen > Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, > Spark executors GC time.png, image-2018-03-22-14-46-31-960.png, > screen_shot_2018-03-20_at_15.23.29.png > > > It seems like there is an issue with memory in structured streaming. A stream > with aggregation (dropDuplicates()) and data partitioning constantly > increases memory usage and finally executors fails with exit code
[jira] [Updated] (SPARK-23682) Memory issue with Spark structured streaming
[ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Korzhuev updated SPARK-23682: Attachment: screen_shot_2018-03-20_at_15.23.29.png > Memory issue with Spark structured streaming > > > Key: SPARK-23682 > URL: https://issues.apache.org/jira/browse/SPARK-23682 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 2.2.0 > Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 > |spark.blacklist.decommissioning.enabled|true| > |spark.blacklist.decommissioning.timeout|1h| > |spark.cleaner.periodicGC.interval|10min| > |spark.default.parallelism|18| > |spark.dynamicAllocation.enabled|false| > |spark.eventLog.enabled|true| > |spark.executor.cores|3| > |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC > -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 > -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'| > |spark.executor.id|driver| > |spark.executor.instances|3| > |spark.executor.memory|22G| > |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2| > |spark.hadoop.parquet.enable.summary-metadata|false| > |spark.hadoop.yarn.timeline-service.enabled|false| > |spark.jars| | > |spark.master|yarn| > |spark.memory.fraction|0.9| > |spark.memory.storageFraction|0.3| > |spark.memory.useLegacyMode|false| > |spark.rdd.compress|true| > |spark.resourceManager.cleanupExpiredHost|true| > |spark.scheduler.mode|FIFO| > |spark.serializer|org.apache.spark.serializer.KryoSerializer| > |spark.shuffle.service.enabled|true| > |spark.speculation|false| > |spark.sql.parquet.filterPushdown|true| > |spark.sql.parquet.mergeSchema|false| > |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse| > |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true| > |spark.submit.deployMode|client| > |spark.yarn.am.cores|1| > |spark.yarn.am.memory|2G| > |spark.yarn.am.memoryOverhead|1G| > |spark.yarn.executor.memoryOverhead|3G| >Reporter: Yuriy Bondaruk >Priority: Major > Labels: Memory, memory, memory-leak > Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot > 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen > Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, > Spark executors GC time.png, image-2018-03-22-14-46-31-960.png, > screen_shot_2018-03-20_at_15.23.29.png > > > It seems like there is an issue with memory in structured streaming. A stream > with aggregation (dropDuplicates()) and data partitioning constantly > increases memory usage and finally executors fails with exit code 137: > {quote}ExecutorLostFailure (executor 2 exited caused by one of the running > tasks) Reason: Container marked as failed: > container_1520214726510_0001_01_03 on host: > ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: > Container killed on request. Exit code is 137 > Container exited with a non-zero exit code 137 > Killed by external signal{quote} > Stream creating looks something like this: > {quote}session > .readStream() > .schema(inputSchema) > .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB) > .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF) > .csv("s3://test-bucket/input") > .as(Encoders.bean(TestRecord.class)) > .flatMap(mf, Encoders.bean(TestRecord.class)) > .dropDuplicates("testId", "testName") > .withColumn("year", > functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), > "")) > .writeStream() > .option("path", "s3://test-bucket/output") > .option("checkpointLocation", "s3://test-bucket/checkpoint") > .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS)) > .partitionBy("year") > .format("parquet") > .outputMode(OutputMode.Append()) > .queryName("test-stream") > .start();{quote} > Analyzing the heap dump I found that most of the memory used by > {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} > that is referenced from > [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196] > > On the first glance it looks normal since that is how Spark keeps aggregation > keys in memory. However I did my testing by renaming files in source folder, > so that they could be picked up by spark again. Since input records are the > same all further rows should be rejected as duplicates and memory consumption > shouldn't increase but it's not true. Moreover, GC time took more than 30% of > total processing time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (SPARK-23682) Memory issue with Spark structured streaming
[ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417474#comment-16417474 ] Andrew Korzhuev edited comment on SPARK-23682 at 3/28/18 2:56 PM: -- I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on: * AWS S3 checkpoint * Spark 2.3.0 on k8s * Structured stream - stream join I managed to track the leak down to [https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_ {code:java} private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code} _,_ which appears not to clean up _UnsafeRow_ coming from: {code:java} type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code} I noticed that memory leaks slower if data is buffered to disk: {code:java} spark.hadoop.fs.s3a.fast.upload true spark.hadoop.fs.s3a.fast.upload.bufferdisk {code} It also seems that the state persisted to S3 is never cleaned up, as both number of objects and volume grows indefinitely. !Screen Shot 2018-03-28 at 16.44.20.png! was (Author: akorzhuev): I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on: * AWS S3 checkpoint * Spark 2.3.0 on k8s * Structured stream - stream join I managed to track the leak down to [https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_ {code:java} private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code} _,_ which appears not to clean up _UnsafeRow_s coming from: {code:java} type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code} I noticed that memory leaks slower if data is buffered to disk: {code:java} spark.hadoop.fs.s3a.fast.upload true spark.hadoop.fs.s3a.fast.upload.bufferdisk {code} It also seems that the state persisted to S3 is never cleaned up, as both number of objects and volume grows indefinitely. !Screen Shot 2018-03-28 at 16.44.20.png! > Memory issue with Spark structured streaming > > > Key: SPARK-23682 > URL: https://issues.apache.org/jira/browse/SPARK-23682 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 2.2.0 > Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 > |spark.blacklist.decommissioning.enabled|true| > |spark.blacklist.decommissioning.timeout|1h| > |spark.cleaner.periodicGC.interval|10min| > |spark.default.parallelism|18| > |spark.dynamicAllocation.enabled|false| > |spark.eventLog.enabled|true| > |spark.executor.cores|3| > |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC > -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 > -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'| > |spark.executor.id|driver| > |spark.executor.instances|3| > |spark.executor.memory|22G| > |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2| > |spark.hadoop.parquet.enable.summary-metadata|false| > |spark.hadoop.yarn.timeline-service.enabled|false| > |spark.jars| | > |spark.master|yarn| > |spark.memory.fraction|0.9| > |spark.memory.storageFraction|0.3| > |spark.memory.useLegacyMode|false| > |spark.rdd.compress|true| > |spark.resourceManager.cleanupExpiredHost|true| > |spark.scheduler.mode|FIFO| > |spark.serializer|org.apache.spark.serializer.KryoSerializer| > |spark.shuffle.service.enabled|true| > |spark.speculation|false| > |spark.sql.parquet.filterPushdown|true| > |spark.sql.parquet.mergeSchema|false| > |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse| > |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true| > |spark.submit.deployMode|client| > |spark.yarn.am.cores|1| > |spark.yarn.am.memory|2G| > |spark.yarn.am.memoryOverhead|1G| > |spark.yarn.executor.memoryOverhead|3G| >Reporter: Yuriy Bondaruk >Priority: Major > Labels: Memory, memory, memory-leak > Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot > 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen > Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, > Spark executors GC time.png, image-2018-03-22-14-46-31-960.png > > > It seems like there is an issue with memory in structured streaming. A stream > with aggregation (dropDuplicates()) and data partitioning constantly > increases memory usage and finally executors fails with exit code 137: > {quote}ExecutorLostFailure (executor 2 exited caused by one of the running > tasks) Reason: Container marked as failed: > container_152021
[jira] [Comment Edited] (SPARK-23682) Memory issue with Spark structured streaming
[ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417474#comment-16417474 ] Andrew Korzhuev edited comment on SPARK-23682 at 3/28/18 2:55 PM: -- I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on: * AWS S3 checkpoint * Spark 2.3.0 on k8s * Structured stream - stream join I managed to track the leak down to [https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_ {code:java} private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code} _,_ which appears not to clean up _UnsafeRow_s coming from: {code:java} type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code} I noticed that memory leaks slower if data is buffered to disk: {code:java} spark.hadoop.fs.s3a.fast.upload true spark.hadoop.fs.s3a.fast.upload.bufferdisk {code} It also seems that the state persisted to S3 is never cleaned up, as both number of objects and volume grows indefinitely. !Screen Shot 2018-03-28 at 16.44.20.png! was (Author: akorzhuev): I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on: * AWS S3 checkpoint * Spark 2.3.0 on k8s * Structured stream - stream join I managed to track the leak down to [https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_ _private lazy val loadedMaps = new mutable.HashMap[Long, MapType]_ _,_ which appears not to clean up _UnsafeRow_s coming from: {code:java} type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code} I noticed that memory leaks slower if data is buffered to disk: {code:java} spark.hadoop.fs.s3a.fast.upload true spark.hadoop.fs.s3a.fast.upload.bufferdisk {code} It also seems that the state persisted to S3 is never cleaned up, as both number of objects and volume grows indefinitely. !Screen Shot 2018-03-28 at 16.44.20.png! > Memory issue with Spark structured streaming > > > Key: SPARK-23682 > URL: https://issues.apache.org/jira/browse/SPARK-23682 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 2.2.0 > Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 > |spark.blacklist.decommissioning.enabled|true| > |spark.blacklist.decommissioning.timeout|1h| > |spark.cleaner.periodicGC.interval|10min| > |spark.default.parallelism|18| > |spark.dynamicAllocation.enabled|false| > |spark.eventLog.enabled|true| > |spark.executor.cores|3| > |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC > -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 > -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'| > |spark.executor.id|driver| > |spark.executor.instances|3| > |spark.executor.memory|22G| > |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2| > |spark.hadoop.parquet.enable.summary-metadata|false| > |spark.hadoop.yarn.timeline-service.enabled|false| > |spark.jars| | > |spark.master|yarn| > |spark.memory.fraction|0.9| > |spark.memory.storageFraction|0.3| > |spark.memory.useLegacyMode|false| > |spark.rdd.compress|true| > |spark.resourceManager.cleanupExpiredHost|true| > |spark.scheduler.mode|FIFO| > |spark.serializer|org.apache.spark.serializer.KryoSerializer| > |spark.shuffle.service.enabled|true| > |spark.speculation|false| > |spark.sql.parquet.filterPushdown|true| > |spark.sql.parquet.mergeSchema|false| > |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse| > |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true| > |spark.submit.deployMode|client| > |spark.yarn.am.cores|1| > |spark.yarn.am.memory|2G| > |spark.yarn.am.memoryOverhead|1G| > |spark.yarn.executor.memoryOverhead|3G| >Reporter: Yuriy Bondaruk >Priority: Major > Labels: Memory, memory, memory-leak > Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot > 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen > Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, > Spark executors GC time.png, image-2018-03-22-14-46-31-960.png > > > It seems like there is an issue with memory in structured streaming. A stream > with aggregation (dropDuplicates()) and data partitioning constantly > increases memory usage and finally executors fails with exit code 137: > {quote}ExecutorLostFailure (executor 2 exited caused by one of the running > tasks) Reason: Container marked as failed: > container_1520214726
[jira] [Commented] (SPARK-23682) Memory issue with Spark structured streaming
[ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417474#comment-16417474 ] Andrew Korzhuev commented on SPARK-23682: - I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on: * AWS S3 checkpoint * Spark 2.3.0 on k8s * Structured stream - stream join I managed to track the leak down to [https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_ _private lazy val loadedMaps = new mutable.HashMap[Long, MapType]_ _,_ which appears not to clean up _UnsafeRow_s coming from: {code:java} type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]{code} I noticed that memory leaks slower if data is buffered to disk: {code:java} spark.hadoop.fs.s3a.fast.upload true spark.hadoop.fs.s3a.fast.upload.bufferdisk {code} It also seems that the state persisted to S3 is never cleaned up, as both number of objects and volume grows indefinitely. !Screen Shot 2018-03-28 at 16.44.20.png! > Memory issue with Spark structured streaming > > > Key: SPARK-23682 > URL: https://issues.apache.org/jira/browse/SPARK-23682 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 2.2.0 > Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 > |spark.blacklist.decommissioning.enabled|true| > |spark.blacklist.decommissioning.timeout|1h| > |spark.cleaner.periodicGC.interval|10min| > |spark.default.parallelism|18| > |spark.dynamicAllocation.enabled|false| > |spark.eventLog.enabled|true| > |spark.executor.cores|3| > |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC > -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 > -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'| > |spark.executor.id|driver| > |spark.executor.instances|3| > |spark.executor.memory|22G| > |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2| > |spark.hadoop.parquet.enable.summary-metadata|false| > |spark.hadoop.yarn.timeline-service.enabled|false| > |spark.jars| | > |spark.master|yarn| > |spark.memory.fraction|0.9| > |spark.memory.storageFraction|0.3| > |spark.memory.useLegacyMode|false| > |spark.rdd.compress|true| > |spark.resourceManager.cleanupExpiredHost|true| > |spark.scheduler.mode|FIFO| > |spark.serializer|org.apache.spark.serializer.KryoSerializer| > |spark.shuffle.service.enabled|true| > |spark.speculation|false| > |spark.sql.parquet.filterPushdown|true| > |spark.sql.parquet.mergeSchema|false| > |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse| > |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true| > |spark.submit.deployMode|client| > |spark.yarn.am.cores|1| > |spark.yarn.am.memory|2G| > |spark.yarn.am.memoryOverhead|1G| > |spark.yarn.executor.memoryOverhead|3G| >Reporter: Yuriy Bondaruk >Priority: Major > Labels: Memory, memory, memory-leak > Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot > 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen > Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, > Spark executors GC time.png, image-2018-03-22-14-46-31-960.png > > > It seems like there is an issue with memory in structured streaming. A stream > with aggregation (dropDuplicates()) and data partitioning constantly > increases memory usage and finally executors fails with exit code 137: > {quote}ExecutorLostFailure (executor 2 exited caused by one of the running > tasks) Reason: Container marked as failed: > container_1520214726510_0001_01_03 on host: > ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: > Container killed on request. Exit code is 137 > Container exited with a non-zero exit code 137 > Killed by external signal{quote} > Stream creating looks something like this: > {quote}session > .readStream() > .schema(inputSchema) > .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB) > .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF) > .csv("s3://test-bucket/input") > .as(Encoders.bean(TestRecord.class)) > .flatMap(mf, Encoders.bean(TestRecord.class)) > .dropDuplicates("testId", "testName") > .withColumn("year", > functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), > "")) > .writeStream() > .option("path", "s3://test-bucket/output") > .option("checkpointLocation", "s3://test-bucket/checkpoint") > .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS)) > .partitionBy("year") > .format("parquet") > .outputMode(OutputMode
[jira] [Updated] (SPARK-23682) Memory issue with Spark structured streaming
[ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Korzhuev updated SPARK-23682: Attachment: Screen Shot 2018-03-28 at 16.44.20.png > Memory issue with Spark structured streaming > > > Key: SPARK-23682 > URL: https://issues.apache.org/jira/browse/SPARK-23682 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 2.2.0 > Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 > |spark.blacklist.decommissioning.enabled|true| > |spark.blacklist.decommissioning.timeout|1h| > |spark.cleaner.periodicGC.interval|10min| > |spark.default.parallelism|18| > |spark.dynamicAllocation.enabled|false| > |spark.eventLog.enabled|true| > |spark.executor.cores|3| > |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC > -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 > -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'| > |spark.executor.id|driver| > |spark.executor.instances|3| > |spark.executor.memory|22G| > |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2| > |spark.hadoop.parquet.enable.summary-metadata|false| > |spark.hadoop.yarn.timeline-service.enabled|false| > |spark.jars| | > |spark.master|yarn| > |spark.memory.fraction|0.9| > |spark.memory.storageFraction|0.3| > |spark.memory.useLegacyMode|false| > |spark.rdd.compress|true| > |spark.resourceManager.cleanupExpiredHost|true| > |spark.scheduler.mode|FIFO| > |spark.serializer|org.apache.spark.serializer.KryoSerializer| > |spark.shuffle.service.enabled|true| > |spark.speculation|false| > |spark.sql.parquet.filterPushdown|true| > |spark.sql.parquet.mergeSchema|false| > |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse| > |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true| > |spark.submit.deployMode|client| > |spark.yarn.am.cores|1| > |spark.yarn.am.memory|2G| > |spark.yarn.am.memoryOverhead|1G| > |spark.yarn.executor.memoryOverhead|3G| >Reporter: Yuriy Bondaruk >Priority: Major > Labels: Memory, memory, memory-leak > Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot > 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen > Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, > Spark executors GC time.png, image-2018-03-22-14-46-31-960.png > > > It seems like there is an issue with memory in structured streaming. A stream > with aggregation (dropDuplicates()) and data partitioning constantly > increases memory usage and finally executors fails with exit code 137: > {quote}ExecutorLostFailure (executor 2 exited caused by one of the running > tasks) Reason: Container marked as failed: > container_1520214726510_0001_01_03 on host: > ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: > Container killed on request. Exit code is 137 > Container exited with a non-zero exit code 137 > Killed by external signal{quote} > Stream creating looks something like this: > {quote}session > .readStream() > .schema(inputSchema) > .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB) > .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF) > .csv("s3://test-bucket/input") > .as(Encoders.bean(TestRecord.class)) > .flatMap(mf, Encoders.bean(TestRecord.class)) > .dropDuplicates("testId", "testName") > .withColumn("year", > functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), > "")) > .writeStream() > .option("path", "s3://test-bucket/output") > .option("checkpointLocation", "s3://test-bucket/checkpoint") > .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS)) > .partitionBy("year") > .format("parquet") > .outputMode(OutputMode.Append()) > .queryName("test-stream") > .start();{quote} > Analyzing the heap dump I found that most of the memory used by > {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} > that is referenced from > [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196] > > On the first glance it looks normal since that is how Spark keeps aggregation > keys in memory. However I did my testing by renaming files in source folder, > so that they could be picked up by spark again. Since input records are the > same all further rows should be rejected as duplicates and memory consumption > shouldn't increase but it's not true. Moreover, GC time took more than 30% of > total processing time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - T
[jira] [Updated] (SPARK-23682) Memory issue with Spark structured streaming
[ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Korzhuev updated SPARK-23682: Attachment: Screen Shot 2018-03-28 at 16.44.20.png > Memory issue with Spark structured streaming > > > Key: SPARK-23682 > URL: https://issues.apache.org/jira/browse/SPARK-23682 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 2.2.0 > Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 > |spark.blacklist.decommissioning.enabled|true| > |spark.blacklist.decommissioning.timeout|1h| > |spark.cleaner.periodicGC.interval|10min| > |spark.default.parallelism|18| > |spark.dynamicAllocation.enabled|false| > |spark.eventLog.enabled|true| > |spark.executor.cores|3| > |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC > -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 > -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'| > |spark.executor.id|driver| > |spark.executor.instances|3| > |spark.executor.memory|22G| > |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2| > |spark.hadoop.parquet.enable.summary-metadata|false| > |spark.hadoop.yarn.timeline-service.enabled|false| > |spark.jars| | > |spark.master|yarn| > |spark.memory.fraction|0.9| > |spark.memory.storageFraction|0.3| > |spark.memory.useLegacyMode|false| > |spark.rdd.compress|true| > |spark.resourceManager.cleanupExpiredHost|true| > |spark.scheduler.mode|FIFO| > |spark.serializer|org.apache.spark.serializer.KryoSerializer| > |spark.shuffle.service.enabled|true| > |spark.speculation|false| > |spark.sql.parquet.filterPushdown|true| > |spark.sql.parquet.mergeSchema|false| > |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse| > |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true| > |spark.submit.deployMode|client| > |spark.yarn.am.cores|1| > |spark.yarn.am.memory|2G| > |spark.yarn.am.memoryOverhead|1G| > |spark.yarn.executor.memoryOverhead|3G| >Reporter: Yuriy Bondaruk >Priority: Major > Labels: Memory, memory, memory-leak > Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot > 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen > Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, > Spark executors GC time.png, image-2018-03-22-14-46-31-960.png > > > It seems like there is an issue with memory in structured streaming. A stream > with aggregation (dropDuplicates()) and data partitioning constantly > increases memory usage and finally executors fails with exit code 137: > {quote}ExecutorLostFailure (executor 2 exited caused by one of the running > tasks) Reason: Container marked as failed: > container_1520214726510_0001_01_03 on host: > ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: > Container killed on request. Exit code is 137 > Container exited with a non-zero exit code 137 > Killed by external signal{quote} > Stream creating looks something like this: > {quote}session > .readStream() > .schema(inputSchema) > .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB) > .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF) > .csv("s3://test-bucket/input") > .as(Encoders.bean(TestRecord.class)) > .flatMap(mf, Encoders.bean(TestRecord.class)) > .dropDuplicates("testId", "testName") > .withColumn("year", > functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), > "")) > .writeStream() > .option("path", "s3://test-bucket/output") > .option("checkpointLocation", "s3://test-bucket/checkpoint") > .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS)) > .partitionBy("year") > .format("parquet") > .outputMode(OutputMode.Append()) > .queryName("test-stream") > .start();{quote} > Analyzing the heap dump I found that most of the memory used by > {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} > that is referenced from > [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196] > > On the first glance it looks normal since that is how Spark keeps aggregation > keys in memory. However I did my testing by renaming files in source folder, > so that they could be picked up by spark again. Since input records are the > same all further rows should be rejected as duplicates and memory consumption > shouldn't increase but it's not true. Moreover, GC time took more than 30% of > total processing time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - T
[jira] [Updated] (SPARK-23682) Memory issue with Spark structured streaming
[ https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Korzhuev updated SPARK-23682: Attachment: Screen Shot 2018-03-28 at 16.44.20.png > Memory issue with Spark structured streaming > > > Key: SPARK-23682 > URL: https://issues.apache.org/jira/browse/SPARK-23682 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming >Affects Versions: 2.2.0 > Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 > |spark.blacklist.decommissioning.enabled|true| > |spark.blacklist.decommissioning.timeout|1h| > |spark.cleaner.periodicGC.interval|10min| > |spark.default.parallelism|18| > |spark.dynamicAllocation.enabled|false| > |spark.eventLog.enabled|true| > |spark.executor.cores|3| > |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails > -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC > -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 > -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'| > |spark.executor.id|driver| > |spark.executor.instances|3| > |spark.executor.memory|22G| > |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2| > |spark.hadoop.parquet.enable.summary-metadata|false| > |spark.hadoop.yarn.timeline-service.enabled|false| > |spark.jars| | > |spark.master|yarn| > |spark.memory.fraction|0.9| > |spark.memory.storageFraction|0.3| > |spark.memory.useLegacyMode|false| > |spark.rdd.compress|true| > |spark.resourceManager.cleanupExpiredHost|true| > |spark.scheduler.mode|FIFO| > |spark.serializer|org.apache.spark.serializer.KryoSerializer| > |spark.shuffle.service.enabled|true| > |spark.speculation|false| > |spark.sql.parquet.filterPushdown|true| > |spark.sql.parquet.mergeSchema|false| > |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse| > |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true| > |spark.submit.deployMode|client| > |spark.yarn.am.cores|1| > |spark.yarn.am.memory|2G| > |spark.yarn.am.memoryOverhead|1G| > |spark.yarn.executor.memoryOverhead|3G| >Reporter: Yuriy Bondaruk >Priority: Major > Labels: Memory, memory, memory-leak > Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot > 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Spark > executors GC time.png, image-2018-03-22-14-46-31-960.png > > > It seems like there is an issue with memory in structured streaming. A stream > with aggregation (dropDuplicates()) and data partitioning constantly > increases memory usage and finally executors fails with exit code 137: > {quote}ExecutorLostFailure (executor 2 exited caused by one of the running > tasks) Reason: Container marked as failed: > container_1520214726510_0001_01_03 on host: > ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: > Container killed on request. Exit code is 137 > Container exited with a non-zero exit code 137 > Killed by external signal{quote} > Stream creating looks something like this: > {quote}session > .readStream() > .schema(inputSchema) > .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB) > .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF) > .csv("s3://test-bucket/input") > .as(Encoders.bean(TestRecord.class)) > .flatMap(mf, Encoders.bean(TestRecord.class)) > .dropDuplicates("testId", "testName") > .withColumn("year", > functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), > "")) > .writeStream() > .option("path", "s3://test-bucket/output") > .option("checkpointLocation", "s3://test-bucket/checkpoint") > .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS)) > .partitionBy("year") > .format("parquet") > .outputMode(OutputMode.Append()) > .queryName("test-stream") > .start();{quote} > Analyzing the heap dump I found that most of the memory used by > {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}} > that is referenced from > [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196] > > On the first glance it looks normal since that is how Spark keeps aggregation > keys in memory. However I did my testing by renaming files in source folder, > so that they could be picked up by spark again. Since input records are the > same all further rows should be rejected as duplicates and memory consumption > shouldn't increase but it's not true. Moreover, GC time took more than 30% of > total processing time. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands,
[jira] [Commented] (SPARK-22865) Publish Official Apache Spark Docker images
[ https://issues.apache.org/jira/browse/SPARK-22865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409609#comment-16409609 ] Andrew Korzhuev commented on SPARK-22865: - Here is an example how this can be done with Travis CI and prebuilt Spark binaries: [https://github.com/andrusha/spark-k8s-docker] Published here: [https://hub.docker.com/r/andrusha/spark-k8s/] > Publish Official Apache Spark Docker images > --- > > Key: SPARK-22865 > URL: https://issues.apache.org/jira/browse/SPARK-22865 > Project: Spark > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Anirudh Ramanathan >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22865) Publish Official Apache Spark Docker images
[ https://issues.apache.org/jira/browse/SPARK-22865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409351#comment-16409351 ] Andrew Korzhuev commented on SPARK-22865: - What is the plan on resolving this issue? I see that there is an official Apache Docker Hub repo [https://hub.docker.com/u/apache/] and some of the projects are built automatically. However Spark is a big project and it requires a build matrix (similar to binary downloads you have on [http://spark.apache.org).|http://spark.apache.org)./] Are there build-servers for Spark or is it possible to use Travis or some other external tool for the purpose? > Publish Official Apache Spark Docker images > --- > > Key: SPARK-22865 > URL: https://issues.apache.org/jira/browse/SPARK-22865 > Project: Spark > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 2.3.0 >Reporter: Anirudh Ramanathan >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23668) Support for imagePullSecrets k8s option
Andrew Korzhuev created SPARK-23668: --- Summary: Support for imagePullSecrets k8s option Key: SPARK-23668 URL: https://issues.apache.org/jira/browse/SPARK-23668 Project: Spark Issue Type: Improvement Components: Kubernetes Affects Versions: 2.3.0 Reporter: Andrew Korzhuev In enterprise setting it's likely that image registry k8s pulling images from is private. Credentials can be passed with the Pod specification through the `imagePullSecrets` parameter, which refers to the k8s secret by name (see [https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/] ). Implementation wise we only need to expose configuration option to a user and then pass it along to the k8s. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23449) Extra java options lose order in Docker context
[ https://issues.apache.org/jira/browse/SPARK-23449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Korzhuev updated SPARK-23449: Description: `spark.driver.extraJavaOptions` and `spark.executor.extraJavaOptions` when processed in `entrypoint.sh` does not preserve its ordering, which makes `-XX:+UnlockExperimentalVMOptions` unusable, as you have to pass it before any other experimental options. Steps to reproduce: # Set `spark.driver.extraJavaOptions`, e.g. `-XX:+UnlockExperimentalVMOptions -XX:+UseG1GC -XX:+CMSClassUnloadingEnabled -XX:+UseCGroupMemoryLimitForHeap` # Submit application to k8s cluster. # Fetch logs and observe that on each run order of options is different and when `-XX:+UnlockExperimentalVMOptions` is not the first startup will fail. Expected behaviour: # Order of `extraJavaOptions` should be preserved. Cause: `entrypoint.sh` fetches environment options with `env`, which doesn't guarantee ordering. {code:java} env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt{code} was: `spark.driver.extraJavaOptions` and `spark.executor.extraJavaOptions` when processed in `entrypoint.sh`, which makes `-XX:+UnlockExperimentalVMOptions` unusable, as you have to pass it before any other experimental options. Steps to reproduce: # Set `spark.driver.extraJavaOptions`, e.g. `-XX:+UnlockExperimentalVMOptions -XX:+UseG1GC -XX:+CMSClassUnloadingEnabled -XX:+UseCGroupMemoryLimitForHeap` # Submit application to k8s cluster. # Fetch logs and observe that on each run order of options is different and when `-XX:+UnlockExperimentalVMOptions` is not the first startup will fail. Expected behaviour: # Order of `extraJavaOptions` should be preserved. Cause: `entrypoint.sh` fetches environment options with `env`, which doesn't guarantee ordering. {code:java} env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt{code} > Extra java options lose order in Docker context > --- > > Key: SPARK-23449 > URL: https://issues.apache.org/jira/browse/SPARK-23449 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 2.3.0 > Environment: Running Spark on K8S with supplied Docker image. Passing > along extra java options. >Reporter: Andrew Korzhuev >Priority: Minor > Fix For: 2.3.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > `spark.driver.extraJavaOptions` and `spark.executor.extraJavaOptions` when > processed in `entrypoint.sh` does not preserve its ordering, which makes > `-XX:+UnlockExperimentalVMOptions` unusable, as you have to pass it before > any other experimental options. > > Steps to reproduce: > # Set `spark.driver.extraJavaOptions`, e.g. > `-XX:+UnlockExperimentalVMOptions -XX:+UseG1GC -XX:+CMSClassUnloadingEnabled > -XX:+UseCGroupMemoryLimitForHeap` > # Submit application to k8s cluster. > # Fetch logs and observe that on each run order of options is different and > when `-XX:+UnlockExperimentalVMOptions` is not the first startup will fail. > > Expected behaviour: > # Order of `extraJavaOptions` should be preserved. > > Cause: > `entrypoint.sh` fetches environment options with `env`, which doesn't > guarantee ordering. > {code:java} > env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > > /tmp/java_opts.txt{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23449) Extra java options lose order in Docker context
Andrew Korzhuev created SPARK-23449: --- Summary: Extra java options lose order in Docker context Key: SPARK-23449 URL: https://issues.apache.org/jira/browse/SPARK-23449 Project: Spark Issue Type: Bug Components: Kubernetes Affects Versions: 2.3.0 Environment: Running Spark on K8S with supplied Docker image. Passing along extra java options. Reporter: Andrew Korzhuev Fix For: 2.3.0 `spark.driver.extraJavaOptions` and `spark.executor.extraJavaOptions` when processed in `entrypoint.sh`, which makes `-XX:+UnlockExperimentalVMOptions` unusable, as you have to pass it before any other experimental options. Steps to reproduce: # Set `spark.driver.extraJavaOptions`, e.g. `-XX:+UnlockExperimentalVMOptions -XX:+UseG1GC -XX:+CMSClassUnloadingEnabled -XX:+UseCGroupMemoryLimitForHeap` # Submit application to k8s cluster. # Fetch logs and observe that on each run order of options is different and when `-XX:+UnlockExperimentalVMOptions` is not the first startup will fail. Expected behaviour: # Order of `extraJavaOptions` should be preserved. Cause: `entrypoint.sh` fetches environment options with `env`, which doesn't guarantee ordering. {code:java} env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23133) Spark options are not passed to the Executor in Docker context
Andrew Korzhuev created SPARK-23133: --- Summary: Spark options are not passed to the Executor in Docker context Key: SPARK-23133 URL: https://issues.apache.org/jira/browse/SPARK-23133 Project: Spark Issue Type: Bug Components: Kubernetes Affects Versions: 2.3.0 Environment: Running Spark on K8s using supplied Docker image. Reporter: Andrew Korzhuev Reproduce: # Build image with `bin/docker-image-tool.sh`. # Submit application to k8s. Set executor options, e.g. ` --conf "spark.executor. extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf"` # Visit Spark UI on executor and notice that option is not set. Expected behavior: options from spark-submit should be correctly passed to executor. Cause: `SPARK_EXECUTOR_JAVA_OPTS` is not defined in `entrypoint.sh` https://github.com/apache/spark/blob/branch-2.3/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L70 [https://github.com/apache/spark/blob/branch-2.3/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L44-L45] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org