Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/7276#discussion_r35226184
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 ---
    @@ -170,101 +207,64 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
         // Signal the receivers to delete old block data
         if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
           logInfo(s"Cleanup old received batch data: $cleanupThreshTime")
    -      receiverInfo.values.flatMap { info => Option(info.endpoint) }
    -        .foreach { _.send(CleanupOldBlocks(cleanupThreshTime)) }
    +      endpoint.send(CleanupOldBlocks(cleanupThreshTime))
         }
       }
     
    -  /** Register a receiver */
    -  private def registerReceiver(
    -      streamId: Int,
    -      typ: String,
    -      host: String,
    -      receiverEndpoint: RpcEndpointRef,
    -      senderAddress: RpcAddress
    -    ): Boolean = {
    -    if (!receiverInputStreamIds.contains(streamId)) {
    -      throw new SparkException("Register received for unexpected id " + 
streamId)
    -    }
    -
    -    if (isTrackerStopping || isTrackerStopped) {
    -      false
    -    } else {
    -      // "stopReceivers" won't happen at the same time because both 
"registerReceiver" and are
    -      // called in the event loop. So here we can assume "stopReceivers" 
has not yet been called. If
    -      // "stopReceivers" is called later, it should be able to see this 
receiver.
    -      receiverInfo(streamId) = ReceiverInfo(
    -        streamId, s"${typ}-${streamId}", receiverEndpoint, true, host)
    -      
listenerBus.post(StreamingListenerReceiverStarted(receiverInfo(streamId)))
    -      logInfo("Registered receiver for stream " + streamId + " from " + 
senderAddress)
    -      true
    -    }
    +  /** Check if any blocks are left to be processed */
    +  def hasUnallocatedBlocks: Boolean = {
    +    receivedBlockTracker.hasUnallocatedReceivedBlocks
       }
     
    -  /** Deregister a receiver */
    -  private def deregisterReceiver(streamId: Int, message: String, error: 
String) {
    -    val newReceiverInfo = receiverInfo.get(streamId) match {
    -      case Some(oldInfo) =>
    -        val lastErrorTime =
    -          if (error == null || error == "") -1 else 
ssc.scheduler.clock.getTimeMillis()
    -        oldInfo.copy(endpoint = null, active = false, lastErrorMessage = 
message,
    -          lastError = error, lastErrorTime = lastErrorTime)
    -      case None =>
    -        logWarning("No prior receiver info")
    -        val lastErrorTime =
    -          if (error == null || error == "") -1 else 
ssc.scheduler.clock.getTimeMillis()
    -        ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = 
message,
    -          lastError = error, lastErrorTime = lastErrorTime)
    -    }
    -    receiverInfo -= streamId
    -    listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
    -    val messageWithError = if (error != null && !error.isEmpty) {
    -      s"$message - $error"
    -    } else {
    -      s"$message"
    -    }
    -    logError(s"Deregistered receiver for stream $streamId: 
$messageWithError")
    +  /**
    +   * Get the list of executors excluding driver
    +   */
    +  private def getExecutors(ssc: StreamingContext): List[String] = {
    +    val executors = 
ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList
    +    val driver = ssc.sparkContext.getConf.get("spark.driver.host")
    +    executors.diff(List(driver))
       }
     
    -  /** Add new blocks for the given stream */
    -  private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    -    receivedBlockTracker.addBlock(receivedBlockInfo)
    -  }
    +  /** Check if tracker has been marked for starting */
    +  private def isTrackerStarted(): Boolean = trackerState == Started
     
    -  /** Report error sent by a receiver */
    -  private def reportError(streamId: Int, message: String, error: String) {
    -    val newReceiverInfo = receiverInfo.get(streamId) match {
    -      case Some(oldInfo) =>
    -        oldInfo.copy(lastErrorMessage = message, lastError = error)
    -      case None =>
    -        logWarning("No prior receiver info")
    -        ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = 
message,
    -          lastError = error, lastErrorTime = 
ssc.scheduler.clock.getTimeMillis())
    -    }
    -    receiverInfo(streamId) = newReceiverInfo
    -    
listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))
    -    val messageWithError = if (error != null && !error.isEmpty) {
    -      s"$message - $error"
    -    } else {
    -      s"$message"
    -    }
    -    logWarning(s"Error reported by receiver for stream $streamId: 
$messageWithError")
    -  }
    +  /** Check if tracker has been marked for stopping */
    +  private def isTrackerStopping(): Boolean = trackerState == Stopping
     
    -  /** Check if any blocks are left to be processed */
    -  def hasUnallocatedBlocks: Boolean = {
    -    receivedBlockTracker.hasUnallocatedReceivedBlocks
    -  }
    +  /** Check if tracker has been marked for stopped */
    +  private def isTrackerStopped(): Boolean = trackerState == Stopped
     
       /** RpcEndpoint to receive messages from the receivers. */
       private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) 
extends ThreadSafeRpcEndpoint {
     
    +    /**
    +     * Track all receivers' information. The key is the receiver id, the 
value is the receiver info.
    +     */
    +    private val receiverTrackingInfos = new HashMap[Int, 
ReceiverTrackingInfo]
    +
    +    /**
    +     * Store all preferred locations for all receivers. We need this 
information to schedule
    +     * receivers
    +     */
    +    private val receiverPreferredLocations = new HashMap[Int, 
Option[String]]
    +
    +    // TODO Remove this thread pool after 
https://github.com/apache/spark/issues/7385 is merged
    +    private val submitJobThreadPool = ExecutionContext.fromExecutorService(
    +      ThreadUtils.newDaemonCachedThreadPool("submit-job-thead-pool"))
    +
         override def receive: PartialFunction[Any, Unit] = {
    --- End diff --
    
    Actually, `receiverAndReply` needs to create a temp `Actor` to send the 
message. Since we don't need to reply anything, it's better to use `receive`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to