vanzin commented on a change in pull request #27010: [SPARK-30313][CORE] Ensure
EndpointRef is available MasterWebUI/WorkerPage
URL: https://github.com/apache/spark/pull/27010#discussion_r362071308
##########
File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
##########
@@ -68,11 +82,10 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv,
numUsableCores: Int) exte
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
- if (endpoints.putIfAbsent(name, getMessageLoop(name, endpoint)) != null)
{
+ if (endpoints.putIfAbsent(name, assignToMessageLoop(name, endpoint,
endpointRef)) != null) {
Review comment:
While this solves the issue, I don't think it's quite right. The error path
here is wrong, because you'll modify `endpointRefs` and, more importantly, the
message loop. (`assignToMessageLoop` mutates those, and is called here
regardless of whether the endpoint should be registered.)
To be fair, the previous code also has that problem w.r.t. the message loop
being modified.
I think it would be safe here to have something like:
```
def findMessageLoop(endpoint) = {
// return the right message loop without modification
}
val messageLoop = findMessageLoop(endpoint)
if (endpoints.putIfAbsent(name, messageLoop) != null) {
throw
}
endpointRefs.put(...)
messageLoop.register(...)
```
If done inside the synchronized loop that seems to be safe and solve the
problem. `DedicatedMessageLoop` should also implement `register` and call
`setActive` there, instead of as part of the constructor. To add another small
thing, `DedicatedMessageLoop` will leak a thread pool here in the error case,
so maybe the thread pool should also be created in the `register`
implementation.
In fact... since this is inside a synchronized block anyway, you can
simplify some of the above by not using `putIfAbsent`. Just check with
`containsKey`, throw if it already exists, then find the right loop, put it in
`endpoints` and update `endpointRefs`, then call `register()`. You'll still
need `DedicatedMessageLoop.register()` to call `setActive()` at the right time.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]