Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9210#discussion_r42756277
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
---
    @@ -294,47 +319,68 @@ private[netty] class NettyRpcEnvFactory extends 
RpcEnvFactory with Logging {
           new 
JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
         val nettyEnv =
           new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, 
config.securityManager)
    -    val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
    -      nettyEnv.start(actualPort)
    -      (nettyEnv, actualPort)
    -    }
    -    try {
    -      Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, 
"NettyRpcEnv")._1
    -    } catch {
    -      case NonFatal(e) =>
    -        nettyEnv.shutdown()
    -        throw e
    +    if (!config.clientMode) {
    +      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
    +        nettyEnv.startServer(actualPort)
    +        (nettyEnv, actualPort)
    +      }
    +      try {
    +        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, 
"NettyRpcEnv")._1
    +      } catch {
    +        case NonFatal(e) =>
    +          nettyEnv.shutdown()
    +          throw e
    +      }
         }
    +    nettyEnv
       }
     }
     
    -private[netty] class NettyRpcEndpointRef(@transient private val conf: 
SparkConf)
    +/**
    + * The NettyRpcEnv version of RpcEndpointRef.
    + *
    + * This class behaves differently depending on where it's created. On the 
node that "owns" the
    + * RpcEndpoint, it's a simple wrapper around the RpcEndpointAddress 
instance.
    + *
    + * On other machines that receive a serialized version of the reference, 
the behavior changes. The
    + * instance will keep track of the TransportClient that sent the 
reference, so that messages
    + * to the endpoint are sent over the client connection, instead of needing 
a new connection to
    + * be opened.
    + *
    + * The RpcAddress of this ref can be null; what that means is that the ref 
can only be used through
    + * a client connection, since the process hosting the endpoint is not 
listening for incoming
    + * connections. These refs should not be shared with 3rd parties, since 
they will not be able to
    + * send messages to the endpoint.
    + *
    + * @param conf Spark configuration.
    + * @param endpointAddress The address where the endpoint is listening.
    + * @param nettyEnv The RpcEnv associated with this ref.
    + * @param local Whether the referenced endpoint lives in the same process.
    + */
    +private[netty] class NettyRpcEndpointRef(
    +    @transient private val conf: SparkConf,
    +    endpointAddress: RpcEndpointAddress,
    +    @transient @volatile private var nettyEnv: NettyRpcEnv)
       extends RpcEndpointRef(conf) with Serializable with Logging {
     
    -  @transient @volatile private var nettyEnv: NettyRpcEnv = _
    +  @transient @volatile var client: TransportClient = _
     
    -  @transient @volatile private var _address: RpcEndpointAddress = _
    +  private var _address = if (endpointAddress.rpcAddress != null) 
endpointAddress else null
    --- End diff --
    
    nit: `_address` and `_name` can be `val`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to