[jira] [Commented] (SPARK-22865) Publish Official Apache Spark Docker images

2018-11-21 Thread Andrew Korzhuev (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695338#comment-16695338
 ] 

Andrew Korzhuev commented on SPARK-22865:
-

I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them 
from [https://hub.docker.com/r/andrusha/spark-k8s/tags/,] scripts were also 
updated. I would love to help making this happen for Spark, but I need somebody 
to show me around Apache CI/CD infrastructure.

> Publish Official Apache Spark Docker images
> ---
>
> Key: SPARK-22865
> URL: https://issues.apache.org/jira/browse/SPARK-22865
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Anirudh Ramanathan
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22865) Publish Official Apache Spark Docker images

2018-11-21 Thread Andrew Korzhuev (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695338#comment-16695338
 ] 

Andrew Korzhuev edited comment on SPARK-22865 at 11/21/18 11:00 PM:


I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them 
from [https://hub.docker.com/r/andrusha/spark-k8s/tags/] scripts were also 
updated. I would love to help making this happen for Spark, but I need somebody 
to show me around Apache CI/CD infrastructure.


was (Author: akorzhuev):
I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them 
from 
[https://hub.docker.com/r/andrusha/spark-k8s/tags/|https://hub.docker.com/r/andrusha/spark-k8s/tags/,]
 scripts were also updated. I would love to help making this happen for Spark, 
but I need somebody to show me around Apache CI/CD infrastructure.

> Publish Official Apache Spark Docker images
> ---
>
> Key: SPARK-22865
> URL: https://issues.apache.org/jira/browse/SPARK-22865
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Anirudh Ramanathan
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22865) Publish Official Apache Spark Docker images

2018-11-21 Thread Andrew Korzhuev (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695338#comment-16695338
 ] 

Andrew Korzhuev edited comment on SPARK-22865 at 11/21/18 11:00 PM:


I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them 
from 
[https://hub.docker.com/r/andrusha/spark-k8s/tags/|https://hub.docker.com/r/andrusha/spark-k8s/tags/,]
 scripts were also updated. I would love to help making this happen for Spark, 
but I need somebody to show me around Apache CI/CD infrastructure.


was (Author: akorzhuev):
I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them 
from 
[https://hub.docker.com/r/andrusha/spark-k8s/tags/|https://hub.docker.com/r/andrusha/spark-k8s/tags/,]
 scripts were also updated. I would love to help making this happen for Spark, 
but I need somebody to show me around Apache CI/CD infrastructure.

> Publish Official Apache Spark Docker images
> ---
>
> Key: SPARK-22865
> URL: https://issues.apache.org/jira/browse/SPARK-22865
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Anirudh Ramanathan
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22865) Publish Official Apache Spark Docker images

2018-11-21 Thread Andrew Korzhuev (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16695338#comment-16695338
 ] 

Andrew Korzhuev edited comment on SPARK-22865 at 11/21/18 10:59 PM:


I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them 
from 
[https://hub.docker.com/r/andrusha/spark-k8s/tags/|https://hub.docker.com/r/andrusha/spark-k8s/tags/,]
 scripts were also updated. I would love to help making this happen for Spark, 
but I need somebody to show me around Apache CI/CD infrastructure.


was (Author: akorzhuev):
I've added builds for vanilla 2.3.1, 2.3.2 and 2.4.0 images, you can grab them 
from [https://hub.docker.com/r/andrusha/spark-k8s/tags/,] scripts were also 
updated. I would love to help making this happen for Spark, but I need somebody 
to show me around Apache CI/CD infrastructure.

> Publish Official Apache Spark Docker images
> ---
>
> Key: SPARK-22865
> URL: https://issues.apache.org/jira/browse/SPARK-22865
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Anirudh Ramanathan
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24422) Add JDK9+ in our Jenkins' build servers

2018-07-03 Thread Andrew Korzhuev (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531241#comment-16531241
 ] 

Andrew Korzhuev commented on SPARK-24422:
-

Also `.travis.yml` needs to be fixed in the following way:
{code:java}
# 2. Choose language and target JDKs for parallel builds.
language: java
jdk:
  - openjdk8
  - openjdk9
  - openjdk10
{code}

> Add JDK9+ in our Jenkins' build servers
> ---
>
> Key: SPARK-24422
> URL: https://issues.apache.org/jira/browse/SPARK-24422
> Project: Spark
>  Issue Type: Sub-task
>  Components: Project Infra
>Affects Versions: 2.3.0
>Reporter: DB Tsai
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24422) Add JDK9+ in our Jenkins' build servers

2018-07-03 Thread Andrew Korzhuev (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531241#comment-16531241
 ] 

Andrew Korzhuev edited comment on SPARK-24422 at 7/3/18 11:46 AM:
--

Also `.travis.yml` needs to be fixed in the following way:
{code:java}
# 2. Choose language and target JDKs for parallel builds.
language: java
jdk:
  - openjdk8
  - openjdk9
{code}


was (Author: akorzhuev):
Also `.travis.yml` needs to be fixed in the following way:
{code:java}
# 2. Choose language and target JDKs for parallel builds.
language: java
jdk:
  - openjdk8
  - openjdk9
  - openjdk10
{code}

> Add JDK9+ in our Jenkins' build servers
> ---
>
> Key: SPARK-24422
> URL: https://issues.apache.org/jira/browse/SPARK-24422
> Project: Spark
>  Issue Type: Sub-task
>  Components: Project Infra
>Affects Versions: 2.3.0
>Reporter: DB Tsai
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24421) sun.misc.Unsafe in JDK9+

2018-07-03 Thread Andrew Korzhuev (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531235#comment-16531235
 ] 

Andrew Korzhuev edited comment on SPARK-24421 at 7/3/18 11:44 AM:
--

If I understand this correctly, then the only deprecated JDK9+ API Spark is 
using is `sun.misc.Cleaner` (while `sun.misc.Unsafe` is still accessible) in 
`[common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java|https://github.com/andrusha/spark/commit/7da06d3c725169f9764225f5a29886eb56bee191#diff-c7483c7efce631c783676f014ba2b0ed]`,
 which is fixable in the following way:
{code:java}
@@ -22,7 +22,7 @@
import java.lang.reflect.Method;
import java.nio.ByteBuffer;

-import sun.misc.Cleaner;
+import java.lang.ref.Cleaner;
import sun.misc.Unsafe;

public final class Platform {
@@ -169,7 +169,8 @@ public static ByteBuffer allocateDirectBuffer(int size) {
cleanerField.setAccessible(true);
long memory = allocateMemory(size);
ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
- Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory));
+ Cleaner cleaner = Cleaner.create();
+ cleaner.register(buffer, () -> freeMemory(memory));
cleanerField.set(buffer, cleaner);
return buffer;
{code}
[https://github.com/andrusha/spark/commit/7da06d3c725169f9764225f5a29886eb56bee191#diff-c7483c7efce631c783676f014ba2b0ed]

 


was (Author: akorzhuev):
If I understand this correctly, then the only deprecated JDK9+ API Spark is 
using is `sun.misc.Cleaner` (while `sun.misc.Unsafe` is still accessible) in 
`[common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java|https://github.com/andrusha/spark/commit/7da06d3c725169f9764225f5a29886eb56bee191#diff-c7483c7efce631c783676f014ba2b0ed]`,
 which is fixable in the following way:

 
{code:java}
@@ -22,7 +22,7 @@
import java.lang.reflect.Method;
import java.nio.ByteBuffer;

-import sun.misc.Cleaner;
+import java.lang.ref.Cleaner;
import sun.misc.Unsafe;

public final class Platform {
@@ -169,7 +169,8 @@ public static ByteBuffer allocateDirectBuffer(int size) {
cleanerField.setAccessible(true);
long memory = allocateMemory(size);
ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
- Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory));
+ Cleaner cleaner = Cleaner.create();
+ cleaner.register(buffer, () -> freeMemory(memory));
cleanerField.set(buffer, cleaner);
return buffer;
{code}
[https://github.com/andrusha/spark/commit/7da06d3c725169f9764225f5a29886eb56bee191#diff-c7483c7efce631c783676f014ba2b0ed]

 

> sun.misc.Unsafe in JDK9+
> 
>
> Key: SPARK-24421
> URL: https://issues.apache.org/jira/browse/SPARK-24421
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build
>Affects Versions: 2.3.0
>Reporter: DB Tsai
>Priority: Major
>
> Many internal APIs such as unsafe are encapsulated in JDK9+, see 
> http://openjdk.java.net/jeps/260 for detail.
> To use Unsafe, we need to add *jdk.unsupported* to our code’s module 
> declaration:
> {code:java}
> module java9unsafe {
> requires jdk.unsupported;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24421) sun.misc.Unsafe in JDK9+

2018-07-03 Thread Andrew Korzhuev (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531235#comment-16531235
 ] 

Andrew Korzhuev commented on SPARK-24421:
-

If I understand this correctly, then the only deprecated JDK9+ API Spark is 
using is `sun.misc.Cleaner` (while `sun.misc.Unsafe` is still accessible) in 
`[common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java|https://github.com/andrusha/spark/commit/7da06d3c725169f9764225f5a29886eb56bee191#diff-c7483c7efce631c783676f014ba2b0ed]`,
 which is fixable in the following way:

 
{code:java}
@@ -22,7 +22,7 @@
import java.lang.reflect.Method;
import java.nio.ByteBuffer;

-import sun.misc.Cleaner;
+import java.lang.ref.Cleaner;
import sun.misc.Unsafe;

public final class Platform {
@@ -169,7 +169,8 @@ public static ByteBuffer allocateDirectBuffer(int size) {
cleanerField.setAccessible(true);
long memory = allocateMemory(size);
ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
- Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory));
+ Cleaner cleaner = Cleaner.create();
+ cleaner.register(buffer, () -> freeMemory(memory));
cleanerField.set(buffer, cleaner);
return buffer;
{code}
[https://github.com/andrusha/spark/commit/7da06d3c725169f9764225f5a29886eb56bee191#diff-c7483c7efce631c783676f014ba2b0ed]

 

> sun.misc.Unsafe in JDK9+
> 
>
> Key: SPARK-24421
> URL: https://issues.apache.org/jira/browse/SPARK-24421
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build
>Affects Versions: 2.3.0
>Reporter: DB Tsai
>Priority: Major
>
> Many internal APIs such as unsafe are encapsulated in JDK9+, see 
> http://openjdk.java.net/jeps/260 for detail.
> To use Unsafe, we need to add *jdk.unsupported* to our code’s module 
> declaration:
> {code:java}
> module java9unsafe {
> requires jdk.unsupported;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-28 Thread Andrew Korzhuev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417474#comment-16417474
 ] 

Andrew Korzhuev edited comment on SPARK-23682 at 3/28/18 3:00 PM:
--

I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_
{code:java}
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code}
_,_ which appears not to clean up _UnsafeRow_ coming from:
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload   true
spark.hadoop.fs.s3a.fast.upload.bufferdisk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

Before worker dies:

!screen_shot_2018-03-20_at_15.23.29.png!

Heap dump of worker running for some time:

!Screen Shot 2018-03-28 at 16.44.20.png!


was (Author: akorzhuev):
I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_
{code:java}
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code}
_,_ which appears not to clean up _UnsafeRow_ coming from:
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload   true
spark.hadoop.fs.s3a.fast.upload.bufferdisk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!

> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>Reporter: Yuriy Bondaruk
>Priority: Major
>  Labels: Memory, memory, memory-leak
> Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 
> 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen 
> Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, 
> Spark executors GC time.png, image-2018-03-22-14-46-31-960.png, 
> screen_shot_2018-03-20_at_15.23.29.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code

[jira] [Updated] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-28 Thread Andrew Korzhuev (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Korzhuev updated SPARK-23682:

Attachment: screen_shot_2018-03-20_at_15.23.29.png

> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>Reporter: Yuriy Bondaruk
>Priority: Major
>  Labels: Memory, memory, memory-leak
> Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 
> 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen 
> Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, 
> Spark executors GC time.png, image-2018-03-22-14-46-31-960.png, 
> screen_shot_2018-03-20_at_15.23.29.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
> tasks) Reason: Container marked as failed: 
> container_1520214726510_0001_01_03 on host: 
> ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
> Container killed on request. Exit code is 137
> Container exited with a non-zero exit code 137
> Killed by external signal{quote}
> Stream creating looks something like this:
> {quote}session
> .readStream()
> .schema(inputSchema)
> .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
> .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
> .csv("s3://test-bucket/input")
> .as(Encoders.bean(TestRecord.class))
> .flatMap(mf, Encoders.bean(TestRecord.class))
> .dropDuplicates("testId", "testName")
> .withColumn("year", 
> functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
> ""))
> .writeStream()
> .option("path", "s3://test-bucket/output")
> .option("checkpointLocation", "s3://test-bucket/checkpoint")
> .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
> .partitionBy("year")
> .format("parquet")
> .outputMode(OutputMode.Append())
> .queryName("test-stream")
> .start();{quote}
> Analyzing the heap dump I found that most of the memory used by 
> {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}}
>  that is referenced from 
> [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
>  
> On the first glance it looks normal since that is how Spark keeps aggregation 
> keys in memory. However I did my testing by renaming files in source folder, 
> so that they could be picked up by spark again. Since input records are the 
> same all further rows should be rejected as duplicates and memory consumption 
> shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
> total processing time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-28 Thread Andrew Korzhuev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417474#comment-16417474
 ] 

Andrew Korzhuev edited comment on SPARK-23682 at 3/28/18 2:56 PM:
--

I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_
{code:java}
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code}
_,_ which appears not to clean up _UnsafeRow_ coming from:
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload   true
spark.hadoop.fs.s3a.fast.upload.bufferdisk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!


was (Author: akorzhuev):
I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_
{code:java}
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code}
_,_ which appears not to clean up _UnsafeRow_s coming from: 
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload   true
spark.hadoop.fs.s3a.fast.upload.bufferdisk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!

> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>Reporter: Yuriy Bondaruk
>Priority: Major
>  Labels: Memory, memory, memory-leak
> Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 
> 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen 
> Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, 
> Spark executors GC time.png, image-2018-03-22-14-46-31-960.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
> tasks) Reason: Container marked as failed: 
> container_152021

[jira] [Comment Edited] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-28 Thread Andrew Korzhuev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417474#comment-16417474
 ] 

Andrew Korzhuev edited comment on SPARK-23682 at 3/28/18 2:55 PM:
--

I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_
{code:java}
private lazy val loadedMaps = new mutable.HashMap[Long, MapType]{code}
_,_ which appears not to clean up _UnsafeRow_s coming from: 
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload   true
spark.hadoop.fs.s3a.fast.upload.bufferdisk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!


was (Author: akorzhuev):
I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_

 

_private lazy val loadedMaps = new mutable.HashMap[Long, MapType]_

 

_,_ which appears not to clean up _UnsafeRow_s coming from:

 
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload   true
spark.hadoop.fs.s3a.fast.upload.bufferdisk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!

> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>Reporter: Yuriy Bondaruk
>Priority: Major
>  Labels: Memory, memory, memory-leak
> Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 
> 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen 
> Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, 
> Spark executors GC time.png, image-2018-03-22-14-46-31-960.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
> tasks) Reason: Container marked as failed: 
> container_1520214726

[jira] [Commented] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-28 Thread Andrew Korzhuev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417474#comment-16417474
 ] 

Andrew Korzhuev commented on SPARK-23682:
-

I can confirm that _HDFSBackedStateStoreProvider_ is leaking memory. Tested on:
 * AWS S3 checkpoint
 * Spark 2.3.0 on k8s
 * Structured stream - stream join

I managed to track the leak down to

[https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L252]_:_

 

_private lazy val loadedMaps = new mutable.HashMap[Long, MapType]_

 

_,_ which appears not to clean up _UnsafeRow_s coming from:

 
{code:java}
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, 
UnsafeRow]{code}
I noticed that memory leaks slower if data is buffered to disk:
{code:java}
spark.hadoop.fs.s3a.fast.upload   true
spark.hadoop.fs.s3a.fast.upload.bufferdisk
{code}
It also seems that the state persisted to S3 is never cleaned up, as both 
number of objects and volume grows indefinitely.

!Screen Shot 2018-03-28 at 16.44.20.png!

> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>Reporter: Yuriy Bondaruk
>Priority: Major
>  Labels: Memory, memory, memory-leak
> Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 
> 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen 
> Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, 
> Spark executors GC time.png, image-2018-03-22-14-46-31-960.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
> tasks) Reason: Container marked as failed: 
> container_1520214726510_0001_01_03 on host: 
> ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
> Container killed on request. Exit code is 137
> Container exited with a non-zero exit code 137
> Killed by external signal{quote}
> Stream creating looks something like this:
> {quote}session
> .readStream()
> .schema(inputSchema)
> .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
> .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
> .csv("s3://test-bucket/input")
> .as(Encoders.bean(TestRecord.class))
> .flatMap(mf, Encoders.bean(TestRecord.class))
> .dropDuplicates("testId", "testName")
> .withColumn("year", 
> functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
> ""))
> .writeStream()
> .option("path", "s3://test-bucket/output")
> .option("checkpointLocation", "s3://test-bucket/checkpoint")
> .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
> .partitionBy("year")
> .format("parquet")
> .outputMode(OutputMode

[jira] [Updated] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-28 Thread Andrew Korzhuev (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Korzhuev updated SPARK-23682:

Attachment: Screen Shot 2018-03-28 at 16.44.20.png

> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>Reporter: Yuriy Bondaruk
>Priority: Major
>  Labels: Memory, memory, memory-leak
> Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 
> 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen 
> Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, 
> Spark executors GC time.png, image-2018-03-22-14-46-31-960.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
> tasks) Reason: Container marked as failed: 
> container_1520214726510_0001_01_03 on host: 
> ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
> Container killed on request. Exit code is 137
> Container exited with a non-zero exit code 137
> Killed by external signal{quote}
> Stream creating looks something like this:
> {quote}session
> .readStream()
> .schema(inputSchema)
> .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
> .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
> .csv("s3://test-bucket/input")
> .as(Encoders.bean(TestRecord.class))
> .flatMap(mf, Encoders.bean(TestRecord.class))
> .dropDuplicates("testId", "testName")
> .withColumn("year", 
> functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
> ""))
> .writeStream()
> .option("path", "s3://test-bucket/output")
> .option("checkpointLocation", "s3://test-bucket/checkpoint")
> .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
> .partitionBy("year")
> .format("parquet")
> .outputMode(OutputMode.Append())
> .queryName("test-stream")
> .start();{quote}
> Analyzing the heap dump I found that most of the memory used by 
> {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}}
>  that is referenced from 
> [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
>  
> On the first glance it looks normal since that is how Spark keeps aggregation 
> keys in memory. However I did my testing by renaming files in source folder, 
> so that they could be picked up by spark again. Since input records are the 
> same all further rows should be rejected as duplicates and memory consumption 
> shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
> total processing time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
T

[jira] [Updated] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-28 Thread Andrew Korzhuev (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Korzhuev updated SPARK-23682:

Attachment: Screen Shot 2018-03-28 at 16.44.20.png

> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>Reporter: Yuriy Bondaruk
>Priority: Major
>  Labels: Memory, memory, memory-leak
> Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 
> 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Screen 
> Shot 2018-03-28 at 16.44.20.png, Screen Shot 2018-03-28 at 16.44.20.png, 
> Spark executors GC time.png, image-2018-03-22-14-46-31-960.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
> tasks) Reason: Container marked as failed: 
> container_1520214726510_0001_01_03 on host: 
> ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
> Container killed on request. Exit code is 137
> Container exited with a non-zero exit code 137
> Killed by external signal{quote}
> Stream creating looks something like this:
> {quote}session
> .readStream()
> .schema(inputSchema)
> .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
> .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
> .csv("s3://test-bucket/input")
> .as(Encoders.bean(TestRecord.class))
> .flatMap(mf, Encoders.bean(TestRecord.class))
> .dropDuplicates("testId", "testName")
> .withColumn("year", 
> functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
> ""))
> .writeStream()
> .option("path", "s3://test-bucket/output")
> .option("checkpointLocation", "s3://test-bucket/checkpoint")
> .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
> .partitionBy("year")
> .format("parquet")
> .outputMode(OutputMode.Append())
> .queryName("test-stream")
> .start();{quote}
> Analyzing the heap dump I found that most of the memory used by 
> {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}}
>  that is referenced from 
> [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
>  
> On the first glance it looks normal since that is how Spark keeps aggregation 
> keys in memory. However I did my testing by renaming files in source folder, 
> so that they could be picked up by spark again. Since input records are the 
> same all further rows should be rejected as duplicates and memory consumption 
> shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
> total processing time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
T

[jira] [Updated] (SPARK-23682) Memory issue with Spark structured streaming

2018-03-28 Thread Andrew Korzhuev (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Korzhuev updated SPARK-23682:

Attachment: Screen Shot 2018-03-28 at 16.44.20.png

> Memory issue with Spark structured streaming
> 
>
> Key: SPARK-23682
> URL: https://issues.apache.org/jira/browse/SPARK-23682
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.2.0
> Environment: EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
> |spark.blacklist.decommissioning.enabled|true|
> |spark.blacklist.decommissioning.timeout|1h|
> |spark.cleaner.periodicGC.interval|10min|
> |spark.default.parallelism|18|
> |spark.dynamicAllocation.enabled|false|
> |spark.eventLog.enabled|true|
> |spark.executor.cores|3|
> |spark.executor.extraJavaOptions|-verbose:gc -XX:+PrintGCDetails 
> -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC 
> -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 
> -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'|
> |spark.executor.id|driver|
> |spark.executor.instances|3|
> |spark.executor.memory|22G|
> |spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version|2|
> |spark.hadoop.parquet.enable.summary-metadata|false|
> |spark.hadoop.yarn.timeline-service.enabled|false|
> |spark.jars| |
> |spark.master|yarn|
> |spark.memory.fraction|0.9|
> |spark.memory.storageFraction|0.3|
> |spark.memory.useLegacyMode|false|
> |spark.rdd.compress|true|
> |spark.resourceManager.cleanupExpiredHost|true|
> |spark.scheduler.mode|FIFO|
> |spark.serializer|org.apache.spark.serializer.KryoSerializer|
> |spark.shuffle.service.enabled|true|
> |spark.speculation|false|
> |spark.sql.parquet.filterPushdown|true|
> |spark.sql.parquet.mergeSchema|false|
> |spark.sql.warehouse.dir|hdfs:///user/spark/warehouse|
> |spark.stage.attempt.ignoreOnDecommissionFetchFailure|true|
> |spark.submit.deployMode|client|
> |spark.yarn.am.cores|1|
> |spark.yarn.am.memory|2G|
> |spark.yarn.am.memoryOverhead|1G|
> |spark.yarn.executor.memoryOverhead|3G|
>Reporter: Yuriy Bondaruk
>Priority: Major
>  Labels: Memory, memory, memory-leak
> Attachments: Screen Shot 2018-03-07 at 21.52.17.png, Screen Shot 
> 2018-03-10 at 18.53.49.png, Screen Shot 2018-03-28 at 16.44.20.png, Spark 
> executors GC time.png, image-2018-03-22-14-46-31-960.png
>
>
> It seems like there is an issue with memory in structured streaming. A stream 
> with aggregation (dropDuplicates()) and data partitioning constantly 
> increases memory usage and finally executors fails with exit code 137:
> {quote}ExecutorLostFailure (executor 2 exited caused by one of the running 
> tasks) Reason: Container marked as failed: 
> container_1520214726510_0001_01_03 on host: 
> ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: 
> Container killed on request. Exit code is 137
> Container exited with a non-zero exit code 137
> Killed by external signal{quote}
> Stream creating looks something like this:
> {quote}session
> .readStream()
> .schema(inputSchema)
> .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
> .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
> .csv("s3://test-bucket/input")
> .as(Encoders.bean(TestRecord.class))
> .flatMap(mf, Encoders.bean(TestRecord.class))
> .dropDuplicates("testId", "testName")
> .withColumn("year", 
> functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), 
> ""))
> .writeStream()
> .option("path", "s3://test-bucket/output")
> .option("checkpointLocation", "s3://test-bucket/checkpoint")
> .trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
> .partitionBy("year")
> .format("parquet")
> .outputMode(OutputMode.Append())
> .queryName("test-stream")
> .start();{quote}
> Analyzing the heap dump I found that most of the memory used by 
> {{org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider}}
>  that is referenced from 
> [StateStore|https://github.com/apache/spark/blob/branch-2.2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L196]
>  
> On the first glance it looks normal since that is how Spark keeps aggregation 
> keys in memory. However I did my testing by renaming files in source folder, 
> so that they could be picked up by spark again. Since input records are the 
> same all further rows should be rejected as duplicates and memory consumption 
> shouldn't increase but it's not true. Moreover, GC time took more than 30% of 
> total processing time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands,

[jira] [Commented] (SPARK-22865) Publish Official Apache Spark Docker images

2018-03-22 Thread Andrew Korzhuev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409609#comment-16409609
 ] 

Andrew Korzhuev commented on SPARK-22865:
-

Here is an example how this can be done with Travis CI and prebuilt Spark 
binaries: [https://github.com/andrusha/spark-k8s-docker]

Published here: [https://hub.docker.com/r/andrusha/spark-k8s/]

> Publish Official Apache Spark Docker images
> ---
>
> Key: SPARK-22865
> URL: https://issues.apache.org/jira/browse/SPARK-22865
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Anirudh Ramanathan
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22865) Publish Official Apache Spark Docker images

2018-03-22 Thread Andrew Korzhuev (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16409351#comment-16409351
 ] 

Andrew Korzhuev commented on SPARK-22865:
-

What is the plan on resolving this issue?

I see that there is an official Apache Docker Hub repo 
[https://hub.docker.com/u/apache/] and some of the projects are built 
automatically. However Spark is a big project and it requires a build matrix 
(similar to binary downloads you have on 
[http://spark.apache.org).|http://spark.apache.org)./] Are there build-servers 
for Spark or is it possible to use Travis or some other external tool for the 
purpose?

> Publish Official Apache Spark Docker images
> ---
>
> Key: SPARK-22865
> URL: https://issues.apache.org/jira/browse/SPARK-22865
> Project: Spark
>  Issue Type: New Feature
>  Components: Kubernetes
>Affects Versions: 2.3.0
>Reporter: Anirudh Ramanathan
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23668) Support for imagePullSecrets k8s option

2018-03-13 Thread Andrew Korzhuev (JIRA)
Andrew Korzhuev created SPARK-23668:
---

 Summary: Support for imagePullSecrets k8s option
 Key: SPARK-23668
 URL: https://issues.apache.org/jira/browse/SPARK-23668
 Project: Spark
  Issue Type: Improvement
  Components: Kubernetes
Affects Versions: 2.3.0
Reporter: Andrew Korzhuev


In enterprise setting it's likely that image registry k8s pulling images from 
is private.

Credentials can be passed with the Pod specification through the 
`imagePullSecrets` parameter, which refers to the k8s secret by name (see 
[https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/]
 ).

Implementation wise we only need to expose configuration option to a user and 
then pass it along to the k8s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23449) Extra java options lose order in Docker context

2018-02-16 Thread Andrew Korzhuev (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Korzhuev updated SPARK-23449:

Description: 
`spark.driver.extraJavaOptions` and `spark.executor.extraJavaOptions` when 
processed in `entrypoint.sh` does not preserve its ordering, which makes 
`-XX:+UnlockExperimentalVMOptions` unusable, as you have to pass it before any 
other experimental options.

 

Steps to reproduce:
 # Set `spark.driver.extraJavaOptions`, e.g. `-XX:+UnlockExperimentalVMOptions 
-XX:+UseG1GC -XX:+CMSClassUnloadingEnabled -XX:+UseCGroupMemoryLimitForHeap`
 # Submit application to k8s cluster.
 # Fetch logs and observe that on each run order of options is different and 
when `-XX:+UnlockExperimentalVMOptions` is not the first startup will fail.

 

Expected behaviour:
 # Order of `extraJavaOptions` should be preserved.

 

Cause:

`entrypoint.sh` fetches environment options with `env`, which doesn't guarantee 
ordering.
{code:java}
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt{code}

  was:
`spark.driver.extraJavaOptions` and `spark.executor.extraJavaOptions` when 
processed in `entrypoint.sh`, which makes `-XX:+UnlockExperimentalVMOptions` 
unusable, as you have to pass it before any other experimental options.

 

Steps to reproduce:
 # Set `spark.driver.extraJavaOptions`, e.g. `-XX:+UnlockExperimentalVMOptions 
-XX:+UseG1GC -XX:+CMSClassUnloadingEnabled -XX:+UseCGroupMemoryLimitForHeap`
 # Submit application to k8s cluster.
 # Fetch logs and observe that on each run order of options is different and 
when `-XX:+UnlockExperimentalVMOptions` is not the first startup will fail.

 

Expected behaviour:
 # Order of `extraJavaOptions` should be preserved.

 

Cause:

`entrypoint.sh` fetches environment options with `env`, which doesn't guarantee 
ordering.
{code:java}
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt{code}


> Extra java options lose order in Docker context
> ---
>
> Key: SPARK-23449
> URL: https://issues.apache.org/jira/browse/SPARK-23449
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.0
> Environment: Running Spark on K8S with supplied Docker image. Passing 
> along extra java options.
>Reporter: Andrew Korzhuev
>Priority: Minor
> Fix For: 2.3.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> `spark.driver.extraJavaOptions` and `spark.executor.extraJavaOptions` when 
> processed in `entrypoint.sh` does not preserve its ordering, which makes 
> `-XX:+UnlockExperimentalVMOptions` unusable, as you have to pass it before 
> any other experimental options.
>  
> Steps to reproduce:
>  # Set `spark.driver.extraJavaOptions`, e.g. 
> `-XX:+UnlockExperimentalVMOptions -XX:+UseG1GC -XX:+CMSClassUnloadingEnabled 
> -XX:+UseCGroupMemoryLimitForHeap`
>  # Submit application to k8s cluster.
>  # Fetch logs and observe that on each run order of options is different and 
> when `-XX:+UnlockExperimentalVMOptions` is not the first startup will fail.
>  
> Expected behaviour:
>  # Order of `extraJavaOptions` should be preserved.
>  
> Cause:
> `entrypoint.sh` fetches environment options with `env`, which doesn't 
> guarantee ordering.
> {code:java}
> env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
> /tmp/java_opts.txt{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23449) Extra java options lose order in Docker context

2018-02-16 Thread Andrew Korzhuev (JIRA)
Andrew Korzhuev created SPARK-23449:
---

 Summary: Extra java options lose order in Docker context
 Key: SPARK-23449
 URL: https://issues.apache.org/jira/browse/SPARK-23449
 Project: Spark
  Issue Type: Bug
  Components: Kubernetes
Affects Versions: 2.3.0
 Environment: Running Spark on K8S with supplied Docker image. Passing 
along extra java options.
Reporter: Andrew Korzhuev
 Fix For: 2.3.0


`spark.driver.extraJavaOptions` and `spark.executor.extraJavaOptions` when 
processed in `entrypoint.sh`, which makes `-XX:+UnlockExperimentalVMOptions` 
unusable, as you have to pass it before any other experimental options.

 

Steps to reproduce:
 # Set `spark.driver.extraJavaOptions`, e.g. `-XX:+UnlockExperimentalVMOptions 
-XX:+UseG1GC -XX:+CMSClassUnloadingEnabled -XX:+UseCGroupMemoryLimitForHeap`
 # Submit application to k8s cluster.
 # Fetch logs and observe that on each run order of options is different and 
when `-XX:+UnlockExperimentalVMOptions` is not the first startup will fail.

 

Expected behaviour:
 # Order of `extraJavaOptions` should be preserved.

 

Cause:

`entrypoint.sh` fetches environment options with `env`, which doesn't guarantee 
ordering.
{code:java}
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23133) Spark options are not passed to the Executor in Docker context

2018-01-17 Thread Andrew Korzhuev (JIRA)
Andrew Korzhuev created SPARK-23133:
---

 Summary: Spark options are not passed to the Executor in Docker 
context
 Key: SPARK-23133
 URL: https://issues.apache.org/jira/browse/SPARK-23133
 Project: Spark
  Issue Type: Bug
  Components: Kubernetes
Affects Versions: 2.3.0
 Environment: Running Spark on K8s using supplied Docker image.
Reporter: Andrew Korzhuev


Reproduce:
 # Build image with `bin/docker-image-tool.sh`.
 # Submit application to k8s. Set executor options, e.g. ` --conf 
"spark.executor. 
extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf"`
 # Visit Spark UI on executor and notice that option is not set.

Expected behavior: options from spark-submit should be correctly passed to 
executor.

Cause:

`SPARK_EXECUTOR_JAVA_OPTS` is not defined in `entrypoint.sh`

https://github.com/apache/spark/blob/branch-2.3/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L70

[https://github.com/apache/spark/blob/branch-2.3/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L44-L45]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org