Ngone51 opened a new pull request #28357: URL: https://github.com/apache/spark/pull/28357
### What changes were proposed in this pull request? Use `TaskSetManger.abort` to abort a barrier stage instead of throwing exception within `resourceOffers`. ### Why are the changes needed? Any non fatal exception thrown within Spark RPC framework can be swallowed: https://github.com/apache/spark/blob/100fc58da54e026cda87832a10e2d06eaeccdf87/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala#L202-L211 The method `TaskSchedulerImpl.resourceOffers` is also within the scope of Spark RPC framework. Thus, throw exception inside `resourceOffers` won't fail the application. As a result, if a barrier stage fail the require check at `require(addressesWithDescs.size == taskSet.numTasks, ...)`, the barrier stage will fail the check again and again util all tasks from `TaskSetManager` dequeued. But since the barrier stage isn't really executed, the application will hang. The issue can be reproduced by the following test: ```scala initLocalClusterSparkContext(2) val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2) val dep = new OneToOneDependency[Int](rdd0) val rdd = new MyRDD(sc, 2, List(dep), Seq(Seq("executor_h_0"),Seq("executor_h_0"))) rdd.barrier().mapPartitions { iter => BarrierTaskContext.get().barrier() iter }.collect() ``` ### Does this PR introduce any user-facing change? Yes, application hang previously but fail-fast after this fix. ### How was this patch tested? Added a regression test. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
