[GitHub] [spark] jiangxb1987 commented on a change in pull request #28746: [SPARK-31922][CORE] Fix "RpcEnv already stopped" error when exit spark-shell with local-cluster mode
jiangxb1987 commented on a change in pull request #28746: URL: https://github.com/apache/spark/pull/28746#discussion_r440589823 ## File path: core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala ## @@ -63,23 +65,34 @@ class LocalSparkCluster( /* Start the Workers */ for (workerNum <- 1 to numWorkers) { - val workerEnv = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, coresPerWorker, -memoryPerWorker, masters, null, Some(workerNum), _conf, -conf.get(config.Worker.SPARK_WORKER_RESOURCE_FILE)) + val (workerEnv, workerRef) = Worker.startRpcEnvAndEndpoint(localHostname, 0, 0, +coresPerWorker, memoryPerWorker, masters, null, Some(workerNum), _conf, +conf.get(config.Worker.SPARK_WORKER_RESOURCE_FILE), isLocalCluster = true) workerRpcEnvs += workerEnv + workerRefs += workerRef } masters } def stop(): Unit = { logInfo("Shutting down local Spark cluster.") +// SPARK-31922: make sure all the workers have handled the messages(`KillExecutor`, +// `ApplicationFinished`) from the Master before we shutdown the workers' rpcEnvs. +// Otherwise, we could hit "RpcEnv already stopped" error. +var busyWorkers = workerRefs +while (busyWorkers.nonEmpty) { Review comment: This is wrong, what if worker didn't reply and throw RPCTimeoutException? Actually I doubt whether it worth to make so many changes to avoid an error message that doesn't hurt anything. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jiangxb1987 commented on a change in pull request #28746: [SPARK-31922][CORE] Fix "RpcEnv already stopped" error when exit spark-shell with local-cluster mode
jiangxb1987 commented on a change in pull request #28746: URL: https://github.com/apache/spark/pull/28746#discussion_r436493879 ## File path: core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala ## @@ -74,6 +74,10 @@ class LocalSparkCluster( def stop(): Unit = { logInfo("Shutting down local Spark cluster.") +// SPARK-31922: wait one more second before shutting down rpcEnvs of master and worker, +// in order to let the cluster have time to handle the `UnregisterApplication` message. +// Otherwise, we could hit "RpcEnv already stopped" error. +Thread.sleep(1000) Review comment: I don't think this is ideal to handle the issue, there can always be lags for various reasons so the error message may still appear. Ideally we should get to know the LocalCluster is stopped on the Master side and don't output the RpcEnv stopped error message but I'm not sure how hard it is to do so. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] jiangxb1987 commented on a change in pull request #28746: [SPARK-31922][CORE] Fix "RpcEnv already stopped" error when exit spark-shell with local-cluster mode
jiangxb1987 commented on a change in pull request #28746: URL: https://github.com/apache/spark/pull/28746#discussion_r436492690 ## File path: core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala ## @@ -74,12 +74,17 @@ class LocalSparkCluster( def stop(): Unit = { logInfo("Shutting down local Spark cluster.") +// SPARK-31922: wait one more second before shutting down rpcEnvs of master and worker, +// in order to let the cluster have time to handle the `UnregisterApplication` message. +// Otherwise, we could hit "RpcEnv already stopped" error. +Thread.sleep(1000) // Stop the workers before the master so they don't get upset that it disconnected -workerRpcEnvs.foreach(_.shutdown()) -masterRpcEnvs.foreach(_.shutdown()) -workerRpcEnvs.foreach(_.awaitTermination()) -masterRpcEnvs.foreach(_.awaitTermination()) -masterRpcEnvs.clear() -workerRpcEnvs.clear() +Seq(workerRpcEnvs, masterRpcEnvs).foreach { rpcEnvArr => + rpcEnvArr.foreach { rpcEnv => Utils.tryLog { +rpcEnv.shutdown() +rpcEnv.awaitTermination() + }} + rpcEnvArr.clear() Review comment: I doubt this is right, it seems previously the masterRpcEnvs are cleared before workerRpcEnvs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org