Ngone51 commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r602266320



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -833,33 +1129,44 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
    *
    * (It would be nice to remove this restriction in the future.)
    */
-  private def getStatuses(shuffleId: Int, conf: SparkConf): Array[MapStatus] = 
{
-    val statuses = mapStatuses.get(shuffleId).orNull
-    if (statuses == null) {
-      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching 
them")
+  private def getStatuses(
+      shuffleId: Int, conf: SparkConf): (Array[MapStatus], Array[MergeStatus]) 
= {
+    val mapOutputStatuses = mapStatuses.get(shuffleId).orNull
+    val mergeResultStatuses = mergeStatuses.get(shuffleId).orNull
+    if (mapOutputStatuses == null || (fetchMergeResult && mergeResultStatuses 
== null)) {
+      logInfo("Don't have map/merge outputs for shuffle " + shuffleId + ", 
fetching them")

Review comment:
       nit:
   ```suggestion
         logInfo(s"Don't have map/merge outputs for shuffle $shuffleId, 
fetching them")
   ```

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -833,33 +1129,44 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
    *
    * (It would be nice to remove this restriction in the future.)
    */
-  private def getStatuses(shuffleId: Int, conf: SparkConf): Array[MapStatus] = 
{
-    val statuses = mapStatuses.get(shuffleId).orNull
-    if (statuses == null) {
-      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching 
them")
+  private def getStatuses(
+      shuffleId: Int, conf: SparkConf): (Array[MapStatus], Array[MergeStatus]) 
= {
+    val mapOutputStatuses = mapStatuses.get(shuffleId).orNull
+    val mergeResultStatuses = mergeStatuses.get(shuffleId).orNull
+    if (mapOutputStatuses == null || (fetchMergeResult && mergeResultStatuses 
== null)) {
+      logInfo("Don't have map/merge outputs for shuffle " + shuffleId + ", 
fetching them")
       val startTimeNs = System.nanoTime()
       fetchingLock.withLock(shuffleId) {
-        var fetchedStatuses = mapStatuses.get(shuffleId).orNull
-        if (fetchedStatuses == null) {
-          logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
+        var fetchedMapStatuses = mapStatuses.get(shuffleId).orNull
+        if (fetchedMapStatuses == null) {
+          logInfo("Doing the map fetch; tracker endpoint = " + trackerEndpoint)
           val fetchedBytes = 
askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
-          fetchedStatuses = 
MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf)
-          logInfo("Got the output locations")
-          mapStatuses.put(shuffleId, fetchedStatuses)
+          fetchedMapStatuses = 
MapOutputTracker.deserializeOutputStatuses(fetchedBytes, conf)
+          logInfo("Got the map output locations")
+          mapStatuses.put(shuffleId, fetchedMapStatuses)
+        }
+        var fetchedMergeStatues = mergeStatuses.get(shuffleId).orNull
+        if (fetchMergeResult && fetchedMergeStatues == null) {
+          logInfo("Doing the merge fetch; tracker endpoint = " + 
trackerEndpoint)
+          val fetchedBytes = 
askTracker[Array[Byte]](GetMergeResultStatuses(shuffleId))
+          fetchedMergeStatues = 
MapOutputTracker.deserializeOutputStatuses(fetchedBytes, conf)
+          logInfo("Got the merge output locations")
+          mergeStatuses.put(shuffleId, fetchedMergeStatues)
         }
-        logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
+        logDebug(s"Fetching map/merge output statuses for shuffle $shuffleId 
took " +

Review comment:
       nit:
   ```suggestion
           logDebug(s"Fetching map ${if (fetchMergeResult) "/merge"} output 
statuses for shuffle $shuffleId took " +
   ```

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -86,36 +90,62 @@ private class ShuffleStatus(numPartitions: Int) extends 
Logging {
   // Exposed for testing
   val mapStatuses = new Array[MapStatus](numPartitions)
 
+  /**
+   * MergeStatus for each shuffle partition when push-based shuffle is 
enabled. The index of the
+   * array is the shuffle partition id (reduce id). Each value in the array is 
the MergeStatus for
+   * a shuffle partition, or null if not available. When push-based shuffle is 
enabled, this array
+   * provides a reducer oriented view of the shuffle status specifically for 
the results of
+   * merging shuffle partition blocks into per-partition merged shuffle files.

Review comment:
       "shuffle partition" is a bit confusing here. The original mapstatus also 
uses the word `partition`. Maybe, change all "shuffle partition" to "shuffle 
reduce partition"?

##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -833,33 +1106,44 @@ private[spark] class MapOutputTrackerWorker(conf: 
SparkConf) extends MapOutputTr
    *
    * (It would be nice to remove this restriction in the future.)
    */
-  private def getStatuses(shuffleId: Int, conf: SparkConf): Array[MapStatus] = 
{
-    val statuses = mapStatuses.get(shuffleId).orNull
-    if (statuses == null) {
-      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching 
them")
+  private def getStatuses(
+      shuffleId: Int, conf: SparkConf): (Array[MapStatus], Array[MergeStatus]) 
= {
+    val mapOutputStatuses = mapStatuses.get(shuffleId).orNull
+    val mergeResultStatuses = mergeStatuses.get(shuffleId).orNull
+    if (mapOutputStatuses == null || (fetchMergeResult && mergeResultStatuses 
== null)) {
+      logInfo("Don't have map/merge outputs for shuffle " + shuffleId + ", 
fetching them")
       val startTimeNs = System.nanoTime()
       fetchingLock.withLock(shuffleId) {
-        var fetchedStatuses = mapStatuses.get(shuffleId).orNull
-        if (fetchedStatuses == null) {
-          logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
+        var fetchedMapStatuses = mapStatuses.get(shuffleId).orNull
+        if (fetchedMapStatuses == null) {
+          logInfo("Doing the map fetch; tracker endpoint = " + trackerEndpoint)
           val fetchedBytes = 
askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
-          fetchedStatuses = 
MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf)
-          logInfo("Got the output locations")
-          mapStatuses.put(shuffleId, fetchedStatuses)
+          fetchedMapStatuses = 
MapOutputTracker.deserializeOutputStatuses(fetchedBytes, conf)
+          logInfo("Got the map output locations")
+          mapStatuses.put(shuffleId, fetchedMapStatuses)
         }
-        logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
+        var fetchedMergeStatues = mergeStatuses.get(shuffleId).orNull
+        if (fetchMergeResult && fetchedMergeStatues == null) {
+          logInfo("Doing the merge fetch; tracker endpoint = " + 
trackerEndpoint)
+          val fetchedBytes = 
askTracker[Array[Byte]](GetMergeResultStatuses(shuffleId))
+          fetchedMergeStatues = 
MapOutputTracker.deserializeOutputStatuses(fetchedBytes, conf)
+          logInfo("Got the merge output locations")
+          mergeStatuses.put(shuffleId, fetchedMergeStatues)
+        }
+        logDebug(s"Fetching map/merge output statuses for shuffle $shuffleId 
took " +
           s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} 
ms")
-        fetchedStatuses
+        (fetchedMapStatuses, fetchedMergeStatues)
       }

Review comment:
       I'd prefer to combine them. Actually, the first time when I reviewed 
this PR, I began to think about a unified way to provide a consistent API for 
both map status and merged status in `MapOutputTracker` & `ShuffleStatus`. 
Unfortunately, I didn't get a good idea.
   
   I think one RPC would ease the error handling for us. Not sure how much 
complexity you'd expect?
   
   And I'd suggest adding an additional new RPC for the combined case and leave 
the current one as it is, so that we don't affect the existing code path when 
push-based shuffle disabled.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to