Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2828#issuecomment-59602394
  
    **tl;dr**: _this patch looks pretty good to me based on the testing that 
I've done so far.  For my own interest / fun, I'd like to find a way to extend 
my test coverage to include the "worker-initiated reconnect" and "master 
restart" cases, but my tests shouldn't necessarily block the merging / review 
of this patch._
    
    To summarize my understanding of the failure scenarios that this PR 
addresses:
    
    - A worker becomes disassociated from the master, due to either:
      - The master failing and restarting
      - A transient network issue
    
    These scenarios are similar but there's one subtle distinction.  In the 
first scenario, the master forgets all previously-registered workers.  In the 
second scenario, the master can remember that a worker was 
previously-registered even though it may now be disassociated.
    
    In some of these scenarios, a disconnection may be reflected at the master, 
worker, or both (perhaps at different times).  For example, a master might 
deregister a worker if it has not received Spark-level heartbeats from it, or a 
worker might disassociate from a master due to the Akka failure detector being 
triggered.
     
    After this PR, there are two paths that can lead to a worker reconnection:
    
    - A master (which stayed alive) receives a heartbeat from 
previously-registered but now de-registered worker and asks that worker to 
reconnect.
    - A worker discovers that it has become disassociated from the master and 
attempts to initiate a reconnection.
    
    I've been working on building a Docker-based integration testing framework 
for testing these sorts of Spark Standalone fault-tolerance issues (to 
hopefully be released publicly sometime soon).
    
    I thought it would be interesting to test the "master stays alive but 
deregisters workers due to not receiving heartbeats" case by simulating network 
issues.  In my testing framework, I added a 
[Jepsen](https://github.com/aphyr/jepsen)-inspired network fault-injector which 
updates `iptables` rules in a boot2docker VM in order to temporarily break 
network links.  Here's the actual code that I wrote to test this PR:
    
    ```scala
    test("workers should reconnect to master if disconnected due to transient 
network issues") {
      // Regression test for SPARK-3736
      val env = Seq(
        "SPARK_MASTER_OPTS" -> "-Dspark.worker.timeout=2",
        "SPARK_WORKER_OPTS" -> "-Dspark.worker.timeout=2 -Dspark.akka.timeout=1 
-Dspark.akka.failure-detector.threshold=1 -Dspark.akka.heartbeat.interval=1"
      )
      cluster = SparkClusters.createStandaloneCluster(env, numWorkers = 1)
      val master = cluster.masters.head
      val worker = cluster.workers.head
      master.getState.liveWorkerIPs.size should be (1)
      println("Cluster launched with one worker")
    
      networkFaultInjector.dropTraffic(master.container, worker.container)
      networkFaultInjector.dropTraffic(worker.container, master.container)
      eventually(timeout(30 seconds), interval(1 seconds)) {
        master.getState.liveWorkerIPs.size should be (0)
      }
      println("Master shows that zero workers are registered after network 
connection fails")
    
      networkFaultInjector.restore()
      eventually(timeout(30 seconds), interval(1 seconds)) {
        master.getState.liveWorkerIPs.size should be (1)
      }
      println("Master shows one worker after network connection is restored")
    }
    ```
    
    While running this against the current Spark master: after I kill the 
network connection between the master and worker, the master more-or-less 
immediately times out the worker and disconnects it.  However, the worker 
doesn't realize that it has become deregistered from the master.  This happens 
because the master detects worker liveness using our own heartbeat mechanism, 
whereas the worker detects master liveness using Akka's failure-detection 
mechanisms (to see this, note that the worker's `masterDisconnected()` function 
is only invoked from the `DisassociatedEvent` message handler).
    
    As a result, we end up in a scenario where the master receives a heartbeat 
from the de-registered workers that does not realize that it has been 
deregistered.  This PR addresses this case by having the master explicitly ask 
the worker to reconnect (via the `ReconnectWorker` message).  Thanks to this 
mechanism, my test passes with this PR's code!
    
    I'm still working on testing the case where the worker receives a 
DisassociationEvent and initiates the reconnection itself.  To do this, I'll 
need to figure out how to configure the Akka failure detector so that it 
quickly fails in my testing suite.  I'll also need to add a way to query the 
worker to ask whether it has become disconnected from the master so that I can 
drop packets for long enough in order to cause a disassociation.
    
    For completeness, I should also test the case where I kill the master and 
bring it back up using the same hostname.  This may require a bit of extra 
scaffolding in my framework (which currently uses container IPs rather than 
hostnames that I control), but I think it's doable.
    
    That said, though, the code here seems reasonable.  Don't block on me here 
:smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to