vanzin commented on a change in pull request #27010: [SPARK-30313][CORE] Ensure 
EndpointRef is available MasterWebUI/WorkerPage
URL: https://github.com/apache/spark/pull/27010#discussion_r362071308
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##########
 @@ -68,11 +82,10 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
       if (stopped) {
         throw new IllegalStateException("RpcEnv has been stopped")
       }
-      if (endpoints.putIfAbsent(name, getMessageLoop(name, endpoint)) != null) 
{
+      if (endpoints.putIfAbsent(name, assignToMessageLoop(name, endpoint, 
endpointRef)) != null) {
 
 Review comment:
   While this solves the issue, I don't think it's quite right. The error path 
here is wrong, because you'll modify `endpointRefs` and, more importantly, the 
message loop. (`assignToMessageLoop` mutates those, and is called here 
regardless of whether the endpoint should be registered.)
   
   To be fair, the previous code also has that problem w.r.t. the message loop 
being modified.
   
   I think it would be safe here to have something like:
   
   ```
   def findMessageLoop(endpoint) = {
     // return the right message loop without modification
   }
   
   val messageLoop = findMessageLoop(endpoint)
   if (endpoints.putIfAbsent(name, messageLoop) != null) {
     throw
   }
   endpointRefs.put(...)
   messageLoop.register(...)
   ```
   
   If done inside the synchronized loop that seems to be safe and solve the 
problem. `DedicatedMessageLoop` should also implement `register` and call 
`setActive` there, instead of as part of the constructor. To add another small 
thing, `DedicatedMessageLoop` will leak a thread pool here in the error case, 
so maybe the thread pool should also be created in the `register` 
implementation.
   
   In fact... since this is inside a synchronized block anyway, you can 
simplify some of the above by not using `putIfAbsent`. Just check with 
`containsKey`, throw if it already exists, then find the right loop, put it in 
`endpoints` and update `endpointRefs`, then call `register()`. You'll still 
need `DedicatedMessageLoop.register()` to call `setActive()` at the right time.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to