pan3793 opened a new pull request #35146:
URL: https://github.com/apache/spark/pull/35146


   ### What changes were proposed in this pull request?
   
   Change the order of `isStale || isTooLate` to leverage `||` short circuit to 
avoid NPE。
   
   ### Why are the changes needed?
   
   There is a chance that late push shuffle block request 
`PushBlockStreamCallback#onData` invoked after the merge partition finalized, 
which causes NPE.
   
   ```
   2022-01-07 21:06:14,464 INFO shuffle.RemoteBlockPushResolver: shuffle 
partition application_1640143179334_0149_-1 102 6922, chunk_size=1, 
meta_length=138, data_length=112632
   2022-01-07 21:06:14,615 ERROR shuffle.RemoteBlockPushResolver: Encountered 
issue when merging shufflePush_102_0_279_6922
   java.lang.NullPointerException
           at 
org.apache.spark.network.shuffle.RemoteBlockPushResolver$AppShuffleMergePartitionsInfo.access$200(RemoteBlockPushResolver.java:1017)
           at 
org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.isStale(RemoteBlockPushResolver.java:806)
           at 
org.apache.spark.network.shuffle.RemoteBlockPushResolver$PushBlockStreamCallback.onData(RemoteBlockPushResolver.java:840)
           at 
org.apache.spark.network.server.TransportRequestHandler$3.onData(TransportRequestHandler.java:209)
           at 
org.apache.spark.network.client.StreamInterceptor.handle(StreamInterceptor.java:79)
           at 
org.apache.spark.network.util.TransportFrameDecoder.feedInterceptor(TransportFrameDecoder.java:263)
           at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:87)
           at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
           at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
           at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
           at 
org.sparkproject.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
           at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
           at 
org.sparkproject.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
           at 
org.sparkproject.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
           at 
org.sparkproject.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
           at 
org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
           at 
org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
           at 
org.sparkproject.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
           at 
org.sparkproject.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
           at 
org.sparkproject.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
           at 
org.sparkproject.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
           at 
org.sparkproject.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   `isTooLate` checks null but `isStale` does not, so check `isTooLate` first 
to avoid NPE
   ```java
      private boolean isTooLate(
           AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
           int reduceId) {
         return null == appShuffleMergePartitionsInfo ||
           INDETERMINATE_SHUFFLE_FINALIZED == 
appShuffleMergePartitionsInfo.shuffleMergePartitions ||
           
!appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
       }
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   Bugfix, to avoid NPE in Yarn ESS.
   
   ### How was this patch tested?
   I don't think easy to write a unit test for this cause based on current 
code, since it's a minor change, use exsiting ut to ensue the change doesn't 
break the current behaviors.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to