[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data

2018-07-24 Thread Brad (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554404#comment-16554404
 ] 

Brad commented on SPARK-21097:
--

Hi [~menelaus],

I have stalled out on this project, if you would like to work on it feel free. 
There is a *lot* of discussion on the PR (it might take a couple of attempts to 
load the page). The last recommendation from Squito was that I open up a SPIP 
to get more input. I'm not sure if the final state of the code is 100% correct 
at this point, I was making a lot of changes at the end.

 

Thanks

Brad

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
>Priority: Major
> Attachments: Preserving Cached Data with Dynamic Allocation.pdf
>
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config. Now when an executor reaches its configured idle timeout, 
> instead of just killing it on the spot, we will stop sending it new tasks, 
> replicate all of its rdd blocks onto other executors, and then kill it. If 
> there is an issue while we replicate the data, like an error, it takes too 
> long, or there isn't enough space, then we will fall back to the original 
> behavior and drop the data and kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data

2018-07-23 Thread Brad (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16552847#comment-16552847
 ] 

Brad commented on SPARK-21097:
--

No, I did not consider cpu utilization when redistributing the cache. It might 
be a good idea, but I'm not sure how you would implement it.

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
>Priority: Major
> Attachments: Preserving Cached Data with Dynamic Allocation.pdf
>
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config. Now when an executor reaches its configured idle timeout, 
> instead of just killing it on the spot, we will stop sending it new tasks, 
> replicate all of its rdd blocks onto other executors, and then kill it. If 
> there is an issue while we replicate the data, like an error, it takes too 
> long, or there isn't enough space, then we will fall back to the original 
> behavior and drop the data and kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data

2018-07-16 Thread Brad (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545240#comment-16545240
 ] 

Brad commented on SPARK-21097:
--

Hi [~menelaus]

The processing time delay is just a way to simulate different sized processing 
workloads. In the test I just do a map over the dataframe and spin for a few 
microseconds on each row as configured.  In the case of 0 µs there it's like 
there is almost no processing on the data. There is still the time of loading 
the data from hadoop.

The dynamic allocation without recovery benchmark is much slower in the 0 µs 
cached data case because it has lost all of its cached data and has to reload 
from hadoop. You can see the performance is similar to the initial load.

Thanks

Brad

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
>Priority: Major
> Attachments: Preserving Cached Data with Dynamic Allocation.pdf
>
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config. Now when an executor reaches its configured idle timeout, 
> instead of just killing it on the spot, we will stop sending it new tasks, 
> replicate all of its rdd blocks onto other executors, and then kill it. If 
> there is an issue while we replicate the data, like an error, it takes too 
> long, or there isn't enough space, then we will fall back to the original 
> behavior and drop the data and kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data

2018-07-11 Thread Brad (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540195#comment-16540195
 ] 

Brad commented on SPARK-21097:
--

Hi [~jothor],

Thanks for the interest, but to be honest I got a new job and stopped working 
on this a while ago. I used spark-bench to run the tests available here, 
[https://github.com/CODAIT/spark-bench,] but my config scripts and custom 
workload are long gone. I'd be happy to answer any specific questions you have 
though.

Thanks

Brad

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
>Priority: Major
> Attachments: Preserving Cached Data with Dynamic Allocation.pdf
>
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config. Now when an executor reaches its configured idle timeout, 
> instead of just killing it on the spot, we will stop sending it new tasks, 
> replicate all of its rdd blocks onto other executors, and then kill it. If 
> there is an issue while we replicate the data, like an error, it takes too 
> long, or there isn't enough space, then we will fall back to the original 
> behavior and drop the data and kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation

2018-03-29 Thread Brad (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419062#comment-16419062
 ] 

Brad commented on SPARK-22618:
--

Yeah the fix for broadcaset unpersist should be basically the same. Thanks 
Thomas.

> RDD.unpersist can cause fatal exception when used with dynamic allocation
> -
>
> Key: SPARK-22618
> URL: https://issues.apache.org/jira/browse/SPARK-22618
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Brad
>Assignee: Brad
>Priority: Minor
> Fix For: 2.3.0
>
>
> If you use rdd.unpersist() with dynamic allocation, then an executor can be 
> deallocated while your rdd is being removed, which will throw an uncaught 
> exception killing your job. 
> I looked into different ways of preventing this error from occurring but 
> couldn't come up with anything that wouldn't require a big change. I propose 
> the best fix is just to catch and log IOExceptions in unpersist() so they 
> don't kill your job. This will match the effective behavior when executors 
> are lost from dynamic allocation in other parts of the code.
> In the worst case scenario I think this could lead to RDD partitions getting 
> left on executors after they were unpersisted, but this is probably better 
> than the whole job failing. I think in most cases the IOException would be 
> due to the executor dieing for some reason, which is effectively the same 
> result as unpersisting the rdd from that executor anyway.
> I noticed this exception in a job that loads a 100GB dataset on a cluster 
> where we use dynamic allocation heavily. Here is the relevant stack trace
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
> at 
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "main" org.apache.spark.SparkException: Exception thrown 
> in awaitResult:
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131)
> at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806)
> at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62)
> at 
> com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at 
> 

[jira] [Updated] (SPARK-21097) Dynamic allocation will preserve cached data

2018-02-06 Thread Brad (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brad updated SPARK-21097:
-
Description: 
We want to use dynamic allocation to distribute resources among many notebook 
users on our spark clusters. One difficulty is that if a user has cached data 
then we are either prevented from de-allocating any of their executors, or we 
are forced to drop their cached data, which can lead to a bad user experience.

We propose adding a feature to preserve cached data by copying it to other 
executors before de-allocation. This behavior would be enabled by a simple 
spark config. Now when an executor reaches its configured idle timeout, instead 
of just killing it on the spot, we will stop sending it new tasks, replicate 
all of its rdd blocks onto other executors, and then kill it. If there is an 
issue while we replicate the data, like an error, it takes too long, or there 
isn't enough space, then we will fall back to the original behavior and drop 
the data and kill the executor.

This feature should allow anyone with notebook users to use their cluster 
resources more efficiently. Also since it will be completely opt-in it will 
unlikely to cause problems for other use cases.

  was:
We want to use dynamic allocation to distribute resources among many notebook 
users on our spark clusters. One difficulty is that if a user has cached data 
then we are either prevented from de-allocating any of their executors, or we 
are forced to drop their cached data, which can lead to a bad user experience.

We propose adding a feature to preserve cached data by copying it to other 
executors before de-allocation. This behavior would be enabled by a simple 
spark config like "spark.dynamicAllocation.recoverCachedData". Now when an 
executor reaches its configured idle timeout, instead of just killing it on the 
spot, we will stop sending it new tasks, replicate all of its rdd blocks onto 
other executors, and then kill it. If there is an issue while we replicate the 
data, like an error, it takes too long, or there isn't enough space, then we 
will fall back to the original behavior and drop the data and kill the executor.

This feature should allow anyone with notebook users to use their cluster 
resources more efficiently. Also since it will be completely opt-in it will 
unlikely to cause problems for other use cases. 



> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
>Priority: Major
> Attachments: Preserving Cached Data with Dynamic Allocation.pdf
>
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config. Now when an executor reaches its configured idle timeout, 
> instead of just killing it on the spot, we will stop sending it new tasks, 
> replicate all of its rdd blocks onto other executors, and then kill it. If 
> there is an issue while we replicate the data, like an error, it takes too 
> long, or there isn't enough space, then we will fall back to the original 
> behavior and drop the data and kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation

2017-11-28 Thread Brad (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brad updated SPARK-22618:
-
Fix Version/s: (was: 2.3.0)

> RDD.unpersist can cause fatal exception when used with dynamic allocation
> -
>
> Key: SPARK-22618
> URL: https://issues.apache.org/jira/browse/SPARK-22618
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Brad
>Priority: Minor
>
> If you use rdd.unpersist() with dynamic allocation, then an executor can be 
> deallocated while your rdd is being removed, which will throw an uncaught 
> exception killing your job. 
> I looked into different ways of preventing this error from occurring but 
> couldn't come up with anything that wouldn't require a big change. I propose 
> the best fix is just to catch and log IOExceptions in unpersist() so they 
> don't kill your job. This will match the effective behavior when executors 
> are lost from dynamic allocation in other parts of the code.
> In the worst case scenario I think this could lead to RDD partitions getting 
> left on executors after they were unpersisted, but this is probably better 
> than the whole job failing. I think in most cases the IOException would be 
> due to the executor dieing for some reason, which is effectively the same 
> result as unpersisting the rdd from that executor anyway.
> I noticed this exception in a job that loads a 100GB dataset on a cluster 
> where we use dynamic allocation heavily. Here is the relevant stack trace
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
> at 
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "main" org.apache.spark.SparkException: Exception thrown 
> in awaitResult:
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131)
> at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806)
> at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62)
> at 
> com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:52)
> at 
> 

[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation

2017-11-27 Thread Brad (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267223#comment-16267223
 ] 

Brad commented on SPARK-22618:
--

I have a PR for this forthcoming.

> RDD.unpersist can cause fatal exception when used with dynamic allocation
> -
>
> Key: SPARK-22618
> URL: https://issues.apache.org/jira/browse/SPARK-22618
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Brad
>Priority: Minor
> Fix For: 2.3.0
>
>
> If you use rdd.unpersist() with dynamic allocation, then an executor can be 
> deallocated while your rdd is being removed, which will throw an uncaught 
> exception killing your job. 
> I looked into different ways of preventing this error from occurring but 
> couldn't come up with anything that wouldn't require a big change. I propose 
> the best fix is just to catch and log IOExceptions in unpersist() so they 
> don't kill your job. This will match the effective behavior when executors 
> are lost from dynamic allocation in other parts of the code.
> In the worst case scenario I think this could lead to RDD partitions getting 
> left on executors after they were unpersisted, but this is probably better 
> than the whole job failing. I think in most cases the IOException would be 
> due to the executor dieing for some reason, which is effectively the same 
> result as unpersisting the rdd from that executor anyway.
> I noticed this exception in a job that loads a 100GB dataset on a cluster 
> where we use dynamic allocation heavily. Here is the relevant stack trace
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
> at 
> io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "main" org.apache.spark.SparkException: Exception thrown 
> in awaitResult:
> at 
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131)
> at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806)
> at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62)
> at 
> com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40)
> at 
> com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at 
> com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78)
> at 
> 

[jira] [Created] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation

2017-11-27 Thread Brad (JIRA)
Brad created SPARK-22618:


 Summary: RDD.unpersist can cause fatal exception when used with 
dynamic allocation
 Key: SPARK-22618
 URL: https://issues.apache.org/jira/browse/SPARK-22618
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: Brad
Priority: Minor
 Fix For: 2.3.0


If you use rdd.unpersist() with dynamic allocation, then an executor can be 
deallocated while your rdd is being removed, which will throw an uncaught 
exception killing your job. 

I looked into different ways of preventing this error from occurring but 
couldn't come up with anything that wouldn't require a big change. I propose 
the best fix is just to catch and log IOExceptions in unpersist() so they don't 
kill your job. This will match the effective behavior when executors are lost 
from dynamic allocation in other parts of the code.

In the worst case scenario I think this could lead to RDD partitions getting 
left on executors after they were unpersisted, but this is probably better than 
the whole job failing. I think in most cases the IOException would be due to 
the executor dieing for some reason, which is effectively the same result as 
unpersisting the rdd from that executor anyway.

I noticed this exception in a job that loads a 100GB dataset on a cluster where 
we use dynamic allocation heavily. Here is the relevant stack trace

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "main" org.apache.spark.SparkException: Exception thrown in 
awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at 
org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131)
at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806)
at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
at 
com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62)
at 
com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40)
at 
com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33)
at 
com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
at 
com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at 
com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78)
at 
com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:52)
at 
com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:47)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at 

[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data

2017-08-29 Thread Brad (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145708#comment-16145708
 ] 

Brad commented on SPARK-21097:
--

Here is a document with some of my benchmark results. I am working on adding 
more benchmarks.

https://docs.google.com/document/d/1E6_rhAAJB8Ww0n52-LYcFTO1zhJBWgfIXzNjLi29730/edit?usp=sharing

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
> Attachments: Preserving Cached Data with Dynamic Allocation.pdf
>
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config like "spark.dynamicAllocation.recoverCachedData". Now when an 
> executor reaches its configured idle timeout, instead of just killing it on 
> the spot, we will stop sending it new tasks, replicate all of its rdd blocks 
> onto other executors, and then kill it. If there is an issue while we 
> replicate the data, like an error, it takes too long, or there isn't enough 
> space, then we will fall back to the original behavior and drop the data and 
> kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data

2017-08-03 Thread Brad (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113264#comment-16113264
 ] 

Brad commented on SPARK-21097:
--

I'm still working on thoroughly benchmarking and testing this change. If anyone 
is interested in this, send me a message. Thanks

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
> Attachments: Preserving Cached Data with Dynamic Allocation.pdf
>
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config like "spark.dynamicAllocation.recoverCachedData". Now when an 
> executor reaches its configured idle timeout, instead of just killing it on 
> the spot, we will stop sending it new tasks, replicate all of its rdd blocks 
> onto other executors, and then kill it. If there is an issue while we 
> replicate the data, like an error, it takes too long, or there isn't enough 
> space, then we will fall back to the original behavior and drop the data and 
> kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21097) Dynamic allocation will preserve cached data

2017-06-15 Thread Brad (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brad updated SPARK-21097:
-
Attachment: Preserving Cached Data with Dynamic Allocation.pdf

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
> Attachments: Preserving Cached Data with Dynamic Allocation.pdf
>
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config like "spark.dynamicAllocation.recoverCachedData". Now when an 
> executor reaches its configured idle timeout, instead of just killing it on 
> the spot, we will stop sending it new tasks, replicate all of its rdd blocks 
> onto other executors, and then kill it. If there is an issue while we 
> replicate the data, like an error, it takes too long, or there isn't enough 
> space, then we will fall back to the original behavior and drop the data and 
> kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21097) Dynamic allocation will preserve cached data

2017-06-15 Thread Brad (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brad updated SPARK-21097:
-
Attachment: (was: Preserving Cached Data with Dynamic Allocation.docx)

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config like "spark.dynamicAllocation.recoverCachedData". Now when an 
> executor reaches its configured idle timeout, instead of just killing it on 
> the spot, we will stop sending it new tasks, replicate all of its rdd blocks 
> onto other executors, and then kill it. If there is an issue while we 
> replicate the data, like an error, it takes too long, or there isn't enough 
> space, then we will fall back to the original behavior and drop the data and 
> kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21097) Dynamic allocation will preserve cached data

2017-06-15 Thread Brad (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brad updated SPARK-21097:
-
Attachment: (was: Preserving Cached Data with Dynamic Allocation.docx)

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config like "spark.dynamicAllocation.recoverCachedData". Now when an 
> executor reaches its configured idle timeout, instead of just killing it on 
> the spot, we will stop sending it new tasks, replicate all of its rdd blocks 
> onto other executors, and then kill it. If there is an issue while we 
> replicate the data, like an error, it takes too long, or there isn't enough 
> space, then we will fall back to the original behavior and drop the data and 
> kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21097) Dynamic allocation will preserve cached data

2017-06-15 Thread Brad (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brad updated SPARK-21097:
-
Attachment: Preserving Cached Data with Dynamic Allocation.docx

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config like "spark.dynamicAllocation.recoverCachedData". Now when an 
> executor reaches its configured idle timeout, instead of just killing it on 
> the spot, we will stop sending it new tasks, replicate all of its rdd blocks 
> onto other executors, and then kill it. If there is an issue while we 
> replicate the data, like an error, it takes too long, or there isn't enough 
> space, then we will fall back to the original behavior and drop the data and 
> kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21097) Dynamic allocation will preserve cached data

2017-06-15 Thread Brad (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brad updated SPARK-21097:
-
Attachment: Preserving Cached Data with Dynamic Allocation.docx

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
> Attachments: Preserving Cached Data with Dynamic Allocation.docx
>
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config like "spark.dynamicAllocation.recoverCachedData". Now when an 
> executor reaches its configured idle timeout, instead of just killing it on 
> the spot, we will stop sending it new tasks, replicate all of its rdd blocks 
> onto other executors, and then kill it. If there is an issue while we 
> replicate the data, like an error, it takes too long, or there isn't enough 
> space, then we will fall back to the original behavior and drop the data and 
> kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21097) Dynamic allocation will preserve cached data

2017-06-14 Thread Brad (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16049485#comment-16049485
 ] 

Brad edited comment on SPARK-21097 at 6/14/17 6:29 PM:
---

Hey [~srowen], thanks for your input. I would definitely like to do some 
benchmarks and show that the recovered data leads to a performance improvement. 
I don't think it will increase complexity too much, If there are any issues 
copying the data, we can just go ahead and kill the executor and fall back to 
the current behavior. I've done some preliminary work on this and the changes 
to existing code will be minimal and unlikely to damage existing functionality.


was (Author: bradkaiser):
Hey Sean, thanks for your input. I would definitely like to do some benchmarks 
and show that the recovered data leads to a performance improvement. I don't 
think it will increase complexity too much, If there are any issues copying the 
data, we can just go ahead and kill the executor and fall back to the current 
behavior. I've done some preliminary work on this and the changes to existing 
code will be minimal and unlikely to damage existing functionality.

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config like "spark.dynamicAllocation.recoverCachedData". Now when an 
> executor reaches its configured idle timeout, instead of just killing it on 
> the spot, we will stop sending it new tasks, replicate all of its rdd blocks 
> onto other executors, and then kill it. If there is an issue while we 
> replicate the data, like an error, it takes too long, or there isn't enough 
> space, then we will fall back to the original behavior and drop the data and 
> kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data

2017-06-14 Thread Brad (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16049485#comment-16049485
 ] 

Brad commented on SPARK-21097:
--

Hey Sean, thanks for your input. I would definitely like to do some benchmarks 
and show that the recovered data leads to a performance improvement. I don't 
think it will increase complexity too much, If there are any issues copying the 
data, we can just go ahead and kill the executor and fall back to the current 
behavior. I've done some preliminary work on this and the changes to existing 
code will be minimal and unlikely to damage existing functionality.

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config like "spark.dynamicAllocation.recoverCachedData". Now when an 
> executor reaches its configured idle timeout, instead of just killing it on 
> the spot, we will stop sending it new tasks, replicate all of its rdd blocks 
> onto other executors, and then kill it. If there is an issue while we 
> replicate the data, like an error, it takes too long, or there isn't enough 
> space, then we will fall back to the original behavior and drop the data and 
> kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data

2017-06-14 Thread Brad (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16049443#comment-16049443
 ] 

Brad commented on SPARK-21097:
--

I am working on this now and will be posting a more detailed design document 
shortly. I am definitely open to any collaboration or input. 

> Dynamic allocation will preserve cached data
> 
>
> Key: SPARK-21097
> URL: https://issues.apache.org/jira/browse/SPARK-21097
> Project: Spark
>  Issue Type: Improvement
>  Components: Block Manager, Scheduler, Spark Core
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Brad
>
> We want to use dynamic allocation to distribute resources among many notebook 
> users on our spark clusters. One difficulty is that if a user has cached data 
> then we are either prevented from de-allocating any of their executors, or we 
> are forced to drop their cached data, which can lead to a bad user experience.
> We propose adding a feature to preserve cached data by copying it to other 
> executors before de-allocation. This behavior would be enabled by a simple 
> spark config like "spark.dynamicAllocation.recoverCachedData". Now when an 
> executor reaches its configured idle timeout, instead of just killing it on 
> the spot, we will stop sending it new tasks, replicate all of its rdd blocks 
> onto other executors, and then kill it. If there is an issue while we 
> replicate the data, like an error, it takes too long, or there isn't enough 
> space, then we will fall back to the original behavior and drop the data and 
> kill the executor.
> This feature should allow anyone with notebook users to use their cluster 
> resources more efficiently. Also since it will be completely opt-in it will 
> unlikely to cause problems for other use cases. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21097) Dynamic allocation will preserve cached data

2017-06-14 Thread Brad (JIRA)
Brad created SPARK-21097:


 Summary: Dynamic allocation will preserve cached data
 Key: SPARK-21097
 URL: https://issues.apache.org/jira/browse/SPARK-21097
 Project: Spark
  Issue Type: Improvement
  Components: Block Manager, Scheduler, Spark Core
Affects Versions: 2.2.0, 2.3.0
Reporter: Brad


We want to use dynamic allocation to distribute resources among many notebook 
users on our spark clusters. One difficulty is that if a user has cached data 
then we are either prevented from de-allocating any of their executors, or we 
are forced to drop their cached data, which can lead to a bad user experience.

We propose adding a feature to preserve cached data by copying it to other 
executors before de-allocation. This behavior would be enabled by a simple 
spark config like "spark.dynamicAllocation.recoverCachedData". Now when an 
executor reaches its configured idle timeout, instead of just killing it on the 
spot, we will stop sending it new tasks, replicate all of its rdd blocks onto 
other executors, and then kill it. If there is an issue while we replicate the 
data, like an error, it takes too long, or there isn't enough space, then we 
will fall back to the original behavior and drop the data and kill the executor.

This feature should allow anyone with notebook users to use their cluster 
resources more efficiently. Also since it will be completely opt-in it will 
unlikely to cause problems for other use cases. 




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org