[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554404#comment-16554404 ] Brad commented on SPARK-21097: -- Hi [~menelaus], I have stalled out on this project, if you would like to work on it feel free. There is a *lot* of discussion on the PR (it might take a couple of attempts to load the page). The last recommendation from Squito was that I open up a SPIP to get more input. I'm not sure if the final state of the code is 100% correct at this point, I was making a lot of changes at the end. Thanks Brad > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad >Priority: Major > Attachments: Preserving Cached Data with Dynamic Allocation.pdf > > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config. Now when an executor reaches its configured idle timeout, > instead of just killing it on the spot, we will stop sending it new tasks, > replicate all of its rdd blocks onto other executors, and then kill it. If > there is an issue while we replicate the data, like an error, it takes too > long, or there isn't enough space, then we will fall back to the original > behavior and drop the data and kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16552847#comment-16552847 ] Brad commented on SPARK-21097: -- No, I did not consider cpu utilization when redistributing the cache. It might be a good idea, but I'm not sure how you would implement it. > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad >Priority: Major > Attachments: Preserving Cached Data with Dynamic Allocation.pdf > > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config. Now when an executor reaches its configured idle timeout, > instead of just killing it on the spot, we will stop sending it new tasks, > replicate all of its rdd blocks onto other executors, and then kill it. If > there is an issue while we replicate the data, like an error, it takes too > long, or there isn't enough space, then we will fall back to the original > behavior and drop the data and kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545240#comment-16545240 ] Brad commented on SPARK-21097: -- Hi [~menelaus] The processing time delay is just a way to simulate different sized processing workloads. In the test I just do a map over the dataframe and spin for a few microseconds on each row as configured. In the case of 0 µs there it's like there is almost no processing on the data. There is still the time of loading the data from hadoop. The dynamic allocation without recovery benchmark is much slower in the 0 µs cached data case because it has lost all of its cached data and has to reload from hadoop. You can see the performance is similar to the initial load. Thanks Brad > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad >Priority: Major > Attachments: Preserving Cached Data with Dynamic Allocation.pdf > > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config. Now when an executor reaches its configured idle timeout, > instead of just killing it on the spot, we will stop sending it new tasks, > replicate all of its rdd blocks onto other executors, and then kill it. If > there is an issue while we replicate the data, like an error, it takes too > long, or there isn't enough space, then we will fall back to the original > behavior and drop the data and kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540195#comment-16540195 ] Brad commented on SPARK-21097: -- Hi [~jothor], Thanks for the interest, but to be honest I got a new job and stopped working on this a while ago. I used spark-bench to run the tests available here, [https://github.com/CODAIT/spark-bench,] but my config scripts and custom workload are long gone. I'd be happy to answer any specific questions you have though. Thanks Brad > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad >Priority: Major > Attachments: Preserving Cached Data with Dynamic Allocation.pdf > > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config. Now when an executor reaches its configured idle timeout, > instead of just killing it on the spot, we will stop sending it new tasks, > replicate all of its rdd blocks onto other executors, and then kill it. If > there is an issue while we replicate the data, like an error, it takes too > long, or there isn't enough space, then we will fall back to the original > behavior and drop the data and kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16419062#comment-16419062 ] Brad commented on SPARK-22618: -- Yeah the fix for broadcaset unpersist should be basically the same. Thanks Thomas. > RDD.unpersist can cause fatal exception when used with dynamic allocation > - > > Key: SPARK-22618 > URL: https://issues.apache.org/jira/browse/SPARK-22618 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Brad >Assignee: Brad >Priority: Minor > Fix For: 2.3.0 > > > If you use rdd.unpersist() with dynamic allocation, then an executor can be > deallocated while your rdd is being removed, which will throw an uncaught > exception killing your job. > I looked into different ways of preventing this error from occurring but > couldn't come up with anything that wouldn't require a big change. I propose > the best fix is just to catch and log IOExceptions in unpersist() so they > don't kill your job. This will match the effective behavior when executors > are lost from dynamic allocation in other parts of the code. > In the worst case scenario I think this could lead to RDD partitions getting > left on executors after they were unpersisted, but this is probably better > than the whole job failing. I think in most cases the IOException would be > due to the executor dieing for some reason, which is effectively the same > result as unpersisting the rdd from that executor anyway. > I noticed this exception in a job that loads a 100GB dataset on a cluster > where we use dynamic allocation heavily. Here is the relevant stack trace > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) > Exception in thread "main" org.apache.spark.SparkException: Exception thrown > in awaitResult: > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at > org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131) > at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806) > at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62) > at > com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at >
[jira] [Updated] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brad updated SPARK-21097: - Description: We want to use dynamic allocation to distribute resources among many notebook users on our spark clusters. One difficulty is that if a user has cached data then we are either prevented from de-allocating any of their executors, or we are forced to drop their cached data, which can lead to a bad user experience. We propose adding a feature to preserve cached data by copying it to other executors before de-allocation. This behavior would be enabled by a simple spark config. Now when an executor reaches its configured idle timeout, instead of just killing it on the spot, we will stop sending it new tasks, replicate all of its rdd blocks onto other executors, and then kill it. If there is an issue while we replicate the data, like an error, it takes too long, or there isn't enough space, then we will fall back to the original behavior and drop the data and kill the executor. This feature should allow anyone with notebook users to use their cluster resources more efficiently. Also since it will be completely opt-in it will unlikely to cause problems for other use cases. was: We want to use dynamic allocation to distribute resources among many notebook users on our spark clusters. One difficulty is that if a user has cached data then we are either prevented from de-allocating any of their executors, or we are forced to drop their cached data, which can lead to a bad user experience. We propose adding a feature to preserve cached data by copying it to other executors before de-allocation. This behavior would be enabled by a simple spark config like "spark.dynamicAllocation.recoverCachedData". Now when an executor reaches its configured idle timeout, instead of just killing it on the spot, we will stop sending it new tasks, replicate all of its rdd blocks onto other executors, and then kill it. If there is an issue while we replicate the data, like an error, it takes too long, or there isn't enough space, then we will fall back to the original behavior and drop the data and kill the executor. This feature should allow anyone with notebook users to use their cluster resources more efficiently. Also since it will be completely opt-in it will unlikely to cause problems for other use cases. > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad >Priority: Major > Attachments: Preserving Cached Data with Dynamic Allocation.pdf > > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config. Now when an executor reaches its configured idle timeout, > instead of just killing it on the spot, we will stop sending it new tasks, > replicate all of its rdd blocks onto other executors, and then kill it. If > there is an issue while we replicate the data, like an error, it takes too > long, or there isn't enough space, then we will fall back to the original > behavior and drop the data and kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brad updated SPARK-22618: - Fix Version/s: (was: 2.3.0) > RDD.unpersist can cause fatal exception when used with dynamic allocation > - > > Key: SPARK-22618 > URL: https://issues.apache.org/jira/browse/SPARK-22618 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Brad >Priority: Minor > > If you use rdd.unpersist() with dynamic allocation, then an executor can be > deallocated while your rdd is being removed, which will throw an uncaught > exception killing your job. > I looked into different ways of preventing this error from occurring but > couldn't come up with anything that wouldn't require a big change. I propose > the best fix is just to catch and log IOExceptions in unpersist() so they > don't kill your job. This will match the effective behavior when executors > are lost from dynamic allocation in other parts of the code. > In the worst case scenario I think this could lead to RDD partitions getting > left on executors after they were unpersisted, but this is probably better > than the whole job failing. I think in most cases the IOException would be > due to the executor dieing for some reason, which is effectively the same > result as unpersisting the rdd from that executor anyway. > I noticed this exception in a job that loads a 100GB dataset on a cluster > where we use dynamic allocation heavily. Here is the relevant stack trace > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) > Exception in thread "main" org.apache.spark.SparkException: Exception thrown > in awaitResult: > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at > org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131) > at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806) > at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62) > at > com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:52) > at >
[jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation
[ https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16267223#comment-16267223 ] Brad commented on SPARK-22618: -- I have a PR for this forthcoming. > RDD.unpersist can cause fatal exception when used with dynamic allocation > - > > Key: SPARK-22618 > URL: https://issues.apache.org/jira/browse/SPARK-22618 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Brad >Priority: Minor > Fix For: 2.3.0 > > > If you use rdd.unpersist() with dynamic allocation, then an executor can be > deallocated while your rdd is being removed, which will throw an uncaught > exception killing your job. > I looked into different ways of preventing this error from occurring but > couldn't come up with anything that wouldn't require a big change. I propose > the best fix is just to catch and log IOExceptions in unpersist() so they > don't kill your job. This will match the effective behavior when executors > are lost from dynamic allocation in other parts of the code. > In the worst case scenario I think this could lead to RDD partitions getting > left on executors after they were unpersisted, but this is probably better > than the whole job failing. I think in most cases the IOException would be > due to the executor dieing for some reason, which is effectively the same > result as unpersisting the rdd from that executor anyway. > I noticed this exception in a job that loads a 100GB dataset on a cluster > where we use dynamic allocation heavily. Here is the relevant stack trace > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) > Exception in thread "main" org.apache.spark.SparkException: Exception thrown > in awaitResult: > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at > org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131) > at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806) > at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62) > at > com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40) > at > com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at > com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78) > at >
[jira] [Created] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation
Brad created SPARK-22618: Summary: RDD.unpersist can cause fatal exception when used with dynamic allocation Key: SPARK-22618 URL: https://issues.apache.org/jira/browse/SPARK-22618 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.0 Reporter: Brad Priority: Minor Fix For: 2.3.0 If you use rdd.unpersist() with dynamic allocation, then an executor can be deallocated while your rdd is being removed, which will throw an uncaught exception killing your job. I looked into different ways of preventing this error from occurring but couldn't come up with anything that wouldn't require a big change. I propose the best fix is just to catch and log IOExceptions in unpersist() so they don't kill your job. This will match the effective behavior when executors are lost from dynamic allocation in other parts of the code. In the worst case scenario I think this could lead to RDD partitions getting left on executors after they were unpersisted, but this is probably better than the whole job failing. I think in most cases the IOException would be due to the executor dieing for some reason, which is effectively the same result as unpersisting the rdd from that executor anyway. I noticed this exception in a job that loads a 100GB dataset on a cluster where we use dynamic allocation heavily. Here is the relevant stack trace java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) Exception in thread "main" org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131) at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806) at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217) at com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62) at com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40) at com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33) at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78) at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:52) at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:47) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.Range.foreach(Range.scala:160) at
[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16145708#comment-16145708 ] Brad commented on SPARK-21097: -- Here is a document with some of my benchmark results. I am working on adding more benchmarks. https://docs.google.com/document/d/1E6_rhAAJB8Ww0n52-LYcFTO1zhJBWgfIXzNjLi29730/edit?usp=sharing > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad > Attachments: Preserving Cached Data with Dynamic Allocation.pdf > > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config like "spark.dynamicAllocation.recoverCachedData". Now when an > executor reaches its configured idle timeout, instead of just killing it on > the spot, we will stop sending it new tasks, replicate all of its rdd blocks > onto other executors, and then kill it. If there is an issue while we > replicate the data, like an error, it takes too long, or there isn't enough > space, then we will fall back to the original behavior and drop the data and > kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113264#comment-16113264 ] Brad commented on SPARK-21097: -- I'm still working on thoroughly benchmarking and testing this change. If anyone is interested in this, send me a message. Thanks > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad > Attachments: Preserving Cached Data with Dynamic Allocation.pdf > > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config like "spark.dynamicAllocation.recoverCachedData". Now when an > executor reaches its configured idle timeout, instead of just killing it on > the spot, we will stop sending it new tasks, replicate all of its rdd blocks > onto other executors, and then kill it. If there is an issue while we > replicate the data, like an error, it takes too long, or there isn't enough > space, then we will fall back to the original behavior and drop the data and > kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brad updated SPARK-21097: - Attachment: Preserving Cached Data with Dynamic Allocation.pdf > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad > Attachments: Preserving Cached Data with Dynamic Allocation.pdf > > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config like "spark.dynamicAllocation.recoverCachedData". Now when an > executor reaches its configured idle timeout, instead of just killing it on > the spot, we will stop sending it new tasks, replicate all of its rdd blocks > onto other executors, and then kill it. If there is an issue while we > replicate the data, like an error, it takes too long, or there isn't enough > space, then we will fall back to the original behavior and drop the data and > kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brad updated SPARK-21097: - Attachment: (was: Preserving Cached Data with Dynamic Allocation.docx) > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config like "spark.dynamicAllocation.recoverCachedData". Now when an > executor reaches its configured idle timeout, instead of just killing it on > the spot, we will stop sending it new tasks, replicate all of its rdd blocks > onto other executors, and then kill it. If there is an issue while we > replicate the data, like an error, it takes too long, or there isn't enough > space, then we will fall back to the original behavior and drop the data and > kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brad updated SPARK-21097: - Attachment: (was: Preserving Cached Data with Dynamic Allocation.docx) > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config like "spark.dynamicAllocation.recoverCachedData". Now when an > executor reaches its configured idle timeout, instead of just killing it on > the spot, we will stop sending it new tasks, replicate all of its rdd blocks > onto other executors, and then kill it. If there is an issue while we > replicate the data, like an error, it takes too long, or there isn't enough > space, then we will fall back to the original behavior and drop the data and > kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brad updated SPARK-21097: - Attachment: Preserving Cached Data with Dynamic Allocation.docx > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config like "spark.dynamicAllocation.recoverCachedData". Now when an > executor reaches its configured idle timeout, instead of just killing it on > the spot, we will stop sending it new tasks, replicate all of its rdd blocks > onto other executors, and then kill it. If there is an issue while we > replicate the data, like an error, it takes too long, or there isn't enough > space, then we will fall back to the original behavior and drop the data and > kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brad updated SPARK-21097: - Attachment: Preserving Cached Data with Dynamic Allocation.docx > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad > Attachments: Preserving Cached Data with Dynamic Allocation.docx > > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config like "spark.dynamicAllocation.recoverCachedData". Now when an > executor reaches its configured idle timeout, instead of just killing it on > the spot, we will stop sending it new tasks, replicate all of its rdd blocks > onto other executors, and then kill it. If there is an issue while we > replicate the data, like an error, it takes too long, or there isn't enough > space, then we will fall back to the original behavior and drop the data and > kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16049485#comment-16049485 ] Brad edited comment on SPARK-21097 at 6/14/17 6:29 PM: --- Hey [~srowen], thanks for your input. I would definitely like to do some benchmarks and show that the recovered data leads to a performance improvement. I don't think it will increase complexity too much, If there are any issues copying the data, we can just go ahead and kill the executor and fall back to the current behavior. I've done some preliminary work on this and the changes to existing code will be minimal and unlikely to damage existing functionality. was (Author: bradkaiser): Hey Sean, thanks for your input. I would definitely like to do some benchmarks and show that the recovered data leads to a performance improvement. I don't think it will increase complexity too much, If there are any issues copying the data, we can just go ahead and kill the executor and fall back to the current behavior. I've done some preliminary work on this and the changes to existing code will be minimal and unlikely to damage existing functionality. > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config like "spark.dynamicAllocation.recoverCachedData". Now when an > executor reaches its configured idle timeout, instead of just killing it on > the spot, we will stop sending it new tasks, replicate all of its rdd blocks > onto other executors, and then kill it. If there is an issue while we > replicate the data, like an error, it takes too long, or there isn't enough > space, then we will fall back to the original behavior and drop the data and > kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16049485#comment-16049485 ] Brad commented on SPARK-21097: -- Hey Sean, thanks for your input. I would definitely like to do some benchmarks and show that the recovered data leads to a performance improvement. I don't think it will increase complexity too much, If there are any issues copying the data, we can just go ahead and kill the executor and fall back to the current behavior. I've done some preliminary work on this and the changes to existing code will be minimal and unlikely to damage existing functionality. > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config like "spark.dynamicAllocation.recoverCachedData". Now when an > executor reaches its configured idle timeout, instead of just killing it on > the spot, we will stop sending it new tasks, replicate all of its rdd blocks > onto other executors, and then kill it. If there is an issue while we > replicate the data, like an error, it takes too long, or there isn't enough > space, then we will fall back to the original behavior and drop the data and > kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21097) Dynamic allocation will preserve cached data
[ https://issues.apache.org/jira/browse/SPARK-21097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16049443#comment-16049443 ] Brad commented on SPARK-21097: -- I am working on this now and will be posting a more detailed design document shortly. I am definitely open to any collaboration or input. > Dynamic allocation will preserve cached data > > > Key: SPARK-21097 > URL: https://issues.apache.org/jira/browse/SPARK-21097 > Project: Spark > Issue Type: Improvement > Components: Block Manager, Scheduler, Spark Core >Affects Versions: 2.2.0, 2.3.0 >Reporter: Brad > > We want to use dynamic allocation to distribute resources among many notebook > users on our spark clusters. One difficulty is that if a user has cached data > then we are either prevented from de-allocating any of their executors, or we > are forced to drop their cached data, which can lead to a bad user experience. > We propose adding a feature to preserve cached data by copying it to other > executors before de-allocation. This behavior would be enabled by a simple > spark config like "spark.dynamicAllocation.recoverCachedData". Now when an > executor reaches its configured idle timeout, instead of just killing it on > the spot, we will stop sending it new tasks, replicate all of its rdd blocks > onto other executors, and then kill it. If there is an issue while we > replicate the data, like an error, it takes too long, or there isn't enough > space, then we will fall back to the original behavior and drop the data and > kill the executor. > This feature should allow anyone with notebook users to use their cluster > resources more efficiently. Also since it will be completely opt-in it will > unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21097) Dynamic allocation will preserve cached data
Brad created SPARK-21097: Summary: Dynamic allocation will preserve cached data Key: SPARK-21097 URL: https://issues.apache.org/jira/browse/SPARK-21097 Project: Spark Issue Type: Improvement Components: Block Manager, Scheduler, Spark Core Affects Versions: 2.2.0, 2.3.0 Reporter: Brad We want to use dynamic allocation to distribute resources among many notebook users on our spark clusters. One difficulty is that if a user has cached data then we are either prevented from de-allocating any of their executors, or we are forced to drop their cached data, which can lead to a bad user experience. We propose adding a feature to preserve cached data by copying it to other executors before de-allocation. This behavior would be enabled by a simple spark config like "spark.dynamicAllocation.recoverCachedData". Now when an executor reaches its configured idle timeout, instead of just killing it on the spot, we will stop sending it new tasks, replicate all of its rdd blocks onto other executors, and then kill it. If there is an issue while we replicate the data, like an error, it takes too long, or there isn't enough space, then we will fall back to the original behavior and drop the data and kill the executor. This feature should allow anyone with notebook users to use their cluster resources more efficiently. Also since it will be completely opt-in it will unlikely to cause problems for other use cases. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org