Github user dragos commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11272#discussion_r53798220
  
    --- Diff: 
core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
 ---
    @@ -17,69 +17,88 @@
     
     package org.apache.spark.deploy.mesos
     
    -import java.net.SocketAddress
     import java.nio.ByteBuffer
    +import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
     
    -import scala.collection.mutable
    +import scala.collection.JavaConverters._
     
     import org.apache.spark.{Logging, SecurityManager, SparkConf}
     import org.apache.spark.deploy.ExternalShuffleService
     import org.apache.spark.network.client.{RpcResponseCallback, 
TransportClient}
     import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
     import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
    -import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver
    +import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, 
ShuffleServiceHeartbeat}
     import org.apache.spark.network.util.TransportConf
    +import org.apache.spark.util.ThreadUtils
     
     /**
      * An RPC endpoint that receives registration requests from Spark drivers 
running on Mesos.
      * It detects driver termination and calls the cleanup callback to 
[[ExternalShuffleService]].
      */
    -private[mesos] class MesosExternalShuffleBlockHandler(transportConf: 
TransportConf)
    +private[mesos] class MesosExternalShuffleBlockHandler(
    +  transportConf: TransportConf, cleanerIntervalS: Long)
       extends ExternalShuffleBlockHandler(transportConf, null) with Logging {
     
    -  // Stores a map of driver socket addresses to app ids
    -  private val connectedApps = new mutable.HashMap[SocketAddress, String]
    +  
ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher")
    +    .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, 
TimeUnit.SECONDS)
    +
    +  // Stores a map of app id to app state (timeout value and last heartbeat)
    +  private val connectedApps = new ConcurrentHashMap[String, AppState]()
     
       protected override def handleMessage(
           message: BlockTransferMessage,
           client: TransportClient,
           callback: RpcResponseCallback): Unit = {
         message match {
    -      case RegisterDriverParam(appId) =>
    +      case RegisterDriverParam(appId, appState) =>
             val address = client.getSocketAddress
    -        logDebug(s"Received registration request from app $appId (remote 
address $address).")
    -        if (connectedApps.contains(address)) {
    -          val existingAppId = connectedApps(address)
    -          if (!existingAppId.equals(appId)) {
    -            logError(s"A new app '$appId' has connected to existing 
address $address, " +
    -              s"removing previously registered app '$existingAppId'.")
    -            applicationRemoved(existingAppId, true)
    -          }
    +        val timeout = appState.heartbeatTimeout
    +        logInfo(s"Received registration request from app $appId (remote 
address $address, " +
    +          s"heartbeat timeout $timeout ms).")
    +        if (connectedApps.containsKey(appId)) {
    +          logWarning(s"Received a registration request from app $appId, 
but it was already " +
    +            s"registered")
             }
    -        connectedApps(address) = appId
    +        connectedApps.put(appId, appState)
             callback.onSuccess(ByteBuffer.allocate(0))
    +      case Heartbeat(appId) =>
    +        val address = client.getSocketAddress
    +        Option(connectedApps.get(appId)) match {
    +          case Some(existingAppState) =>
    +            logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' 
(remote " +
    +              s"address $address).")
    +            existingAppState.lastHeartbeat = System.nanoTime()
    +          case None =>
    +            logWarning(s"Received ShuffleServiceHeartbeat from an unknown 
app (remote " +
    +              s"address $address, appId '$appId').")
    +        }
           case _ => super.handleMessage(message, client, callback)
         }
       }
     
    -  /**
    -   * On connection termination, clean up shuffle files written by the 
associated application.
    -   */
    -  override def channelInactive(client: TransportClient): Unit = {
    -    val address = client.getSocketAddress
    -    if (connectedApps.contains(address)) {
    -      val appId = connectedApps(address)
    -      logInfo(s"Application $appId disconnected (address was $address).")
    -      applicationRemoved(appId, true /* cleanupLocalDirs */)
    -      connectedApps.remove(address)
    -    } else {
    -      logWarning(s"Unknown $address disconnected.")
    -    }
    -  }
    -
       /** An extractor object for matching [[RegisterDriver]] message. */
       private object RegisterDriverParam {
    -    def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId)
    +    def unapply(r: RegisterDriver): Option[(String, AppState)] =
    +      Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, 
System.nanoTime())))
    --- End diff --
    
    Good point. So, in the case of Shuffle Server with 1.6 and newer driver:
    
    - the shuffle server will see a RegisterDriver message that has an 
additional parameter. the old shuffle service will decode it without looking at 
the timeout value, and registration will continue as usual
    - heartbeat messages, if they arrive, are going to be dropped as "unknown 
message"
    
    But, since Spark on Mesos in 1.6.0 is broken in this way, I would say this 
isn't a setup we should be very concerned with.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to