[spark] branch master updated (b623c03 -> cf74901)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b623c03 [SPARK-32381][CORE][FOLLOWUP][TEST-HADOOP2.7] Don't remove SerializableFileStatus and SerializableBlockLocation for Hadoop 2.7 add cf74901 Revert "[SPARK-28704][SQL][TEST] Add back Skiped HiveExternalCatalogVersionsSuite in HiveSparkSubmitSuite at JDK9+" No new revisions were added by this update. Summary of changes: .../hive/HiveExternalCatalogVersionsSuite.scala| 22 ++ 1 file changed, 14 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (530c0a8 -> b623c03)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 530c0a8 [SPARK-33505][SQL][TESTS] Fix adding new partitions by INSERT INTO `InMemoryPartitionTable` add b623c03 [SPARK-32381][CORE][FOLLOWUP][TEST-HADOOP2.7] Don't remove SerializableFileStatus and SerializableBlockLocation for Hadoop 2.7 No new revisions were added by this update. Summary of changes: .../org/apache/spark/util/HadoopFSUtils.scala | 61 +- 1 file changed, 60 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (67c6ed9 -> 530c0a8)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 67c6ed9 [SPARK-33223][SS][FOLLOWUP] Clarify the meaning of "number of rows dropped by watermark" in SS UI page add 530c0a8 [SPARK-33505][SQL][TESTS] Fix adding new partitions by INSERT INTO `InMemoryPartitionTable` No new revisions were added by this update. Summary of changes: .../sql/connector/InMemoryPartitionTable.scala | 4 .../apache/spark/sql/connector/InMemoryTable.scala | 3 +++ .../spark/sql/connector/DataSourceV2SQLSuite.scala | 21 + 3 files changed, 28 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (de0f50a -> 67c6ed9)
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from de0f50a [SPARK-32670][SQL] Group exception messages in Catalyst Analyzer in one file add 67c6ed9 [SPARK-33223][SS][FOLLOWUP] Clarify the meaning of "number of rows dropped by watermark" in SS UI page No new revisions were added by this update. Summary of changes: .../spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala | 10 +- .../org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala| 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2479778 -> de0f50a)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2479778 [SPARK-33492][SQL] DSv2: Append/Overwrite/ReplaceTable should invalidate cache add de0f50a [SPARK-32670][SQL] Group exception messages in Catalyst Analyzer in one file No new revisions were added by this update. Summary of changes: .../apache/spark/sql/QueryCompilationErrors.scala | 164 + .../spark/sql/catalyst/analysis/Analyzer.scala | 84 --- 2 files changed, 192 insertions(+), 56 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a1a3d5c -> 2479778)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a1a3d5c [MINOR][TESTS][DOCS] Use fully-qualified class name in docker integration test add 2479778 [SPARK-33492][SQL] DSv2: Append/Overwrite/ReplaceTable should invalidate cache No new revisions were added by this update. Summary of changes: .../datasources/v2/DataSourceV2Strategy.scala | 13 ++-- .../datasources/v2/V1FallbackWriters.scala | 21 -- .../datasources/v2/WriteToDataSourceV2Exec.scala | 38 ++- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 78 + .../spark/sql/connector/V1WriteFallbackSuite.scala | 79 +- 5 files changed, 212 insertions(+), 17 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (116b7b7 -> a1a3d5c)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 116b7b7 [SPARK-33466][ML][PYTHON] Imputer support mode(most_frequent) strategy add a1a3d5c [MINOR][TESTS][DOCS] Use fully-qualified class name in docker integration test No new revisions were added by this update. Summary of changes: .../src/test/scala/org/apache/spark/sql/jdbc/DB2IntegrationSuite.scala | 3 ++- .../scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala | 3 ++- .../test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala | 3 ++- .../scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala | 3 ++- 4 files changed, 8 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-33472][SQL][2.4] Adjust RemoveRedundantSorts rule order
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 6705912 [SPARK-33472][SQL][2.4] Adjust RemoveRedundantSorts rule order 6705912 is described below commit 67059127a8e8218dc2a8abae6095c1aa299ce175 Author: allisonwang-db <66282705+allisonwang...@users.noreply.github.com> AuthorDate: Fri Nov 20 10:02:26 2020 -0800 [SPARK-33472][SQL][2.4] Adjust RemoveRedundantSorts rule order Backport #30373 for branch-2.4. ### What changes were proposed in this pull request? This PR switched the order for the rule `RemoveRedundantSorts` and `EnsureRequirements` so that `EnsureRequirements` will be invoked before `RemoveRedundantSorts` to avoid IllegalArgumentException when instantiating PartitioningCollection. ### Why are the changes needed? `RemoveRedundantSorts` rule uses SparkPlan's `outputPartitioning` to check whether a sort node is redundant. Currently, it is added before `EnsureRequirements`. Since `PartitioningCollection` requires left and right partitioning to have the same number of partitions, which is not necessarily true before applying `EnsureRequirements`, the rule can fail with the following exception: ``` IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #30437 from allisonwang-db/spark-33472-2.4. Authored-by: allisonwang-db <66282705+allisonwang...@users.noreply.github.com> Signed-off-by: Dongjoon Hyun --- .../spark/sql/execution/QueryExecution.scala | 4 +++- .../org/apache/spark/sql/execution/SparkPlan.scala | 7 +- .../sql/execution/RemoveRedundantSortsSuite.scala | 27 ++ 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index b92f346..0b9c469 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -97,8 +97,10 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { /** A sequence of rules that will be applied in order to the physical plan before execution. */ protected def preparations: Seq[Rule[SparkPlan]] = Seq( PlanSubqueries(sparkSession), -RemoveRedundantSorts(sparkSession.sessionState.conf), EnsureRequirements(sparkSession.sessionState.conf), +// `RemoveRedundantSorts` needs to be added before `EnsureRequirements` to guarantee the same +// number of partitions when instantiating PartitioningCollection. +RemoveRedundantSorts(sparkSession.sessionState.conf), CollapseCodegenStages(sparkSession.sessionState.conf), ReuseExchange(sparkSession.sessionState.conf), ReuseSubquery(sparkSession.sessionState.conf)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 7646f96..28addf6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -91,7 +91,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ def longMetric(name: String): SQLMetric = metrics(name) // TODO: Move to `DistributedPlan` - /** Specifies how data is partitioned across different nodes in the cluster. */ + /** + * Specifies how data is partitioned across different nodes in the cluster. + * Note this method may fail if it is invoked before `EnsureRequirements` is applied + * since `PartitioningCollection` requires all its partitionings to have + * the same number of partitions. + */ def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH! /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala index f7987e2..b82e5cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.plans.physical.{RangePartitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.in
[spark] branch branch-3.0 updated: [SPARK-33472][SQL][3.0] Adjust RemoveRedundantSorts rule order
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 1e525c1 [SPARK-33472][SQL][3.0] Adjust RemoveRedundantSorts rule order 1e525c1 is described below commit 1e525c1156ee5558bda3937751588a50bc132c38 Author: allisonwang-db <66282705+allisonwang...@users.noreply.github.com> AuthorDate: Fri Nov 20 09:47:54 2020 -0800 [SPARK-33472][SQL][3.0] Adjust RemoveRedundantSorts rule order Backport #30373 for branch-3.0. ### What changes were proposed in this pull request? This PR switched the order for the rule `RemoveRedundantSorts` and `EnsureRequirements` so that `EnsureRequirements` will be invoked before `RemoveRedundantSorts` to avoid IllegalArgumentException when instantiating PartitioningCollection. ### Why are the changes needed? `RemoveRedundantSorts` rule uses SparkPlan's `outputPartitioning` to check whether a sort node is redundant. Currently, it is added before `EnsureRequirements`. Since `PartitioningCollection` requires left and right partitioning to have the same number of partitions, which is not necessarily true before applying `EnsureRequirements`, the rule can fail with the following exception: ``` IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions. ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #30438 from allisonwang-db/spark-33472-3.0. Authored-by: allisonwang-db <66282705+allisonwang...@users.noreply.github.com> Signed-off-by: Dongjoon Hyun --- .../spark/sql/execution/QueryExecution.scala | 4 +++- .../org/apache/spark/sql/execution/SparkPlan.scala | 7 +- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 4 ++-- .../sql/execution/RemoveRedundantSortsSuite.scala | 25 ++ 4 files changed, 36 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 574a67f..7f5a5e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -297,8 +297,10 @@ object QueryExecution { Seq( PlanDynamicPruningFilters(sparkSession), PlanSubqueries(sparkSession), - RemoveRedundantSorts(sparkSession.sessionState.conf), EnsureRequirements(sparkSession.sessionState.conf), + // `RemoveRedundantSorts` needs to be added before `EnsureRequirements` to guarantee the same + // number of partitions when instantiating PartitioningCollection. + RemoveRedundantSorts(sparkSession.sessionState.conf), ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf, sparkSession.sessionState.columnarRules), CollapseCodegenStages(sparkSession.sessionState.conf), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index ead8c00..062aa69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -135,7 +135,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ def longMetric(name: String): SQLMetric = metrics(name) // TODO: Move to `DistributedPlan` - /** Specifies how data is partitioned across different nodes in the cluster. */ + /** + * Specifies how data is partitioned across different nodes in the cluster. + * Note this method may fail if it is invoked before `EnsureRequirements` is applied + * since `PartitioningCollection` requires all its partitionings to have + * the same number of partitions. + */ def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH! /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 4e73f06..187827c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -90,8 +90,8 @@ case class AdaptiveSparkPlanExec( // plan should reach a final status of query stages (i.e., no more addition or removal of // Exchange nodes) after running these rules. private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( -removeRedundantSorts, -ensureRequirements +ensureRequire
[spark] branch master updated (47326ac -> 116b7b7)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 47326ac [SPARK-28704][SQL][TEST] Add back Skiped HiveExternalCatalogVersionsSuite in HiveSparkSubmitSuite at JDK9+ add 116b7b7 [SPARK-33466][ML][PYTHON] Imputer support mode(most_frequent) strategy No new revisions were added by this update. Summary of changes: .../org/apache/spark/ml/feature/Imputer.scala | 49 +++-- .../org/apache/spark/ml/feature/ImputerSuite.scala | 211 +++-- python/pyspark/ml/feature.py | 5 +- 3 files changed, 144 insertions(+), 121 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (3384bda -> 47326ac)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 3384bda [SPARK-33468][SQL] ParseUrl in ANSI mode should fail if input string is not a valid url add 47326ac [SPARK-28704][SQL][TEST] Add back Skiped HiveExternalCatalogVersionsSuite in HiveSparkSubmitSuite at JDK9+ No new revisions were added by this update. Summary of changes: .../hive/HiveExternalCatalogVersionsSuite.scala| 22 -- 1 file changed, 8 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (cbc8be2 -> 3384bda)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from cbc8be2 [SPARK-33422][DOC] Fix the correct display of left menu item add 3384bda [SPARK-33468][SQL] ParseUrl in ANSI mode should fail if input string is not a valid url No new revisions were added by this update. Summary of changes: docs/sql-ref-ansi-compliance.md| 1 + .../spark/sql/catalyst/expressions/stringExpressions.scala | 7 +-- .../sql/catalyst/expressions/StringExpressionsSuite.scala | 14 ++ 3 files changed, 20 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-33422][DOC] Fix the correct display of left menu item
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new d7c2dae [SPARK-33422][DOC] Fix the correct display of left menu item d7c2dae is described below commit d7c2daebeed2ac2b858dbe91025894f0ff964a21 Author: liucht AuthorDate: Fri Nov 20 22:19:35 2020 +0900 [SPARK-33422][DOC] Fix the correct display of left menu item ### What changes were proposed in this pull request? Limit the height of the menu area on the left to display vertical scroll bar ### Why are the changes needed? The bottom menu item cannot be displayed when the left menu tree is long ### Does this PR introduce any user-facing change? Yes, if the menu item shows more, you'll see it by pulling down the vertical scroll bar before: ![image](https://user-images.githubusercontent.com/28332082/98805115-16995d80-2452-11eb-933a-3b72c14bea78.png) after: ![image](https://user-images.githubusercontent.com/28332082/98805418-7e4fa880-2452-11eb-9a9b-8d265078297c.png) ### How was this patch tested? NA Closes #30335 from liucht-inspur/master. Authored-by: liucht Signed-off-by: HyukjinKwon (cherry picked from commit cbc8be24c896ed25be63ef9a111ff015af4fabec) Signed-off-by: HyukjinKwon --- docs/css/main.css | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/css/main.css b/docs/css/main.css index bb34d6e..b873992 100755 --- a/docs/css/main.css +++ b/docs/css/main.css @@ -79,6 +79,7 @@ body .container-wrapper { margin-right: auto; border-radius: 15px; position: relative; + min-height: 100vh; } .title { @@ -159,6 +160,7 @@ a:hover code { max-width: 914px; line-height: 1.6; /* Inspired by Github's wiki style */ padding-left: 30px; + min-height: 100vh; } .dropdown-menu { @@ -239,6 +241,7 @@ a.anchorjs-link:hover { text-decoration: none; } border-bottom-width: 0px; margin-top: 0px; width: 210px; + height: 80%; float: left; position: fixed; overflow-y: scroll; - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (870d409 -> cbc8be2)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 870d409 [SPARK-32512][SQL][TESTS][FOLLOWUP] Remove duplicate tests for ALTER TABLE .. PARTITIONS from DataSourceV2SQLSuite add cbc8be2 [SPARK-33422][DOC] Fix the correct display of left menu item No new revisions were added by this update. Summary of changes: docs/css/main.css | 3 +++ 1 file changed, 3 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2289389 -> 870d409)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2289389 [SPARK-33441][BUILD][FOLLOWUP] Make unused-imports check for SBT specific add 870d409 [SPARK-32512][SQL][TESTS][FOLLOWUP] Remove duplicate tests for ALTER TABLE .. PARTITIONS from DataSourceV2SQLSuite No new revisions were added by this update. Summary of changes: .../spark/sql/connector/DataSourceV2SQLSuite.scala | 53 -- 1 file changed, 53 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (8218b48 -> 2289389)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 8218b48 [SPARK-32919][SHUFFLE][TEST-MAVEN][TEST-HADOOP2.7] Driver side changes for coordinating push based shuffle by selecting external shuffle services for merging partitions add 2289389 [SPARK-33441][BUILD][FOLLOWUP] Make unused-imports check for SBT specific No new revisions were added by this update. Summary of changes: pom.xml | 5 + project/SparkBuild.scala | 3 +++ 2 files changed, 4 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-32919][SHUFFLE][TEST-MAVEN][TEST-HADOOP2.7] Driver side changes for coordinating push based shuffle by selecting external shuffle services for merging partitions
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8218b48 [SPARK-32919][SHUFFLE][TEST-MAVEN][TEST-HADOOP2.7] Driver side changes for coordinating push based shuffle by selecting external shuffle services for merging partitions 8218b48 is described below commit 8218b488035049434271dc9e3bd5af45ffadf0fd Author: Venkata krishnan Sowrirajan AuthorDate: Fri Nov 20 06:00:30 2020 -0600 [SPARK-32919][SHUFFLE][TEST-MAVEN][TEST-HADOOP2.7] Driver side changes for coordinating push based shuffle by selecting external shuffle services for merging partitions ### What changes were proposed in this pull request? Driver side changes for coordinating push based shuffle by selecting external shuffle services for merging partitions. This PR includes changes related to `ShuffleMapStage` preparation which is selection of merger locations and initializing them as part of `ShuffleDependency`. Currently this code is not used as some of the changes would come subsequently as part of https://issues.apache.org/jira/browse/SPARK-32917 (shuffle blocks push as part of `ShuffleMapTask`), https://issues.apache.org/jira/browse/SPARK-32918 (support for finalize API) and https://issues.apache.org/jira/browse/SPARK-32920 (finalization of push/merge phase). This is why the tests here are also partial, once these above mentioned changes are raised as PR we will have enough tests for DAGS [...] ### Why are the changes needed? Added a new API in `SchedulerBackend` to get merger locations for push based shuffle. This is currently implemented for Yarn and other cluster managers can have separate implementations which is why a new API is introduced. ### Does this PR introduce _any_ user-facing change? Yes, user facing config to enable push based shuffle is introduced ### How was this patch tested? Added unit tests partially and some of the changes in DAGScheduler depends on future changes, DAGScheduler tests will be added along with those changes. Lead-authored-by: Venkata krishnan Sowrirajan vsowrirajanlinkedin.com Co-authored-by: Min Shen mshenlinkedin.com Closes #30164 from venkata91/upstream-SPARK-32919. Lead-authored-by: Venkata krishnan Sowrirajan Co-authored-by: Min Shen Signed-off-by: Mridul Muralidharan gmail.com> --- .../main/scala/org/apache/spark/Dependency.scala | 15 + .../org/apache/spark/internal/config/package.scala | 47 .../org/apache/spark/scheduler/DAGScheduler.scala | 40 + .../apache/spark/scheduler/SchedulerBackend.scala | 13 + .../org/apache/spark/storage/BlockManagerId.scala | 2 + .../apache/spark/storage/BlockManagerMaster.scala | 20 +++ .../spark/storage/BlockManagerMasterEndpoint.scala | 65 ++ .../spark/storage/BlockManagerMessages.scala | 6 ++ .../main/scala/org/apache/spark/util/Utils.scala | 8 +++ .../apache/spark/storage/BlockManagerSuite.scala | 49 +++- .../scala/org/apache/spark/util/UtilsSuite.scala | 12 .../scheduler/cluster/YarnSchedulerBackend.scala | 50 +++-- 12 files changed, 320 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index ba8e4d6..d21b9d9 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -23,6 +23,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor} +import org.apache.spark.storage.BlockManagerId /** * :: DeveloperApi :: @@ -95,6 +96,20 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, this) + /** + * Stores the location of the list of chosen external shuffle services for handling the + * shuffle merge requests from mappers in this shuffle map stage. + */ + private[spark] var mergerLocs: Seq[BlockManagerId] = Nil + + def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = { +if (mergerLocs != null) { + this.mergerLocs = mergerLocs +} + } + + def getMergerLocs: Seq[BlockManagerId] = mergerLocs + _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) _rdd.sparkContext.shuffleDriverComponents.registerShuffle(shuffleId) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 4bc4951..b38d0e5 100644 --- a/core/src/main/scala/org/apache/spark/in