otterc commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r517814517



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -249,6 +249,8 @@ private[spark] class DAGScheduler(
   private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
   taskScheduler.setDAGScheduler(this)
 
+  private val pushBasedShuffleEnabled = 
Utils.isPushBasedShuffleEnabled(sc.getConf)

Review comment:
       Where is this being used?

##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -95,6 +97,30 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
   val shuffleHandle: ShuffleHandle = 
_rdd.context.env.shuffleManager.registerShuffle(
     shuffleId, this)
 
+  // By default, shuffle merge is enabled for ShuffleDependency if push based 
shuffle is enabled
+  private[spark] var _shuffleMergeEnabled =

Review comment:
       Why does this variable name start with `_`? 
   Other variables in this class don't follow this.

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -657,6 +681,28 @@ class BlockManagerMasterEndpoint(
     }
   }
 
+  private def getShufflePushMergerLocations(
+      numMergersNeeded: Int,
+      hostsToFilter: Set[String]): Seq[BlockManagerId] = {
+    val activeBlockManagers = blockManagerIdByExecutor.groupBy(_._2.host)
+        .mapValues(_.head).values.map(_._2).toSet

Review comment:
       Nit: indentation

##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -95,6 +97,30 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
   val shuffleHandle: ShuffleHandle = 
_rdd.context.env.shuffleManager.registerShuffle(
     shuffleId, this)
 
+  // By default, shuffle merge is enabled for ShuffleDependency if push based 
shuffle is enabled
+  private[spark] var _shuffleMergeEnabled =
+    Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf)
+
+  def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
+    _shuffleMergeEnabled = shuffleMergeEnabled
+  }
+
+  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled

Review comment:
       Nit: `isShuffleMergeEnabled` seems better.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
##########
@@ -125,6 +125,17 @@ class BlockManagerMaster(
     driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
   }
 
+  /**
+   * Get list of unique shuffle service locations where an executor is 
successfully

Review comment:
       Nit: `Get a list...`

##########
File path: core/src/main/scala/org/apache/spark/Dependency.scala
##########
@@ -95,6 +97,30 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
   val shuffleHandle: ShuffleHandle = 
_rdd.context.env.shuffleManager.registerShuffle(
     shuffleId, this)
 
+  // By default, shuffle merge is enabled for ShuffleDependency if push based 
shuffle is enabled
+  private[spark] var _shuffleMergeEnabled =
+    Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf)
+
+  def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = {
+    _shuffleMergeEnabled = shuffleMergeEnabled
+  }
+
+  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled
+
+  /**
+   * Stores the location of the list of chosen external shuffle services for 
handling the
+   * shuffle merge requests from mappers in this shuffle map stage.
+   */
+  private[spark] var _mergerLocs: Seq[BlockManagerId] = Nil

Review comment:
       This starts with `_` as well.

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -1974,7 +1974,51 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     }
   }
 
-  class MockBlockTransferService(val maxFailures: Int) extends 
BlockTransferService {
+  test("mergerLocations should be bounded with in" +

Review comment:
       Can you re-word the the test name. It's not easy to understand.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to