Github user javadba commented on the pull request:

    https://github.com/apache/spark/pull/1542#issuecomment-50194492
  
    Hi again.  Upon closer inspection of the existing code/ functionality we do 
have an opportunity here to:
      (a) reduce the code size / complexity
      (b) at the same time improve the concurrency by a factor of the number of 
concurrent MapOutput fetch requests
    
    Here is an initial version of the rewrite of   getServerStatuses. Notice it 
has a single synchronized and reduces the SLOC from 53 to 32.    I am going to 
write a thorough unit test to demonstrate (a) correctness and (b) improved 
concurrency of this new version versus the existing
    
      def getServerStatuses(shuffleId: Int, reduceId: Int): 
Array[(BlockManagerId, Long)] = {
        val monitor = shuffleId.toString.intern
        var statuses: Array[MapStatus] = null
        monitor.synchronized {
          statuses = mapStatuses.get(shuffleId).orNull
          if (statuses == null) {
            logInfo(s"Fetching for shuffle $shuffleId ; tracker actor = 
$trackerActor")
            val fetchedBytes =
              
askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]
            fetchedStatuses = 
MapOutputTracker.deserializeMapStatuses(fetchedBytes)
            if (fetchedStatuses != null) {
              logInfo(s"We got the output locations for shuffle $shuffleId ; 
tracker actor = $trackerActor")
              mapStatuses.put(shuffleId, fetchedStatuses)
              return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, 
fetchedStatuses)
            } else {
              throw new MetadataFetchFailedException(
                shuffleId, reduceId, "Missing all output locations for shuffle 
" + shuffleId)
            }
          } else {
            return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, 
statuses)
          }
        }
      }
    
    btw the sync on statuses was moved to the private method : reduces code 
size and also avoids risk of someone not remembering to include synchronization:
    
      private def convertMapStatuses(
      ..
        statuses.synchronized {
        .,
        }



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to