[spark] branch branch-3.0 updated: [SPARK-38570][SQL][3.0] Incorrect DynamicPartitionPruning caused by Literal
This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 4fc718fb [SPARK-38570][SQL][3.0] Incorrect DynamicPartitionPruning caused by Literal 4fc718fb is described below commit 4fc718fbc7e70784c250ea9315ccfc56cfaa5893 Author: mcdull-zhang AuthorDate: Sat Mar 26 12:48:08 2022 +0800 [SPARK-38570][SQL][3.0] Incorrect DynamicPartitionPruning caused by Literal This is a backport of #35878 to branch 3.0. The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column. For example, the sql in the test case will generate such a physical plan when the adaptive is closed: ```tex *(4) Project [store_id#5281, date_id#5283, state_province#5292] +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false :- Union : :- *(1) Project [4 AS store_id#5281, date_id#5283] : : +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300)) : : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] : : +- *(1) ColumnarToRow : :+- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct : : +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335] : :+- *(1) Project [store_id#5291, state_province#5292] : : +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291)) : : +- *(1) ColumnarToRow : : +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4), [...] : +- *(2) Project [5 AS store_id#5282, date_id#5287] : +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300)) :: +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] :+- *(2) ColumnarToRow : +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335] ``` after this pr: ```tex *(4) Project [store_id#5281, date_id#5283, state_province#5292] +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false :- Union : :- *(1) Project [4 AS store_id#5281, date_id#5283] : : +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) : : +- *(1) ColumnarToRow : :+- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct : +- *(2) Project [5 AS store_id#5282, date_id#5287] : +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) :+- *(2) ColumnarToRow
[spark] branch branch-3.1 updated: [SPARK-38570][SQL][3.1] Incorrect DynamicPartitionPruning caused by Literal
This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 789ec13 [SPARK-38570][SQL][3.1] Incorrect DynamicPartitionPruning caused by Literal 789ec13 is described below commit 789ec137c1e240de58152a06746a7defa001343c Author: mcdull-zhang AuthorDate: Sat Mar 26 12:48:08 2022 +0800 [SPARK-38570][SQL][3.1] Incorrect DynamicPartitionPruning caused by Literal This is a backport of #35878 to branch 3.1. The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column. For example, the sql in the test case will generate such a physical plan when the adaptive is closed: ```tex *(4) Project [store_id#5281, date_id#5283, state_province#5292] +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false :- Union : :- *(1) Project [4 AS store_id#5281, date_id#5283] : : +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300)) : : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] : : +- *(1) ColumnarToRow : :+- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct : : +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335] : :+- *(1) Project [store_id#5291, state_province#5292] : : +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291)) : : +- *(1) ColumnarToRow : : +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4), [...] : +- *(2) Project [5 AS store_id#5282, date_id#5287] : +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300)) :: +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] :+- *(2) ColumnarToRow : +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335] ``` after this pr: ```tex *(4) Project [store_id#5281, date_id#5283, state_province#5292] +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false :- Union : :- *(1) Project [4 AS store_id#5281, date_id#5283] : : +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) : : +- *(1) ColumnarToRow : :+- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct : +- *(2) Project [5 AS store_id#5282, date_id#5287] : +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) :+- *(2) ColumnarToRow :
[spark] branch branch-3.2 updated: [SPARK-38570][SQL][3.2] Incorrect DynamicPartitionPruning caused by Literal
This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 8621914 [SPARK-38570][SQL][3.2] Incorrect DynamicPartitionPruning caused by Literal 8621914 is described below commit 8621914e2052eeab25e6ac4e7d5f48b5570c71f7 Author: mcdull-zhang AuthorDate: Sat Mar 26 12:48:08 2022 +0800 [SPARK-38570][SQL][3.2] Incorrect DynamicPartitionPruning caused by Literal This is a backport of #35878 to branch 3.2. ### What changes were proposed in this pull request? The return value of Literal.references is an empty AttributeSet, so Literal is mistaken for a partition column. For example, the sql in the test case will generate such a physical plan when the adaptive is closed: ```tex *(4) Project [store_id#5281, date_id#5283, state_province#5292] +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false :- Union : :- *(1) Project [4 AS store_id#5281, date_id#5283] : : +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300)) : : : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] : : +- *(1) ColumnarToRow : :+- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct : : +- SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335] : :+- *(1) Project [store_id#5291, state_province#5292] : : +- *(1) Filter (((isnotnull(country#5293) AND (country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND isnotnull(store_id#5291)) : : +- *(1) ColumnarToRow : : +- FileScan parquet default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: true, DataFilters: [isnotnull(country#5293), (country#5293 = US), ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache, PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), Or(EqualNullSafe(store_id,4), [...] : +- *(2) Project [5 AS store_id#5282, date_id#5287] : +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300)) :: +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] :+- *(2) ColumnarToRow : +- FileScan parquet default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: [isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: struct : +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, [store_id#5291], [id=#336] +- ReusedExchange [store_id#5291, state_province#5292], BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#335] ``` after this pr: ```tex *(4) Project [store_id#5281, date_id#5283, state_province#5292] +- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, BuildRight, false :- Union : :- *(1) Project [4 AS store_id#5281, date_id#5283] : : +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300)) : : +- *(1) ColumnarToRow : :+- FileScan parquet default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: [isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: CatalogFileIndex(1 paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s..., PartitionFilters: [], PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], ReadSchema: struct : +- *(2) Project [5 AS store_id#5282, date_id#5287] : +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287
[spark] branch master updated: [SPARK-38336][SQL] Support DEFAULT column values in CREATE/REPLACE TABLE statements
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4e95738 [SPARK-38336][SQL] Support DEFAULT column values in CREATE/REPLACE TABLE statements 4e95738 is described below commit 4e95738fdfc334c25f44689ff8c2db5aa7c726f2 Author: Daniel Tenedorio AuthorDate: Sat Mar 26 12:09:56 2022 +0800 [SPARK-38336][SQL] Support DEFAULT column values in CREATE/REPLACE TABLE statements ### What changes were proposed in this pull request? Extend CREATE TABLE and REPLACE TABLE statements to support columns with DEFAULT values. This information will be stored in the column metadata. ### Why are the changes needed? This builds the foundation for future work (not included in this PR) to support INSERT INTO statements, which may then omit the default values or refer to them explicitly with the DEFAULT keyword, in which case the Spark analyzer will automatically insert the appropriate corresponding values in the right places. Example: ``` CREATE TABLE T(a INT DEFAULT 4, b INT NOT NULL DEFAULT 5); INSERT INTO T VALUES (1); INSERT INTO T VALUES (1, DEFAULT); INSERT INTO T VALUES (DEFAULT, 6); SELECT * FROM T; (1, 5) (1, 5) (4, 6) ``` ### How was this patch tested? This change is covered by new and existing unit test coverage as well as new INSERT INTO query test cases covering a variety of positive and negative scenarios. Closes #35855 from dtenedor/default-cols-create-table. Authored-by: Daniel Tenedorio Signed-off-by: Gengliang Wang --- .../spark/sql/catalyst/parser/AstBuilder.scala | 34 - .../sql/catalyst/util/ResolveDefaultColumns.scala | 153 + .../spark/sql/errors/QueryParsingErrors.scala | 4 + .../org/apache/spark/sql/internal/SQLConf.scala| 13 ++ .../catalyst/catalog/ExternalCatalogSuite.scala| 44 -- .../sql/catalyst/catalog/SessionCatalogSuite.scala | 62 + .../spark/sql/catalyst/parser/DDLParserSuite.scala | 56 ++-- .../datasources/v2/DataSourceV2Strategy.scala | 15 +- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 14 +- .../org/apache/spark/sql/hive/InsertSuite.scala| 22 +-- 10 files changed, 369 insertions(+), 48 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 3fcd8d8..01e627f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, Set} import org.antlr.v4.runtime.{ParserRuleContext, Token} +import org.antlr.v4.runtime.misc.Interval import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode} import org.apache.commons.codec.DecoderException import org.apache.commons.codec.binary.Hex @@ -39,7 +40,7 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.CurrentOrigin -import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, IntervalUtils} +import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, IntervalUtils, ResolveDefaultColumns} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, stringToTimestamp, stringToTimestampWithoutTimeZone} import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog} import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition @@ -2788,13 +2789,18 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit Option(commentSpec()).map(visitCommentSpec).foreach { builder.putString("comment", _) } +// Add the 'DEFAULT expression' clause in the column definition, if any, to the column metadata. +Option(ctx.defaultExpression()).map(visitDefaultExpression).foreach { field => + if (conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) { +// Add default to metadata + builder.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, field) + builder.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, field) + } else { +throw QueryParsingErrors.defaultColumnNotEnabledError(ctx) + } +} -// Process the 'DEFAULT expression' clause in the column definition, if any. val name: String = colName.getText -val defaultExpr =
[spark] branch master updated (9a7596e -> 8262a7b)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9a7596e [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors add 8262a7b [SPARK-38219][SQL] Support ANSI aggregation function `percentile_cont` as window function No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 2 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 6 +- .../expressions/aggregate/PercentileCont.scala | 41 +++ .../spark/sql/catalyst/parser/AstBuilder.scala | 17 ++- .../sql/catalyst/parser/PlanParserSuite.scala | 6 +- .../src/test/resources/sql-tests/inputs/window.sql | 57 + .../resources/sql-tests/results/group-by.sql.out | 4 +- .../resources/sql-tests/results/window.sql.out | 131 - 8 files changed, 251 insertions(+), 13 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileCont.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors
This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 2d47076 [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors 2d47076 is described below commit 2d470763ecbcccde418956b03e503461352ab4c2 Author: Adam Binford AuthorDate: Fri Mar 25 13:00:17 2022 -0500 [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors ### What changes were proposed in this pull request? Add support for removing shuffle files on released executors via the external shuffle service. The shuffle service already supports removing shuffle service cached RDD blocks, so I reused this mechanism to remove shuffle blocks as well, so as not to require updating the shuffle service itself. To support this change functioning in a secure Yarn environment, I updated permissions on some of the block manager folders and files. Specifically: - Block manager sub directories have the group write posix permission added to them. This gives the shuffle service permission to delete files from within these folders. - Shuffle files have the world readable posix permission added to them. This is because when the sub directories are marked group writable, they lose the setgid bit that gets set in a secure Yarn environment. Without this, the permissions on the files would be `rw-r-`, and since the group running Yarn (and therefore the shuffle service), is no longer the group owner of the file, it does not have access to read the file. The sub directories still do not have world execute permissio [...] Both of these changes are done after creating a file so that umasks don't affect the resulting permissions. ### Why are the changes needed? External shuffle services are very useful for long running jobs and dynamic allocation. However, currently if an executor is removed (either through dynamic deallocation or through some error), the shuffle files created by that executor will live until the application finishes. This results in local disks slowly filling up over time, eventually causing problems for long running applications. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. Not sure if there's a better way I could have tested for the files being deleted or any other tests I should add. Closes #35085 from Kimahriman/shuffle-service-remove-shuffle-blocks. Authored-by: Adam Binford Signed-off-by: Thomas Graves (cherry picked from commit 9a7596e1dde0f1dd596aa6d3b2efbcb5d1ef70ea) Signed-off-by: Thomas Graves --- .../network/shuffle/ExternalBlockStoreClient.java | 4 +- .../sort/io/LocalDiskShuffleMapOutputWriter.java | 3 +- .../scala/org/apache/spark/ContextCleaner.scala| 4 +- .../src/main/scala/org/apache/spark/SparkEnv.scala | 6 +- .../org/apache/spark/internal/config/package.scala | 10 ++ .../spark/shuffle/IndexShuffleBlockResolver.scala | 18 ++- .../spark/shuffle/ShuffleBlockResolver.scala | 8 ++ .../spark/storage/BlockManagerMasterEndpoint.scala | 89 +++ .../apache/spark/storage/DiskBlockManager.scala| 61 +- .../scala/org/apache/spark/storage/DiskStore.scala | 10 ++ .../shuffle/sort/UnsafeShuffleWriterSuite.java | 8 ++ .../apache/spark/ExternalShuffleServiceSuite.scala | 127 - .../sort/BypassMergeSortShuffleWriterSuite.scala | 11 ++ .../sort/IndexShuffleBlockResolverSuite.scala | 5 + .../io/LocalDiskShuffleMapOutputWriterSuite.scala | 5 + .../storage/BlockManagerReplicationSuite.scala | 3 +- .../apache/spark/storage/BlockManagerSuite.scala | 3 +- .../spark/storage/DiskBlockManagerSuite.scala | 26 - docs/configuration.md | 11 ++ .../streaming/ReceivedBlockHandlerSuite.scala | 3 +- 20 files changed, 372 insertions(+), 43 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index d2df776..b066d99 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -299,7 +299,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient { BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response); numRemovedBlocksFuture.complete(((BlocksRemoved) msgObj).numRemovedBlocks); } catch (Throwable t) { -
[spark] branch master updated: [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors
This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9a7596e [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors 9a7596e is described below commit 9a7596e1dde0f1dd596aa6d3b2efbcb5d1ef70ea Author: Adam Binford AuthorDate: Fri Mar 25 13:00:17 2022 -0500 [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors ### What changes were proposed in this pull request? Add support for removing shuffle files on released executors via the external shuffle service. The shuffle service already supports removing shuffle service cached RDD blocks, so I reused this mechanism to remove shuffle blocks as well, so as not to require updating the shuffle service itself. To support this change functioning in a secure Yarn environment, I updated permissions on some of the block manager folders and files. Specifically: - Block manager sub directories have the group write posix permission added to them. This gives the shuffle service permission to delete files from within these folders. - Shuffle files have the world readable posix permission added to them. This is because when the sub directories are marked group writable, they lose the setgid bit that gets set in a secure Yarn environment. Without this, the permissions on the files would be `rw-r-`, and since the group running Yarn (and therefore the shuffle service), is no longer the group owner of the file, it does not have access to read the file. The sub directories still do not have world execute permissio [...] Both of these changes are done after creating a file so that umasks don't affect the resulting permissions. ### Why are the changes needed? External shuffle services are very useful for long running jobs and dynamic allocation. However, currently if an executor is removed (either through dynamic deallocation or through some error), the shuffle files created by that executor will live until the application finishes. This results in local disks slowly filling up over time, eventually causing problems for long running applications. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. Not sure if there's a better way I could have tested for the files being deleted or any other tests I should add. Closes #35085 from Kimahriman/shuffle-service-remove-shuffle-blocks. Authored-by: Adam Binford Signed-off-by: Thomas Graves --- .../network/shuffle/ExternalBlockStoreClient.java | 4 +- .../sort/io/LocalDiskShuffleMapOutputWriter.java | 3 +- .../scala/org/apache/spark/ContextCleaner.scala| 4 +- .../src/main/scala/org/apache/spark/SparkEnv.scala | 6 +- .../org/apache/spark/internal/config/package.scala | 10 ++ .../spark/shuffle/IndexShuffleBlockResolver.scala | 18 ++- .../spark/shuffle/ShuffleBlockResolver.scala | 8 ++ .../spark/storage/BlockManagerMasterEndpoint.scala | 89 +++ .../apache/spark/storage/DiskBlockManager.scala| 61 +- .../scala/org/apache/spark/storage/DiskStore.scala | 10 ++ .../shuffle/sort/UnsafeShuffleWriterSuite.java | 8 ++ .../apache/spark/ExternalShuffleServiceSuite.scala | 127 - .../sort/BypassMergeSortShuffleWriterSuite.scala | 11 ++ .../sort/IndexShuffleBlockResolverSuite.scala | 5 + .../io/LocalDiskShuffleMapOutputWriterSuite.scala | 5 + .../storage/BlockManagerReplicationSuite.scala | 3 +- .../apache/spark/storage/BlockManagerSuite.scala | 3 +- .../spark/storage/DiskBlockManagerSuite.scala | 26 - docs/configuration.md | 11 ++ .../streaming/ReceivedBlockHandlerSuite.scala | 3 +- 20 files changed, 372 insertions(+), 43 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index d2df776..b066d99 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -299,7 +299,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient { BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response); numRemovedBlocksFuture.complete(((BlocksRemoved) msgObj).numRemovedBlocks); } catch (Throwable t) { - logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) + + logger.warn("Error trying to remove
[spark] branch master updated (b112528 -> 8ef0159)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b112528 [SPARK-38569][BUILD] Rename `external` top level dir to `connector` add 8ef0159 [SPARK-38654][SQL][PYTHON] Show default index type in SQL plans for pandas API on Spark No new revisions were added by this update. Summary of changes: python/pyspark/pandas/internal.py | 6 +- .../spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala | 2 +- .../spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala | 7 +++ .../spark/sql/execution/python/AttachDistributedSequenceExec.scala | 7 +++ 4 files changed, 20 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-38654][SQL][PYTHON] Show default index type in SQL plans for pandas API on Spark
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 3328615 [SPARK-38654][SQL][PYTHON] Show default index type in SQL plans for pandas API on Spark 3328615 is described below commit 332861569d09d404da48b63846c0fa5920da0a6e Author: Hyukjin Kwon AuthorDate: Fri Mar 25 22:00:48 2022 +0900 [SPARK-38654][SQL][PYTHON] Show default index type in SQL plans for pandas API on Spark ### What changes were proposed in this pull request? This PR proposes to show the default index type in SQL plans for pandas API on Spark. Note that this PR does not handle `sequence` case because that's discouraged in production, and tricky to insert an alias. ### Why are the changes needed? When users set `compute.default_index_type`, it's difficult to know which DataFrame users which index. We should at least note that in Spark SQL so users can tell which plans are for default index. ### Does this PR introduce _any_ user-facing change? Yes, when users call `pyspark.pandas.DataFrame.spark.explain(True)`: **distributed** ```python import pyspark.pandas as ps ps.set_option("compute.default_index_type", "distributed") ps.range(1).spark.explain() ``` ``` == Physical Plan == *(1) Project [distributed_index() AS __index_level_0__#15L, id#13L] +- *(1) Range (0, 1, step=1, splits=16) ``` **distributed-sequence** ```python import pyspark.pandas as ps ps.set_option("compute.default_index_type", "distributed-sequence") ps.range(1).spark.explain() ``` ``` == Physical Plan == AttachDistributedSequence[__index_level_0__#16L, id#13L] Index: __index_level_0__#16L +- *(1) Range (0, 1, step=1, splits=16) ``` ### How was this patch tested? Manually tested. Closes #35968 from HyukjinKwon/SPARK-38654. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon (cherry picked from commit 8ef0159550c143e07fa79b120b2d1fdf9d535fdc) Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/internal.py | 6 +- .../spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala | 2 +- .../spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala | 7 +++ .../spark/sql/execution/python/AttachDistributedSequenceExec.scala | 7 +++ 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/python/pyspark/pandas/internal.py b/python/pyspark/pandas/internal.py index f79f0ad..b2e6749 100644 --- a/python/pyspark/pandas/internal.py +++ b/python/pyspark/pandas/internal.py @@ -887,7 +887,11 @@ class InternalFrame: @staticmethod def attach_distributed_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame: scols = [scol_for(sdf, column) for column in sdf.columns] -return sdf.select(F.monotonically_increasing_id().alias(column_name), *scols) +jvm = sdf.sparkSession._jvm +tag = jvm.org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FUNC_ALIAS() +jexpr = F.monotonically_increasing_id()._jc.expr() +jexpr.setTagValue(tag, "distributed_index") +return sdf.select(Column(jvm.Column(jexpr)).alias(column_name), *scols) @staticmethod def attach_distributed_sequence_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala index f228b36..ecf254f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala @@ -85,7 +85,7 @@ case class MonotonicallyIncreasingID() extends LeafExpression with Stateful { $countTerm++;""", isNull = FalseLiteral) } - override def prettyName: String = "monotonically_increasing_id" + override def nodeName: String = "monotonically_increasing_id" override def sql: String = s"$prettyName()" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala index 13a40db..c2f74b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import
[spark] branch master updated (53908be -> b112528)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 53908be [SPARK-38644][SQL] DS V2 topN push-down supports project with alias add b112528 [SPARK-38569][BUILD] Rename `external` top level dir to `connector` No new revisions were added by this update. Summary of changes: R/run-tests.sh| 2 +- .../benchmarks/AvroReadBenchmark-jdk11-results.txt| 0 .../benchmarks/AvroReadBenchmark-jdk17-results.txt| 0 .../avro/benchmarks/AvroReadBenchmark-results.txt | 0 .../benchmarks/AvroWriteBenchmark-jdk11-results.txt | 0 .../benchmarks/AvroWriteBenchmark-jdk17-results.txt | 0 .../avro/benchmarks/AvroWriteBenchmark-results.txt| 0 {external => connector}/avro/pom.xml | 0 .../spark/sql/avro/SparkAvroKeyOutputFormat.java | 0 .../org.apache.spark.sql.sources.DataSourceRegister | 0 .../apache/spark/sql/avro/AvroDataToCatalyst.scala| 0 .../org/apache/spark/sql/avro/AvroDeserializer.scala | 0 .../org/apache/spark/sql/avro/AvroFileFormat.scala| 0 .../scala/org/apache/spark/sql/avro/AvroOptions.scala | 0 .../org/apache/spark/sql/avro/AvroOutputWriter.scala | 0 .../spark/sql/avro/AvroOutputWriterFactory.scala | 0 .../org/apache/spark/sql/avro/AvroSerializer.scala| 0 .../scala/org/apache/spark/sql/avro/AvroUtils.scala | 0 .../apache/spark/sql/avro/CatalystDataToAvro.scala| 0 .../org/apache/spark/sql/avro/SchemaConverters.scala | 0 .../scala/org/apache/spark/sql/avro/functions.scala | 0 .../scala/org/apache/spark/sql/avro/package.scala | 0 .../apache/spark/sql/v2/avro/AvroDataSourceV2.scala | 0 .../sql/v2/avro/AvroPartitionReaderFactory.scala | 0 .../scala/org/apache/spark/sql/v2/avro/AvroScan.scala | 0 .../apache/spark/sql/v2/avro/AvroScanBuilder.scala| 0 .../org/apache/spark/sql/v2/avro/AvroTable.scala | 0 .../org/apache/spark/sql/v2/avro/AvroWrite.scala | 0 .../apache/spark/sql/avro/JavaAvroFunctionsSuite.java | 0 .../src/test/resources/before_1582_date_v2_4_5.avro | Bin .../src/test/resources/before_1582_date_v2_4_6.avro | Bin .../src/test/resources/before_1582_date_v3_2_0.avro | Bin .../before_1582_timestamp_micros_v2_4_5.avro | Bin .../before_1582_timestamp_micros_v2_4_6.avro | Bin .../before_1582_timestamp_micros_v3_2_0.avro | Bin .../before_1582_timestamp_millis_v2_4_5.avro | Bin .../before_1582_timestamp_millis_v2_4_6.avro | Bin .../before_1582_timestamp_millis_v3_2_0.avro | Bin .../avro/src/test/resources/episodes.avro | Bin .../avro}/src/test/resources/log4j2.properties| 0 .../test-random-partitioned/part-r-0.avro | Bin .../test-random-partitioned/part-r-1.avro | Bin .../test-random-partitioned/part-r-2.avro | Bin .../test-random-partitioned/part-r-3.avro | Bin .../test-random-partitioned/part-r-4.avro | Bin .../test-random-partitioned/part-r-5.avro | Bin .../test-random-partitioned/part-r-6.avro | Bin .../test-random-partitioned/part-r-7.avro | Bin .../test-random-partitioned/part-r-8.avro | Bin .../test-random-partitioned/part-r-9.avro | Bin .../test-random-partitioned/part-r-00010.avro | Bin .../avro/src/test/resources/test.avro | Bin .../avro/src/test/resources/test.avsc | 0 .../avro/src/test/resources/test.json | 0 .../avro/src/test/resources/test_sub.avsc | 0 .../sql/avro/AvroCatalystDataConversionSuite.scala| 0 .../org/apache/spark/sql/avro/AvroCodecSuite.scala| 0 .../apache/spark/sql/avro/AvroFunctionsSuite.scala| 0 .../apache/spark/sql/avro/AvroLogicalTypeSuite.scala | 0 .../apache/spark/sql/avro/AvroRowReaderSuite.scala| 0 .../org/apache/spark/sql/avro/AvroScanSuite.scala | 0 .../apache/spark/sql/avro/AvroSchemaHelperSuite.scala | 0 .../org/apache/spark/sql/avro/AvroSerdeSuite.scala| 0 .../scala/org/apache/spark/sql/avro/AvroSuite.scala | 0 .../spark/sql/avro/DeprecatedAvroFunctionsSuite.scala | 0 .../sql/execution/benchmark/AvroReadBenchmark.scala | 0 .../sql/execution/benchmark/AvroWriteBenchmark.scala | 0 .../execution/datasources/AvroReadSchemaSuite.scala | 0 .../docker-integration-tests/pom.xml | 0 .../src/test/resources/db2_krb_setup.sh | 0 .../src/test/resources/log4j2.properties | 0 .../src/test/resources/mariadb_docker_entrypoint.sh | 0 .../src/test/resources/mariadb_krb_setup.sh | 0 .../src/test/resources/postgres_krb_setup.sh | 0 .../apache/spark/sql/jdbc/DB2IntegrationSuite.scala | 0
[spark] branch branch-3.3 updated: [SPARK-38644][SQL] DS V2 topN push-down supports project with alias
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 9277353 [SPARK-38644][SQL] DS V2 topN push-down supports project with alias 9277353 is described below commit 9277353b23df4b54dfb65e948e1b3d001806929b Author: Jiaan Geng AuthorDate: Fri Mar 25 20:00:39 2022 +0800 [SPARK-38644][SQL] DS V2 topN push-down supports project with alias ### What changes were proposed in this pull request? Currently, Spark DS V2 topN push-down doesn't supports project with alias. This PR let it works good with alias. **Example**: the origin plan show below: ``` Sort [mySalary#10 ASC NULLS FIRST], true +- Project [NAME#1, SALARY#2 AS mySalary#10] +- ScanBuilderHolder [DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4], RelationV2[DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession7fd4b9ec,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true),StructField(IS_MANAGER,BooleanType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions3c8e4a82) ``` The `pushedLimit` and `sortOrders` of `JDBCScanBuilder` are empty. If we can push down the top n, then the plan will be: ``` Project [NAME#1, SALARY#2 AS mySalary#10] +- ScanBuilderHolder [DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4], RelationV2[DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4] test.employee, JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession7fd4b9ec,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true),StructField(IS_MANAGER,BooleanType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions3c8e4a82) ``` The `pushedLimit` of `JDBCScanBuilder` will be `1` and `sortOrders` of `JDBCScanBuilder` will be `SALARY ASC NULLS FIRST`. ### Why are the changes needed? Alias is more useful. ### Does this PR introduce _any_ user-facing change? 'Yes'. Users could see DS V2 topN push-down supports project with alias. ### How was this patch tested? New tests. Closes #35961 from beliefer/SPARK-38644. Authored-by: Jiaan Geng Signed-off-by: Wenchen Fan --- .../datasources/v2/V2ScanRelationPushDown.scala| 15 -- .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 24 ++ 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala index c699e92..eaa30f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala @@ -19,14 +19,14 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, And, Attribute, AttributeReference, Cast, Divide, DivideDTInterval, DivideYMInterval, EqualTo, Expression, If, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, And, Attribute, AttributeReference, Cast, Divide, DivideDTInterval, DivideYMInterval, EqualTo, Expression, If, IntegerLiteral, Literal, NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.aggregate import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.CollapseProject import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LeafNode, Limit, LocalLimit, LogicalPlan, Project, Sample, Sort} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.expressions.SortOrder +import org.apache.spark.sql.connector.expressions.{SortOrder => V2SortOrder} import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Avg, Count, GeneralAggregateFunc, Sum} import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan} @@ -374,9 +374,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit sHolder.pushedLimit = Some(limit)
[spark] branch master updated (6d3149a -> 53908be)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 6d3149a [SPARK-38643][ML] Validate input dataset of ml.regression add 53908be [SPARK-38644][SQL] DS V2 topN push-down supports project with alias No new revisions were added by this update. Summary of changes: .../datasources/v2/V2ScanRelationPushDown.scala| 15 -- .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 24 ++ 2 files changed, 33 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-38643][ML] Validate input dataset of ml.regression
This is an automated email from the ASF dual-hosted git repository. huaxingao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6d3149a [SPARK-38643][ML] Validate input dataset of ml.regression 6d3149a is described below commit 6d3149a0d5fe0652197841a589bbeb8654471e58 Author: Ruifeng Zheng AuthorDate: Thu Mar 24 23:46:31 2022 -0700 [SPARK-38643][ML] Validate input dataset of ml.regression ### What changes were proposed in this pull request? validate the input dataset, and fail fast when containing invalid values ### Why are the changes needed? to avoid retruning a bad model silently ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? added testsuites Closes #35958 from zhengruifeng/regression_validate_training_dataset. Authored-by: Ruifeng Zheng Signed-off-by: huaxingao --- .../ml/regression/AFTSurvivalRegression.scala | 26 ++- .../ml/regression/DecisionTreeRegressor.scala | 13 ++-- .../apache/spark/ml/regression/FMRegressor.scala | 9 -- .../apache/spark/ml/regression/GBTRegressor.scala | 14 ++-- .../regression/GeneralizedLinearRegression.scala | 28 .../spark/ml/regression/IsotonicRegression.scala | 16 ++ .../spark/ml/regression/LinearRegression.scala | 16 ++ .../ml/regression/RandomForestRegressor.scala | 12 +-- .../org/apache/spark/ml/util/DatasetUtils.scala| 12 --- .../ml/regression/AFTSurvivalRegressionSuite.scala | 37 ++ .../ml/regression/DecisionTreeRegressorSuite.scala | 6 .../spark/ml/regression/FMRegressorSuite.scala | 5 +++ .../spark/ml/regression/GBTRegressorSuite.scala| 6 .../GeneralizedLinearRegressionSuite.scala | 31 ++ .../ml/regression/IsotonicRegressionSuite.scala| 32 +++ .../ml/regression/LinearRegressionSuite.scala | 6 .../ml/regression/RandomForestRegressorSuite.scala | 6 .../scala/org/apache/spark/ml/util/MLTest.scala| 29 + 18 files changed, 258 insertions(+), 46 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 117229b..c48fe68 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -35,6 +35,7 @@ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.stat._ import org.apache.spark.ml.util._ +import org.apache.spark.ml.util.DatasetUtils._ import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD @@ -210,14 +211,23 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S s"then cached during training. Be careful of double caching!") } -val instances = dataset.select(col($(featuresCol)), col($(labelCol)).cast(DoubleType), - col($(censorCol)).cast(DoubleType)) - .rdd.map { case Row(features: Vector, label: Double, censor: Double) => -require(censor == 1.0 || censor == 0.0, "censor must be 1.0 or 0.0") -// AFT does not support instance weighting, -// here use Instance.weight to store censor for convenience -Instance(label, censor, features) - }.setName("training instances") +val validatedCensorCol = { + val casted = col($(censorCol)).cast(DoubleType) + when(casted.isNull || casted.isNaN, raise_error(lit("Censors MUST NOT be Null or NaN"))) +.when(casted =!= 0 && casted =!= 1, + raise_error(concat(lit("Censors MUST be in {0, 1}, but got "), casted))) +.otherwise(casted) +} + +val instances = dataset.select( + checkRegressionLabels($(labelCol)), + validatedCensorCol, + checkNonNanVectors($(featuresCol)) +).rdd.map { case Row(l: Double, c: Double, v: Vector) => + // AFT does not support instance weighting, + // here use Instance.weight to store censor for convenience + Instance(l, c, v) +}.setName("training instances") val summarizer = instances.treeAggregate( Summarizer.createSummarizerBuffer("mean", "std", "count"))( diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala index 6913718..d9942f1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala @@ -22,16 +22,18 @@ import