(spark) branch master updated (ea2bca74923e -> 78fd4e3301ff)

2024-06-13 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from ea2bca74923e [SPARK-48602][SQL] Make csv generator support different 
output style with spark.sql.binaryOutputStyle
 add 78fd4e3301ff [SPARK-48584][SQL][FOLLOWUP] Improve the unescapePathName

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala| 2 +-
 .../apache/spark/sql/catalyst/catalog/ExternalCatalogUtilsSuite.scala   | 2 ++
 2 files changed, 3 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46937][SQL] Improve concurrency performance for FunctionRegistry

2024-06-04 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8cebb9b56cc3 [SPARK-46937][SQL] Improve concurrency performance for 
FunctionRegistry
8cebb9b56cc3 is described below

commit 8cebb9b56cc3716fb5afaafa317751924f0f8062
Author: beliefer 
AuthorDate: Tue Jun 4 16:23:49 2024 +0800

[SPARK-46937][SQL] Improve concurrency performance for FunctionRegistry

### What changes were proposed in this pull request?
This PR propose to improve concurrency performance for `FunctionRegistry`.

### Why are the changes needed?
Currently, `SimpleFunctionRegistryBase` adopted the `mutable.Map` caching 
function infos. The `SimpleFunctionRegistryBase`  guarded by this so as ensure 
security under multithreading.
Because all the mutable state are related to `functionBuilders`, we can 
delegate security to `ConcurrentHashMap`.
`ConcurrentHashMap ` has higher concurrency activity and responsiveness.
After this change, `FunctionRegistry` have better perf than before.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
GA.
The benchmark test.
```
object FunctionRegistryBenchmark extends BenchmarkBase {

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("FunctionRegistry") {
  val iters = 100
  val threadNum = 4
  val functionRegistry = FunctionRegistry.builtin
  val names = FunctionRegistry.expressions.keys.toSeq
  val barrier = new CyclicBarrier(threadNum + 1)
  val threadPool = ThreadUtils.newDaemonFixedThreadPool(threadNum, 
"test-function-registry")
  val benchmark = new Benchmark("SimpleFunctionRegistry", iters, output 
= output)

  benchmark.addCase("only read") { _ =>
for (_ <- 1 to threadNum) {
  threadPool.execute(new Runnable {
val random = new Random()
override def run(): Unit = {
  barrier.await()
  for (_ <- 1 to iters) {
val name = names(random.nextInt(names.size))
val fun = functionRegistry.lookupFunction(new 
FunctionIdentifier(name))
assert(fun.map(_.getName).get == name)
functionRegistry.listFunction()
  }
  barrier.await()
}
  })
}
barrier.await()
barrier.await()
  }

  benchmark.run()
}
  }
}
```
The benchmark output before this PR.
```
Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6
Intel(R) Core(TM) i5-5350U CPU  1.80GHz
SimpleFunctionRegistry:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative


only read 54858  55043  
   261  0.0   54858.1   1.0X
```
The benchmark output after this PR.
```
Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6
Intel(R) Core(TM) i5-5350U CPU  1.80GHz
SimpleFunctionRegistry:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative


only read 20202  20264  
88  0.0   20202.1   1.0X
```

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #44976 from beliefer/SPARK-46937.
    
Authored-by: beliefer 
Signed-off-by: beliefer 
---
 .../sql/catalyst/analysis/FunctionRegistry.scala   | 54 +++---
 1 file changed, 26 insertions(+), 28 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 3a418497fa53..a52feaa41acf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import java.util.Locale
-import javax.annotation.concurrent.GuardedBy
+import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.mutable
+import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
 import or

(spark) branch master updated: [SPARK-46207][SQL] Support MergeInto in DataFrameWriterV2

2023-12-20 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 56dc7f8c2199 [SPARK-46207][SQL] Support MergeInto in DataFrameWriterV2
56dc7f8c2199 is described below

commit 56dc7f8c2199bd3f2822004a1d9853188a9db465
Author: Huaxin Gao 
AuthorDate: Thu Dec 21 10:13:55 2023 +0800

[SPARK-46207][SQL] Support MergeInto in DataFrameWriterV2

### What changes were proposed in this pull request?
Add `MergeInto` support in `DataFrameWriterV2`

### Why are the changes needed?
Spark currently supports merge into sql statement. We want DataFrame to 
have the same support.

### Does this PR introduce _any_ user-facing change?
Yes. This PR introduces new API like the following:

```
  spark.table("source")
.mergeInto("target", $"source.id" === $"target.id")
.whenNotMatched()
.insertAll()
.merge()
```

### How was this patch tested?
new tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44119 from huaxingao/mergeinto.

Authored-by: Huaxin Gao 
Signed-off-by: Jiaan Geng 
---
 .../src/main/resources/error/error-classes.json|   6 +
 .../CheckConnectJvmClientCompatibility.scala   |  11 +-
 docs/sql-error-conditions.md   |   6 +
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  32 +
 .../org/apache/spark/sql/MergeIntoWriter.scala | 329 +++
 .../sql/connector/MergeIntoDataFrameSuite.scala| 946 +
 6 files changed, 1329 insertions(+), 1 deletion(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 930042505379..df223f3298ef 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2846,6 +2846,12 @@
 ],
 "sqlState" : "42000"
   },
+  "NO_MERGE_ACTION_SPECIFIED" : {
+"message" : [
+  "df.mergeInto needs to be followed by at least one of 
whenMatched/whenNotMatched/whenNotMatchedBySource."
+],
+"sqlState" : "42K0E"
+  },
   "NO_SQL_TYPE_IN_PROTOBUF_SCHEMA" : {
 "message" : [
   "Cannot find  in Protobuf schema."
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index a9b6f102a512..bd5ff6af7464 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -298,7 +298,16 @@ object CheckConnectJvmClientCompatibility {
   ProblemFilters.exclude[MissingClassProblem](
 "org.apache.spark.sql.artifact.util.ArtifactUtils"),
   ProblemFilters.exclude[MissingClassProblem](
-"org.apache.spark.sql.artifact.util.ArtifactUtils$"))
+"org.apache.spark.sql.artifact.util.ArtifactUtils$"),
+
+  // MergeIntoWriter
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.MergeIntoWriter"),
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenMatched"),
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenMatched$"),
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatched"),
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatched$"),
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatchedBySource"),
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatchedBySource$"))
 checkMiMaCompatibility(clientJar, sqlJar, includedRules, excludeRules)
   }
 
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 94c7c167e392..a1af6863913e 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1640,6 +1640,12 @@ Can't determine the default value for `` since 
it is not nullable and i
 
 No handler for UDAF '``'. Use sparkSession.udf.register(...) 
instead.
 
+### NO_MERGE_ACTION_SPECIFIED
+
+[SQLSTATE: 
42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+df.mergeInto needs to be followed by at least one of 
whenMatched/whenNotMatched/whenNotMatchedBySource.
+
 ### NO_SQL

(spark) branch master updated: [SPARK-46406][SQL] Assign a name to the error class _LEGACY_ERROR_TEMP_1023

2023-12-16 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e955a5979cd3 [SPARK-46406][SQL] Assign a name to the error class 
_LEGACY_ERROR_TEMP_1023
e955a5979cd3 is described below

commit e955a5979cd3623c92e57df0c1bfc341043ee754
Author: Jiaan Geng 
AuthorDate: Sun Dec 17 09:19:46 2023 +0800

[SPARK-46406][SQL] Assign a name to the error class _LEGACY_ERROR_TEMP_1023

### What changes were proposed in this pull request?
Based on the suggestion at 
https://github.com/apache/spark/pull/43910#discussion_r1412089938, this PR want 
assign a name to the error class `_LEGACY_ERROR_TEMP_1023`.

### Why are the changes needed?
Assign a name to the error class `_LEGACY_ERROR_TEMP_1023`.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
N/A

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #44355 from beliefer/SPARK-46406.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../src/main/resources/error/error-classes.json|  10 +-
 ...or-conditions-invalid-sql-syntax-error-class.md |   4 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |   4 +-
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 126 ++---
 .../sql-tests/analyzer-results/percentiles.sql.out |  20 ++--
 .../sql-tests/results/percentiles.sql.out  |  20 ++--
 .../sql-tests/results/udaf/udaf-group-by.sql.out   |   5 +-
 7 files changed, 126 insertions(+), 63 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 2aa5420eb22c..b4a3031c06c9 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2229,6 +2229,11 @@
   "Partition key  must set value."
 ]
   },
+  "FUNCTION_WITH_UNSUPPORTED_SYNTAX" : {
+"message" : [
+  "The function  does not support ."
+]
+  },
   "INVALID_COLUMN_REFERENCE" : {
 "message" : [
   "Expected a column reference for transform : ."
@@ -4320,11 +4325,6 @@
   "count(.*) is not allowed. Please use count(*) or expand 
the columns manually, e.g. count(col1, col2)."
 ]
   },
-  "_LEGACY_ERROR_TEMP_1023" : {
-"message" : [
-  "Function  does not support ."
-]
-  },
   "_LEGACY_ERROR_TEMP_1024" : {
 "message" : [
   "FILTER expression is non-deterministic, it cannot be used in aggregate 
functions."
diff --git a/docs/sql-error-conditions-invalid-sql-syntax-error-class.md 
b/docs/sql-error-conditions-invalid-sql-syntax-error-class.md
index d9be7bad1032..93bd5c24c9d3 100644
--- a/docs/sql-error-conditions-invalid-sql-syntax-error-class.md
+++ b/docs/sql-error-conditions-invalid-sql-syntax-error-class.md
@@ -45,6 +45,10 @@ CREATE TEMPORARY FUNCTION with IF NOT EXISTS is not allowed.
 
 Partition key `` must set value.
 
+## FUNCTION_WITH_UNSUPPORTED_SYNTAX
+
+The function `` does not support ``.
+
 ## INVALID_COLUMN_REFERENCE
 
 Expected a column reference for transform ``: ``.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 5f49fe03cba7..a2ce6cc16393 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -611,8 +611,8 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
 
   def functionWithUnsupportedSyntaxError(prettyName: String, syntax: String): 
Throwable = {
 new AnalysisException(
-  errorClass = "_LEGACY_ERROR_TEMP_1023",
-  messageParameters = Map("prettyName" -> prettyName, "syntax" -> syntax))
+  errorClass = "INVALID_SQL_SYNTAX.FUNCTION_WITH_UNSUPPORTED_SYNTAX",
+  messageParameters = Map("prettyName" -> toSQLId(prettyName), "syntax" -> 
toSQLStmt(syntax)))
   }
 
   def nonDeterministicFilterInAggregateError(): Throwable = {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 0676d3834794..ac263230f127 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/an

(spark) branch master updated (ac935f5074d -> f69f791ef87)

2023-12-16 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from ac935f5074d [SPARK-46427][PYTHON][SQL] Change Python Data Source's 
description to be pretty in explain
 add f69f791ef87 [SPARK-45795][SQL] DS V2 supports push down Mode

No new revisions were added by this update.

Summary of changes:
 .../aggregate/GeneralAggregateFunc.java|  1 +
 .../sql/catalyst/util/V2ExpressionBuilder.scala|  4 ++
 .../org/apache/spark/sql/jdbc/H2Dialect.scala  | 16 +-
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 64 ++
 4 files changed, 83 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46403][SQL] Decode parquet binary with getBytesUnsafe method

2023-12-15 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f350ddba682 [SPARK-46403][SQL] Decode parquet binary with 
getBytesUnsafe method
f350ddba682 is described below

commit f350ddba68264f05c1029ffb5cbebbeead7e275f
Author: Kun Wan 
AuthorDate: Sat Dec 16 10:28:51 2023 +0800

[SPARK-46403][SQL] Decode parquet binary with getBytesUnsafe method

### What changes were proposed in this pull request?

Now spark will get a parquet binary object with getBytes() method.

The **Binary.getBytes()** method will always make a new copy of the 
internal bytes.

We can use **Binary.getBytesUnsafe()** method  in  
https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java#L55-L62
 to get the cached bytes if it has already been called getBytes() and has the 
cached bytes.

Local benchmark, before this PR:
```
OpenJDK 64-Bit Server VM 17.0.6+10-LTS on Mac OS X 13.6
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
Parquet dictionary:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative


Read binary dictionary18919  19449  
   393  5.5 180.4   1.0X
```
after this PR:
```
OpenJDK 64-Bit Server VM 17.0.6+10-LTS on Mac OS X 13.6
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
Parquet dictionary:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative


Read binary dictionary10135  10602  
   337 10.3  96.7   1.0X

```

### Why are the changes needed?

Optimize parquet reader.
Before this PR:

![image](https://github.com/apache/spark/assets/3626747/c61b86f7-8745-46d6-a5f7-34339f57f11a)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Local test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44351 from wankunde/binary.

Authored-by: Kun Wan 
Signed-off-by: Jiaan Geng 
---
 .../spark/sql/execution/datasources/parquet/ParquetDictionary.java  | 2 +-
 .../sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java
index 6626f3fee98..ffdb94ee376 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionary.java
@@ -70,7 +70,7 @@ public final class ParquetDictionary implements Dictionary {
   long signed = dictionary.decodeToLong(id);
   return new BigInteger(Long.toUnsignedString(signed)).toByteArray();
 } else {
-  return dictionary.decodeToBinary(id).getBytes();
+  return dictionary.decodeToBinary(id).getBytesUnsafe();
 }
   }
 }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 1ed6c4329eb..918f21716f4 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -969,7 +969,7 @@ public class ParquetVectorUpdaterFactory {
 int offset,
 WritableColumnVector values,
 VectorizedValuesReader valuesReader) {
-  values.putByteArray(offset, 
valuesReader.readBinary(arrayLen).getBytes());
+  values.putByteArray(offset, 
valuesReader.readBinary(arrayLen).getBytesUnsafe());
 }
 
 @Override


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-45796][SQL] Support MODE() WITHIN GROUP (ORDER BY col)

2023-12-14 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e3f46ed57dc0 [SPARK-45796][SQL] Support MODE() WITHIN GROUP (ORDER BY 
col)
e3f46ed57dc0 is described below

commit e3f46ed57dc063566cdb9425b4d5e02c65332df1
Author: Jiaan Geng 
AuthorDate: Thu Dec 14 19:20:48 2023 +0800

[SPARK-45796][SQL] Support MODE() WITHIN GROUP (ORDER BY col)

### What changes were proposed in this pull request?
The mainstream database supports this syntax.
**H2**
http://www.h2database.com/html/functions-aggregate.html#mode
**Postgres**
https://www.postgresql.org/docs/16/functions-aggregate.html

**Syntax**:
Aggregate function
`MODE() WITHIN GROUP (ORDER BY sortSpecification)`

Window function
```
MODE() WITHIN GROUP (ORDER BY sortSpecification)
[FILTER (WHERE expression)] [OVER windowNameOrSpecification]
```

**Examples**:
```
SELECT
  mode() WITHIN GROUP (ORDER BY v),
  mode() WITHIN GROUP (ORDER BY v) FILTER (WHERE k > 0)
FROM aggr;
```

```
SELECT
employee_name,
department,
salary,
mode() WITHIN GROUP (ORDER BY salary) OVER (PARTITION BY department),
mode() WITHIN GROUP (ORDER BY salary) FILTER (WHERE department = 
'Accounting') OVER (PARTITION BY department)
FROM basic_pays
ORDER BY salary;
```

### Why are the changes needed?
Support `MODE() WITHIN GROUP (ORDER BY col)`

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #44184 from beliefer/SPARK-45796_new.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../explain-results/function_mode.explain  |   2 +-
 python/pyspark/sql/functions/builtin.py|  22 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala |  10 +-
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   2 +-
 .../sql/catalyst/expressions/aggregate/Mode.scala  | 185 +---
 .../aggregate/SupportsOrderingWithinGroup.scala|   1 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |   9 +
 .../sql-functions/sql-expression-schema.md |   2 +-
 .../sql-tests/analyzer-results/group-by.sql.out| 141 --
 .../sql-tests/analyzer-results/mode.sql.out| 511 +
 .../test/resources/sql-tests/inputs/group-by.sql   |  16 -
 .../src/test/resources/sql-tests/inputs/mode.sql   | 139 ++
 .../resources/sql-tests/results/group-by.sql.out   | 147 --
 .../test/resources/sql-tests/results/mode.sql.out  | 495 
 14 files changed, 1290 insertions(+), 392 deletions(-)

diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mode.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mode.explain
index 28bbb44b0fda..0952c9a14ef3 100644
--- 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mode.explain
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mode.explain
@@ -1,2 +1,2 @@
-Aggregate [mode(a#0, 0, 0, false) AS mode(a, false)#0]
+Aggregate [mode(a#0, 0, 0, None) AS mode(a)#0]
 +- LocalRelation , [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index e1c01018..17a64b1a4f1d 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -798,12 +798,12 @@ def mode(col: "ColumnOrName", deterministic: bool = 
False) -> Column:
 ... ("dotNET", 2013, 48000), ("Java", 2013, 3)],
 ... schema=("course", "year", "earnings"))
 >>> df.groupby("course").agg(mode("year")).show()
-+--+-+
-|course|mode(year, false)|
-+--+-+
-|  Java| 2012|
-|dotNET| 2012|
-+--+-+
++--+--+
+|course|mode(year)|
++--+--+
+|  Java|  2012|
+|dotNET|  2012|
++--+--+
 
 When multiple values have the same greatest frequency then either any of 
values is returned if
 deterministic is false or is not defined, or the lowest value is returned 
if deterministic is
@@ -811,11 +811,11 @@ def mode(col: "ColumnOrName", deterministic: bool = 
False) -> Column:
 
 >>> df2 = spark.createDataFrame([(-10,), (0,), (10,)], ["col"])
 >>>

(spark) branch master updated: [SPARK-45649][SQL] Unify the prepare framework for OffsetWindowFunctionFrame

2023-12-11 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6a197efeb3c [SPARK-45649][SQL] Unify the prepare framework for 
OffsetWindowFunctionFrame
6a197efeb3c is described below

commit 6a197efeb3c1cca156cd615e990e35e82ce22ee3
Author: Jiaan Geng 
AuthorDate: Mon Dec 11 19:48:14 2023 +0800

[SPARK-45649][SQL] Unify the prepare framework for OffsetWindowFunctionFrame

### What changes were proposed in this pull request?
Currently, the implementation of the `prepare` of all the 
`OffsetWindowFunctionFrame` have the same code logic show below.
```
  override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
if (offset > rows.length) {
  fillDefaultValue(EmptyRow)
} else {
  resetStates(rows)
  if (ignoreNulls) {
...
  } else {
...
  }
}
  }
```
This PR want unify the prepare framework for `OffsetWindowFunctionFrame`

**Why the https://github.com/apache/spark/pull/43507 introduces the NPE 
bug?**
For example, there is a window group with the offset 5 and have 4 elements.
First, we don't call the `resetStates` due to the offset is greater than 4.
After that, we iterates the elements of the window group by visit input. 
But the input is null.

This PR also add two test cases about the absolute value of offset greater 
than the window group size.

### Why are the changes needed?
Unify the prepare framework for `OffsetWindowFunctionFrame`

### Does this PR introduce _any_ user-facing change?
'No'.
Inner update.

### How was this patch tested?
Exists test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43958 from beliefer/SPARK-45649.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../sql/execution/window/WindowFunctionFrame.scala | 114 ++---
 .../spark/sql/DataFrameWindowFramesSuite.scala |  24 +
 2 files changed, 76 insertions(+), 62 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
index 6cea838311a..4aa7444c407 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
@@ -87,7 +87,8 @@ abstract class OffsetWindowFunctionFrameBase(
 expressions: Array[OffsetWindowFunction],
 inputSchema: Seq[Attribute],
 newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-offset: Int)
+offset: Int,
+ignoreNulls: Boolean)
   extends WindowFunctionFrame {
 
   /** Rows of the partition currently being processed. */
@@ -141,6 +142,8 @@ abstract class OffsetWindowFunctionFrameBase(
   // is not null.
   protected var skippedNonNullCount = 0
 
+  protected val absOffset = Math.abs(offset)
+
   // Reset the states by the data of the new partition.
   protected def resetStates(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
 input = rows
@@ -176,6 +179,33 @@ abstract class OffsetWindowFunctionFrameBase(
 }
   }
 
+  override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
+resetStates(rows)
+if (absOffset > rows.length) {
+  fillDefaultValue(EmptyRow)
+} else {
+  if (ignoreNulls) {
+prepareForIgnoreNulls()
+  } else {
+prepareForRespectNulls()
+  }
+}
+  }
+
+  protected def prepareForIgnoreNulls(): Unit = findNextRowWithNonNullInput()
+
+  protected def prepareForRespectNulls(): Unit = {
+// drain the first few rows if offset is larger than one
+while (inputIndex < offset) {
+  nextSelectedRow = WindowFunctionFrame.getNextOrNull(inputIterator)
+  inputIndex += 1
+}
+// `inputIndex` starts as 0, but the `offset` can be negative and we may 
not enter the
+// while loop at all. We need to make sure `inputIndex` ends up as 
`offset` to meet the
+// assumption of the write path.
+inputIndex = offset
+  }
+
   override def currentLowerBound(): Int = throw new 
UnsupportedOperationException()
 
   override def currentUpperBound(): Int = throw new 
UnsupportedOperationException()
@@ -197,25 +227,7 @@ class FrameLessOffsetWindowFunctionFrame(
 offset: Int,
 ignoreNulls: Boolean = false)
   extends OffsetWindowFunctionFrameBase(
-target, ordinal, expressions, inputSchema, newMutableProjection, offset) {
-
-  override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
-resetStates(rows)
-if (ignoreNulls) {
-  if (Math.abs(offset) &

(spark) branch master updated: [SPARK-45515][CORE][SQL][FOLLOWUP] Use enhanced switch expressions to replace the regular switch statement

2023-12-06 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b3c56cffdb6 [SPARK-45515][CORE][SQL][FOLLOWUP] Use enhanced switch 
expressions to replace the regular switch statement
b3c56cffdb6 is described below

commit b3c56cffdb6f731a1c8677a6bc896be0144ac0fc
Author: Jiaan Geng 
AuthorDate: Thu Dec 7 13:50:44 2023 +0800

[SPARK-45515][CORE][SQL][FOLLOWUP] Use enhanced switch expressions to 
replace the regular switch statement

### What changes were proposed in this pull request?
This PR follows up https://github.com/apache/spark/pull/43349.

This pr also does not include parts of the hive and hive-thriftserver 
module.

### Why are the changes needed?
Please see https://github.com/apache/spark/pull/43349.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
GA

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #44183 from beliefer/SPARK-45515_followup.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../org/apache/spark/network/util/DBProvider.java  |  16 +--
 .../apache/spark/network/RpcIntegrationSuite.java  |  10 +-
 .../java/org/apache/spark/network/StreamSuite.java |  19 ++-
 .../shuffle/checksum/ShuffleChecksumHelper.java|  15 +-
 .../apache/spark/unsafe/UnsafeAlignedOffset.java   |  10 +-
 .../apache/spark/launcher/SparkLauncherSuite.java  |  13 +-
 .../apache/spark/launcher/CommandBuilderUtils.java |  98 +++--
 .../spark/launcher/SparkClassCommandBuilder.java   |  45 +++---
 .../spark/launcher/SparkSubmitCommandBuilder.java  |  55 +++-
 .../spark/launcher/InProcessLauncherSuite.java |  43 +++---
 .../sql/connector/util/V2ExpressionSQLBuilder.java | 154 +
 .../parquet/ParquetVectorUpdaterFactory.java   |  39 +++---
 .../parquet/VectorizedColumnReader.java|  50 +++
 .../parquet/VectorizedRleValuesReader.java |  89 +---
 14 files changed, 251 insertions(+), 405 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java
index 1adb9cfe5d3..5a25bdda233 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java
@@ -38,17 +38,17 @@ public class DBProvider {
 StoreVersion version,
 ObjectMapper mapper) throws IOException {
   if (dbFile != null) {
-switch (dbBackend) {
-  case LEVELDB:
+return switch (dbBackend) {
+  case LEVELDB -> {
 org.iq80.leveldb.DB levelDB = LevelDBProvider.initLevelDB(dbFile, 
version, mapper);
 logger.warn("The LEVELDB is deprecated. Please use ROCKSDB 
instead.");
-return levelDB != null ? new LevelDB(levelDB) : null;
-  case ROCKSDB:
+yield levelDB != null ? new LevelDB(levelDB) : null;
+  }
+  case ROCKSDB -> {
 org.rocksdb.RocksDB rocksDB = RocksDBProvider.initRockDB(dbFile, 
version, mapper);
-return rocksDB != null ? new RocksDB(rocksDB) : null;
-  default:
-throw new IllegalArgumentException("Unsupported DBBackend: " + 
dbBackend);
-}
+yield rocksDB != null ? new RocksDB(rocksDB) : null;
+  }
+};
   }
   return null;
 }
diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
index 55a0cc73f8b..40495d6912c 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
@@ -67,14 +67,10 @@ public class RpcIntegrationSuite {
 String msg = JavaUtils.bytesToString(message);
 String[] parts = msg.split("/");
 switch (parts[0]) {
-  case "hello":
+  case "hello" ->
 callback.onSuccess(JavaUtils.stringToBytes("Hello, " + parts[1] + 
"!"));
-break;
-  case "return error":
-callback.onFailure(new RuntimeException("Returned: " + parts[1]));
-break;
-  case "throw error":
-throw new RuntimeException("Thrown: " + parts[1]);
+  case "return error" -> callback.onFailure(new 
RuntimeException("Returned: " + parts[1]));
+  case "throw error"

(spark) branch master updated: [SPARK-46009][SQL][CONNECT] Merge the parse rule of PercentileCont and PercentileDisc into functionCall

2023-12-04 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f1283c126878 [SPARK-46009][SQL][CONNECT] Merge the parse rule of 
PercentileCont and PercentileDisc into functionCall
f1283c126878 is described below

commit f1283c12687853f9cd190f8db69d97abe16a2d88
Author: Jiaan Geng 
AuthorDate: Tue Dec 5 09:28:30 2023 +0800

[SPARK-46009][SQL][CONNECT] Merge the parse rule of PercentileCont and 
PercentileDisc into functionCall

### What changes were proposed in this pull request?
Spark SQL parser have a special rule to parse 
`[percentile_cont|percentile_disc](percentage) WITHIN GROUP (ORDER BY v)`.
We should merge this rule into the `functionCall`.

### Why are the changes needed?
Merge the parse rule of `PercentileCont` and `PercentileDisc` into 
`functionCall`.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
New test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43910 from beliefer/SPARK-46009.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 common/utils/src/main/resources/error/README.md|   1 +
 .../src/main/resources/error/error-classes.json|  23 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  |   7 +-
 ...id-inverse-distribution-function-error-class.md |  40 +++
 docs/sql-error-conditions.md   |   8 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4 |   4 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala |  29 +-
 .../sql/catalyst/analysis/FunctionRegistry.scala   |   2 +
 .../spark/sql/catalyst/analysis/unresolved.scala   |  32 ++-
 .../aggregate/SupportsOrderingWithinGroup.scala|  27 ++
 .../expressions/aggregate/percentiles.scala|  81 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala |  34 +--
 .../spark/sql/errors/QueryCompilationErrors.scala  |  24 ++
 .../sql/catalyst/parser/PlanParserSuite.scala  |  49 +++-
 .../sql-functions/sql-expression-schema.md |   2 +
 .../sql-tests/analyzer-results/percentiles.sql.out | 275 +++
 .../resources/sql-tests/inputs/percentiles.sql |  48 
 .../sql-tests/results/percentiles.sql.out  | 299 +
 18 files changed, 922 insertions(+), 63 deletions(-)

diff --git a/common/utils/src/main/resources/error/README.md 
b/common/utils/src/main/resources/error/README.md
index c9fdd84e7442..556a634e9927 100644
--- a/common/utils/src/main/resources/error/README.md
+++ b/common/utils/src/main/resources/error/README.md
@@ -1309,6 +1309,7 @@ The following SQLSTATEs are collated from:
 |HZ320|HZ   |RDA-specific condition|320 
|version not supported   |RDA/SQL|Y 
  |RDA/SQL  
   |
 |HZ321|HZ   |RDA-specific condition|321 
|TCP/IP error|RDA/SQL|Y 
  |RDA/SQL  
   |
 |HZ322|HZ   |RDA-specific condition|322 
|TLS alert   |RDA/SQL|Y 
  |RDA/SQL  
   |
+|ID001|IM   |Invalid inverse distribution function |001 
|Invalid inverse distribution function   |SQL/Foundation |N 
  |SQL/Foundation PostgreSQL Oracle Snowflake Redshift H2   
   |
 |IM001|IM   |ODBC driver   |001 
|Driver does not support this function   |SQL Server |N 
  |SQL Server   
   |
 |IM002|IM   |ODBC driver   |002 
|Data source name not found and no default driver specified  |SQL Server |N 
  |SQL Server   
   |
 |IM003|IM   |ODBC driver   |003 
|Specified driver could not be loaded|SQL Server |N 
  |SQL Server   
   |
diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 6795ebcb0bd0..a808be9510cf 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1858,6 +1858,29 @@
 },
 "sqlState" : "42000"
   },
+  "INVALID_INVERSE_DISTRIBUTION_FU

(spark) branch master updated: [SPARK-46055][SQL][FOLLOWUP] Respect code style for scala.

2023-11-30 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9c6782fff9ec [SPARK-46055][SQL][FOLLOWUP] Respect code style for scala.
9c6782fff9ec is described below

commit 9c6782fff9ec7073281035f1dd0f1273f931d337
Author: Jiaan Geng 
AuthorDate: Fri Dec 1 15:02:30 2023 +0800

[SPARK-46055][SQL][FOLLOWUP] Respect code style for scala.

### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/43959 get metadata when resolving 
catalogs instead of directly calling catalog.

This PR fix one minor issues that some code didn't follow the code style of 
scala.

### Why are the changes needed?
Respect code style for scala.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Exists test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #44079 from beliefer/SPARK-46055_followup.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala| 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 1bddac7f8f1f..664b68008080 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -59,9 +59,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
   }
 
   private def resolveNamespace(
-catalog: CatalogPlugin,
-ns: Seq[String],
-fetchMetadata: Boolean): ResolvedNamespace = {
+  catalog: CatalogPlugin,
+  ns: Seq[String],
+  fetchMetadata: Boolean): ResolvedNamespace = {
 catalog match {
   case supportsNS: SupportsNamespaces if fetchMetadata =>
 ResolvedNamespace(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.3 updated: [SPARK-46029][SQL][3.3] Escape the single quote, _ and % for DS V2 pushdown

2023-11-30 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 6ad00b7b2e34 [SPARK-46029][SQL][3.3] Escape the single quote, _ and % 
for DS V2 pushdown
6ad00b7b2e34 is described below

commit 6ad00b7b2e34c8b6868b4d5042c6972a8fa9f869
Author: Jiaan Geng 
AuthorDate: Fri Dec 1 14:58:52 2023 +0800

[SPARK-46029][SQL][3.3] Escape the single quote, _ and % for DS V2 pushdown

### What changes were proposed in this pull request?
This PR used to back port https://github.com/apache/spark/pull/43801 to 
branch-3.3

### Why are the changes needed?
Escape the single quote, _ and % for DS V2 pushdown

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
New test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #44089 from beliefer/SPARK-46029_3.3.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../sql/connector/util/V2ExpressionSQLBuilder.java |  35 ++-
 .../org/apache/spark/sql/jdbc/H2Dialect.scala  |  24 +
 .../datasources/v2/V2PredicateSuite.scala  |   6 +-
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 112 -
 4 files changed, 166 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
index 3fa648002879..29414377f668 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
@@ -35,6 +35,35 @@ import org.apache.spark.sql.types.DataType;
  */
 public class V2ExpressionSQLBuilder {
 
+  /**
+   * Escape the special chars for like pattern.
+   *
+   * Note: This method adopts the escape representation within Spark and is 
not bound to any JDBC
+   * dialect. JDBC dialect should overwrite this API if the underlying 
database have more special
+   * chars other than _ and %.
+   */
+  protected String escapeSpecialCharsForLikePattern(String str) {
+StringBuilder builder = new StringBuilder();
+
+for (char c : str.toCharArray()) {
+  switch (c) {
+case '_':
+  builder.append("\\_");
+  break;
+case '%':
+  builder.append("\\%");
+  break;
+case '\'':
+  builder.append("\\\'");
+  break;
+default:
+  builder.append(c);
+  }
+}
+
+return builder.toString();
+  }
+
   public String build(Expression expr) {
 if (expr instanceof Literal) {
   return visitLiteral((Literal) expr);
@@ -147,21 +176,21 @@ public class V2ExpressionSQLBuilder {
 // Remove quotes at the beginning and end.
 // e.g. converts "'str'" to "str".
 String value = r.substring(1, r.length() - 1);
-return l + " LIKE '" + value + "%'";
+return l + " LIKE '" + escapeSpecialCharsForLikePattern(value) + "%' 
ESCAPE '\\'";
   }
 
   protected String visitEndsWith(String l, String r) {
 // Remove quotes at the beginning and end.
 // e.g. converts "'str'" to "str".
 String value = r.substring(1, r.length() - 1);
-return l + " LIKE '%" + value + "'";
+return l + " LIKE '%" + escapeSpecialCharsForLikePattern(value) + "' 
ESCAPE '\\'";
   }
 
   protected String visitContains(String l, String r) {
 // Remove quotes at the beginning and end.
 // e.g. converts "'str'" to "str".
 String value = r.substring(1, r.length() - 1);
-return l + " LIKE '%" + value + "%'";
+return l + " LIKE '%" + escapeSpecialCharsForLikePattern(value) + "%' 
ESCAPE '\\'";
   }
 
   private String inputToSQL(Expression input) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 967df112af22..eb759144b6c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -20,8 +20,11 @@ package org.apache.spark.sql.jdbc
 import java.sql.{SQLException, Types}
 import java.util.Locale
 
+import scala.util.control.NonFatal
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, 
NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.connector.expressions.Expression
 import org.apache.spark.sql.connector.expressio

(spark) branch branch-3.4 updated: [SPARK-46029][SQL][3.4] Escape the single quote, _ and % for DS V2 pushdown

2023-11-29 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 639f836bfa1 [SPARK-46029][SQL][3.4] Escape the single quote, _ and % 
for DS V2 pushdown
639f836bfa1 is described below

commit 639f836bfa1536dacded8b1186c6b0f7d268aacf
Author: Jiaan Geng 
AuthorDate: Thu Nov 30 09:51:46 2023 +0800

[SPARK-46029][SQL][3.4] Escape the single quote, _ and % for DS V2 pushdown

### What changes were proposed in this pull request?
This PR used to back port https://github.com/apache/spark/pull/43801 to 
branch-3.4

### Why are the changes needed?
Escape the single quote, _ and % for DS V2 pushdown

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
New test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #44066 from beliefer/SPARK-46029_backport.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../sql/connector/util/V2ExpressionSQLBuilder.java |  35 ++-
 .../org/apache/spark/sql/jdbc/H2Dialect.scala  |   8 ++
 .../datasources/v2/V2PredicateSuite.scala  |   6 +-
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 113 -
 4 files changed, 151 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
index 9ca0fe4787f..dcb3c706946 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java
@@ -48,6 +48,35 @@ import org.apache.spark.sql.types.DataType;
  */
 public class V2ExpressionSQLBuilder {
 
+  /**
+   * Escape the special chars for like pattern.
+   *
+   * Note: This method adopts the escape representation within Spark and is 
not bound to any JDBC
+   * dialect. JDBC dialect should overwrite this API if the underlying 
database have more special
+   * chars other than _ and %.
+   */
+  protected String escapeSpecialCharsForLikePattern(String str) {
+StringBuilder builder = new StringBuilder();
+
+for (char c : str.toCharArray()) {
+  switch (c) {
+case '_':
+  builder.append("\\_");
+  break;
+case '%':
+  builder.append("\\%");
+  break;
+case '\'':
+  builder.append("\\\'");
+  break;
+default:
+  builder.append(c);
+  }
+}
+
+return builder.toString();
+  }
+
   public String build(Expression expr) {
 if (expr instanceof Literal) {
   return visitLiteral((Literal) expr);
@@ -247,21 +276,21 @@ public class V2ExpressionSQLBuilder {
 // Remove quotes at the beginning and end.
 // e.g. converts "'str'" to "str".
 String value = r.substring(1, r.length() - 1);
-return l + " LIKE '" + value + "%'";
+return l + " LIKE '" + escapeSpecialCharsForLikePattern(value) + "%' 
ESCAPE '\\'";
   }
 
   protected String visitEndsWith(String l, String r) {
 // Remove quotes at the beginning and end.
 // e.g. converts "'str'" to "str".
 String value = r.substring(1, r.length() - 1);
-return l + " LIKE '%" + value + "'";
+return l + " LIKE '%" + escapeSpecialCharsForLikePattern(value) + "' 
ESCAPE '\\'";
   }
 
   protected String visitContains(String l, String r) {
 // Remove quotes at the beginning and end.
 // e.g. converts "'str'" to "str".
 String value = r.substring(1, r.length() - 1);
-return l + " LIKE '%" + value + "%'";
+return l + " LIKE '%" + escapeSpecialCharsForLikePattern(value) + "%' 
ESCAPE '\\'";
   }
 
   private String inputToSQL(Expression input) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 5ede793f6d1..4ae3260f3d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -240,6 +240,14 @@ private[sql] object H2Dialect extends JdbcDialect {
   }
 
   class H2SQLBuilder extends JDBCSQLBuilder {
+override def escapeSpecialCharsForLikePattern(str: String): String = {
+  str.map {
+case '_' => "\\_"
+case '%' => "\\%"
+case c => c.toString
+  }.mkString
+}
+
 override def visitAggregateFunction(
 funcName: String, isDistinct:

(spark) branch master updated: [SPARK-45752][SQL] Simplify the code for check unreferenced CTE relations

2023-11-10 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6851cb96ec6 [SPARK-45752][SQL] Simplify the code for check 
unreferenced CTE relations
6851cb96ec6 is described below

commit 6851cb96ec651b25a8103f7681e8528ff7d625ff
Author: Jiaan Geng 
AuthorDate: Fri Nov 10 22:00:51 2023 +0800

[SPARK-45752][SQL] Simplify the code for check unreferenced CTE relations

### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/43614 let unreferenced `CTE` checked 
by `CheckAnalysis0`.
This PR follows up https://github.com/apache/spark/pull/43614 to simplify 
the code for check unreferenced CTE relations.

### Why are the changes needed?
Simplify the code for check unreferenced CTE relations

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Exists test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43727 from beliefer/SPARK-45752_followup.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../spark/sql/catalyst/analysis/CheckAnalysis.scala| 12 
 .../scala/org/apache/spark/sql/CTEInlineSuite.scala| 18 --
 2 files changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 29d60ae0f41..f9010d47508 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -167,25 +167,21 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
 val inlineCTE = InlineCTE(alwaysInline = true)
 val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int, 
mutable.Map[Long, Int])]
 inlineCTE.buildCTEMap(plan, cteMap)
-cteMap.values.foreach { case (relation, _, _) =>
+val visited: mutable.Map[Long, Boolean] = 
mutable.Map.empty.withDefaultValue(false)
+cteMap.foreach { case (cteId, (relation, refCount, _)) =>
   // If a CTE relation is never used, it will disappear after inline. Here 
we explicitly check
   // analysis for it, to make sure the entire query plan is valid.
   try {
 // If a CTE relation ref count is 0, the other CTE relations that 
reference it
 // should also be checked by checkAnalysis0. This code will also 
guarantee the leaf
 // relations that do not reference any others are checked first.
-val visited: mutable.Map[Long, Boolean] = 
mutable.Map.empty.withDefaultValue(false)
-cteMap.foreach { case (cteId, _) =>
-  val (_, refCount, _) = cteMap(cteId)
-  if (refCount == 0) {
-checkUnreferencedCTERelations(cteMap, visited, cteId)
-  }
+if (refCount == 0) {
+  checkUnreferencedCTERelations(cteMap, visited, cteId)
 }
   } catch {
 case e: AnalysisException =>
   throw new ExtendedAnalysisException(e, relation.child)
   }
-
 }
 // Inline all CTEs in the plan to help check query plan structures in 
subqueries.
 var inlinedPlan: Option[LogicalPlan] = None
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index 055c04992c0..a06b50d175f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -683,11 +683,25 @@ abstract class CTEInlineSuiteBase
 val e = intercept[AnalysisException](sql(
   s"""
 |with
-|a as (select * from non_exist),
+|a as (select * from tab_non_exists),
 |b as (select * from a)
 |select 2
 |""".stripMargin))
-checkErrorTableNotFound(e, "`non_exist`", ExpectedContext("non_exist", 26, 
34))
+checkErrorTableNotFound(e, "`tab_non_exists`", 
ExpectedContext("tab_non_exists", 26, 39))
+
+withTable("tab_exists") {
+  spark.sql("CREATE TABLE tab_exists(id INT) using parquet")
+  val e = intercept[AnalysisException](sql(
+s"""
+   |with
+   |a as (select * from tab_exists),
+   |b as (select * from a),
+   |c as (select * from tab_non_exists),
+   |d as (select * from c)
+   |select 2
+   |""".stripMargin))
+  checkErrorTableNotFound(e, "`tab_non_exists`", 
ExpectedContext("tab_non_

(spark) branch master updated (1d8df4f6b99b -> f866549a5aa8)

2023-11-08 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 1d8df4f6b99b [SPARK-45606][SQL] Release restrictions on multi-layer 
runtime filter
 add f866549a5aa8 [SPARK-45816][SQL] Return `NULL` when overflowing during 
casting from timestamp to integers

No new revisions were added by this update.

Summary of changes:
 docs/sql-migration-guide.md|  1 +
 .../spark/sql/catalyst/expressions/Cast.scala  | 51 --
 .../expressions/CastWithAnsiOffSuite.scala |  6 +--
 3 files changed, 33 insertions(+), 25 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-45606][SQL] Release restrictions on multi-layer runtime filter

2023-11-08 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1d8df4f6b99b [SPARK-45606][SQL] Release restrictions on multi-layer 
runtime filter
1d8df4f6b99b is described below

commit 1d8df4f6b99b836f4267b888e81d67c75b4dfdcd
Author: Jiaan Geng 
AuthorDate: Wed Nov 8 19:43:33 2023 +0800

[SPARK-45606][SQL] Release restrictions on multi-layer runtime filter

### What changes were proposed in this pull request?
Before https://github.com/apache/spark/pull/39170, Spark only supports 
insert runtime filter for application side of shuffle join on single-layer. 
Considered it's not worth to insert more runtime filter if the column already 
exists runtime filter, Spark restricts it at 
https://github.com/apache/spark/blob/7057952f6bc2c5cf97dd408effd1b18bee1cb8f4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala#L346
For example
`select * from bf1 join bf2 on bf1.c1 = bf2.c2 and bf1.c1 = bf2.b2 where 
bf2.a2 = 62`
This SQL have two join conditions. There will insert two runtime filter on 
`bf1.c1` if haven't the restriction mentioned above.
At that time, it was reasonable.

After https://github.com/apache/spark/pull/39170, Spark supports insert 
runtime filter for one side of any shuffle join on multi-layer. But the 
restrictions on multi-layer runtime filter mentioned above looks outdated.
For example
`select * from bf1 join bf2 join bf3 on bf1.c1 = bf2.c2 and bf3.c3 = bf1.c1 
where bf2.a2 = 5`
Assume bf2 as the build side and insert a runtime filter for bf1. We can't 
insert the same runtime filter for bf3 due to there are already a runtime 
filter on `bf1.c1`.
The behavior is different from the origin and is unexpected.

The change of the PR doesn't affect the restriction mentioned above.

### Why are the changes needed?
Release restrictions on multi-layer runtime filter.
Expand optimization surface.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
Test cases updated.

Micro benchmark for q9 in TPC-H.
**TPC-H 100**
Query | Master(ms) | PR(ms) | Difference(ms) | Percent
-- | -- | -- | -- | --
q9 | 26491 | 20725 | 5766| 27.82%

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43449 from beliefer/SPARK-45606.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../catalyst/optimizer/InjectRuntimeFilter.scala   | 33 ++
 .../spark/sql/InjectRuntimeFilterSuite.scala   |  8 ++
 2 files changed, 18 insertions(+), 23 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 5f5508d6b22c..9c150f1f3308 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -247,15 +247,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
 }
   }
 
-  private def hasBloomFilter(
-  left: LogicalPlan,
-  right: LogicalPlan,
-  leftKey: Expression,
-  rightKey: Expression): Boolean = {
-findBloomFilterWithKey(left, leftKey) || findBloomFilterWithKey(right, 
rightKey)
-  }
-
-  private def findBloomFilterWithKey(plan: LogicalPlan, key: Expression): 
Boolean = {
+  private def hasBloomFilter(plan: LogicalPlan, key: Expression): Boolean = {
 plan.exists {
   case Filter(condition, _) =>
 splitConjunctivePredicates(condition).exists {
@@ -277,28 +269,33 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
 leftKeys.lazyZip(rightKeys).foreach((l, r) => {
   // Check if:
   // 1. There is already a DPP filter on the key
-  // 2. There is already a bloom filter on the key
-  // 3. The keys are simple cheap expressions
+  // 2. The keys are simple cheap expressions
   if (filterCounter < numFilterThreshold &&
 !hasDynamicPruningSubquery(left, right, l, r) &&
-!hasBloomFilter(newLeft, newRight, l, r) &&
 isSimpleExpression(l) && isSimpleExpression(r)) {
 val oldLeft = newLeft
 val oldRight = newRight
-// Check if the current join is a shuffle join or a broadcast join 
that
-// has a shuffle below it
+// Check if:
+// 1. The current join type supports prune the left side with 
runtime filter
+   

(spark) branch master updated: [SPARK-45793][CORE] Improve the built-in compression codecs

2023-11-06 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f6038302dd6 [SPARK-45793][CORE] Improve the built-in compression codecs
f6038302dd6 is described below

commit f6038302dd615f4bf9bed9c4af3d04426f7e5c5e
Author: Jiaan Geng 
AuthorDate: Mon Nov 6 20:06:39 2023 +0800

[SPARK-45793][CORE] Improve the built-in compression codecs

### What changes were proposed in this pull request?
Currently, Spark supported many built-in compression codecs used for I/O 
and storage.
There are a lot of magic strings copy from built-in compression codecs. 
This issue lead to developers need to manually maintain its consistency. It is 
easy to make mistakes and reduce development efficiency.

### Why are the changes needed?
Improve some code for storage compression codecs

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
N/A

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43659 from beliefer/improve_storage_code.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../deploy/history/HistoryServerMemoryManager.scala |  3 ++-
 .../org/apache/spark/internal/config/package.scala  |  7 ---
 .../org/apache/spark/io/CompressionCodec.scala  | 21 -
 .../deploy/history/EventLogFileWritersSuite.scala   |  6 +++---
 .../deploy/history/FsHistoryProviderSuite.scala |  5 +++--
 .../org/apache/spark/io/CompressionCodecSuite.scala |  8 
 .../apache/spark/storage/FallbackStorageSuite.scala |  3 ++-
 .../collection/ExternalAppendOnlyMapSuite.scala |  3 +--
 .../k8s/integrationtest/BasicTestsSuite.scala   |  3 ++-
 .../org/apache/spark/sql/internal/SQLConf.scala |  3 ++-
 .../spark/sql/execution/streaming/OffsetSeq.scala   |  3 ++-
 .../streaming/state/RocksDBFileManager.scala|  2 +-
 12 files changed, 38 insertions(+), 29 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
index 00e58cbdc57..b95f1ed24f3 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerMemoryManager.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.HashMap
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.History._
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.Utils
 
 /**
@@ -75,7 +76,7 @@ private class HistoryServerMemoryManager(
 
   private def approximateMemoryUsage(eventLogSize: Long, codec: 
Option[String]): Long = {
 codec match {
-  case Some("zstd") =>
+  case Some(CompressionCodec.ZSTD) =>
 eventLogSize * 10
   case Some(_) =>
 eventLogSize * 4
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 93a42eec832..bbadf91fc41 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -21,6 +21,7 @@ import java.util.Locale
 import java.util.concurrent.TimeUnit
 
 import org.apache.spark.SparkContext
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.metrics.GarbageCollectionMetrics
 import org.apache.spark.network.shuffle.Constants
@@ -1530,7 +1531,7 @@ package object config {
 "use fully qualified class names to specify the codec.")
   .version("3.0.0")
   .stringConf
-  .createWithDefault("zstd")
+  .createWithDefault(CompressionCodec.ZSTD)
 
   private[spark] val SHUFFLE_SPILL_INITIAL_MEM_THRESHOLD =
 ConfigBuilder("spark.shuffle.spill.initialMemoryThreshold")
@@ -1871,7 +1872,7 @@ package object config {
 "the codec")
   .version("0.8.0")
   .stringConf
-  .createWithDefaultString("lz4")
+  .createWithDefaultString(CompressionCodec.LZ4)
 
   private[spark] val IO_COMPRESSION_ZSTD_BUFFERSIZE =
 ConfigBuilder("spark.io.compression.zstd.bufferSize")
@@ -1914,7 +1915,7 @@ package object config {
 "the codec.")
   .version("3.0.0")
   .stringConf
-  .createWithDefault("zstd")
+  .createWithDefault(CompressionCodec.ZSTD)
 
   private[spark] val BUFFER_SIZE =
 ConfigBuilder("spark.buffer.size")
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 

(spark) branch master updated: [SPARK-45758][SQL] Introduce a mapper for hadoop compression codecs

2023-11-06 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fc867266f08 [SPARK-45758][SQL] Introduce a mapper for hadoop 
compression codecs
fc867266f08 is described below

commit fc867266f0898866ab5ff7ed82b0c7c5fbaccefc
Author: Jiaan Geng 
AuthorDate: Mon Nov 6 18:01:11 2023 +0800

[SPARK-45758][SQL] Introduce a mapper for hadoop compression codecs

### What changes were proposed in this pull request?
Currently, Spark supported partial Hadoop compression codecs, but the 
Hadoop supported compression codecs and spark supported are not completely 
one-on-one due to Spark introduce two fake compression codecs none and 
uncompress.
There are a lot of magic strings copy from Hadoop compression codecs. This 
issue lead to developers need to manually maintain its consistency. It is easy 
to make mistakes and reduce development efficiency.

### Why are the changes needed?
Let developers easy to use Hadoop compression codecs.

### Does this PR introduce _any_ user-facing change?
'No'.
Introduce a new class.

### How was this patch tested?
Exists test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43620 from beliefer/SPARK-45758.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../sql/catalyst/util/HadoopCompressionCodec.java  | 63 ++
 .../sql/catalyst/util/CompressionCodecs.scala  | 12 ++---
 .../org/apache/spark/sql/DataFrameSuite.scala  |  4 +-
 .../benchmark/DataSourceReadBenchmark.scala|  8 ++-
 .../sql/execution/datasources/csv/CSVSuite.scala   |  4 +-
 .../sql/execution/datasources/json/JsonSuite.scala |  4 +-
 .../sql/execution/datasources/text/TextSuite.scala | 10 ++--
 .../datasources/text/WholeTextFileSuite.scala  |  3 +-
 8 files changed, 87 insertions(+), 21 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/HadoopCompressionCodec.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/HadoopCompressionCodec.java
new file mode 100644
index 000..ee4cb4da322
--- /dev/null
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/HadoopCompressionCodec.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+/**
+ * A mapper class from Spark supported hadoop compression codecs to hadoop 
compression codecs.
+ */
+public enum HadoopCompressionCodec {
+  NONE(null),
+  UNCOMPRESSED(null),
+  BZIP2(new BZip2Codec()),
+  DEFLATE(new DeflateCodec()),
+  GZIP(new GzipCodec()),
+  LZ4(new Lz4Codec()),
+  SNAPPY(new SnappyCodec());
+
+  // TODO supports ZStandardCodec
+
+  private final CompressionCodec compressionCodec;
+
+  HadoopCompressionCodec(CompressionCodec compressionCodec) {
+this.compressionCodec = compressionCodec;
+  }
+
+  public CompressionCodec getCompressionCodec() {
+return this.compressionCodec;
+  }
+
+  private static final Map codecNameMap =
+Arrays.stream(HadoopCompressionCodec.values()).collect(
+  Collectors.toMap(Enum::name, codec -> 
codec.name().toLowerCase(Locale.ROOT)));
+
+  public String lowerCaseName() {
+return codecNameMap.get(this.name());
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
index 1377a03d93b..a1d6446cc10 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Compression

(spark) branch master updated: [SPARK-45755][SQL] Improve `Dataset.isEmpty()` by applying global limit `1`

2023-11-01 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c7bba9bfcc3 [SPARK-45755][SQL] Improve `Dataset.isEmpty()` by applying 
global limit `1`
c7bba9bfcc3 is described below

commit c7bba9bfcc350bd3508dd6bb41da6f0c1fef63c6
Author: Yuming Wang 
AuthorDate: Wed Nov 1 19:24:57 2023 +0800

[SPARK-45755][SQL] Improve `Dataset.isEmpty()` by applying global limit `1`

### What changes were proposed in this pull request?

This PR makes `Dataset.isEmpty()` to execute global limit 1 first. 
`LimitPushDown` may push down global limit 1 to lower nodes to improve query 
performance.

Note that we use global limit 1 here, because the local limit cannot be 
pushed down the group only case: 
https://github.com/apache/spark/blob/89ca8b6065e9f690a492c778262080741d50d94d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L766-L770

### Why are the changes needed?

Improve query performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual testing:
```scala
spark.range(3).selectExpr("id", "array(id, id % 10, id % 100) as 
eo").write.saveAsTable("t1")
spark.range(1).selectExpr("id", "array(id, id % 10, id % 1000) as 
eo").write.saveAsTable("t2")
println(spark.sql("SELECT * FROM t1 LATERAL VIEW explode_outer(eo) AS e 
UNION SELECT * FROM t2 LATERAL VIEW explode_outer(eo) AS e").isEmpty)
```

Before this PR | After this PR
-- | --
https://github.com/apache/spark/assets/5399861/417adc05-4160-4470-b63c-125faac08c9c;>
 | https://github.com/apache/spark/assets/5399861/bdeff231-e725-4c55-9da2-1b4cd59ec8c8;>

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43617 from wangyum/SPARK-45755.

Lead-authored-by: Yuming Wang 
Co-authored-by: Yuming Wang 
Signed-off-by: Jiaan Geng 
---
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ba5eb790cea..a567a915daf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -652,7 +652,7 @@ class Dataset[T] private[sql](
* @group basic
* @since 2.4.0
*/
-  def isEmpty: Boolean = withAction("isEmpty", select().queryExecution) { plan 
=>
+  def isEmpty: Boolean = withAction("isEmpty", 
select().limit(1).queryExecution) { plan =>
 plan.executeTake(1).isEmpty
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-43380][SQL][FOLLOWUP] Deprecate toSqlType(avroSchema: Schema, …useStableIdForUnionType: Boolean): SchemaType

2023-10-30 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new dc02991fa662 [SPARK-43380][SQL][FOLLOWUP] Deprecate 
toSqlType(avroSchema: Schema, …useStableIdForUnionType: Boolean): SchemaType
dc02991fa662 is described below

commit dc02991fa662c2f760315f190893cf09545e1b83
Author: Jiaan Geng 
AuthorDate: Mon Oct 30 21:34:30 2023 +0800

[SPARK-43380][SQL][FOLLOWUP] Deprecate toSqlType(avroSchema: Schema, 
…useStableIdForUnionType: Boolean): SchemaType

### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/43530 provides a new method:
```
  /**
   * Converts an Avro schema to a corresponding Spark SQL schema.
   *
   * since 4.0.0
   */
  def toSqlType(avroSchema: Schema, useStableIdForUnionType: Boolean): 
SchemaType = {
toSqlTypeHelper(avroSchema, Set.empty, useStableIdForUnionType)
  }
```
Because take `AvroOptions` as parameter causes the performance regression, 
the old `toSqlType` looks very useless.

This PR also improve some caller of `toSqlType` by pass 
`useStableIdForUnionType` directly.

### Why are the changes needed?
Deprecate toSqlType(avroSchema: Schema, …useStableIdForUnionType: Boolean): 
SchemaType

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Exists test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43557 from beliefer/SPARK-43380_followup.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala  | 3 ++-
 .../avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala  | 2 +-
 .../src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala| 2 ++
 .../src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala  | 2 +-
 4 files changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
index 2c2a45fc3f14..06388409284a 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
@@ -39,7 +39,8 @@ private[sql] case class AvroDataToCatalyst(
   override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
 
   override lazy val dataType: DataType = {
-val dt = SchemaConverters.toSqlType(expectedSchema, options).dataType
+val dt = SchemaConverters.toSqlType(
+  expectedSchema, avroOptions.useStableIdForUnionType).dataType
 parseMode match {
   // With PermissiveMode, the output Catalyst row might contain columns of 
null values for
   // corrupt records, even if some of the columns are not nullable in the 
user-provided schema.
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index e738f541ca79..0e27e4a604c4 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -61,7 +61,7 @@ private[sql] object AvroUtils extends Logging {
   new 
FileSourceOptions(CaseInsensitiveMap(options)).ignoreCorruptFiles)
   }
 
-SchemaConverters.toSqlType(avroSchema, options).dataType match {
+SchemaConverters.toSqlType(avroSchema, 
parsedOptions.useStableIdForUnionType).dataType match {
   case t: StructType => Some(t)
   case _ => throw new RuntimeException(
 s"""Avro schema cannot be converted to a Spark SQL StructType:
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index ba01a18d76f7..00fb32794e3a 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -62,6 +62,8 @@ object SchemaConverters {
   def toSqlType(avroSchema: Schema): SchemaType = {
 toSqlType(avroSchema, false)
   }
+
+  @deprecated("using toSqlType(..., useStableIdForUnionType: Boolean) 
instead", "4.0.0")
   def toSqlType(avroSchema: Schema, options: Map[String, String]): SchemaType 
= {
 toSqlTypeHelper(avroSchema, Set.empty, 
AvroOptions(options).useStableIdForUnionType)
   }
diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSuite.scala
 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroFunctionsSui

(spark) branch master updated (57e73dafeb9 -> e46c8ae84e8)

2023-10-28 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 57e73dafeb9 [SPARK-45678][CORE] Cover 
BufferReleasingInputStream.available/reset under tryOrFetchFailedException
 add e46c8ae84e8 [SPARK-45711][SQL] Introduce a mapper for avro compression 
codecs

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/avro/AvroCompressionCodec.java   | 59 ++
 .../org/apache/spark/sql/avro/AvroUtils.scala  | 15 +++---
 .../org/apache/spark/sql/avro/AvroCodecSuite.scala |  4 +-
 .../org/apache/spark/sql/avro/AvroSuite.scala  | 18 ---
 4 files changed, 81 insertions(+), 15 deletions(-)
 create mode 100644 
connector/avro/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45481][SQL] Introduce a mapper for parquet compression codecs

2023-10-26 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 62a3868b93a [SPARK-45481][SQL] Introduce a mapper for parquet 
compression codecs
62a3868b93a is described below

commit 62a3868b93a51a4dc424e87d6fd06df914158e1b
Author: Jiaan Geng 
AuthorDate: Fri Oct 27 10:48:44 2023 +0800

[SPARK-45481][SQL] Introduce a mapper for parquet compression codecs

### What changes were proposed in this pull request?
Currently, Spark supported all the parquet compression codecs, but the 
parquet supported compression codecs and spark supported are not completely 
one-on-one due to Spark introduce a fake compression codecs none.
On the other hand, there are a lot of magic strings copy from parquet 
compression codecs. This issue lead to developers need to manually maintain its 
consistency. It is easy to make mistakes and reduce development efficiency.

The `CompressionCodecName`, refer: 
https://github.com/apache/parquet-mr/blob/master/parquet-common/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java

### Why are the changes needed?
Let developers easy to use parquet compression codecs.

### Does this PR introduce _any_ user-facing change?
'No'.
Introduce a new class.

### How was this patch tested?
Exists test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43308 from beliefer/SPARK-45481.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../parquet/ParquetCompressionCodec.java   | 62 ++
 .../datasources/parquet/ParquetOptions.scala   | 15 ++
 .../BuiltInDataSourceWriteBenchmark.scala  |  6 ++-
 .../benchmark/DataSourceReadBenchmark.scala|  9 ++--
 .../benchmark/FilterPushdownBenchmark.scala|  5 +-
 .../execution/benchmark/TPCDSQueryBenchmark.scala  |  6 ++-
 .../datasources/FileSourceCodecSuite.scala | 12 +++--
 .../ParquetCompressionCodecPrecedenceSuite.scala   | 41 +++---
 .../datasources/parquet/ParquetIOSuite.scala   | 22 
 .../spark/sql/hive/CompressionCodecSuite.scala | 23 ++--
 .../apache/spark/sql/hive/HiveParquetSuite.scala   |  6 ++-
 .../spark/sql/hive/execution/HiveDDLSuite.scala| 10 ++--
 .../sql/sources/ParquetHadoopFsRelationSuite.scala |  3 +-
 13 files changed, 155 insertions(+), 65 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
new file mode 100644
index 000..1a37c7a33f2
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodec.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * A mapper class from Spark supported parquet compression codecs to parquet 
compression codecs.
+ */
+public enum ParquetCompressionCodec {
+  NONE(CompressionCodecName.UNCOMPRESSED),
+  UNCOMPRESSED(CompressionCodecName.UNCOMPRESSED),
+  SNAPPY(CompressionCodecName.SNAPPY),
+  GZIP(CompressionCodecName.GZIP),
+  LZO(CompressionCodecName.LZO),
+  BROTLI(CompressionCodecName.BROTLI),
+  LZ4(CompressionCodecName.LZ4),
+  LZ4_RAW(CompressionCodecName.LZ4_RAW),
+  ZSTD(CompressionCodecName.ZSTD);
+
+  private final CompressionCodecName compressionCodec;
+
+  ParquetCompressionCodec(CompressionCodecName compressionCodec) {
+this.compressionCodec = compressionCodec;
+  }
+
+  public CompressionCodecName getCompressionCodec() {
+return this.compressionCodec;
+  }
+
+  public static ParquetCompressionCodec fromString(String s) {
+return ParquetCompressionCodec.valueOf(s.toUpperCase(Locale.ROOT

[spark] branch master updated: [SPARK-45484][SQL] Fix the bug that uses incorrect parquet compression codec lz4raw

2023-10-19 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b9ac7d38ea7 [SPARK-45484][SQL] Fix the bug that uses incorrect parquet 
compression codec lz4raw
b9ac7d38ea7 is described below

commit b9ac7d38ea7f4f7e57c4bbb7b5e5bc836b2b1c58
Author: Jiaan Geng 
AuthorDate: Fri Oct 20 09:43:11 2023 +0800

[SPARK-45484][SQL] Fix the bug that uses incorrect parquet compression 
codec lz4raw

### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/41507 supported the new parquet 
compression codec `lz4raw`. But `lz4raw` is not a correct parquet compression 
codec name.

This mistake causes error. Please refer 
https://github.com/apache/spark/pull/43310/files#r1352405312

The root cause is parquet uses `lz4_raw` as its name and store it into the 
metadata of parquet file. Please refer 
https://github.com/apache/spark/blob/6373f19f537f69c6460b2e4097f19903c01a608f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompressionCodecPrecedenceSuite.scala#L65

We should use `lz4_raw` as its name.

### Why are the changes needed?
Fix the bug that uses incorrect parquet compression codec `lz4raw`.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Fix a bug.

### How was this patch tested?
New test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43310 from beliefer/SPARK-45484.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 docs/sql-migration-guide.md |  1 +
 .../main/scala/org/apache/spark/sql/internal/SQLConf.scala  |  4 ++--
 .../sql/execution/datasources/parquet/ParquetOptions.scala  |  2 +-
 .../sql/execution/datasources/FileSourceCodecSuite.scala|  2 +-
 .../parquet/ParquetCompressionCodecPrecedenceSuite.scala| 13 +++--
 5 files changed, 16 insertions(+), 6 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index c5d09c19b24..b0dc49ed476 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -27,6 +27,7 @@ license: |
 - Since Spark 4.0, the default value of `spark.sql.maxSinglePartitionBytes` is 
changed from `Long.MaxValue` to `128m`. To restore the previous behavior, set 
`spark.sql.maxSinglePartitionBytes` to `9223372036854775807`(`Long.MaxValue`).
 - Since Spark 4.0, any read of SQL tables takes into consideration the SQL 
configs 
`spark.sql.files.ignoreCorruptFiles`/`spark.sql.files.ignoreMissingFiles` 
instead of the core config 
`spark.files.ignoreCorruptFiles`/`spark.files.ignoreMissingFiles`.
 - Since Spark 4.0, `spark.sql.hive.metastore` drops the support of Hive prior 
to 2.0.0 as they require JDK 8 that Spark does not support anymore. Users 
should migrate to higher versions.
+- Since Spark 4.0, `spark.sql.parquet.compression.codec` drops the support of 
codec name `lz4raw`, please use `lz4_raw` instead.
 
 ## Upgrading from Spark SQL 3.4 to 3.5
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e66eadaa914..1e759b6266c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1015,12 +1015,12 @@ object SQLConf {
   "`parquet.compression` is specified in the table-specific 
options/properties, the " +
   "precedence would be `compression`, `parquet.compression`, " +
   "`spark.sql.parquet.compression.codec`. Acceptable values include: none, 
uncompressed, " +
-  "snappy, gzip, lzo, brotli, lz4, lz4raw, zstd.")
+  "snappy, gzip, lzo, brotli, lz4, lz4_raw, zstd.")
 .version("1.1.1")
 .stringConf
 .transform(_.toLowerCase(Locale.ROOT))
 .checkValues(
-  Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", 
"lz4raw", "zstd"))
+  Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", 
"lz4_raw", "zstd"))
 .createWithDefault("snappy")
 
   val PARQUET_FILTER_PUSHDOWN_ENABLED = 
buildConf("spark.sql.parquet.filterPushdown")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 023d2460959..559a994319d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/

[spark] branch branch-3.5 updated: [SPARK-45543][SQL] `InferWindowGroupLimit` causes bug if the other window functions haven't the same window frame as the rank-like functions

2023-10-19 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new feb48dc146d [SPARK-45543][SQL] `InferWindowGroupLimit` causes bug if 
the other window functions haven't the same window frame as the rank-like 
functions
feb48dc146d is described below

commit feb48dc146d8a89882875f25115af52e8295dfcc
Author: Jiaan Geng 
AuthorDate: Thu Oct 19 20:16:21 2023 +0800

[SPARK-45543][SQL] `InferWindowGroupLimit` causes bug if the other window 
functions haven't the same window frame as the rank-like functions

### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/38799 Introduce the group limit of 
Window for rank-based filter to optimize top-k computation.
But it causes a bug if window expressions exists non-rank function which 
has the window frame is not the same as `(UnboundedPreceding, CurrentRow)`.
Please see the detail at https://issues.apache.org/jira/browse/SPARK-45543.

### Why are the changes needed?
Fix the bug.

### Does this PR introduce _any_ user-facing change?
'Yes'.

### How was this patch tested?
New test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43385 from beliefer/SPARK-45543.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
(cherry picked from commit d6d4e52ecc3015b41c51bc7e4e122696c76b06ee)
Signed-off-by: Jiaan Geng 
---
 .../catalyst/optimizer/InferWindowGroupLimit.scala |  18 +++-
 .../spark/sql/DataFrameWindowFunctionsSuite.scala  | 112 +
 2 files changed, 126 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
index 261be291463..04204c6a2e1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
@@ -52,23 +52,33 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with 
PredicateHelper {
 if (limits.nonEmpty) Some(limits.min) else None
   }
 
-  private def support(
+  /**
+   * All window expressions should use the same expanding window, so that
+   * we can safely do the early stop.
+   */
+  private def isExpandingWindow(
   windowExpression: NamedExpression): Boolean = windowExpression match {
-case Alias(WindowExpression(_: Rank | _: DenseRank | _: RowNumber, 
WindowSpecDefinition(_, _,
+case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
 SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => 
true
 case _ => false
   }
 
+  private def support(windowFunction: Expression): Boolean = windowFunction 
match {
+case _: Rank | _: DenseRank | _: RowNumber => true
+case _ => false
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan = {
 if (conf.windowGroupLimitThreshold == -1) return plan
 
 plan.transformWithPruning(_.containsAllPatterns(FILTER, WINDOW), ruleId) {
   case filter @ Filter(condition,
 window @ Window(windowExpressions, partitionSpec, orderSpec, child))
-if !child.isInstanceOf[WindowGroupLimit] && 
windowExpressions.exists(support) &&
+if !child.isInstanceOf[WindowGroupLimit] && 
windowExpressions.forall(isExpandingWindow) &&
   orderSpec.nonEmpty =>
 val limits = windowExpressions.collect {
-  case alias @ Alias(WindowExpression(rankLikeFunction, _), _) if 
support(alias) =>
+  case alias @ Alias(WindowExpression(rankLikeFunction, _), _)
+if support(rankLikeFunction) =>
 extractLimits(condition, alias.toAttribute).map((_, 
rankLikeFunction))
 }.flatten
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index a57e927ba84..47380db4217 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1521,4 +1521,116 @@ class DataFrameWindowFunctionsSuite extends QueryTest
   assert(windows.size === 1)
 }
   }
+
+  test("SPARK-45543: InferWindowGroupLimit causes bug " +
+"if the other window functions haven't the same window frame as the 
rank-like functions") {
+val df = Seq(
+  (1, "Dave", 1, 2020),
+  (2, "Dave", 1, 2021),
+  (3, "Dave", 2, 2022),
+  (4, "Dave", 3, 2023),
+  (5, &

[spark] branch master updated: [SPARK-45543][SQL] `InferWindowGroupLimit` causes bug if the other window functions haven't the same window frame as the rank-like functions

2023-10-19 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d6d4e52ecc3 [SPARK-45543][SQL] `InferWindowGroupLimit` causes bug if 
the other window functions haven't the same window frame as the rank-like 
functions
d6d4e52ecc3 is described below

commit d6d4e52ecc3015b41c51bc7e4e122696c76b06ee
Author: Jiaan Geng 
AuthorDate: Thu Oct 19 20:16:21 2023 +0800

[SPARK-45543][SQL] `InferWindowGroupLimit` causes bug if the other window 
functions haven't the same window frame as the rank-like functions

### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/38799 Introduce the group limit of 
Window for rank-based filter to optimize top-k computation.
But it causes a bug if window expressions exists non-rank function which 
has the window frame is not the same as `(UnboundedPreceding, CurrentRow)`.
Please see the detail at https://issues.apache.org/jira/browse/SPARK-45543.

### Why are the changes needed?
Fix the bug.

### Does this PR introduce _any_ user-facing change?
'Yes'.

### How was this patch tested?
New test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43385 from beliefer/SPARK-45543.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../catalyst/optimizer/InferWindowGroupLimit.scala |  18 +++-
 .../spark/sql/DataFrameWindowFunctionsSuite.scala  | 112 +
 2 files changed, 126 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
index 261be291463..04204c6a2e1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InferWindowGroupLimit.scala
@@ -52,23 +52,33 @@ object InferWindowGroupLimit extends Rule[LogicalPlan] with 
PredicateHelper {
 if (limits.nonEmpty) Some(limits.min) else None
   }
 
-  private def support(
+  /**
+   * All window expressions should use the same expanding window, so that
+   * we can safely do the early stop.
+   */
+  private def isExpandingWindow(
   windowExpression: NamedExpression): Boolean = windowExpression match {
-case Alias(WindowExpression(_: Rank | _: DenseRank | _: RowNumber, 
WindowSpecDefinition(_, _,
+case Alias(WindowExpression(_, WindowSpecDefinition(_, _,
 SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => 
true
 case _ => false
   }
 
+  private def support(windowFunction: Expression): Boolean = windowFunction 
match {
+case _: Rank | _: DenseRank | _: RowNumber => true
+case _ => false
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan = {
 if (conf.windowGroupLimitThreshold == -1) return plan
 
 plan.transformWithPruning(_.containsAllPatterns(FILTER, WINDOW), ruleId) {
   case filter @ Filter(condition,
 window @ Window(windowExpressions, partitionSpec, orderSpec, child))
-if !child.isInstanceOf[WindowGroupLimit] && 
windowExpressions.exists(support) &&
+if !child.isInstanceOf[WindowGroupLimit] && 
windowExpressions.forall(isExpandingWindow) &&
   orderSpec.nonEmpty =>
 val limits = windowExpressions.collect {
-  case alias @ Alias(WindowExpression(rankLikeFunction, _), _) if 
support(alias) =>
+  case alias @ Alias(WindowExpression(rankLikeFunction, _), _)
+if support(rankLikeFunction) =>
 extractLimits(condition, alias.toAttribute).map((_, 
rankLikeFunction))
 }.flatten
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index df3f3eaf7ef..6dcc0334376 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1521,4 +1521,116 @@ class DataFrameWindowFunctionsSuite extends QueryTest
   assert(windows.size === 1)
 }
   }
+
+  test("SPARK-45543: InferWindowGroupLimit causes bug " +
+"if the other window functions haven't the same window frame as the 
rank-like functions") {
+val df = Seq(
+  (1, "Dave", 1, 2020),
+  (2, "Dave", 1, 2021),
+  (3, "Dave", 2, 2022),
+  (4, "Dave", 3, 2023),
+  (5, "Dave", 3, 2024),
+  (6, "Mark", 2, 2022),
+  (7, "Mark", 3, 2023)

[spark] branch branch-3.5 updated: [SPARK-45484][SQL][3.5] Deprecated the incorrect parquet compression codec lz4raw

2023-10-16 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new b2103731bcf [SPARK-45484][SQL][3.5] Deprecated the incorrect parquet 
compression codec lz4raw
b2103731bcf is described below

commit b2103731bcfe7e0bee3b1302c773e46f80badcc9
Author: Jiaan Geng 
AuthorDate: Tue Oct 17 09:50:39 2023 +0800

[SPARK-45484][SQL][3.5] Deprecated the incorrect parquet compression codec 
lz4raw

### What changes were proposed in this pull request?
According to the discussion at 
https://github.com/apache/spark/pull/43310#issuecomment-1757139681, this PR 
want deprecates the incorrect parquet compression codec `lz4raw` at Spark 3.5.1 
and adds a warning log.

The warning log prompts users that `lz4raw` will be removed it at Apache 
Spark 4.0.0.

### Why are the changes needed?
Deprecated the incorrect parquet compression codec `lz4raw`.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Users will see the waring log below.
`Parquet compression codec 'lz4raw' is deprecated, please use 'lz4_raw'`

### How was this patch tested?
Exists test cases and new test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43330 from beliefer/SPARK-45484_3.5.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../org/apache/spark/sql/internal/SQLConf.scala| 14 ++--
 .../datasources/parquet/ParquetOptions.scala   |  8 ++-
 .../datasources/FileSourceCodecSuite.scala |  2 +-
 .../ParquetCompressionCodecPrecedenceSuite.scala   | 25 ++
 4 files changed, 41 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 73d3756ef6b..427d0480190 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -995,12 +995,22 @@ object SQLConf {
   "`parquet.compression` is specified in the table-specific 
options/properties, the " +
   "precedence would be `compression`, `parquet.compression`, " +
   "`spark.sql.parquet.compression.codec`. Acceptable values include: none, 
uncompressed, " +
-  "snappy, gzip, lzo, brotli, lz4, lz4raw, zstd.")
+  "snappy, gzip, lzo, brotli, lz4, lz4raw, lz4_raw, zstd.")
 .version("1.1.1")
 .stringConf
 .transform(_.toLowerCase(Locale.ROOT))
 .checkValues(
-  Set("none", "uncompressed", "snappy", "gzip", "lzo", "brotli", "lz4", 
"lz4raw", "zstd"))
+  Set(
+"none",
+"uncompressed",
+"snappy",
+"gzip",
+"lzo",
+"brotli",
+"lz4",
+"lz4raw",
+"lz4_raw",
+"zstd"))
 .createWithDefault("snappy")
 
   val PARQUET_FILTER_PUSHDOWN_ENABLED = 
buildConf("spark.sql.parquet.filterPushdown")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 023d2460959..95869b6fbb9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -22,6 +22,7 @@ import java.util.Locale
 import org.apache.parquet.hadoop.ParquetOutputFormat
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.internal.SQLConf
@@ -32,7 +33,7 @@ import org.apache.spark.sql.internal.SQLConf
 class ParquetOptions(
 @transient private val parameters: CaseInsensitiveMap[String],
 @transient private val sqlConf: SQLConf)
-  extends FileSourceOptions(parameters) {
+  extends FileSourceOptions(parameters) with Logging {
 
   import ParquetOptions._
 
@@ -59,6 +60,9 @@ class ParquetOptions(
   throw new IllegalArgumentException(s"Codec [$codecName] " +
 s"is not available. Available codecs are ${availableCodecs.mkString(", 
")}.")
 }
+if (codecName == "lz4raw") {
+  log.warn("Parquet compression codec 'lz4raw' is deprecated, please use 
'lz4_raw'&q

[spark] branch master updated: [SPARK-45514][SQL][MLLIB] Replace `scala.runtime.Tuple3Zipped` to `scala.collection.LazyZip3`

2023-10-15 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d8dbb6674d0 [SPARK-45514][SQL][MLLIB] Replace 
`scala.runtime.Tuple3Zipped` to `scala.collection.LazyZip3`
d8dbb6674d0 is described below

commit d8dbb6674d0f3087f072dc08263f86e02a263693
Author: Jiaan Geng 
AuthorDate: Mon Oct 16 10:00:26 2023 +0800

[SPARK-45514][SQL][MLLIB] Replace `scala.runtime.Tuple3Zipped` to 
`scala.collection.LazyZip3`

### What changes were proposed in this pull request?
Since scala 2.13.0, `scala.runtime.Tuple3Zipped` marked as deprecated and 
`scala.collection.LazyZip3` recommended.

### Why are the changes needed?
Replace `scala.runtime.Tuple3Zipped` to `scala.collection.LazyZip3`.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Exists test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43363 from beliefer/SPARK-45514.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../spark/mllib/feature/ElementwiseProductSuite.scala|  2 +-
 .../org/apache/spark/mllib/feature/NormalizerSuite.scala |  6 +++---
 .../apache/spark/mllib/feature/StandardScalerSuite.scala | 16 
 .../spark/sql/execution/arrow/ArrowConvertersSuite.scala |  2 +-
 .../spark/sql/hive/execution/HiveComparisonTest.scala|  2 +-
 5 files changed, 14 insertions(+), 14 deletions(-)

diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/feature/ElementwiseProductSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/feature/ElementwiseProductSuite.scala
index 9eca2d682d6..a764ac8551f 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/feature/ElementwiseProductSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/feature/ElementwiseProductSuite.scala
@@ -48,7 +48,7 @@ class ElementwiseProductSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 val data2 = sparseData.map(transformer.transform)
 val data2RDD = transformer.transform(dataRDD)
 
-assert((sparseData, data2, data2RDD.collect()).zipped.forall {
+assert(sparseData.lazyZip(data2).lazyZip(data2RDD.collect()).forall {
   case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
   case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
   case _ => false
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala
index 71ce26360b8..77eade64db9 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/NormalizerSuite.scala
@@ -43,7 +43,7 @@ class NormalizerSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 val data1 = data.map(l1Normalizer.transform)
 val data1RDD = l1Normalizer.transform(dataRDD)
 
-assert((data, data1, data1RDD.collect()).zipped.forall {
+assert(data.lazyZip(data1).lazyZip(data1RDD.collect()).forall {
   case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
   case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
   case _ => false
@@ -70,7 +70,7 @@ class NormalizerSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 val data2 = data.map(l2Normalizer.transform)
 val data2RDD = l2Normalizer.transform(dataRDD)
 
-assert((data, data2, data2RDD.collect()).zipped.forall {
+assert(data.lazyZip(data2).lazyZip(data2RDD.collect()).forall {
   case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
   case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
   case _ => false
@@ -97,7 +97,7 @@ class NormalizerSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 val dataInf = data.map(lInfNormalizer.transform)
 val dataInfRDD = lInfNormalizer.transform(dataRDD)
 
-assert((data, dataInf, dataInfRDD.collect()).zipped.forall {
+assert(data.lazyZip(dataInf).lazyZip(dataInfRDD.collect()).forall {
   case (v1: DenseVector, v2: DenseVector, v3: DenseVector) => true
   case (v1: SparseVector, v2: SparseVector, v3: SparseVector) => true
   case _ => false
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
index a2c72de4231..2fad944344b 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
@@ -87,19 +87,19 @@ class StandardScalerSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 val summary2 = computeSummary(data2RDD

[spark] branch master updated: [SPARK-45513][CORE][SQL][MLLIB][CONNECT] Replace `scala.runtime.Tuple2Zipped` to `scala.collection.LazyZip2`

2023-10-13 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6f46ea2f9bb [SPARK-45513][CORE][SQL][MLLIB][CONNECT] Replace 
`scala.runtime.Tuple2Zipped` to `scala.collection.LazyZip2`
6f46ea2f9bb is described below

commit 6f46ea2f9bbad71077f9f2dbf72fa4e6906ef29a
Author: Jiaan Geng 
AuthorDate: Sat Oct 14 11:22:40 2023 +0800

[SPARK-45513][CORE][SQL][MLLIB][CONNECT] Replace 
`scala.runtime.Tuple2Zipped` to `scala.collection.LazyZip2`

### What changes were proposed in this pull request?
Since scala 2.13.0, `scala.runtime.Tuple2Zipped` marked as deprecated and 
`scala.collection.LazyZip2` recommended.

### Why are the changes needed?
Replace `scala.runtime.Tuple2Zipped` to `scala.collection.LazyZip2`

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Exist test cases.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #43351 from beliefer/SPARK-45513.

Authored-by: Jiaan Geng 
Signed-off-by: Jiaan Geng 
---
 .../org/apache/spark/sql/test/SQLHelper.scala  |  2 +-
 .../scheduler/EventLoggingListenerSuite.scala  |  2 +-
 .../spark/shuffle/ShuffleBlockPusherSuite.scala|  6 +++---
 .../mllib/feature/ElementwiseProductSuite.scala|  2 +-
 .../spark/mllib/feature/NormalizerSuite.scala  |  6 +++---
 .../spark/mllib/feature/StandardScalerSuite.scala  | 24 +++---
 .../spark/mllib/optimization/LBFGSSuite.scala  |  2 +-
 .../catalyst/optimizer/InjectRuntimeFilter.scala   |  2 +-
 .../spark/sql/catalyst/plans/SQLHelper.scala   |  2 +-
 .../columnar/compression/IntegralDeltaSuite.scala  |  4 ++--
 .../datasources/parquet/ParquetSchemaSuite.scala   |  2 +-
 .../sql/execution/joins/HashedRelationSuite.scala  | 24 --
 12 files changed, 45 insertions(+), 33 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala
index 12212492e37..727e2a4f420 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala
@@ -41,7 +41,7 @@ trait SQLHelper {
 None
   }
 }
-(keys, values).zipped.foreach { (k, v) =>
+keys.lazyZip(values).foreach { (k, v) =>
   if (spark.conf.isModifiable(k)) {
 spark.conf.set(k, v)
   } else {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index edc54e60654..bd659363e53 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -385,7 +385,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
   8000L, 5000L, 7000L, 4000L, 6000L, 3000L, 10L, 90L, 2L, 20L, 110L)
 
 def max(a: Array[Long], b: Array[Long]): Array[Long] =
-  (a, b).zipped.map(Math.max).toArray
+  a.lazyZip(b).map(Math.max).toArray
 
 // calculated metric peaks per stage per executor
 // metrics sent during stage 0 for each executor
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
index c8d89625dd8..18c27ff1269 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
@@ -82,7 +82,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite {
 pushedBlocks ++= blocks
 val managedBuffers = 
invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]]
 val blockPushListener = 
invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
-(blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
+blocks.lazyZip(managedBuffers).foreach((blockId, buffer) => {
   blockPushListener.onBlockPushSuccess(blockId, buffer)
 })
   })
@@ -91,7 +91,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite {
   private def verifyPushRequests(
   pushRequests: Seq[PushRequest],
   expectedSizes: Seq[Int]): Unit = {
-(pushRequests, expectedSizes).zipped.foreach((req, size) => {
+pushRequests.lazyZip(expectedSizes).foreach((req, size) => {
   assert(req.size == size)
 })
   }
@@ -256,7 +256,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite {
   // blocks to be deferred
   blockPushListener.

[spark] branch master updated: [SPARK-45412][PYTHON][CONNECT][FOLLOW-UP] Remove unnecessary check

2023-10-09 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f5cf3101991 [SPARK-45412][PYTHON][CONNECT][FOLLOW-UP] Remove 
unnecessary check
f5cf3101991 is described below

commit f5cf310199132f62b779e0244d15f7680e2ba856
Author: Ruifeng Zheng 
AuthorDate: Mon Oct 9 18:07:59 2023 +0800

[SPARK-45412][PYTHON][CONNECT][FOLLOW-UP] Remove unnecessary check

### What changes were proposed in this pull request?
Remove unnecessary check

### Why are the changes needed?
https://github.com/apache/spark/pull/43215 already validates the plan in 
`__init__`

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #43287 from zhengruifeng/SPARK-45412-followup.

Authored-by: Ruifeng Zheng 
Signed-off-by: Jiaan Geng 
---
 python/pyspark/sql/connect/dataframe.py | 16 +---
 1 file changed, 1 insertion(+), 15 deletions(-)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 2c0a75fad46..4044fab3bb3 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -169,7 +169,6 @@ class DataFrame:
 
 @property
 def write(self) -> "DataFrameWriter":
-assert self._plan is not None
 return DataFrameWriter(self._plan, self._session)
 
 write.__doc__ = PySparkDataFrame.write.__doc__
@@ -1096,11 +1095,6 @@ class DataFrame:
 union.__doc__ = PySparkDataFrame.union.__doc__
 
 def unionAll(self, other: "DataFrame") -> "DataFrame":
-if other._plan is None:
-raise PySparkValueError(
-error_class="MISSING_VALID_PLAN",
-message_parameters={"operator": "Union"},
-)
 self._check_same_session(other)
 return DataFrame.withPlan(
 plan.SetOperation(self._plan, other._plan, "union", is_all=True), 
session=self._session
@@ -2030,8 +2024,6 @@ class DataFrame:
 mapInArrow.__doc__ = PySparkDataFrame.mapInArrow.__doc__
 
 def foreach(self, f: Callable[[Row], None]) -> None:
-assert self._plan is not None
-
 def foreach_func(row: Any) -> None:
 f(row)
 
@@ -2042,8 +2034,6 @@ class DataFrame:
 foreach.__doc__ = PySparkDataFrame.foreach.__doc__
 
 def foreachPartition(self, f: Callable[[Iterator[Row]], None]) -> None:
-assert self._plan is not None
-
 schema = self.schema
 field_converters = [
 ArrowTableToRowsConversion._create_converter(f.dataType) for f in 
schema.fields
@@ -2069,14 +2059,12 @@ class DataFrame:
 
 @property
 def writeStream(self) -> DataStreamWriter:
-assert self._plan is not None
 return DataStreamWriter(plan=self._plan, session=self._session)
 
 writeStream.__doc__ = PySparkDataFrame.writeStream.__doc__
 
 def sameSemantics(self, other: "DataFrame") -> bool:
-assert self._plan is not None
-assert other._plan is not None
+self._check_same_session(other)
 return self._session.client.same_semantics(
 plan=self._plan.to_proto(self._session.client),
 other=other._plan.to_proto(other._session.client),
@@ -2085,7 +2073,6 @@ class DataFrame:
 sameSemantics.__doc__ = PySparkDataFrame.sameSemantics.__doc__
 
 def semanticHash(self) -> int:
-assert self._plan is not None
 return self._session.client.semantic_hash(
 plan=self._plan.to_proto(self._session.client),
 )
@@ -2093,7 +2080,6 @@ class DataFrame:
 semanticHash.__doc__ = PySparkDataFrame.semanticHash.__doc__
 
 def writeTo(self, table: str) -> "DataFrameWriterV2":
-assert self._plan is not None
 return DataFrameWriterV2(self._plan, self._session, table)
 
 writeTo.__doc__ = PySparkDataFrame.writeTo.__doc__


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org