This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 109b1e4a79d [SPARK-46346][CORE] Fix Master to update a worker from 
`UNKNOWN` to `ALIVE` on `RegisterWorker` msg
109b1e4a79d is described below

commit 109b1e4a79d9a5ec33944887a34c92d453016902
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Sun Dec 10 11:19:46 2023 -0800

    [SPARK-46346][CORE] Fix Master to update a worker from `UNKNOWN` to `ALIVE` 
on `RegisterWorker` msg
    
    ### What changes were proposed in this pull request?
    
    This PR aims to fix `Spark Master`'s recovery process to update a worker 
status from `UNKNOWN` to `ALIVE` when it receives a `RegisterWroker` message 
from that worker.
    
    ### Why are the changes needed?
    
    This only happens during the recovery.
    - `Master` already has the recovered worker information in memory with 
`UNKNOWN` status.
    - `Worker` sends `RegisterWorker` message correctly.
    - `Master` keeps its worker status in `UNKNOWN` and informs the worker with 
`RegisteredWorker` message with `duplicated` flag.
    - Since `Worker` received like the following and will not try to reconnect.
    ```
    23/12/09 23:49:57 INFO Worker: Retrying connection to master (attempt # 3)
    23/12/09 23:49:57 INFO Worker: Connecting to master ...:7077...
    23/12/09 23:50:04 INFO TransportClientFactory: Successfully created 
connection to master...:7077 after 7089 ms (0 ms spent in bootstraps)
    23/12/09 23:50:04 WARN Worker: Duplicate registration at master spark://...
    23/12/09 23:50:04 INFO Worker: Successfully registered with master 
spark://...
    ```
    
    The `UNKNOWN`-status workers blocks the recovery process and causes a long 
delay.
    
    
https://github.com/apache/spark/blob/bac3492980a3e793065a9e9d511ddf0fb66357b3/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L604-L606
    
    After the delay, master simply kills them all.
    
    
https://github.com/apache/spark/blob/bac3492980a3e793065a9e9d511ddf0fb66357b3/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L647-L649
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This case is a little hard to make a unit test.
    Manually test.
    
    - Master
    ```
    23/12/10 04:58:30 WARN OneWayOutboxMessage: Failed to send one-way RPC.
    java.io.IOException: Connecting to /***:1024 timed out (10000 ms)
            at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:291)
            at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214)
            at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:226)
            at 
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
            at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
            at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
            at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
            at java.base/java.lang.Thread.run(Thread.java:840)
    23/12/10 04:58:54 INFO Master: Registering worker ***:1024 with 2 cores, 
23.0 GiB RAM
    23/12/10 04:58:54 INFO Master: Worker has been re-registered: 
worker-20231210045613-***-1024
    ```
    
    - Worker
    ```
    23/12/10 04:58:45 INFO Worker: Retrying connection to master (attempt # 5)
    23/12/10 04:58:45 INFO Worker: Connecting to master master:7077...
    23/12/10 04:58:54 INFO TransportClientFactory: Successfully created 
connection to master/...:7077 after 63957 ms (0 ms spent in bootstraps)
    23/12/10 04:58:54 WARN Worker: Duplicate registration at master 
spark://master:7077
    23/12/10 04:58:54 INFO Worker: Successfully registered with master 
spark://master:7077
    23/12/10 04:58:54 INFO Worker: WorkerWebUI is available at 
https://...-1***-1024
    23/12/10 04:58:54 INFO Worker: Worker cleanup enabled; old application 
directories will be deleted in: /data/spark
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44280 from dongjoon-hyun/SPARK-46346.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 7346c80aff4..a550f44fc0a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -286,6 +286,10 @@ private[deploy] class Master(
       if (state == RecoveryState.STANDBY) {
         workerRef.send(MasterInStandby)
       } else if (idToWorker.contains(id)) {
+        if (idToWorker(id).state == WorkerState.UNKNOWN) {
+          logInfo("Worker has been re-registered: " + id)
+          idToWorker(id).state = WorkerState.ALIVE
+        }
         workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, 
true))
       } else {
         val workerResources =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to