[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s
Victsm commented on a change in pull request #30164: URL: https://github.com/apache/spark/pull/30164#discussion_r525835969 ## File path: core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala ## @@ -92,4 +93,16 @@ private[spark] trait SchedulerBackend { */ def maxNumConcurrentTasks(rp: ResourceProfile): Int + /** + * Get the list of host locations for push based shuffle + * + * Currently push based shuffle is disabled for both stage retry and stage reuse cases Review comment: Both are true. `getShufflePushMergerLocations` will be invoked only once per `ShuffleDependency`. Thus retried stages will get the same merger locations. In #30062, the way we implemented the block push handling logic would ignore blocks received after shuffle finalization. https://github.com/apache/spark/blob/dd32f45d2058d00293330c01d3d9f53ecdbc036c/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java#L132 So, blocks pushed from the retry stage will be ignored, and this is what's stated in this comment about `push based shuffle is disabled for both stage retry and stage reuse cases`. Ignoring blocks pushed from the retry stage is reasonable, since the block data from these retried tasks most likely have already been merged. Making sure the retried stage use the same merger location is critical to ensure we don't run into data duplication issues. The only exception is for indeterministic stage retry, which we have created SPARK-32923 for it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s
Victsm commented on a change in pull request #30164: URL: https://github.com/apache/spark/pull/30164#discussion_r525832654 ## File path: core/src/main/scala/org/apache/spark/util/Utils.scala ## @@ -2541,6 +2541,16 @@ private[spark] object Utils extends Logging { master == "local" || master.startsWith("local[") } + /** + * Push based shuffle can only be enabled when external shuffle service is enabled. + * In the initial version, we cannot support pushed based shuffle and adaptive execution + * at the same time. Will improve this in a later version. Review comment: This comment is outdated. The current version can already support AQE with push-based shuffle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s
Victsm commented on a change in pull request #30164: URL: https://github.com/apache/spark/pull/30164#discussion_r524541577 ## File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ## @@ -657,6 +688,43 @@ class BlockManagerMasterEndpoint( } } + private def getShufflePushMergerLocations( + numMergersNeeded: Int, + hostsToFilter: Set[String]): Seq[BlockManagerId] = { +val blockManagersWithExecutors = blockManagerIdByExecutor.groupBy(_._2.host) + .mapValues(_.head).values.map(_._2).toSet +val filteredBlockManagersWithExecutors = blockManagersWithExecutors + .filterNot(x => hostsToFilter.contains(x.host)) +val filteredMergersWithExecutors = filteredBlockManagersWithExecutors.map( + x => BlockManagerId(x.executorId, x.host, StorageUtils.externalShuffleServicePort(conf))) + +// Enough mergers are available as part of active executors list +if (filteredMergersWithExecutors.size >= numMergersNeeded) { + filteredMergersWithExecutors.toSeq +} else { + // Delta mergers added from inactive mergers list to the active mergers list + val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host) + val filteredMergersWithoutExecutors = shuffleMergerLocations.values +.filterNot(x => hostsToFilter.contains(x.host)) +.filterNot(x => filteredMergersWithExecutorsHosts.contains(x.host)) + val randomFilteredMergersLocations = +if (filteredMergersWithoutExecutors.size > + numMergersNeeded - filteredMergersWithExecutors.size) { + Utils.randomize(filteredMergersWithoutExecutors) +} else { + filteredMergersWithoutExecutors +} + filteredMergersWithExecutors.toSeq ++ randomFilteredMergersLocations +.take(numMergersNeeded - filteredMergersWithExecutors.size) Review comment: If # elements in `randomFilteredMergersLocations` is less than `numMergersNeeded - filteredMergersWithExecutors.size`, `take()` would just return all elements. What's the reason for `take()` cannot be called here if `randomize()` is not performed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s
Victsm commented on a change in pull request #30164: URL: https://github.com/apache/spark/pull/30164#discussion_r524527610 ## File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ## @@ -657,6 +688,43 @@ class BlockManagerMasterEndpoint( } } + private def getShufflePushMergerLocations( + numMergersNeeded: Int, + hostsToFilter: Set[String]): Seq[BlockManagerId] = { +val blockManagersWithExecutors = blockManagerIdByExecutor.groupBy(_._2.host) + .mapValues(_.head).values.map(_._2).toSet +val filteredBlockManagersWithExecutors = blockManagersWithExecutors + .filterNot(x => hostsToFilter.contains(x.host)) +val filteredMergersWithExecutors = filteredBlockManagersWithExecutors.map( + x => BlockManagerId(x.executorId, x.host, StorageUtils.externalShuffleServicePort(conf))) Review comment: Thanks for pointing this one out. We currently use empty String for `executorId` part of a `BlockManagerId` for shuffle push merger locations. This part should conform to the convention. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s
Victsm commented on a change in pull request #30164: URL: https://github.com/apache/spark/pull/30164#discussion_r524527610 ## File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ## @@ -657,6 +688,43 @@ class BlockManagerMasterEndpoint( } } + private def getShufflePushMergerLocations( + numMergersNeeded: Int, + hostsToFilter: Set[String]): Seq[BlockManagerId] = { +val blockManagersWithExecutors = blockManagerIdByExecutor.groupBy(_._2.host) + .mapValues(_.head).values.map(_._2).toSet +val filteredBlockManagersWithExecutors = blockManagersWithExecutors + .filterNot(x => hostsToFilter.contains(x.host)) +val filteredMergersWithExecutors = filteredBlockManagersWithExecutors.map( + x => BlockManagerId(x.executorId, x.host, StorageUtils.externalShuffleServicePort(conf))) Review comment: Thanks for pointing this one out. We currently use empty String for `executorId` part of a `BlockManagerId` for shuffle push merger locations. This part should conform to the convention, and we should also do a dedup here since we could have multiple executors on the same host. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s
Victsm commented on a change in pull request #30164: URL: https://github.com/apache/spark/pull/30164#discussion_r523714996 ## File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ## @@ -657,6 +688,38 @@ class BlockManagerMasterEndpoint( } } + private def getShufflePushMergerLocations( + numMergersNeeded: Int, + hostsToFilter: Set[String]): Seq[BlockManagerId] = { +val blockManagersWithExecutors = blockManagerIdByExecutor.groupBy(_._2.host) + .mapValues(_.head).values.map(_._2).toSet +val filteredBlockManagersWithExecutors = blockManagersWithExecutors + .filterNot(x => hostsToFilter.contains(x.host)) +val filteredMergersWithExecutors = filteredBlockManagersWithExecutors.map( + x => BlockManagerId(x.executorId, x.host, StorageUtils.externalShuffleServicePort(conf))) + +// Enough mergers are available as part of active executors list +if (filteredMergersWithExecutors.size >= numMergersNeeded) { + filteredMergersWithExecutors.toSeq +} else { + // Delta mergers added from inactive mergers list to the active mergers list + val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host) + // Pick random hosts instead of preferring the top of the list + val randomizedShuffleMergerLocations = Utils.randomize(shuffleMergerLocations.values.toSeq) Review comment: @mridulm shouldn't it be the following instead? ``` val filteredMergersWithoutExecutors = shuffleMergerLocations.values.toSeq .filterNot() .filterNot() ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s
Victsm commented on a change in pull request #30164: URL: https://github.com/apache/spark/pull/30164#discussion_r523713075 ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -1252,6 +1254,28 @@ private[spark] class DAGScheduler( execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores)) } + /** + * If push based shuffle is enabled, set the shuffle services to be used for the given + * shuffle map stage. The list of shuffle services is determined based on the list of + * active executors tracked by block manager master at the start of the stage. + */ + private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage) { +// TODO: Handle stage reuse/retry cases separately as without finalize changes we cannot +// TODO: disable shuffle merge for the retry/reuse cases +val mergerLocs = sc.schedulerBackend.getMergerLocations( + stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) +logDebug(s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") + +if (mergerLocs.nonEmpty) { + stage.shuffleDep.setMergerLocs(mergerLocs) + logInfo("Shuffle merge enabled for %s (%s) with %d merger locations" +.format(stage, stage.name, stage.shuffleDep.getMergerLocs.size)) +} else { + stage.shuffleDep.setShuffleMergeEnabled(false) + logInfo("Shuffle merge disabled for %s (%s)".format(stage, stage.name)) +} Review comment: Should we expose the list of merger locations or just a count of number of merges for a given ShuffleMapStage? Having a long list of merge locations might increase the event size. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s
Victsm commented on a change in pull request #30164: URL: https://github.com/apache/spark/pull/30164#discussion_r523712631 ## File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ## @@ -360,6 +371,17 @@ class BlockManagerMasterEndpoint( } + private def addMergerLocation(blockManagerId: BlockManagerId): Unit = { +if (!shuffleMergerLocations.contains(blockManagerId.host) && !blockManagerId.isDriver) { + val shuffleServerId = BlockManagerId(blockManagerId.executorId, blockManagerId.host, +StorageUtils.externalShuffleServicePort(conf)) + if (shuffleMergerLocations.size >= maxRetainedMergerLocations) { +shuffleMergerLocations -= shuffleMergerLocations.head._1 Review comment: Just to clarify, removing a merger does not remove the merged shuffle data on that location. It only prevents using that location for future shuffles. Reducers will still be able to fetch merged shuffle data from the removed merger locations. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s
Victsm commented on a change in pull request #30164: URL: https://github.com/apache/spark/pull/30164#discussion_r523711623 ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -1252,6 +1254,32 @@ private[spark] class DAGScheduler( execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores)) } + /** + * If push based shuffle is enabled, set the shuffle services to be used for the given + * shuffle map stage for block push/merge. + * + * Even with dynamic resource allocation kicking in and significantly reducing the number + * of available active executors, we would still be able to get sufficient shuffle service + * locations for block push/merge by getting the historical locations of past executors. + */ + private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = { +// TODO SPARK-32920: Handle stage reuse/retry cases separately as without finalize +// TODO changes we cannot disable shuffle merge for the retry/reuse cases +val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( + stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) + +if (mergerLocs.nonEmpty) { + stage.shuffleDep.setMergerLocs(mergerLocs) + logInfo(s"Shuffle merge enabled for $stage (${stage.name}) with" + Review comment: Nit: Change to "Push-based shuffle enabled for stage ..." This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s
Victsm commented on a change in pull request #30164: URL: https://github.com/apache/spark/pull/30164#discussion_r523711581 ## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ## @@ -1252,6 +1254,32 @@ private[spark] class DAGScheduler( execCores.map(cores => properties.setProperty(EXECUTOR_CORES_LOCAL_PROPERTY, cores)) } + /** + * If push based shuffle is enabled, set the shuffle services to be used for the given + * shuffle map stage for block push/merge. + * + * Even with dynamic resource allocation kicking in and significantly reducing the number + * of available active executors, we would still be able to get sufficient shuffle service + * locations for block push/merge by getting the historical locations of past executors. + */ + private def prepareShuffleServicesForShuffleMapStage(stage: ShuffleMapStage): Unit = { +// TODO SPARK-32920: Handle stage reuse/retry cases separately as without finalize +// TODO changes we cannot disable shuffle merge for the retry/reuse cases +val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations( + stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId) + +if (mergerLocs.nonEmpty) { + stage.shuffleDep.setMergerLocs(mergerLocs) + logInfo(s"Shuffle merge enabled for $stage (${stage.name}) with" + +s" ${stage.shuffleDep.getMergerLocs.size} merger locations") + + logDebug("List of shuffle push merger locations " + +s"${stage.shuffleDep.getMergerLocs.map(_.host).mkString(", ")}") +} else { + logInfo(s"No available merger locations. Shuffle merge disabled for $stage (${stage.name})") Review comment: Nit: Change to "Push-based shuffle disabled for stage..." This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s
Victsm commented on a change in pull request #30164: URL: https://github.com/apache/spark/pull/30164#discussion_r523711229 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -1938,4 +1938,51 @@ package object config { .version("3.0.1") .booleanConf .createWithDefault(false) + + private[spark] val PUSH_BASED_SHUFFLE_ENABLED = +ConfigBuilder("spark.shuffle.push.enabled") + .doc("Set to 'true' to enable push based shuffle on the client side and this works in " + +"conjunction with the server side flag spark.shuffle.server.mergedShuffleFileManagerImpl " + +"which needs to be set with the appropriate " + +"org.apache.spark.network.shuffle.MergedShuffleFileManager implementation for push-based " + +"shuffle to be enabled") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + private[spark] val SHUFFLE_MERGERS_MAX_RETAINED_LOCATIONS = +ConfigBuilder("spark.shuffle.push.retainedMergerLocations") Review comment: Should the config name be "spark.shuffle.push.maxRetainedMergerLocations" to be more consistent? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Victsm commented on a change in pull request #30164: [SPARK-32919][SHUFFLE][test-maven][test-hadoop2.7] Driver side changes for coordinating push based shuffle by selecting external s
Victsm commented on a change in pull request #30164: URL: https://github.com/apache/spark/pull/30164#discussion_r523710562 ## File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ## @@ -657,6 +681,13 @@ class BlockManagerMasterEndpoint( } } + private def getShufflePushMergerLocations( + numMergersNeeded: Int, + hostsToFilter: Set[String]): Seq[BlockManagerId] = { +val mergers = shuffleMergerLocations.values.filterNot(x => hostsToFilter.contains(x.host)).toSeq +mergers.take(numMergersNeeded) Review comment: I thought the reason we didn't do that internally is because the merger locations is picked at the beginning of the map stage and is only relevant with executor placement of the following reduce stage. Since there is no guarantee on whether the active executors at the beginning of the map stage would still be active at the beginning of the reduce stage, we didn't do that. Is that reasoning not applicable any more? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org