This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit a253b1fed2e947e397b45b1db8f56d856eabc9fc Author: roryqi <jerqi1242949...@gmail.com> AuthorDate: Mon Jun 27 10:07:13 2022 +0800 [Improvement] Add dynamic allocation patch for Spark 3.2 (#199) ### What changes were proposed in this pull request? Add the dynamic allocation patch for Spark 3.2, solve issue #106 ### Why are the changes needed? If we don't have this patch, users can't use dynamic allocation in Spark 3.2. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual test --- README.md | 2 +- .../spark-3.2.1_dynamic_allocation_support.patch | 92 ++++++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 0fb65e5..9ad8299 100644 --- a/README.md +++ b/README.md @@ -155,7 +155,7 @@ rss-xxx.tgz will be generated for deployment ### Support Spark dynamic allocation To support spark dynamic allocation with Firestorm, spark code should be updated. -There are 2 patches for spark-2.4.6 and spark-3.1.2 in spark-patches folder for reference. +There are 3 patches for spark (2.4.6/3.1.2/3.2.1) in spark-patches folder for reference. After apply the patch and rebuild spark, add following configuration in spark conf to enable dynamic allocation: ``` diff --git a/spark-patches/spark-3.2.1_dynamic_allocation_support.patch b/spark-patches/spark-3.2.1_dynamic_allocation_support.patch new file mode 100644 index 0000000..1e195df --- /dev/null +++ b/spark-patches/spark-3.2.1_dynamic_allocation_support.patch @@ -0,0 +1,92 @@ +diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala +index 1b4e7ba5106..95818ff72ca 100644 +--- a/core/src/main/scala/org/apache/spark/Dependency.scala ++++ b/core/src/main/scala/org/apache/spark/Dependency.scala +@@ -174,8 +174,10 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( + !rdd.isBarrier() + } + +- _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) +- _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId) ++ if (!_rdd.context.getConf.isRssEnable()) { ++ _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) ++ _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId) ++ } + } + + +diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +index c4b619300b5..821a01985d9 100644 +--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ++++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +@@ -207,7 +207,9 @@ private[spark] class ExecutorAllocationManager( + // If dynamic allocation shuffle tracking or worker decommissioning along with + // storage shuffle decommissioning is enabled we have *experimental* support for + // decommissioning without a shuffle service. +- if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) || ++ if (conf.isRssEnable()) { ++ logInfo("Dynamic allocation will use remote shuffle service") ++ } else if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) || + (decommissionEnabled && + conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) { + logWarning("Dynamic allocation without a shuffle service is an experimental feature.") +diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala +index 5f37a1abb19..af4bee1e1bb 100644 +--- a/core/src/main/scala/org/apache/spark/SparkConf.scala ++++ b/core/src/main/scala/org/apache/spark/SparkConf.scala +@@ -580,6 +580,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria + Utils.redact(this, getAll).sorted.map { case (k, v) => k + "=" + v }.mkString("\n") + } + ++ /** ++ * Return true if remote shuffle service is enabled. ++ */ ++ def isRssEnable(): Boolean = get("spark.shuffle.manager", "sort").contains("RssShuffleManager") + } + + private[spark] object SparkConf extends Logging { +diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +index a82d261d545..72e54940ca2 100644 +--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ++++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +@@ -2231,7 +2231,8 @@ private[spark] class DAGScheduler( + // if the cluster manager explicitly tells us that the entire worker was lost, then + // we know to unregister shuffle output. (Note that "worker" specifically refers to the process + // from a Standalone cluster, where the shuffle service lives in the Worker.) +- val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled ++ val fileLost = (workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled) && ++ !sc.getConf.isRssEnable() + removeExecutorAndUnregisterOutputs( + execId = execId, + fileLost = fileLost, +diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +index 3b72103f993..8e03754941e 100644 +--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ++++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +@@ -1015,7 +1015,8 @@ private[spark] class TaskSetManager( + // and we are not using an external shuffle server which could serve the shuffle outputs. + // The reason is the next stage wouldn't be able to fetch the data from this dead executor + // so we would need to rerun these tasks on other executors. +- if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && !isZombie) { ++ if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled && ++ !isZombie && !conf.isRssEnable()) { + for ((tid, info) <- taskInfos if info.executorId == execId) { + val index = taskInfos(tid).index + // We may have a running task whose partition has been marked as successful, +diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +index 47d61196fe8..98a5381bef4 100644 +--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala ++++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +@@ -174,6 +174,9 @@ class ShuffledRowRDD( + } + + override def getPreferredLocations(partition: Partition): Seq[String] = { ++ if (conf.isRssEnable()) { ++ return Nil ++ } + val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + partition.asInstanceOf[ShuffledRowRDDPartition].spec match { + case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) => +-- +2.32.0 +