Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/9210#discussion_r42701122
--- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
---
@@ -399,41 +445,41 @@ private[netty] case class Ack(sender:
NettyRpcEndpointRef) extends ResponseMessa
private[netty] case class RpcFailure(e: Throwable)
/**
- * Maintain the mapping relations between client addresses and [[RpcEnv]]
addresses, broadcast
- * network events and forward messages to [[Dispatcher]].
+ * Dispatches incoming RPCs to registered endpoints.
+ *
+ * The handler keeps track of all client instances that communicate with
it, so that the RpcEnv
+ * knows which `TransportClient` instance to use when sending RPCs to a
client endpoint (i.e.,
+ * one that is not listening for incoming connections, but rather needs to
be contacted via the
+ * client socket).
+ *
+ * Events are sent on a per-connection basis, so if a client opens
multiple connections to the
+ * RpcEnv, multiple connection / disconnection events will be created for
that client (albeit
+ * with different `RpcAddress` information).
*/
private[netty] class NettyRpcHandler(
dispatcher: Dispatcher, nettyEnv: NettyRpcEnv) extends RpcHandler with
Logging {
- private type ClientAddress = RpcAddress
- private type RemoteEnvAddress = RpcAddress
-
- // Store all client addresses and their NettyRpcEnv addresses.
- // TODO: Is this even necessary?
- @GuardedBy("this")
- private val remoteAddresses = new mutable.HashMap[ClientAddress,
RemoteEnvAddress]()
+ private val clients = new ConcurrentHashMap[TransportClient, JBoolean]()
override def receive(
- client: TransportClient, message: Array[Byte], callback:
RpcResponseCallback): Unit = {
- val requestMessage = nettyEnv.deserialize[RequestMessage](message)
- val addr =
client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
+ client: TransportClient,
+ message: Array[Byte],
+ callback: RpcResponseCallback): Unit = {
+ val addr =
client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
assert(addr != null)
- val remoteEnvAddress = requestMessage.senderAddress
val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
-
- // TODO: Can we add connection callback (channel registered) to the
underlying framework?
--- End diff --
Maybe I should keep this comment.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]