Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9210#discussion_r42830999
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
---
    @@ -399,41 +445,41 @@ private[netty] case class Ack(sender: 
NettyRpcEndpointRef) extends ResponseMessa
     private[netty] case class RpcFailure(e: Throwable)
     
     /**
    - * Maintain the mapping relations between client addresses and [[RpcEnv]] 
addresses, broadcast
    - * network events and forward messages to [[Dispatcher]].
    + * Dispatches incoming RPCs to registered endpoints.
    + *
    + * The handler keeps track of all client instances that communicate with 
it, so that the RpcEnv
    + * knows which `TransportClient` instance to use when sending RPCs to a 
client endpoint (i.e.,
    + * one that is not listening for incoming connections, but rather needs to 
be contacted via the
    + * client socket).
    + *
    + * Events are sent on a per-connection basis, so if a client opens 
multiple connections to the
    + * RpcEnv, multiple connection / disconnection events will be created for 
that client (albeit
    + * with different `RpcAddress` information).
      */
     private[netty] class NettyRpcHandler(
         dispatcher: Dispatcher, nettyEnv: NettyRpcEnv) extends RpcHandler with 
Logging {
     
    -  private type ClientAddress = RpcAddress
    -  private type RemoteEnvAddress = RpcAddress
    -
    -  // Store all client addresses and their NettyRpcEnv addresses.
    -  // TODO: Is this even necessary?
    -  @GuardedBy("this")
    -  private val remoteAddresses = new mutable.HashMap[ClientAddress, 
RemoteEnvAddress]()
    +  private val clients = new ConcurrentHashMap[TransportClient, JBoolean]()
     
       override def receive(
    -      client: TransportClient, message: Array[Byte], callback: 
RpcResponseCallback): Unit = {
    -    val requestMessage = nettyEnv.deserialize[RequestMessage](message)
    -    val addr = 
client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
    +      client: TransportClient,
    +      message: Array[Byte],
    +      callback: RpcResponseCallback): Unit = {
    +    val addr = 
client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
         assert(addr != null)
    -    val remoteEnvAddress = requestMessage.senderAddress
         val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
    -
    -    // TODO: Can we add connection callback (channel registered) to the 
underlying framework?
    --- End diff --
    
    that'd be great...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to