[spark] branch branch-3.0 updated: [SPARK-38570][SQL][3.0] Incorrect DynamicPartitionPruning caused by Literal

2022-03-25 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 4fc718fb [SPARK-38570][SQL][3.0] Incorrect DynamicPartitionPruning 
caused by Literal
4fc718fb is described below

commit 4fc718fbc7e70784c250ea9315ccfc56cfaa5893
Author: mcdull-zhang 
AuthorDate: Sat Mar 26 12:48:08 2022 +0800

[SPARK-38570][SQL][3.0] Incorrect DynamicPartitionPruning caused by Literal

This is a backport of #35878  to branch 3.0.

The return value of Literal.references is an empty AttributeSet, so Literal 
is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan 
when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, 
BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 
1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  : :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
   :  : +- *(1) ColumnarToRow
   :  :+- FileScan parquet 
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: 
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], 
PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], 
ReadSchema: struct
   :  :  +- SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
   :  : +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), 
[id=#335]
   :  :+- *(1) Project [store_id#5291, 
state_province#5292]
   :  :   +- *(1) Filter (((isnotnull(country#5293) AND 
(country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND 
isnotnull(store_id#5291))
   :  :  +- *(1) ColumnarToRow
   :  : +- FileScan parquet 
default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: 
true, DataFilters: [isnotnull(country#5293), (country#5293 = US), 
((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, 
Location: InMemoryFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache,
 PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), 
Or(EqualNullSafe(store_id,4), [...]
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   : +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 
1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   ::  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
   :+- *(2) ColumnarToRow
   :   +- FileScan parquet 
default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: 
[isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], 
PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: 
struct
   : +- ReusedSubquery SubqueryBroadcast 
dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   +- ReusedExchange [store_id#5291, state_province#5292], 
BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, 
BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  : +- *(1) ColumnarToRow
   :  :+- FileScan parquet 
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: 
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [], PushedFilters: [IsNotNull(date_id), 
GreaterThanOrEqual(date_id,1300)], ReadSchema: struct
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   : +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :+- *(2) ColumnarToRow
   

[spark] branch branch-3.1 updated: [SPARK-38570][SQL][3.1] Incorrect DynamicPartitionPruning caused by Literal

2022-03-25 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 789ec13  [SPARK-38570][SQL][3.1] Incorrect DynamicPartitionPruning 
caused by Literal
789ec13 is described below

commit 789ec137c1e240de58152a06746a7defa001343c
Author: mcdull-zhang 
AuthorDate: Sat Mar 26 12:48:08 2022 +0800

[SPARK-38570][SQL][3.1] Incorrect DynamicPartitionPruning caused by Literal

This is a backport of #35878  to branch 3.1.

The return value of Literal.references is an empty AttributeSet, so Literal 
is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan 
when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, 
BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 
1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  : :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
   :  : +- *(1) ColumnarToRow
   :  :+- FileScan parquet 
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: 
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], 
PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], 
ReadSchema: struct
   :  :  +- SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
   :  : +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), 
[id=#335]
   :  :+- *(1) Project [store_id#5291, 
state_province#5292]
   :  :   +- *(1) Filter (((isnotnull(country#5293) AND 
(country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND 
isnotnull(store_id#5291))
   :  :  +- *(1) ColumnarToRow
   :  : +- FileScan parquet 
default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: 
true, DataFilters: [isnotnull(country#5293), (country#5293 = US), 
((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, 
Location: InMemoryFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache,
 PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), 
Or(EqualNullSafe(store_id,4), [...]
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   : +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 
1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   ::  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
   :+- *(2) ColumnarToRow
   :   +- FileScan parquet 
default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: 
[isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], 
PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: 
struct
   : +- ReusedSubquery SubqueryBroadcast 
dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   +- ReusedExchange [store_id#5291, state_province#5292], 
BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, 
BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  : +- *(1) ColumnarToRow
   :  :+- FileScan parquet 
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: 
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [], PushedFilters: [IsNotNull(date_id), 
GreaterThanOrEqual(date_id,1300)], ReadSchema: struct
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   : +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 <= 1000))
   :+- *(2) ColumnarToRow
   : 

[spark] branch branch-3.2 updated: [SPARK-38570][SQL][3.2] Incorrect DynamicPartitionPruning caused by Literal

2022-03-25 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 8621914  [SPARK-38570][SQL][3.2] Incorrect DynamicPartitionPruning 
caused by Literal
8621914 is described below

commit 8621914e2052eeab25e6ac4e7d5f48b5570c71f7
Author: mcdull-zhang 
AuthorDate: Sat Mar 26 12:48:08 2022 +0800

[SPARK-38570][SQL][3.2] Incorrect DynamicPartitionPruning caused by Literal

This is a backport of #35878  to branch 3.2.

### What changes were proposed in this pull request?
The return value of Literal.references is an empty AttributeSet, so Literal 
is mistaken for a partition column.

For example, the sql in the test case will generate such a physical plan 
when the adaptive is closed:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, 
BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter ((isnotnull(date_id#5283) AND (date_id#5283 >= 
1300)) AND dynamicpruningexpression(4 IN dynamicpruning#5300))
   :  : :  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
   :  : +- *(1) ColumnarToRow
   :  :+- FileScan parquet 
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: 
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [dynamicpruningexpression(4 IN dynamicpruning#5300)], 
PushedFilters: [IsNotNull(date_id), GreaterThanOrEqual(date_id,1300)], 
ReadSchema: struct
   :  :  +- SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
   :  : +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), 
[id=#335]
   :  :+- *(1) Project [store_id#5291, 
state_province#5292]
   :  :   +- *(1) Filter (((isnotnull(country#5293) AND 
(country#5293 = US)) AND ((store_id#5291 <=> 4) OR (store_id#5291 <=> 5))) AND 
isnotnull(store_id#5291))
   :  :  +- *(1) ColumnarToRow
   :  : +- FileScan parquet 
default.dim_store[store_id#5291,state_province#5292,country#5293] Batched: 
true, DataFilters: [isnotnull(country#5293), (country#5293 = US), 
((store_id#5291 <=> 4) OR (store_id#5291 <=> 5)), ..., Format: Parquet, 
Location: InMemoryFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache,
 PartitionFilters: [], PushedFilters: [IsNotNull(country), EqualTo(country,US), 
Or(EqualNullSafe(store_id,4), [...]
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   : +- *(2) Filter ((isnotnull(date_id#5287) AND (date_id#5287 <= 
1000)) AND dynamicpruningexpression(5 IN dynamicpruning#5300))
   ::  +- ReusedSubquery SubqueryBroadcast dynamicpruning#5300, 0, 
[store_id#5291], [id=#336]
   :+- *(2) ColumnarToRow
   :   +- FileScan parquet 
default.fact_stats[date_id#5287,store_id#5290] Batched: true, DataFilters: 
[isnotnull(date_id#5287), (date_id#5287 <= 1000)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [dynamicpruningexpression(5 IN dynamicpruning#5300)], 
PushedFilters: [IsNotNull(date_id), LessThanOrEqual(date_id,1000)], ReadSchema: 
struct
   : +- ReusedSubquery SubqueryBroadcast 
dynamicpruning#5300, 0, [store_id#5291], [id=#336]
   +- ReusedExchange [store_id#5291, state_province#5292], 
BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#335]
```
after this pr:
```tex
*(4) Project [store_id#5281, date_id#5283, state_province#5292]
+- *(4) BroadcastHashJoin [store_id#5281], [store_id#5291], Inner, 
BuildRight, false
   :- Union
   :  :- *(1) Project [4 AS store_id#5281, date_id#5283]
   :  :  +- *(1) Filter (isnotnull(date_id#5283) AND (date_id#5283 >= 1300))
   :  : +- *(1) ColumnarToRow
   :  :+- FileScan parquet 
default.fact_sk[date_id#5283,store_id#5286] Batched: true, DataFilters: 
[isnotnull(date_id#5283), (date_id#5283 >= 1300)], Format: Parquet, Location: 
CatalogFileIndex(1 
paths)[file:/Users/dongdongzhang/code/study/spark/spark-warehouse/org.apache.s...,
 PartitionFilters: [], PushedFilters: [IsNotNull(date_id), 
GreaterThanOrEqual(date_id,1300)], ReadSchema: struct
   :  +- *(2) Project [5 AS store_id#5282, date_id#5287]
   : +- *(2) Filter (isnotnull(date_id#5287) AND (date_id#5287 

[spark] branch master updated: [SPARK-38336][SQL] Support DEFAULT column values in CREATE/REPLACE TABLE statements

2022-03-25 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4e95738  [SPARK-38336][SQL] Support DEFAULT column values in 
CREATE/REPLACE TABLE statements
4e95738 is described below

commit 4e95738fdfc334c25f44689ff8c2db5aa7c726f2
Author: Daniel Tenedorio 
AuthorDate: Sat Mar 26 12:09:56 2022 +0800

[SPARK-38336][SQL] Support DEFAULT column values in CREATE/REPLACE TABLE 
statements

### What changes were proposed in this pull request?

Extend CREATE TABLE and REPLACE TABLE statements to support columns with 
DEFAULT values. This information will be stored in the column metadata.

### Why are the changes needed?

This builds the foundation for future work (not included in this PR) to 
support INSERT INTO statements, which may then omit the default values or refer 
to them explicitly with the DEFAULT keyword, in which case the Spark analyzer 
will automatically insert the appropriate corresponding values in the right 
places.

Example:
```
CREATE TABLE T(a INT DEFAULT 4, b INT NOT NULL DEFAULT 5);
INSERT INTO T VALUES (1);
INSERT INTO T VALUES (1, DEFAULT);
INSERT INTO T VALUES (DEFAULT, 6);
SELECT * FROM T;
(1, 5)
(1, 5)
(4, 6)
```

### How was this patch tested?

This change is covered by new and existing unit test coverage as well as 
new INSERT INTO query test cases covering a variety of positive and negative 
scenarios.

Closes #35855 from dtenedor/default-cols-create-table.

Authored-by: Daniel Tenedorio 
Signed-off-by: Gengliang Wang 
---
 .../spark/sql/catalyst/parser/AstBuilder.scala |  34 -
 .../sql/catalyst/util/ResolveDefaultColumns.scala  | 153 +
 .../spark/sql/errors/QueryParsingErrors.scala  |   4 +
 .../org/apache/spark/sql/internal/SQLConf.scala|  13 ++
 .../catalyst/catalog/ExternalCatalogSuite.scala|  44 --
 .../sql/catalyst/catalog/SessionCatalogSuite.scala |  62 +
 .../spark/sql/catalyst/parser/DDLParserSuite.scala |  56 ++--
 .../datasources/v2/DataSourceV2Strategy.scala  |  15 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |  14 +-
 .../org/apache/spark/sql/hive/InsertSuite.scala|  22 +--
 10 files changed, 369 insertions(+), 48 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 3fcd8d8..01e627f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, Set}
 
 import org.antlr.v4.runtime.{ParserRuleContext, Token}
+import org.antlr.v4.runtime.misc.Interval
 import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
 import org.apache.commons.codec.DecoderException
 import org.apache.commons.codec.binary.Hex
@@ -39,7 +40,7 @@ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
-import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, 
IntervalUtils}
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils, 
IntervalUtils, ResolveDefaultColumns}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.{convertSpecialDate, 
convertSpecialTimestamp, convertSpecialTimestampNTZ, getZoneId, stringToDate, 
stringToTimestamp, stringToTimestampWithoutTimeZone}
 import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, 
TableCatalog}
 import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
@@ -2788,13 +2789,18 @@ class AstBuilder extends 
SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
 Option(commentSpec()).map(visitCommentSpec).foreach {
   builder.putString("comment", _)
 }
+// Add the 'DEFAULT expression' clause in the column definition, if any, 
to the column metadata.
+Option(ctx.defaultExpression()).map(visitDefaultExpression).foreach { 
field =>
+  if (conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) {
+// Add default to metadata
+
builder.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, 
field)
+
builder.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, 
field)
+  } else {
+throw QueryParsingErrors.defaultColumnNotEnabledError(ctx)
+  }
+}
 
-// Process the 'DEFAULT expression' clause in the column definition, if 
any.
 val name: String = colName.getText
-val defaultExpr = 

[spark] branch master updated (9a7596e -> 8262a7b)

2022-03-25 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 9a7596e  [SPARK-37618][CORE] Remove shuffle blocks using the shuffle 
service for released executors
 add 8262a7b  [SPARK-38219][SQL] Support ANSI aggregation function 
`percentile_cont` as window function

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/parser/SqlBaseParser.g4 |   2 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala  |   6 +-
 .../expressions/aggregate/PercentileCont.scala |  41 +++
 .../spark/sql/catalyst/parser/AstBuilder.scala |  17 ++-
 .../sql/catalyst/parser/PlanParserSuite.scala  |   6 +-
 .../src/test/resources/sql-tests/inputs/window.sql |  57 +
 .../resources/sql-tests/results/group-by.sql.out   |   4 +-
 .../resources/sql-tests/results/window.sql.out | 131 -
 8 files changed, 251 insertions(+), 13 deletions(-)
 create mode 100644 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileCont.scala

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors

2022-03-25 Thread tgraves
This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 2d47076  [SPARK-37618][CORE] Remove shuffle blocks using the shuffle 
service for released executors
2d47076 is described below

commit 2d470763ecbcccde418956b03e503461352ab4c2
Author: Adam Binford 
AuthorDate: Fri Mar 25 13:00:17 2022 -0500

[SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for 
released executors

### What changes were proposed in this pull request?

Add support for removing shuffle files on released executors via the 
external shuffle service. The shuffle service already supports removing shuffle 
service cached RDD blocks, so I reused this mechanism to remove shuffle blocks 
as well, so as not to require updating the shuffle service itself.

To support this change functioning in a secure Yarn environment, I updated 
permissions on some of the block manager folders and files. Specifically:
- Block manager sub directories have the group write posix permission added 
to them. This gives the shuffle service permission to delete files from within 
these folders.
- Shuffle files have the world readable posix permission added to them. 
This is because when the sub directories are marked group writable, they lose 
the setgid bit that gets set in a secure Yarn environment. Without this, the 
permissions on the files would be `rw-r-`, and since the group running Yarn 
(and therefore the shuffle service), is no longer the group owner of the file, 
it does not have access to read the file. The sub directories still do not have 
world execute permissio [...]

Both of these changes are done after creating a file so that umasks don't 
affect the resulting permissions.

### Why are the changes needed?

External shuffle services are very useful for long running jobs and dynamic 
allocation. However, currently if an executor is removed (either through 
dynamic deallocation or through some error), the shuffle files created by that 
executor will live until the application finishes. This results in local disks 
slowly filling up over time, eventually causing problems for long running 
applications.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit test. Not sure if there's a better way I could have tested for the 
files being deleted or any other tests I should add.

Closes #35085 from Kimahriman/shuffle-service-remove-shuffle-blocks.

Authored-by: Adam Binford 
Signed-off-by: Thomas Graves 
(cherry picked from commit 9a7596e1dde0f1dd596aa6d3b2efbcb5d1ef70ea)
Signed-off-by: Thomas Graves 
---
 .../network/shuffle/ExternalBlockStoreClient.java  |   4 +-
 .../sort/io/LocalDiskShuffleMapOutputWriter.java   |   3 +-
 .../scala/org/apache/spark/ContextCleaner.scala|   4 +-
 .../src/main/scala/org/apache/spark/SparkEnv.scala |   6 +-
 .../org/apache/spark/internal/config/package.scala |  10 ++
 .../spark/shuffle/IndexShuffleBlockResolver.scala  |  18 ++-
 .../spark/shuffle/ShuffleBlockResolver.scala   |   8 ++
 .../spark/storage/BlockManagerMasterEndpoint.scala |  89 +++
 .../apache/spark/storage/DiskBlockManager.scala|  61 +-
 .../scala/org/apache/spark/storage/DiskStore.scala |  10 ++
 .../shuffle/sort/UnsafeShuffleWriterSuite.java |   8 ++
 .../apache/spark/ExternalShuffleServiceSuite.scala | 127 -
 .../sort/BypassMergeSortShuffleWriterSuite.scala   |  11 ++
 .../sort/IndexShuffleBlockResolverSuite.scala  |   5 +
 .../io/LocalDiskShuffleMapOutputWriterSuite.scala  |   5 +
 .../storage/BlockManagerReplicationSuite.scala |   3 +-
 .../apache/spark/storage/BlockManagerSuite.scala   |   3 +-
 .../spark/storage/DiskBlockManagerSuite.scala  |  26 -
 docs/configuration.md  |  11 ++
 .../streaming/ReceivedBlockHandlerSuite.scala  |   3 +-
 20 files changed, 372 insertions(+), 43 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index d2df776..b066d99 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -299,7 +299,7 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
   BlockTransferMessage msgObj = 
BlockTransferMessage.Decoder.fromByteBuffer(response);
   numRemovedBlocksFuture.complete(((BlocksRemoved) 
msgObj).numRemovedBlocks);
 } catch (Throwable t) {
-  

[spark] branch master updated: [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors

2022-03-25 Thread tgraves
This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9a7596e  [SPARK-37618][CORE] Remove shuffle blocks using the shuffle 
service for released executors
9a7596e is described below

commit 9a7596e1dde0f1dd596aa6d3b2efbcb5d1ef70ea
Author: Adam Binford 
AuthorDate: Fri Mar 25 13:00:17 2022 -0500

[SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for 
released executors

### What changes were proposed in this pull request?

Add support for removing shuffle files on released executors via the 
external shuffle service. The shuffle service already supports removing shuffle 
service cached RDD blocks, so I reused this mechanism to remove shuffle blocks 
as well, so as not to require updating the shuffle service itself.

To support this change functioning in a secure Yarn environment, I updated 
permissions on some of the block manager folders and files. Specifically:
- Block manager sub directories have the group write posix permission added 
to them. This gives the shuffle service permission to delete files from within 
these folders.
- Shuffle files have the world readable posix permission added to them. 
This is because when the sub directories are marked group writable, they lose 
the setgid bit that gets set in a secure Yarn environment. Without this, the 
permissions on the files would be `rw-r-`, and since the group running Yarn 
(and therefore the shuffle service), is no longer the group owner of the file, 
it does not have access to read the file. The sub directories still do not have 
world execute permissio [...]

Both of these changes are done after creating a file so that umasks don't 
affect the resulting permissions.

### Why are the changes needed?

External shuffle services are very useful for long running jobs and dynamic 
allocation. However, currently if an executor is removed (either through 
dynamic deallocation or through some error), the shuffle files created by that 
executor will live until the application finishes. This results in local disks 
slowly filling up over time, eventually causing problems for long running 
applications.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit test. Not sure if there's a better way I could have tested for the 
files being deleted or any other tests I should add.

Closes #35085 from Kimahriman/shuffle-service-remove-shuffle-blocks.

Authored-by: Adam Binford 
Signed-off-by: Thomas Graves 
---
 .../network/shuffle/ExternalBlockStoreClient.java  |   4 +-
 .../sort/io/LocalDiskShuffleMapOutputWriter.java   |   3 +-
 .../scala/org/apache/spark/ContextCleaner.scala|   4 +-
 .../src/main/scala/org/apache/spark/SparkEnv.scala |   6 +-
 .../org/apache/spark/internal/config/package.scala |  10 ++
 .../spark/shuffle/IndexShuffleBlockResolver.scala  |  18 ++-
 .../spark/shuffle/ShuffleBlockResolver.scala   |   8 ++
 .../spark/storage/BlockManagerMasterEndpoint.scala |  89 +++
 .../apache/spark/storage/DiskBlockManager.scala|  61 +-
 .../scala/org/apache/spark/storage/DiskStore.scala |  10 ++
 .../shuffle/sort/UnsafeShuffleWriterSuite.java |   8 ++
 .../apache/spark/ExternalShuffleServiceSuite.scala | 127 -
 .../sort/BypassMergeSortShuffleWriterSuite.scala   |  11 ++
 .../sort/IndexShuffleBlockResolverSuite.scala  |   5 +
 .../io/LocalDiskShuffleMapOutputWriterSuite.scala  |   5 +
 .../storage/BlockManagerReplicationSuite.scala |   3 +-
 .../apache/spark/storage/BlockManagerSuite.scala   |   3 +-
 .../spark/storage/DiskBlockManagerSuite.scala  |  26 -
 docs/configuration.md  |  11 ++
 .../streaming/ReceivedBlockHandlerSuite.scala  |   3 +-
 20 files changed, 372 insertions(+), 43 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index d2df776..b066d99 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -299,7 +299,7 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
   BlockTransferMessage msgObj = 
BlockTransferMessage.Decoder.fromByteBuffer(response);
   numRemovedBlocksFuture.complete(((BlocksRemoved) 
msgObj).numRemovedBlocks);
 } catch (Throwable t) {
-  logger.warn("Error trying to remove RDD blocks " + 
Arrays.toString(blockIds) +
+  logger.warn("Error trying to remove 

[spark] branch master updated (b112528 -> 8ef0159)

2022-03-25 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from b112528  [SPARK-38569][BUILD] Rename `external` top level dir to 
`connector`
 add 8ef0159  [SPARK-38654][SQL][PYTHON] Show default index type in SQL 
plans for pandas API on Spark

No new revisions were added by this update.

Summary of changes:
 python/pyspark/pandas/internal.py  | 6 +-
 .../spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala | 2 +-
 .../spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala  | 7 +++
 .../spark/sql/execution/python/AttachDistributedSequenceExec.scala | 7 +++
 4 files changed, 20 insertions(+), 2 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-38654][SQL][PYTHON] Show default index type in SQL plans for pandas API on Spark

2022-03-25 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 3328615  [SPARK-38654][SQL][PYTHON] Show default index type in SQL 
plans for pandas API on Spark
3328615 is described below

commit 332861569d09d404da48b63846c0fa5920da0a6e
Author: Hyukjin Kwon 
AuthorDate: Fri Mar 25 22:00:48 2022 +0900

[SPARK-38654][SQL][PYTHON] Show default index type in SQL plans for pandas 
API on Spark

### What changes were proposed in this pull request?

This PR proposes to show the default index type in SQL plans for pandas API 
on Spark.

Note that this PR does not handle `sequence` case because that's 
discouraged in production, and tricky to insert an alias.

### Why are the changes needed?

When users set `compute.default_index_type`, it's difficult to know which 
DataFrame users which index. We should at least note that in Spark SQL so users 
can tell which plans are for default index.

### Does this PR introduce _any_ user-facing change?

Yes, when users call `pyspark.pandas.DataFrame.spark.explain(True)`:

**distributed**

```python
import pyspark.pandas as ps
ps.set_option("compute.default_index_type", "distributed")
ps.range(1).spark.explain()
```

```
== Physical Plan ==
*(1) Project [distributed_index() AS __index_level_0__#15L, id#13L]
+- *(1) Range (0, 1, step=1, splits=16)
```

**distributed-sequence**

```python
import pyspark.pandas as ps
ps.set_option("compute.default_index_type", "distributed-sequence")
ps.range(1).spark.explain()
```

```
== Physical Plan ==
AttachDistributedSequence[__index_level_0__#16L, id#13L] Index: 
__index_level_0__#16L
+- *(1) Range (0, 1, step=1, splits=16)
```

### How was this patch tested?

Manually tested.

Closes #35968 from HyukjinKwon/SPARK-38654.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 8ef0159550c143e07fa79b120b2d1fdf9d535fdc)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/internal.py  | 6 +-
 .../spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala | 2 +-
 .../spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala  | 7 +++
 .../spark/sql/execution/python/AttachDistributedSequenceExec.scala | 7 +++
 4 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/pandas/internal.py 
b/python/pyspark/pandas/internal.py
index f79f0ad..b2e6749 100644
--- a/python/pyspark/pandas/internal.py
+++ b/python/pyspark/pandas/internal.py
@@ -887,7 +887,11 @@ class InternalFrame:
 @staticmethod
 def attach_distributed_column(sdf: SparkDataFrame, column_name: str) -> 
SparkDataFrame:
 scols = [scol_for(sdf, column) for column in sdf.columns]
-return sdf.select(F.monotonically_increasing_id().alias(column_name), 
*scols)
+jvm = sdf.sparkSession._jvm
+tag = 
jvm.org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FUNC_ALIAS()
+jexpr = F.monotonically_increasing_id()._jc.expr()
+jexpr.setTagValue(tag, "distributed_index")
+return sdf.select(Column(jvm.Column(jexpr)).alias(column_name), *scols)
 
 @staticmethod
 def attach_distributed_sequence_column(sdf: SparkDataFrame, column_name: 
str) -> SparkDataFrame:
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
index f228b36..ecf254f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala
@@ -85,7 +85,7 @@ case class MonotonicallyIncreasingID() extends LeafExpression 
with Stateful {
   $countTerm++;""", isNull = FalseLiteral)
   }
 
-  override def prettyName: String = "monotonically_increasing_id"
+  override def nodeName: String = "monotonically_increasing_id"
 
   override def sql: String = s"$prettyName()"
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
index 13a40db..c2f74b3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.plans.logical
 
 import 

[spark] branch master updated (53908be -> b112528)

2022-03-25 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 53908be  [SPARK-38644][SQL] DS V2 topN push-down supports project with 
alias
 add b112528  [SPARK-38569][BUILD] Rename `external` top level dir to 
`connector`

No new revisions were added by this update.

Summary of changes:
 R/run-tests.sh|   2 +-
 .../benchmarks/AvroReadBenchmark-jdk11-results.txt|   0
 .../benchmarks/AvroReadBenchmark-jdk17-results.txt|   0
 .../avro/benchmarks/AvroReadBenchmark-results.txt |   0
 .../benchmarks/AvroWriteBenchmark-jdk11-results.txt   |   0
 .../benchmarks/AvroWriteBenchmark-jdk17-results.txt   |   0
 .../avro/benchmarks/AvroWriteBenchmark-results.txt|   0
 {external => connector}/avro/pom.xml  |   0
 .../spark/sql/avro/SparkAvroKeyOutputFormat.java  |   0
 .../org.apache.spark.sql.sources.DataSourceRegister   |   0
 .../apache/spark/sql/avro/AvroDataToCatalyst.scala|   0
 .../org/apache/spark/sql/avro/AvroDeserializer.scala  |   0
 .../org/apache/spark/sql/avro/AvroFileFormat.scala|   0
 .../scala/org/apache/spark/sql/avro/AvroOptions.scala |   0
 .../org/apache/spark/sql/avro/AvroOutputWriter.scala  |   0
 .../spark/sql/avro/AvroOutputWriterFactory.scala  |   0
 .../org/apache/spark/sql/avro/AvroSerializer.scala|   0
 .../scala/org/apache/spark/sql/avro/AvroUtils.scala   |   0
 .../apache/spark/sql/avro/CatalystDataToAvro.scala|   0
 .../org/apache/spark/sql/avro/SchemaConverters.scala  |   0
 .../scala/org/apache/spark/sql/avro/functions.scala   |   0
 .../scala/org/apache/spark/sql/avro/package.scala |   0
 .../apache/spark/sql/v2/avro/AvroDataSourceV2.scala   |   0
 .../sql/v2/avro/AvroPartitionReaderFactory.scala  |   0
 .../scala/org/apache/spark/sql/v2/avro/AvroScan.scala |   0
 .../apache/spark/sql/v2/avro/AvroScanBuilder.scala|   0
 .../org/apache/spark/sql/v2/avro/AvroTable.scala  |   0
 .../org/apache/spark/sql/v2/avro/AvroWrite.scala  |   0
 .../apache/spark/sql/avro/JavaAvroFunctionsSuite.java |   0
 .../src/test/resources/before_1582_date_v2_4_5.avro   | Bin
 .../src/test/resources/before_1582_date_v2_4_6.avro   | Bin
 .../src/test/resources/before_1582_date_v3_2_0.avro   | Bin
 .../before_1582_timestamp_micros_v2_4_5.avro  | Bin
 .../before_1582_timestamp_micros_v2_4_6.avro  | Bin
 .../before_1582_timestamp_micros_v3_2_0.avro  | Bin
 .../before_1582_timestamp_millis_v2_4_5.avro  | Bin
 .../before_1582_timestamp_millis_v2_4_6.avro  | Bin
 .../before_1582_timestamp_millis_v3_2_0.avro  | Bin
 .../avro/src/test/resources/episodes.avro | Bin
 .../avro}/src/test/resources/log4j2.properties|   0
 .../test-random-partitioned/part-r-0.avro | Bin
 .../test-random-partitioned/part-r-1.avro | Bin
 .../test-random-partitioned/part-r-2.avro | Bin
 .../test-random-partitioned/part-r-3.avro | Bin
 .../test-random-partitioned/part-r-4.avro | Bin
 .../test-random-partitioned/part-r-5.avro | Bin
 .../test-random-partitioned/part-r-6.avro | Bin
 .../test-random-partitioned/part-r-7.avro | Bin
 .../test-random-partitioned/part-r-8.avro | Bin
 .../test-random-partitioned/part-r-9.avro | Bin
 .../test-random-partitioned/part-r-00010.avro | Bin
 .../avro/src/test/resources/test.avro | Bin
 .../avro/src/test/resources/test.avsc |   0
 .../avro/src/test/resources/test.json |   0
 .../avro/src/test/resources/test_sub.avsc |   0
 .../sql/avro/AvroCatalystDataConversionSuite.scala|   0
 .../org/apache/spark/sql/avro/AvroCodecSuite.scala|   0
 .../apache/spark/sql/avro/AvroFunctionsSuite.scala|   0
 .../apache/spark/sql/avro/AvroLogicalTypeSuite.scala  |   0
 .../apache/spark/sql/avro/AvroRowReaderSuite.scala|   0
 .../org/apache/spark/sql/avro/AvroScanSuite.scala |   0
 .../apache/spark/sql/avro/AvroSchemaHelperSuite.scala |   0
 .../org/apache/spark/sql/avro/AvroSerdeSuite.scala|   0
 .../scala/org/apache/spark/sql/avro/AvroSuite.scala   |   0
 .../spark/sql/avro/DeprecatedAvroFunctionsSuite.scala |   0
 .../sql/execution/benchmark/AvroReadBenchmark.scala   |   0
 .../sql/execution/benchmark/AvroWriteBenchmark.scala  |   0
 .../execution/datasources/AvroReadSchemaSuite.scala   |   0
 .../docker-integration-tests/pom.xml  |   0
 .../src/test/resources/db2_krb_setup.sh   |   0
 .../src/test/resources/log4j2.properties  |   0
 .../src/test/resources/mariadb_docker_entrypoint.sh   |   0
 .../src/test/resources/mariadb_krb_setup.sh   |   0
 .../src/test/resources/postgres_krb_setup.sh  |   0
 .../apache/spark/sql/jdbc/DB2IntegrationSuite.scala   |   0
 

[spark] branch branch-3.3 updated: [SPARK-38644][SQL] DS V2 topN push-down supports project with alias

2022-03-25 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 9277353  [SPARK-38644][SQL] DS V2 topN push-down supports project with 
alias
9277353 is described below

commit 9277353b23df4b54dfb65e948e1b3d001806929b
Author: Jiaan Geng 
AuthorDate: Fri Mar 25 20:00:39 2022 +0800

[SPARK-38644][SQL] DS V2 topN push-down supports project with alias

### What changes were proposed in this pull request?
Currently, Spark DS V2 topN push-down doesn't supports project with alias.

This PR let it works good with alias.

**Example**:
the origin plan show below:
```
Sort [mySalary#10 ASC NULLS FIRST], true
+- Project [NAME#1, SALARY#2 AS mySalary#10]
   +- ScanBuilderHolder [DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4], 
RelationV2[DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4] test.employee, 
JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession7fd4b9ec,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true),StructField(IS_MANAGER,BooleanType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions3c8e4a82)
```
The `pushedLimit` and `sortOrders` of `JDBCScanBuilder` are empty.

If we can push down the top n, then the plan will be:
```
Project [NAME#1, SALARY#2 AS mySalary#10]
+- ScanBuilderHolder [DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4], 
RelationV2[DEPT#0, NAME#1, SALARY#2, BONUS#3, IS_MANAGER#4] test.employee, 
JDBCScanBuilder(org.apache.spark.sql.test.TestSparkSession7fd4b9ec,StructType(StructField(DEPT,IntegerType,true),StructField(NAME,StringType,true),StructField(SALARY,DecimalType(20,2),true),StructField(BONUS,DoubleType,true),StructField(IS_MANAGER,BooleanType,true)),org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions3c8e4a82)
```
The `pushedLimit` of `JDBCScanBuilder` will be `1` and `sortOrders` of 
`JDBCScanBuilder` will be `SALARY ASC NULLS FIRST`.

### Why are the changes needed?
Alias is more useful.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Users could see DS V2 topN push-down supports project with alias.

### How was this patch tested?
New tests.

Closes #35961 from beliefer/SPARK-38644.

Authored-by: Jiaan Geng 
Signed-off-by: Wenchen Fan 
---
 .../datasources/v2/V2ScanRelationPushDown.scala| 15 --
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 24 ++
 2 files changed, 33 insertions(+), 6 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index c699e92..eaa30f9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -19,14 +19,14 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, And, 
Attribute, AttributeReference, Cast, Divide, DivideDTInterval, 
DivideYMInterval, EqualTo, Expression, If, IntegerLiteral, Literal, 
NamedExpression, PredicateHelper, ProjectionOverSchema, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AliasHelper, And, 
Attribute, AttributeReference, Cast, Divide, DivideDTInterval, 
DivideYMInterval, EqualTo, Expression, If, IntegerLiteral, Literal, 
NamedExpression, PredicateHelper, ProjectionOverSchema, SortOrder, 
SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.aggregate
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.optimizer.CollapseProject
 import org.apache.spark.sql.catalyst.planning.ScanOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, 
LeafNode, Limit, LocalLimit, LogicalPlan, Project, Sample, Sort}
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.expressions.SortOrder
+import org.apache.spark.sql.connector.expressions.{SortOrder => V2SortOrder}
 import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Avg, 
Count, GeneralAggregateFunc, Sum}
 import org.apache.spark.sql.connector.expressions.filter.Predicate
 import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownAggregates, SupportsPushDownFilters, V1Scan}
@@ -374,9 +374,12 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] 
with PredicateHelper wit
 sHolder.pushedLimit = Some(limit)
 

[spark] branch master updated (6d3149a -> 53908be)

2022-03-25 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 6d3149a  [SPARK-38643][ML] Validate input dataset of ml.regression
 add 53908be  [SPARK-38644][SQL] DS V2 topN push-down supports project with 
alias

No new revisions were added by this update.

Summary of changes:
 .../datasources/v2/V2ScanRelationPushDown.scala| 15 --
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 24 ++
 2 files changed, 33 insertions(+), 6 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-38643][ML] Validate input dataset of ml.regression

2022-03-25 Thread huaxingao
This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6d3149a  [SPARK-38643][ML] Validate input dataset of ml.regression
6d3149a is described below

commit 6d3149a0d5fe0652197841a589bbeb8654471e58
Author: Ruifeng Zheng 
AuthorDate: Thu Mar 24 23:46:31 2022 -0700

[SPARK-38643][ML] Validate input dataset of ml.regression

### What changes were proposed in this pull request?
validate the input dataset, and fail fast when containing invalid values

### Why are the changes needed?
to avoid retruning a bad model silently

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
added testsuites

Closes #35958 from zhengruifeng/regression_validate_training_dataset.

Authored-by: Ruifeng Zheng 
Signed-off-by: huaxingao 
---
 .../ml/regression/AFTSurvivalRegression.scala  | 26 ++-
 .../ml/regression/DecisionTreeRegressor.scala  | 13 ++--
 .../apache/spark/ml/regression/FMRegressor.scala   |  9 --
 .../apache/spark/ml/regression/GBTRegressor.scala  | 14 ++--
 .../regression/GeneralizedLinearRegression.scala   | 28 
 .../spark/ml/regression/IsotonicRegression.scala   | 16 ++
 .../spark/ml/regression/LinearRegression.scala | 16 ++
 .../ml/regression/RandomForestRegressor.scala  | 12 +--
 .../org/apache/spark/ml/util/DatasetUtils.scala| 12 ---
 .../ml/regression/AFTSurvivalRegressionSuite.scala | 37 ++
 .../ml/regression/DecisionTreeRegressorSuite.scala |  6 
 .../spark/ml/regression/FMRegressorSuite.scala |  5 +++
 .../spark/ml/regression/GBTRegressorSuite.scala|  6 
 .../GeneralizedLinearRegressionSuite.scala | 31 ++
 .../ml/regression/IsotonicRegressionSuite.scala| 32 +++
 .../ml/regression/LinearRegressionSuite.scala  |  6 
 .../ml/regression/RandomForestRegressorSuite.scala |  6 
 .../scala/org/apache/spark/ml/util/MLTest.scala| 29 +
 18 files changed, 258 insertions(+), 46 deletions(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
index 117229b..c48fe68 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
@@ -35,6 +35,7 @@ import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.stat._
 import org.apache.spark.ml.util._
+import org.apache.spark.ml.util.DatasetUtils._
 import org.apache.spark.ml.util.Instrumentation.instrumented
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.rdd.RDD
@@ -210,14 +211,23 @@ class AFTSurvivalRegression @Since("1.6.0") 
(@Since("1.6.0") override val uid: S
 s"then cached during training. Be careful of double caching!")
 }
 
-val instances = dataset.select(col($(featuresCol)), 
col($(labelCol)).cast(DoubleType),
-  col($(censorCol)).cast(DoubleType))
-  .rdd.map { case Row(features: Vector, label: Double, censor: Double) =>
-require(censor == 1.0 || censor == 0.0, "censor must be 1.0 or 0.0")
-// AFT does not support instance weighting,
-// here use Instance.weight to store censor for convenience
-Instance(label, censor, features)
-  }.setName("training instances")
+val validatedCensorCol = {
+  val casted = col($(censorCol)).cast(DoubleType)
+  when(casted.isNull || casted.isNaN, raise_error(lit("Censors MUST NOT be 
Null or NaN")))
+.when(casted =!= 0 && casted =!= 1,
+  raise_error(concat(lit("Censors MUST be in {0, 1}, but got "), 
casted)))
+.otherwise(casted)
+}
+
+val instances = dataset.select(
+  checkRegressionLabels($(labelCol)),
+  validatedCensorCol,
+  checkNonNanVectors($(featuresCol))
+).rdd.map { case Row(l: Double, c: Double, v: Vector) =>
+  // AFT does not support instance weighting,
+  // here use Instance.weight to store censor for convenience
+  Instance(l, c, v)
+}.setName("training instances")
 
 val summarizer = instances.treeAggregate(
   Summarizer.createSummarizerBuffer("mean", "std", "count"))(
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
index 6913718..d9942f1 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
@@ -22,16 +22,18 @@ import