[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s

2020-11-17 Thread GitBox


Victsm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r525835969



##
File path: core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
##
@@ -92,4 +93,16 @@ private[spark] trait SchedulerBackend {
*/
   def maxNumConcurrentTasks(rp: ResourceProfile): Int
 
+  /**
+   * Get the list of host locations for push based shuffle
+   *
+   * Currently push based shuffle is disabled for both stage retry and stage 
reuse cases

Review comment:
   Both are true.
   `getShufflePushMergerLocations` will be invoked only once per 
`ShuffleDependency`.
   Thus retried stages will get the same merger locations.
   In #30062, the way we implemented the block push handling logic would ignore 
blocks received after shuffle finalization.
   
   
https://github.com/apache/spark/blob/dd32f45d2058d00293330c01d3d9f53ecdbc036c/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java#L132
   
   So, blocks pushed from the retry stage will be ignored, and this is what's 
stated in this comment about `push based shuffle is disabled for both stage 
retry and stage reuse cases`.
   Ignoring blocks pushed from the retry stage is reasonable, since the block 
data from these retried tasks most likely have already been merged.
   Making sure the retried stage use the same merger location is critical to 
ensure we don't run into data duplication issues.
   
   The only exception is for indeterministic stage retry, which we have created 
SPARK-32923 for it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s

2020-11-17 Thread GitBox


Victsm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r525832654



##
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##
@@ -2541,6 +2541,16 @@ private[spark] object Utils extends Logging {
 master == "local" || master.startsWith("local[")
   }
 
+  /**
+   * Push based shuffle can only be enabled when external shuffle service is 
enabled.
+   * In the initial version, we cannot support pushed based shuffle and 
adaptive execution
+   * at the same time. Will improve this in a later version.

Review comment:
   This comment is outdated. The current version can already support AQE 
with push-based shuffle.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s

2020-11-16 Thread GitBox


Victsm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r524541577



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##
@@ -657,6 +688,43 @@ class BlockManagerMasterEndpoint(
 }
   }
 
+  private def getShufflePushMergerLocations(
+  numMergersNeeded: Int,
+  hostsToFilter: Set[String]): Seq[BlockManagerId] = {
+val blockManagersWithExecutors = 
blockManagerIdByExecutor.groupBy(_._2.host)
+  .mapValues(_.head).values.map(_._2).toSet
+val filteredBlockManagersWithExecutors = blockManagersWithExecutors
+  .filterNot(x => hostsToFilter.contains(x.host))
+val filteredMergersWithExecutors = filteredBlockManagersWithExecutors.map(
+  x => BlockManagerId(x.executorId, x.host, 
StorageUtils.externalShuffleServicePort(conf)))
+
+// Enough mergers are available as part of active executors list
+if (filteredMergersWithExecutors.size >= numMergersNeeded) {
+  filteredMergersWithExecutors.toSeq
+} else {
+  // Delta mergers added from inactive mergers list to the active mergers 
list
+  val filteredMergersWithExecutorsHosts = 
filteredMergersWithExecutors.map(_.host)
+  val filteredMergersWithoutExecutors = shuffleMergerLocations.values
+.filterNot(x => hostsToFilter.contains(x.host))
+.filterNot(x => filteredMergersWithExecutorsHosts.contains(x.host))
+  val randomFilteredMergersLocations =
+if (filteredMergersWithoutExecutors.size >
+  numMergersNeeded - filteredMergersWithExecutors.size) {
+  Utils.randomize(filteredMergersWithoutExecutors)
+} else {
+  filteredMergersWithoutExecutors
+}
+  filteredMergersWithExecutors.toSeq ++ randomFilteredMergersLocations
+.take(numMergersNeeded - filteredMergersWithExecutors.size)

Review comment:
   If # elements in `randomFilteredMergersLocations` is less than 
`numMergersNeeded - filteredMergersWithExecutors.size`, `take()` would just 
return all elements.
   What's the reason for `take()` cannot be called here if `randomize()` is not 
performed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s

2020-11-16 Thread GitBox


Victsm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r524527610



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##
@@ -657,6 +688,43 @@ class BlockManagerMasterEndpoint(
 }
   }
 
+  private def getShufflePushMergerLocations(
+  numMergersNeeded: Int,
+  hostsToFilter: Set[String]): Seq[BlockManagerId] = {
+val blockManagersWithExecutors = 
blockManagerIdByExecutor.groupBy(_._2.host)
+  .mapValues(_.head).values.map(_._2).toSet
+val filteredBlockManagersWithExecutors = blockManagersWithExecutors
+  .filterNot(x => hostsToFilter.contains(x.host))
+val filteredMergersWithExecutors = filteredBlockManagersWithExecutors.map(
+  x => BlockManagerId(x.executorId, x.host, 
StorageUtils.externalShuffleServicePort(conf)))

Review comment:
   Thanks for pointing this one out.
   We currently use empty String for `executorId` part of a `BlockManagerId` 
for shuffle push merger locations.
   This part should conform to the convention.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s

2020-11-16 Thread GitBox


Victsm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r524527610



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##
@@ -657,6 +688,43 @@ class BlockManagerMasterEndpoint(
 }
   }
 
+  private def getShufflePushMergerLocations(
+  numMergersNeeded: Int,
+  hostsToFilter: Set[String]): Seq[BlockManagerId] = {
+val blockManagersWithExecutors = 
blockManagerIdByExecutor.groupBy(_._2.host)
+  .mapValues(_.head).values.map(_._2).toSet
+val filteredBlockManagersWithExecutors = blockManagersWithExecutors
+  .filterNot(x => hostsToFilter.contains(x.host))
+val filteredMergersWithExecutors = filteredBlockManagersWithExecutors.map(
+  x => BlockManagerId(x.executorId, x.host, 
StorageUtils.externalShuffleServicePort(conf)))

Review comment:
   Thanks for pointing this one out.
   We currently use empty String for `executorId` part of a `BlockManagerId` 
for shuffle push merger locations.
   This part should conform to the convention, and we should also do a dedup 
here since we could have multiple executors on the same host.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s

2020-11-14 Thread GitBox


Victsm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r523714996



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##
@@ -657,6 +688,38 @@ class BlockManagerMasterEndpoint(
 }
   }
 
+  private def getShufflePushMergerLocations(
+  numMergersNeeded: Int,
+  hostsToFilter: Set[String]): Seq[BlockManagerId] = {
+val blockManagersWithExecutors = 
blockManagerIdByExecutor.groupBy(_._2.host)
+  .mapValues(_.head).values.map(_._2).toSet
+val filteredBlockManagersWithExecutors = blockManagersWithExecutors
+  .filterNot(x => hostsToFilter.contains(x.host))
+val filteredMergersWithExecutors = filteredBlockManagersWithExecutors.map(
+  x => BlockManagerId(x.executorId, x.host, 
StorageUtils.externalShuffleServicePort(conf)))
+
+// Enough mergers are available as part of active executors list
+if (filteredMergersWithExecutors.size >= numMergersNeeded) {
+  filteredMergersWithExecutors.toSeq
+} else {
+  // Delta mergers added from inactive mergers list to the active mergers 
list
+  val filteredMergersWithExecutorsHosts = 
filteredMergersWithExecutors.map(_.host)
+  // Pick random hosts instead of preferring the top of the list
+  val randomizedShuffleMergerLocations = 
Utils.randomize(shuffleMergerLocations.values.toSeq)

Review comment:
   @mridulm shouldn't it be the following instead?
   ```
   val filteredMergersWithoutExecutors = shuffleMergerLocations.values.toSeq
 .filterNot()
 .filterNot()
   
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s

2020-11-14 Thread GitBox


Victsm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r523713075



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1252,6 +1254,28 @@ private[spark] class DAGScheduler(
 execCores.map(cores => 
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for 
the given
+   * shuffle map stage. The list of shuffle services is determined based on 
the list of
+   * active executors tracked by block manager master at the start of the 
stage.
+   */
+  private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) 
{
+// TODO: Handle stage reuse/retry cases separately as without finalize 
changes we cannot
+// TODO: disable shuffle merge for the retry/reuse cases
+val mergerLocs = sc.schedulerBackend.getMergerLocations(
+  stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+logDebug(s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
+
+if (mergerLocs.nonEmpty) {
+  stage.shuffleDep.setMergerLocs(mergerLocs)
+  logInfo("Shuffle merge enabled for %s (%s) with %d merger locations"
+.format(stage, stage.name, stage.shuffleDep.getMergerLocs.size))
+} else {
+  stage.shuffleDep.setShuffleMergeEnabled(false)
+  logInfo("Shuffle merge disabled for %s (%s)".format(stage, stage.name))
+}

Review comment:
   Should we expose the list of merger locations or just a count of number 
of merges for a given ShuffleMapStage?
   Having a long list of merge locations might increase the event size.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s

2020-11-14 Thread GitBox


Victsm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r523712631



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##
@@ -360,6 +371,17 @@ class BlockManagerMasterEndpoint(
 
   }
 
+  private def addMergerLocation(blockManagerId: BlockManagerId): Unit = {
+if (!shuffleMergerLocations.contains(blockManagerId.host) && 
!blockManagerId.isDriver) {
+  val shuffleServerId = BlockManagerId(blockManagerId.executorId, 
blockManagerId.host,
+StorageUtils.externalShuffleServicePort(conf))
+  if (shuffleMergerLocations.size >= maxRetainedMergerLocations) {
+shuffleMergerLocations -= shuffleMergerLocations.head._1

Review comment:
   Just to clarify, removing a merger does not remove the merged shuffle 
data on that location. It only prevents using that location for future 
shuffles. Reducers will still be able to fetch merged shuffle data from the 
removed merger locations.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s

2020-11-14 Thread GitBox


Victsm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r523711623



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1252,6 +1254,32 @@ private[spark] class DAGScheduler(
 execCores.map(cores => 
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for 
the given
+   * shuffle map stage for block push/merge.
+   *
+   * Even with dynamic resource allocation kicking in and significantly 
reducing the number
+   * of available active executors, we would still be able to get sufficient 
shuffle service
+   * locations for block push/merge by getting the historical locations of 
past executors.
+   */
+  private def prepareShuffleServicesForShuffleMapStage(stage: 
ShuffleMapStage): Unit = {
+// TODO SPARK-32920: Handle stage reuse/retry cases separately as without 
finalize
+// TODO changes we cannot disable shuffle merge for the retry/reuse cases
+val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
+  stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+
+if (mergerLocs.nonEmpty) {
+  stage.shuffleDep.setMergerLocs(mergerLocs)
+  logInfo(s"Shuffle merge enabled for $stage (${stage.name}) with" +

Review comment:
   Nit: Change to "Push-based shuffle enabled for stage ..."





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s

2020-11-14 Thread GitBox


Victsm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r523711581



##
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##
@@ -1252,6 +1254,32 @@ private[spark] class DAGScheduler(
 execCores.map(cores => 
properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores))
   }
 
+  /**
+   * If push based shuffle is enabled, set the shuffle services to be used for 
the given
+   * shuffle map stage for block push/merge.
+   *
+   * Even with dynamic resource allocation kicking in and significantly 
reducing the number
+   * of available active executors, we would still be able to get sufficient 
shuffle service
+   * locations for block push/merge by getting the historical locations of 
past executors.
+   */
+  private def prepareShuffleServicesForShuffleMapStage(stage: 
ShuffleMapStage): Unit = {
+// TODO SPARK-32920: Handle stage reuse/retry cases separately as without 
finalize
+// TODO changes we cannot disable shuffle merge for the retry/reuse cases
+val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
+  stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
+
+if (mergerLocs.nonEmpty) {
+  stage.shuffleDep.setMergerLocs(mergerLocs)
+  logInfo(s"Shuffle merge enabled for $stage (${stage.name}) with" +
+s" ${stage.shuffleDep.getMergerLocs.size} merger locations")
+
+  logDebug("List of shuffle push merger locations " +
+s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}")
+} else {
+  logInfo(s"No available merger locations. Shuffle merge disabled for 
$stage (${stage.name})")

Review comment:
   Nit: Change to "Push-based shuffle disabled for stage..."





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s

2020-11-14 Thread GitBox


Victsm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r523711229



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -1938,4 +1938,51 @@ package object config {
   .version("3.0.1")
   .booleanConf
   .createWithDefault(false)
+
+  private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
+ConfigBuilder("spark.shuffle.push.enabled")
+  .doc("Set to 'true' to enable push based shuffle on the client side and 
this works in " +
+"conjunction with the server side flag 
spark.shuffle.server.mergedShuffleFileManagerImpl " +
+"which needs to be set with the appropriate " +
+"org.apache.spark.network.shuffle.MergedShuffleFileManager 
implementation for push-based " +
+"shuffle to be enabled")
+  .version("3.1.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS =
+ConfigBuilder("spark.shuffle.push.retainedMergerLocations")

Review comment:
   Should the config name be 
"spark.shuffle.push.maxRetainedMergerLocations" to be more consistent?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s

2020-11-14 Thread GitBox


Victsm commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r523710562



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##
@@ -657,6 +681,13 @@ class BlockManagerMasterEndpoint(
 }
   }
 
+  private def getShufflePushMergerLocations(
+  numMergersNeeded: Int,
+  hostsToFilter: Set[String]): Seq[BlockManagerId] = {
+val mergers = shuffleMergerLocations.values.filterNot(x => 
hostsToFilter.contains(x.host)).toSeq
+mergers.take(numMergersNeeded)

Review comment:
   I thought the reason we didn't do that internally is because the merger 
locations is picked at the beginning of the map stage and is only relevant with 
executor placement of the following reduce stage.
   Since there is no guarantee on whether the active executors at the beginning 
of the map stage would still be active at the beginning of the reduce stage, we 
didn't do that.
   Is that reasoning not applicable any more?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org