Github user JoshRosen commented on the pull request:
https://github.com/apache/spark/pull/2828#issuecomment-59602394
**tl;dr**: _this patch looks pretty good to me based on the testing that
I've done so far. For my own interest / fun, I'd like to find a way to extend
my test coverage to include the "worker-initiated reconnect" and "master
restart" cases, but my tests shouldn't necessarily block the merging / review
of this patch._
To summarize my understanding of the failure scenarios that this PR
addresses:
- A worker becomes disassociated from the master, due to either:
- The master failing and restarting
- A transient network issue
These scenarios are similar but there's one subtle distinction. In the
first scenario, the master forgets all previously-registered workers. In the
second scenario, the master can remember that a worker was
previously-registered even though it may now be disassociated.
In some of these scenarios, a disconnection may be reflected at the master,
worker, or both (perhaps at different times). For example, a master might
deregister a worker if it has not received Spark-level heartbeats from it, or a
worker might disassociate from a master due to the Akka failure detector being
triggered.
After this PR, there are two paths that can lead to a worker reconnection:
- A master (which stayed alive) receives a heartbeat from
previously-registered but now de-registered worker and asks that worker to
reconnect.
- A worker discovers that it has become disassociated from the master and
attempts to initiate a reconnection.
I've been working on building a Docker-based integration testing framework
for testing these sorts of Spark Standalone fault-tolerance issues (to
hopefully be released publicly sometime soon).
I thought it would be interesting to test the "master stays alive but
deregisters workers due to not receiving heartbeats" case by simulating network
issues. In my testing framework, I added a
[Jepsen](https://github.com/aphyr/jepsen)-inspired network fault-injector which
updates `iptables` rules in a boot2docker VM in order to temporarily break
network links. Here's the actual code that I wrote to test this PR:
```scala
test("workers should reconnect to master if disconnected due to transient
network issues") {
// Regression test for SPARK-3736
val env = Seq(
"SPARK_MASTER_OPTS" -> "-Dspark.worker.timeout=2",
"SPARK_WORKER_OPTS" -> "-Dspark.worker.timeout=2 -Dspark.akka.timeout=1
-Dspark.akka.failure-detector.threshold=1 -Dspark.akka.heartbeat.interval=1"
)
cluster = SparkClusters.createStandaloneCluster(env, numWorkers = 1)
val master = cluster.masters.head
val worker = cluster.workers.head
master.getState.liveWorkerIPs.size should be (1)
println("Cluster launched with one worker")
networkFaultInjector.dropTraffic(master.container, worker.container)
networkFaultInjector.dropTraffic(worker.container, master.container)
eventually(timeout(30 seconds), interval(1 seconds)) {
master.getState.liveWorkerIPs.size should be (0)
}
println("Master shows that zero workers are registered after network
connection fails")
networkFaultInjector.restore()
eventually(timeout(30 seconds), interval(1 seconds)) {
master.getState.liveWorkerIPs.size should be (1)
}
println("Master shows one worker after network connection is restored")
}
```
While running this against the current Spark master: after I kill the
network connection between the master and worker, the master more-or-less
immediately times out the worker and disconnects it. However, the worker
doesn't realize that it has become deregistered from the master. This happens
because the master detects worker liveness using our own heartbeat mechanism,
whereas the worker detects master liveness using Akka's failure-detection
mechanisms (to see this, note that the worker's `masterDisconnected()` function
is only invoked from the `DisassociatedEvent` message handler).
As a result, we end up in a scenario where the master receives a heartbeat
from the de-registered workers that does not realize that it has been
deregistered. This PR addresses this case by having the master explicitly ask
the worker to reconnect (via the `ReconnectWorker` message). Thanks to this
mechanism, my test passes with this PR's code!
I'm still working on testing the case where the worker receives a
DisassociationEvent and initiates the reconnection itself. To do this, I'll
need to figure out how to configure the Akka failure detector so that it
quickly fails in my testing suite. I'll also need to add a way to query the
worker to ask whether it has become disconnected from the master so that I can
drop packets for long enough in order to cause a disassociation.
For completeness, I should also test the case where I kill the master and
bring it back up using the same hostname. This may require a bit of extra
scaffolding in my framework (which currently uses container IPs rather than
hostnames that I control), but I think it's doable.
That said, though, the code here seems reasonable. Don't block on me here
:smile:
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]