[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20807 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r175156772 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -418,7 +418,19 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } private def sparkContextInitialized(sc: SparkContext) = { +// Notify runDriver function that SparkContext is available sparkContextPromise.success(sc) +// Pause the user class thread in order to make proper initialization in runDriver function. +// When it happened the thread has to be resumed with resumeDriver function. +sparkContextPromise.synchronized { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r175156659 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } + // After initialisation notify user class thread to continue --- End diff -- Moved. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r175151620 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } + // After initialisation notify user class thread to continue --- End diff -- You should remove this comment and add one to the `resumeDriver` method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r175151438 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -418,7 +418,19 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } private def sparkContextInitialized(sc: SparkContext) = { +// Notify runDriver function that SparkContext is available sparkContextPromise.success(sc) +// Pause the user class thread in order to make proper initialization in runDriver function. +// When it happened the thread has to be resumed with resumeDriver function. +sparkContextPromise.synchronized { --- End diff -- Hmm, there's a race now. You're updating the promise outside the lock, so it's possible that the `runDriver` thread can see that and notify the lock before this thread grabs it, so this thread would hang forever in that case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r174983468 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } + // After initialisation notify user class thread to continue + synchronized { notify() } --- End diff -- Moved into `resumeDriver` function right below `sparkContextInitialized`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r174983449 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -417,8 +417,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } - private def sparkContextInitialized(sc: SparkContext) = { + private def sparkContextInitialized(sc: SparkContext) = synchronized { --- End diff -- Used `sparkContextPromise` as lock. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r174983476 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } + // After initialisation notify user class thread to continue --- End diff -- Fixed and switched to US spell checker. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r174964951 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -417,8 +417,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } - private def sparkContextInitialized(sc: SparkContext) = { + private def sparkContextInitialized(sc: SparkContext) = synchronized { --- End diff -- Some other code in this class uses synchronization on `this`, so I think it would be better to synchronize on `sparkContextPromise` in this case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r174965130 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } + // After initialisation notify user class thread to continue + synchronized { notify() } --- End diff -- Since you have to do this in two places, I'd create a method (e.g. `resumeDriver`) close to where `sparkContextInitialized` is declared, so that it's easier to find the context of why this is needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r174965182 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -497,6 +500,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } + // After initialisation notify user class thread to continue --- End diff -- nit: rest of the code uses American spelling ("initialization"), so this should be consistent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r174045577 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -496,7 +497,7 @@ private[yarn] class YarnAllocator( executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId - val executorId = executorIdCounter.toString + val executorId = (initialExecutorIdCounter + executorIdCounter).toString --- End diff -- Yeah, this is an issue only when the application is quite fast. Do you have concerns in general solving this or related the fix in the first commit? Asking it because pausing the user class thread would be definitely better as I've written below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r174035432 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -496,7 +497,7 @@ private[yarn] class YarnAllocator( executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId - val executorId = executorIdCounter.toString + val executorId = (initialExecutorIdCounter + executorIdCounter).toString --- End diff -- I get the point of fix. But also it seems a little strange to me. Besides, do we really need to fix your issue? As I know the case here is not a normal one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r174032329 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -496,7 +497,7 @@ private[yarn] class YarnAllocator( executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId - val executorId = executorIdCounter.toString + val executorId = (initialExecutorIdCounter + executorIdCounter).toString --- End diff -- The initial problem was that initialExecutorIdCounter is coming from the driver which is already stopped. Making this lazy solved this. The other integer is necessary because make it `lazy var` is not possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/20807#discussion_r174027869 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -496,7 +497,7 @@ private[yarn] class YarnAllocator( executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId - val executorId = executorIdCounter.toString + val executorId = (initialExecutorIdCounter + executorIdCounter).toString --- End diff -- it seems a bit strange to me to "add" the Ids? @vanzin @jerryshao --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20807: SPARK-23660: Fix exception in yarn cluster mode w...
GitHub user gaborgsomogyi opened a pull request: https://github.com/apache/spark/pull/20807 SPARK-23660: Fix exception in yarn cluster mode when application ended fast ## What changes were proposed in this pull request? Yarn throws the following exception in cluster mode when the application is really small: ``` 18/03/07 23:34:22 WARN netty.NettyRpcEnv: Ignored failure: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@7c974942 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@1eea9d2d[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 18/03/07 23:34:22 ERROR yarn.ApplicationMaster: Uncaught exception: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76) at org.apache.spark.deploy.yarn.YarnAllocator.(YarnAllocator.scala:102) at org.apache.spark.deploy.yarn.YarnRMClient.register(YarnRMClient.scala:77) at org.apache.spark.deploy.yarn.ApplicationMaster.registerAM(ApplicationMaster.scala:450) at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:493) at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:345) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$5.run(ApplicationMaster.scala:810) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:809) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:834) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) Caused by: org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already stopped. at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:158) at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:135) at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:229) at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91) ... 17 more 18/03/07 23:34:22 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Uncaught exception: org.apache.spark.SparkException: Exception thrown in awaitResult: ) ``` Example application: ``` object ExampleApp { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("ExampleApp") val sc = new SparkContext(conf) try { // Do nothing } finally { sc.stop() } } ``` This PR makes `initialExecutorIdCounter ` lazy. This way `YarnAllocator` can be instantiated even if the driver already ended. ## How was this patch tested? Automated: Additional unit test added Manual: Application submitted into small cluster You can merge this pull request into a Git repository by running: $ git pull https://github.com/gaborgsomogyi/spark SPARK-23660 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20807.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20807 commit 114ac05102c9d563c922447423ec8445bb37e9ef Author: Gabor SomogyiDate: 2018-03-13T04:23:59Z SPARK-23660: Fix exception in yarn cluster mode when application ended fast --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org