LuciferYang opened a new pull request, #37024:
URL: https://github.com/apache/spark/pull/37024

   ### What changes were proposed in this pull request?
   This pr add a `shuffleStatus != null` condition to 
`o.a.s.MapOutputTrackerMaster#unregisterShuffle` method to avoid throwing NPE 
when using Scala 2.13. 
   
   ### Why are the changes needed?
   Ensure that no NPE is thrown when 
`o.a.s.MapOutputTrackerMaster#unregisterShuffle` is called by multiple threads, 
this pr is only for Scala 2.13. 
   
   `o.a.s.MapOutputTrackerMaster#unregisterShuffle` method will be called 
concurrently by the following two paths:
   
   - BlockManagerStorageEndpoint:
   
   
https://github.com/apache/spark/blob/6f1046afa40096f477b29beecca5ca6286dfa7f3/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala#L56-L62
   
   - ContextCleaner:
   
https://github.com/apache/spark/blob/6f1046afa40096f477b29beecca5ca6286dfa7f3/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L234-L241
   
   When test with Scala 2.13, for example `sql/core` module,  there are many 
log as follows,although these did not cause UTs failure:
   
   ```
   17:44:09.957 WARN org.apache.spark.storage.BlockManagerMaster: Failed to 
remove shuffle 87 - null
   java.lang.NullPointerException
        at 
org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1(MapOutputTracker.scala:882)
        at 
org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1$adapted(MapOutputTracker.scala:881)
        at scala.Option.foreach(Option.scala:437)
        at 
org.apache.spark.MapOutputTrackerMaster.unregisterShuffle(MapOutputTracker.scala:881)
        at 
org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:59)
        at 
scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.scala:17)
        at 
org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89)
        at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:678)
        at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   17:44:09.958 ERROR org.apache.spark.ContextCleaner: Error cleaning shuffle 94
   java.lang.NullPointerException
        at 
org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1(MapOutputTracker.scala:882)
        at 
org.apache.spark.MapOutputTrackerMaster.$anonfun$unregisterShuffle$1$adapted(MapOutputTracker.scala:881)
        at scala.Option.foreach(Option.scala:437)
        at 
org.apache.spark.MapOutputTrackerMaster.unregisterShuffle(MapOutputTracker.scala:881)
        at 
org.apache.spark.ContextCleaner.doCleanupShuffle(ContextCleaner.scala:241)
        at 
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3(ContextCleaner.scala:202)
        at 
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3$adapted(ContextCleaner.scala:195)
        at scala.Option.foreach(Option.scala:437)
        at 
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:195)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1432)
        at 
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:189)
        at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:79) 
   ```
   
   I think this is a bug of Scala 2.13.8 and already submit an issue to 
https://github.com/scala/bug/issues/12613, this PR is only for protection, we 
should remove this protection after Scala 2.13 fixes this issue.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   
   - Pass GA
   - Add new test `SPARK-39553: Multi-thread unregister shuffle shouldn't throw 
NPE` to `MapOutputTrackerSuite`, we can test manually as follows:
   
   ```
   dev/change-scala-version.sh 2.13
   mvn clean install -DskipTests -pl core -am -Pscala-2.13
   mvn clean test -pl core -Pscala-2.13 -Dtest=none 
-DwildcardSuites=org.apache.spark.MapOutputTrackerSuite
   ```
   
   **Before**
   
   ```
   - SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE *** 
FAILED ***
     3 did not equal 0 (MapOutputTrackerSuite.scala:971)
   Run completed in 17 seconds, 505 milliseconds.
   Total number of tests run: 25
   Suites: completed 2, aborted 0
   Tests: succeeded 24, failed 1, canceled 0, ignored 1, pending 0
   *** 1 TEST FAILED ***
   ```
   
   **After**
   
   ```
   - SPARK-39553: Multi-thread unregister shuffle shouldn't throw NPE
   Run completed in 17 seconds, 996 milliseconds.
   Total number of tests run: 25
   Suites: completed 2, aborted 0
   Tests: succeeded 25, failed 0, canceled 0, ignored 1, pending 0
   All tests passed.
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to