Brad created SPARK-22618:
----------------------------

             Summary: RDD.unpersist can cause fatal exception when used with 
dynamic allocation
                 Key: SPARK-22618
                 URL: https://issues.apache.org/jira/browse/SPARK-22618
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.3.0
            Reporter: Brad
            Priority: Minor
             Fix For: 2.3.0


If you use rdd.unpersist() with dynamic allocation, then an executor can be 
deallocated while your rdd is being removed, which will throw an uncaught 
exception killing your job. 

I looked into different ways of preventing this error from occurring but 
couldn't come up with anything that wouldn't require a big change. I propose 
the best fix is just to catch and log IOExceptions in unpersist() so they don't 
kill your job. This will match the effective behavior when executors are lost 
from dynamic allocation in other parts of the code.

In the worst case scenario I think this could lead to RDD partitions getting 
left on executors after they were unpersisted, but this is probably better than 
the whole job failing. I think in most cases the IOException would be due to 
the executor dieing for some reason, which is effectively the same result as 
unpersisting the rdd from that executor anyway.

I noticed this exception in a job that loads a 100GB dataset on a cluster where 
we use dynamic allocation heavily. Here is the relevant stack trace

java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
        at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)
Exception in thread "main" org.apache.spark.SparkException: Exception thrown in 
awaitResult:
        at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
        at 
org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131)
        at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806)
        at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
        at 
com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62)
        at 
com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40)
        at 
com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33)
        at 
com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
        at 
com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:285)
        at 
com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78)
        at 
com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:52)
        at 
com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:47)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at 
com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.run(SuiteKickoff.scala:47)
        at 
com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$MultipleSuiteKickoff$$runSuitesSerially$1.apply(MultipleSuiteKickoff.scala:24)
        at 
com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$MultipleSuiteKickoff$$runSuitesSerially$1.apply(MultipleSuiteKickoff.scala:24)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at 
com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$.com$ibm$sparktc$sparkbench$workload$MultipleSuiteKickoff$$runSuitesSerially(MultipleSuiteKickoff.scala:24)
        at 
com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$run$1.apply(MultipleSuiteKickoff.scala:13)
        at 
com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$run$1.apply(MultipleSuiteKickoff.scala:10)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at 
com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$.run(MultipleSuiteKickoff.scala:10)
        at com.ibm.sparktc.sparkbench.cli.CLIKickoff$.main(CLIKickoff.scala:16)
        at com.ibm.sparktc.sparkbench.cli.CLIKickoff.main(CLIKickoff.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:843)
        at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:188)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:218)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
        at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
        at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to