weixiuli opened a new pull request, #36141:
URL: https://github.com/apache/spark/pull/36141

   
   ### What changes were proposed in this pull request?
   When enabled push-based shuffle in our production, there will be a 
rejectedExecutionException error, this is  because that the shuffle pusher pool 
has been shutdowned before using it. 
   
   This is the rejectedExecutionException error :
   ```
   FetchFailed(BlockManagerId(26,xxxxx.hadoop.jd.local, 7337, None), 
shuffleId=0, mapIndex=6424, mapId=4177, reduceId=1031, message=
   org.apache.spark.shuffle.FetchFailedException
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1181)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:919)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:81)
        at 
org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:815)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:133)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.util.concurrent.RejectedExecutionException: Task 
org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2$$Lambda$1045/583658475@3492bd6f
 rejected from java.util.concurrent.ThreadPoolExecutor@2e63bad5[Terminated, 
pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 243134]
        at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at 
org.apache.spark.shuffle.ShuffleBlockPusher.submitTask(ShuffleBlockPusher.scala:147)
        at 
org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.handleResult(ShuffleBlockPusher.scala:235)
        at 
org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockPushSuccess(ShuffleBlockPusher.scala:245)
        at 
org.apache.spark.network.shuffle.BlockPushingListener.onBlockTransferSuccess(BlockPushingListener.java:42)
        at 
org.apache.spark.shuffle.ShuffleBlockPusher$$anon$2.onBlockTransferSuccess(ShuffleBlockPusher.scala:224)
        at 
org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.handleBlockTransferSuccess(RetryingBlockTransferor.java:258)
        at 
org.apache.spark.network.shuffle.RetryingBlockTransferor$RetryingBlockTransferListener.onBlockPushSuccess(RetryingBlockTransferor.java:304)
        at 
org.apache.spark.network.shuffle.OneForOneBlockPusher$BlockPushCallback.onSuccess(OneForOneBlockPusher.java:97)
        at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:197)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more
   
   )
   ```
   
   ### Why are the changes needed?
   To fix it
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Pass CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to