Hi Spark users, I am trying to use the executor and storage decommissioning feature in conjunction with an S3 fallback storage, on a K8S cluster.
I am wondering how mature this feature is considered, given it is around for quite some time. Has this been used in anger on K8S? What is your experience with this setup? The most severe issue that I am seeing is when an executor gets decommissioned while other executors are reading from it, they see the following error: FetchFailed(BlockManagerId(2, 21.33.215.240, 30001, None), shuffleId=0, mapIndex=1235, mapId=1235, reduceId=18, message= org.apache.spark.shuffle.FetchFailedException at org.apache.spark.errors.SparkCoreErrors$.fetchFailedError(SparkCoreErrors.scala:437) at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1239) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:971) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:86) ... Caused by: org.apache.spark.ExecutorDeadException: [INTERNAL_ERROR_NETWORK] The relative remote executor(Id: 2), which maintains the block data to fetch is dead. at org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:141) at org.apache.spark.network.shuffle.RetryingBlockTransferor.transferAllOutstanding(RetryingBlockTransferor.java:173) at org.apache.spark.network.shuffle.RetryingBlockTransferor.start(RetryingBlockTransferor.java:152) at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:151) at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:374) at org.apache.spark.storage.ShuffleBlockFetcherIterator.send$1(ShuffleBlockFetcherIterator.scala:1209) at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:1201) at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:715) at org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:194) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:73) ... This causes a stage retry, which is expensive and has a maximum retry limit. This sounds exactly what has been reported inhttps://issues.apache.org/jira/browse/SPARK-44389. Has anyone seen this and managed to work around it? Ideally, the ShuffleBlockFetcherIterator would fetch the new location of the shuffle block and immediately attempts reading from there. Cheers, Enrico