Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9917#discussion_r45802883
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala ---
    @@ -22,13 +22,51 @@ import javax.annotation.concurrent.GuardedBy
     
     import scala.util.control.NonFatal
     
    -import org.apache.spark.SparkException
    +import org.apache.spark.{Logging, SparkException}
     import org.apache.spark.network.client.{RpcResponseCallback, 
TransportClient}
     import org.apache.spark.rpc.RpcAddress
     
    -private[netty] case class OutboxMessage(content: Array[Byte],
    +private[netty] sealed trait OutboxMessage {
    +
    +  def send(client: TransportClient): Unit
    +
    +  def onFailure(e: Throwable): Unit
    +
    +}
    +
    +private[netty] case class OneWayOutboxMessage(content: Array[Byte]) 
extends OutboxMessage
    +  with Logging {
    +
    +  override def send(client: TransportClient): Unit = {
    +    client.send(content)
    +  }
    +
    +  override def onFailure(e: Throwable): Unit = {
    +    logWarning(s"Failed to send one-way RPC.", e)
    +  }
    +
    +}
    +
    +private[netty] case class RpcOutboxMessage(content: Array[Byte],
       _onFailure: (Throwable) => Unit,
    -  _onSuccess: (TransportClient, Array[Byte]) => Unit) {
    +  _onSuccess: (TransportClient, Array[Byte]) => Unit) extends 
OutboxMessage {
    +
    +  private var client: TransportClient = _
    +  private var requestId: Long = _
    +
    +  override def send(client: TransportClient): Unit = {
    +    this.client = client
    +    this.requestId = client.sendRpc(content, createCallback(client))
    +  }
    +
    +  override def onFailure(e: Throwable): Unit = {
    +    _onFailure(e)
    +  }
    +
    +  def onTimeout(): Unit = {
    +    require(client != null, "TransportClient has not yet been set.")
    +    client.removeRpcRequest(requestId)
    +  }
     
       def createCallback(client: TransportClient): RpcResponseCallback = new 
RpcResponseCallback() {
    --- End diff --
    
    this can be private now that you only use it in here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to