Hi Spark users,

I am trying to use the executor and storage decommissioning feature in 
conjunction with an S3 fallback storage, on a K8S cluster.

I am wondering how mature this feature is considered, given it is around for 
quite some time. Has this been used in anger on K8S? What is your experience 
with this setup?

The most severe issue that I am seeing is when an executor gets decommissioned 
while other executors are reading from it, they see the following error:

FetchFailed(BlockManagerId(2, 21.33.215.240, 30001, None), shuffleId=0, 
mapIndex=1235, mapId=1235, reduceId=18, message=
org.apache.spark.shuffle.FetchFailedException
        at 
org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:437)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1239)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:971)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:86)
        ...
Caused by: org.apache.spark.ExecutorDeadException: [INTERNAL_ERROR_NETWORK] The 
relative remote executor(Id: 2), which maintains the block data to fetch is 
dead.
        at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:141)
        at 
org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173)
        at 
org.apache.spark.network.shuffle.RetryingBlockTransferor.start(RetryingBlockTransferor.java:152)
        at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:151)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:374)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1209)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1201)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:715)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:194)
        at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:73)
        ...

This causes a stage retry, which is expensive and has a maximum retry limit.

This sounds exactly what has been reported 
inhttps://issues.apache.org/jira/browse/SPARK-44389. Has anyone seen this and 
managed to work around it?

Ideally, the ShuffleBlockFetcherIterator would fetch the new location of the 
shuffle block and immediately attempts reading from there.


Cheers,
Enrico

Reply via email to