(spark) branch master updated: [SPARK-46950][CORE][SQL] Align `not available codec` error-class

2024-02-02 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 88d4681dd6ab [SPARK-46950][CORE][SQL] Align `not available codec` 
error-class
88d4681dd6ab is described below

commit 88d4681dd6ab37d65d704b0119ba2a54801fffde
Author: panbingkun 
AuthorDate: Sat Feb 3 10:21:58 2024 +0300

[SPARK-46950][CORE][SQL] Align `not available codec` error-class

### What changes were proposed in this pull request?
The pr aims to align `not available codec` error-class, includes:
- In `core`, convert `CODEC_NOT_AVAILABLE` to 
`CODEC_NOT_AVAILABLE.WITH_CONF_SUGGESTION`.
- In `sql`, use `CODEC_NOT_AVAILABLE.WITH_AVAILABLE_CODECS_SUGGESTION` to 
cover `IllegalArgumentException`.

### Why are the changes needed?
When I was solving a case where `the compression option` is `null`, as 
follows:
https://github.com/apache/spark/assets/15246973/0e2abdad-0c1c-4ade-9f48-014bc4372fc6";>
I found that we could first complete the extraction logic for `the codec 
error`. At the same time, I found that there was already an 
`CODEC_NOT_AVAILABLE` error in the `error-classes.json`, but it had some 
differences from the error prompt in `SQL` for compress. Therefore, I proposed 
using `CODEC_NOT_AVAILABLE` as `a parent class error` and adding `two subclass 
errors`: `WITH_AVAILABLE_CODECS_SUGGESTION` and `WITH_CONF_SUGGESTION`.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
- Update existed UT.
- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44992 from panbingkun/codec_improve.

Lead-authored-by: panbingkun 
Co-authored-by: Maxim Gekk 
Signed-off-by: Max Gekk 
---
 .../src/main/resources/error/error-classes.json| 14 +++-
 .../org/apache/spark/errors/SparkCoreErrors.scala  | 13 ++-
 .../org/apache/spark/io/CompressionCodec.scala |  9 ++---
 .../apache/spark/io/CompressionCodecSuite.scala|  2 +-
 ...r-conditions-codec-not-available-error-class.md | 41 ++
 docs/sql-error-conditions.md   |  6 ++--
 .../sql/catalyst/util/CompressionCodecs.scala  |  7 ++--
 .../spark/sql/errors/QueryExecutionErrors.scala|  8 +
 .../sql/execution/datasources/orc/OrcOptions.scala |  4 +--
 .../datasources/parquet/ParquetOptions.scala   |  5 +--
 .../ParquetCompressionCodecPrecedenceSuite.scala   | 14 +---
 .../sql/execution/datasources/text/TextSuite.scala | 18 ++
 12 files changed, 112 insertions(+), 29 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 6d88f5ee511c..8399311cbfc4 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -441,8 +441,20 @@
   },
   "CODEC_NOT_AVAILABLE" : {
 "message" : [
-  "The codec  is not available. Consider to set the config 
 to ."
+  "The codec  is not available."
 ],
+"subClass" : {
+  "WITH_AVAILABLE_CODECS_SUGGESTION" : {
+"message" : [
+  "Available codecs are ."
+]
+  },
+  "WITH_CONF_SUGGESTION" : {
+"message" : [
+  "Consider to set the config  to ."
+]
+  }
+},
 "sqlState" : "56038"
   },
   "CODEC_SHORT_NAME_NOT_FOUND" : {
diff --git a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala 
b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
index 641310f2a0c5..a131f8233b0d 100644
--- a/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
+++ b/core/src/main/scala/org/apache/spark/errors/SparkCoreErrors.scala
@@ -24,7 +24,9 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SparkException, SparkRuntimeException, 
SparkUnsupportedOperationException, TaskNotSerializableException}
+import org.apache.spark.{SparkException, SparkIllegalArgumentException, 
SparkRuntimeException, SparkUnsupportedOperationException, 
TaskNotSerializableException}
+import org.apache.spark.internal.config.IO_COMPRESSION_CODEC
+import org.apache.spark.io.CompressionCodec.FALLBACK_COMPRESSION_CODEC
 import org.apache.spark.memory.SparkOutOfMemoryError
 import 
org.apache.spark.scheduler.{BarrierJobRunWithDynamicAllocationException, 
BarrierJobSlotsNumberCheckFailed, BarrierJobUnsupportedRDDChainException}
 import org.apache.spark.shuffle.{FetchFailedException, ShuffleManager}
@@ -490,6 +492,15 @@ private[spark] object SparkCoreErrors {
   cause = null)
   }
 
+  def codecNotAvailableError(codecName: String): Throwable = {
+new SparkIllegalArgumentException(
+  errorClass = "CODEC_NOT_AVAILABLE.WITH_CONF_SUGG

(spark) branch master updated: [SPARK-46890][SQL] Fix CSV parsing bug with existence default values and column pruning

2024-02-02 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 16ac82092bb7 [SPARK-46890][SQL] Fix CSV parsing bug with existence 
default values and column pruning
16ac82092bb7 is described below

commit 16ac82092bb775aafd010e2fb02b7ddc1eceea73
Author: Daniel Tenedorio 
AuthorDate: Sat Feb 3 08:50:44 2024 +0300

[SPARK-46890][SQL] Fix CSV parsing bug with existence default values and 
column pruning

### What changes were proposed in this pull request?

This PR fixes a CSV parsing bug with existence default values and column 
pruning (https://issues.apache.org/jira/browse/SPARK-46890).

The bug fix includes disabling column pruning specifically when checking 
the CSV header schema against the required schema expected by Catalyst. This 
makes the expected schema match what the CSV parser provides, since later we 
also happen instruct the CSV parser to disable column pruning and instead read 
each entire row in order to correctly assign the default value(s) during 
execution.

### Why are the changes needed?

Before this change, queries from a subset of the columns in a CSV table 
whose `CREATE TABLE` statement contained default values would return an 
internal exception. For example:

```
CREATE TABLE IF NOT EXISTS products (
  product_id INT,
  name STRING,
  price FLOAT default 0.0,
  quantity INT default 0
)
USING CSV
OPTIONS (
  header 'true',
  inferSchema 'false',
  enforceSchema 'false',
  path '/Users/maximgekk/tmp/products.csv'
);
```

The CSV file products.csv:

```
product_id,name,price,quantity
1,Apple,0.50,100
2,Banana,0.25,200
3,Orange,0.75,50
```

The query fails:

```
spark-sql (default)> SELECT price FROM products;
24/01/28 11:43:09 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 6)
java.lang.IllegalArgumentException: Number of column in CSV header is not 
equal to number of fields in the schema:
 Header length: 4, schema size: 1
CSV file: file:///Users/Daniel.Tenedorio/tmp/products.csv
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR adds test coverage.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44939 from dtenedor/debug-csv-default.

Authored-by: Daniel Tenedorio 
Signed-off-by: Max Gekk 
---
 .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 15 -
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  4 +--
 .../execution/datasources/csv/CSVFileFormat.scala  |  5 ++-
 .../v2/csv/CSVPartitionReaderFactory.scala |  6 +++-
 .../sql/execution/datasources/csv/CSVSuite.scala   | 38 ++
 5 files changed, 62 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
index c5a6bf5076de..f4ade722791c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala
@@ -26,8 +26,10 @@ import com.univocity.parsers.csv.{CsvParserSettings, 
CsvWriterSettings, Unescape
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.{DataSourceOptions, FileSourceOptions}
 import org.apache.spark.sql.catalyst.util._
+import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils.EXISTS_DEFAULT_COLUMN_METADATA_KEY
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.types.StructType
 
 class CSVOptions(
 @transient val parameters: CaseInsensitiveMap[String],
@@ -278,13 +280,24 @@ class CSVOptions(
 .getOrElse(UNESCAPED_QUOTE_HANDLING, 
"STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))
 
   /**
+   * Returns true if column pruning is enabled and there are no existence 
column default values in
+   * the [[schema]].
+   *
* The column pruning feature can be enabled either via the CSV option 
`columnPruning` or
* in non-multiline mode via initialization of CSV options by the SQL config:
* `spark.sql.csv.parser.columnPruning.enabled`.
* The feature is disabled in the `multiLine` mode because of the issue:
* https://github.com/uniVocity/univocity-parsers/issues/529
+   *
+   * We disable column pruning when there are any column defaults, instead 
preferring to reach in
+   * each row and then post-process it to substitute the default values after.
*/
-  val isColumnPruningEnabled: Boolean = getBool(COLUMN_PRUNING, !multiLine && 

(spark) branch master updated: [SPARK-46965][CORE] Check `logType` in `Utils.getLog`

2024-02-02 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 84387394c387 [SPARK-46965][CORE] Check `logType` in `Utils.getLog`
84387394c387 is described below

commit 84387394c387c7a6c171714f5d45d517b6bec7af
Author: Dongjoon Hyun 
AuthorDate: Fri Feb 2 17:22:32 2024 -0800

[SPARK-46965][CORE] Check `logType` in `Utils.getLog`

### What changes were proposed in this pull request?

This PR aims to check `logType` in `Utils.getLog`.

### Why are the changes needed?

To prevent security vulnerability.

### Does this PR introduce _any_ user-facing change?

No. This is a new module which is not released yet.

### How was this patch tested?

Manually.

**BEFORE**
```
$ sbin/start-master.sh
$ curl -s 
'http://localhost:8080/logPage/self?logType=../../../../../../etc/nfs.conf' | 
grep NFS
# nfs.conf: the NFS configuration file
```

**AFTER**
```
$ sbin/start-master.sh
$ curl -s 
'http://localhost:8080/logPage/self?logType=../../../../../../etc/nfs.conf' | 
grep NFS
```

For `Spark History Server`, the same check with 18080 port.
```
$ curl -s 
'http://localhost:18080/logPage/self?logType=../../../../../../../etc/nfs.conf' 
| grep NFS
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #45006 from dongjoon-hyun/SPARK-46965.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/deploy/Utils.scala | 4 
 1 file changed, 4 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/deploy/Utils.scala 
b/core/src/main/scala/org/apache/spark/deploy/Utils.scala
index 9bbcc9f314b2..32328ae1e07a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Utils.scala
@@ -32,6 +32,7 @@ import org.apache.spark.util.logging.RollingFileAppender
  */
 private[deploy] object Utils extends Logging {
   val DEFAULT_BYTES = 100 * 1024
+  val SUPPORTED_LOG_TYPES = Set("stderr", "stdout", "out")
 
   def addRenderLogHandler(page: WebUI, conf: SparkConf): Unit = {
 page.attachHandler(createServletHandler("/log",
@@ -58,6 +59,9 @@ private[deploy] object Utils extends Logging {
   logType: String,
   offsetOption: Option[Long],
   byteLength: Int): (String, Long, Long, Long) = {
+if (!SUPPORTED_LOG_TYPES.contains(logType)) {
+  return ("Error: Log type must be one of " + 
SUPPORTED_LOG_TYPES.mkString(", "), 0, 0, 0)
+}
 try {
   // Find a log file name
   val fileName = if (logType.equals("out")) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46964][SQL] Change the signature of the hllInvalidLgK query execution error to take an integer as 4th argument

2024-02-02 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0965412d5174 [SPARK-46964][SQL] Change the signature of the 
hllInvalidLgK query execution error to take an integer as 4th argument
0965412d5174 is described below

commit 0965412d517441a15d4da0b5fc8fe34a9b5ec40f
Author: Menelaos Karavelas 
AuthorDate: Fri Feb 2 11:55:21 2024 -0800

[SPARK-46964][SQL] Change the signature of the hllInvalidLgK query 
execution error to take an integer as 4th argument

### What changes were proposed in this pull request?

The current signature of the `hllInvalidLgK` query execution error takes 
four arguments:
1. The SQL function (a string).
2. The minimum possible `lgk` value (an integer).
3. The maximum possible `lgk` value (an integer).
4. The actual invalid `lgk` value (a string).

There is no meaningful reason for the 4th argument to be a string. In this 
PR we change it to be an integer, just like the minimum and maximum valid 
values.

### Why are the changes needed?

Seeking to make the signature of the `hllInvalidLgK` error more meaningful 
and self-consistent.

### Does this PR introduce _any_ user-facing change?

No, there is no user-facing changes because of this PR. This is just an 
internal change.

### How was this patch tested?

Existing tests suffice.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44995 from mkaravel/hll-invalid-lgk-error-arg.

Authored-by: Menelaos Karavelas 
Signed-off-by: Gengliang Wang 
---
 .../sql/catalyst/expressions/aggregate/datasketchesAggregates.scala   | 2 +-
 .../main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
index 595ae32d77b9..02925f3625d2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
@@ -196,7 +196,7 @@ object HllSketchAgg {
   def checkLgK(lgConfigK: Int): Unit = {
 if (lgConfigK < minLgConfigK || lgConfigK > maxLgConfigK) {
   throw QueryExecutionErrors.hllInvalidLgK(function = "hll_sketch_agg",
-min = minLgConfigK, max = maxLgConfigK, value = lgConfigK.toString)
+min = minLgConfigK, max = maxLgConfigK, value = lgConfigK)
 }
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 9ff076c5fd50..af5cafdc8a3a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2601,14 +2601,14 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
   cause = e)
   }
 
-  def hllInvalidLgK(function: String, min: Int, max: Int, value: String): 
Throwable = {
+  def hllInvalidLgK(function: String, min: Int, max: Int, value: Int): 
Throwable = {
 new SparkRuntimeException(
   errorClass = "HLL_INVALID_LG_K",
   messageParameters = Map(
 "function" -> toSQLId(function),
 "min" -> toSQLValue(min, IntegerType),
 "max" -> toSQLValue(max, IntegerType),
-"value" -> value))
+"value" -> toSQLValue(value, IntegerType)))
   }
 
   def hllInvalidInputSketchBuffer(function: String): Throwable = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46915][SQL] Simplify `UnaryMinus` `Abs` and align error class

2024-02-02 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e35e29a0517d [SPARK-46915][SQL] Simplify `UnaryMinus` `Abs` and align 
error class
e35e29a0517d is described below

commit e35e29a0517db930e12fe801f0f0ab1a31c3b23e
Author: panbingkun 
AuthorDate: Fri Feb 2 20:33:31 2024 +0300

[SPARK-46915][SQL] Simplify `UnaryMinus` `Abs` and align error class

### What changes were proposed in this pull request?
The pr aims to:
- simplify `UnaryMinus` & `Abs`
- convert error-class `_LEGACY_ERROR_TEMP_2043` to `ARITHMETIC_OVERFLOW`, 
and remove it.

### Why are the changes needed?
1.When the data type in `UnaryMinus` and `Abs` is `ByteType` or 
`ShortType`, if `an overflow exception` occurs, the corresponding error class 
is: `_LEGACY_ERROR_TEMP_2043`
But when the data type is `IntegerType` or `LongType`, if `an overflow 
exception` occurs, its corresponding error class is: ARITHMETIC_OVERFLOW, We 
should unify it.

2.In the `codegen` logic of `UnaryMinus` and `Abs`, there is a difference 
between the logic of generating code when the data type is `ByteType` or 
`ShortType` and when the data type is `IntegerType` or `LongType`. We can unify 
it and simplify the code.

### Does this PR introduce _any_ user-facing change?
Yes,

### How was this patch tested?
- Update existed UT.
- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44942 from panbingkun/UnaryMinus_improve.

Authored-by: panbingkun 
Signed-off-by: Max Gekk 
---
 .../src/main/resources/error/error-classes.json|  5 ---
 .../sql/catalyst/expressions/arithmetic.scala  | 45 --
 .../spark/sql/errors/QueryExecutionErrors.scala|  8 
 .../org/apache/spark/sql/types/numerics.scala  |  6 +--
 .../expressions/ArithmeticExpressionSuite.scala| 27 -
 5 files changed, 36 insertions(+), 55 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 136825ab374d..6d88f5ee511c 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -5747,11 +5747,6 @@
   ". If necessary set  to false to bypass this error."
 ]
   },
-  "_LEGACY_ERROR_TEMP_2043" : {
-"message" : [
-  "-  caused overflow."
-]
-  },
   "_LEGACY_ERROR_TEMP_2045" : {
 "message" : [
   "Unsupported table change: "
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index 9f1b42ad84d3..0f95ae821ab0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -60,23 +60,15 @@ case class UnaryMinus(
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = 
dataType match {
 case _: DecimalType => defineCodeGen(ctx, ev, c => s"$c.unary_$$minus()")
-case ByteType | ShortType if failOnError =>
+case ByteType | ShortType | IntegerType | LongType if failOnError =>
+  val typeUtils = TypeUtils.getClass.getCanonicalName.stripSuffix("$")
+  val refDataType = ctx.addReferenceObj("refDataType", dataType, 
dataType.getClass.getName)
   nullSafeCodeGen(ctx, ev, eval => {
 val javaBoxedType = CodeGenerator.boxedType(dataType)
-val javaType = CodeGenerator.javaType(dataType)
-val originValue = ctx.freshName("origin")
 s"""
-   |$javaType $originValue = ($javaType)($eval);
-   |if ($originValue == $javaBoxedType.MIN_VALUE) {
-   |  throw 
QueryExecutionErrors.unaryMinusCauseOverflowError($originValue);
-   |}
-   |${ev.value} = ($javaType)(-($originValue));
-   """.stripMargin
-  })
-case IntegerType | LongType if failOnError =>
-  val mathUtils = MathUtils.getClass.getCanonicalName.stripSuffix("$")
-  nullSafeCodeGen(ctx, ev, eval => {
-s"${ev.value} = $mathUtils.negateExact($eval);"
+   |${ev.value} = ($javaBoxedType)$typeUtils.getNumeric(
+   |  $refDataType, $failOnError).negate($eval);
+ """.stripMargin
   })
 case dt: NumericType => nullSafeCodeGen(ctx, ev, eval => {
   val originValue = ctx.freshName("origin")
@@ -181,23 +173,16 @@ case class Abs(child: Expression, failOnError: Boolean = 
SQLConf.get.ansiEnabled
 case _: DecimalType =>
   defineCodeGen(ctx, ev, c => s"$c.abs()")
 
-case ByteType | ShortType if failOnError =>
-  val javaBoxedType = CodeGenerator.boxedTy

(spark) branch master updated: [SPARK-46911][SS] Adding deleteIfExists operator to StatefulProcessorHandleImpl

2024-02-02 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 25d96f7bacb4 [SPARK-46911][SS] Adding deleteIfExists operator to 
StatefulProcessorHandleImpl
25d96f7bacb4 is described below

commit 25d96f7bacb43a7d5a835454ecc075e40d4f3c93
Author: Eric Marnadi 
AuthorDate: Fri Feb 2 22:32:42 2024 +0900

[SPARK-46911][SS] Adding deleteIfExists operator to 
StatefulProcessorHandleImpl

### What changes were proposed in this pull request?

Adding the `deleteIfExists` method to the `StatefulProcessorHandle` in 
order to remove state variables from the State Store. Implemented only for 
RocksDBStateStoreProvider, as we do not currently support multiple column 
families for HDFS.

### Why are the changes needed?

This functionality is needed to so users can remove state from the state 
store from the StatefulProcessorHandleImpl

### Does this PR introduce _any_ user-facing change?

Yes - this functionality (removing column families) was previously not 
supported from our RocksDB client.

### How was this patch tested?

Added a unit test that creates two streams with the same checkpoint 
directory. The second stream removes state that was created in the first stream 
upon initialization. We ensure that the state from the previous stream isn't 
kept.

### Was this patch authored or co-authored using generative AI tooling?

Closes #44903 from ericm-db/deleteIfExists.

Authored-by: Eric Marnadi 
Signed-off-by: Jungtaek Lim 
---
 .../src/main/resources/error/error-classes.json|  11 +++
 ...r-conditions-unsupported-feature-error-class.md |   4 +
 docs/sql-error-conditions.md   |   6 ++
 .../sql/streaming/StatefulProcessorHandle.scala|   6 ++
 .../streaming/StatefulProcessorHandleImpl.scala|  12 +++
 .../state/HDFSBackedStateStoreProvider.scala   |   6 ++
 .../sql/execution/streaming/state/RocksDB.scala|  16 
 .../state/RocksDBStateStoreProvider.scala  |   5 +
 .../sql/execution/streaming/state/StateStore.scala |   6 ++
 .../streaming/state/StateStoreErrors.scala |  22 +
 .../streaming/state/MemoryStateStore.scala |   4 +
 .../sql/streaming/TransformWithStateSuite.scala| 104 +
 12 files changed, 202 insertions(+)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index baefb05a7070..136825ab374d 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3241,6 +3241,12 @@
 ],
 "sqlState" : "0A000"
   },
+  "STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY" : {
+"message" : [
+  "Failed to remove default column family with reserved 
name=."
+],
+"sqlState" : "42802"
+  },
   "STATE_STORE_MULTIPLE_VALUES_PER_KEY" : {
 "message" : [
   "Store does not support multiple values per key"
@@ -3950,6 +3956,11 @@
   "Creating multiple column families with  is not 
supported."
 ]
   },
+  "STATE_STORE_REMOVING_COLUMN_FAMILIES" : {
+"message" : [
+  "Removing column families with  is not 
supported."
+]
+  },
   "TABLE_OPERATION" : {
 "message" : [
   "Table  does not support . Please check the 
current catalog and namespace to make sure the qualified table name is 
expected, and also check the catalog implementation which is configured by 
\"spark.sql.catalog\"."
diff --git a/docs/sql-error-conditions-unsupported-feature-error-class.md 
b/docs/sql-error-conditions-unsupported-feature-error-class.md
index 1b12c4bfc1b3..8d42ecdce790 100644
--- a/docs/sql-error-conditions-unsupported-feature-error-class.md
+++ b/docs/sql-error-conditions-unsupported-feature-error-class.md
@@ -194,6 +194,10 @@ set PROPERTIES and DBPROPERTIES at the same time.
 
 Creating multiple column families with `` is not supported.
 
+## STATE_STORE_REMOVING_COLUMN_FAMILIES
+
+Removing column families with `` is not supported.
+
 ## TABLE_OPERATION
 
 Table `` does not support ``. Please check the current 
catalog and namespace to make sure the qualified table name is expected, and 
also check the catalog implementation which is configured by 
"spark.sql.catalog".
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 3a2c4d261352..c704b1c10c46 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -2025,6 +2025,12 @@ The SQL config `` cannot be found. Please 
verify that the config exists
 
 Star (*) is not allowed in a select list when GROUP BY an ordinal position is 
used.
 
+### STATE_STORE_CANNOT_REMOVE_DEFAULT_COLUMN_FAMILY
+
+[SQLSTATE: 
42802](sql-error-conditions-sqlstates.html

(spark) branch master updated: [SPARK-44815][CONNECT] Cache df.schema to avoid extra RPC

2024-02-02 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6f87fe2f513d [SPARK-44815][CONNECT] Cache df.schema to avoid extra RPC
6f87fe2f513d is described below

commit 6f87fe2f513d1b1a022f0d03b6c81d73d7cfb228
Author: Martin Grund 
AuthorDate: Fri Feb 2 08:49:06 2024 -0400

[SPARK-44815][CONNECT] Cache df.schema to avoid extra RPC

### What changes were proposed in this pull request?
This patch caches the result of the `df.schema` call in the DataFrame to 
avoid the extra roundtrip to the Spark Connect service to retrieve the columns 
or the schema. Since the Dataframe is immutable, the schema will not change.

### Why are the changes needed?
Performance / Stability

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing UT

Closes #42499 from grundprinzip/SPARK-44815.

Lead-authored-by: Martin Grund 
Co-authored-by: Herman van Hovell 
Co-authored-by: Martin Grund 
Signed-off-by: Herman van Hovell 
---
 .../jvm/src/main/scala/org/apache/spark/sql/Dataset.scala   | 13 -
 python/pyspark/sql/connect/dataframe.py | 12 ++--
 2 files changed, 22 insertions(+), 3 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 949f53409386..9a42afebf8f2 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -244,7 +244,18 @@ class Dataset[T] private[sql] (
* @group basic
* @since 3.4.0
*/
-  def schema: StructType = {
+  def schema: StructType = cachedSchema
+
+  /**
+   * The cached schema.
+   *
+   * Schema caching is correct in most cases. Connect is lazy by nature. This 
means that we only
+   * resolve the plan when it is submitted for execution or analysis. We do 
not cache intermediate
+   * resolved plans. If the input (changes table, view redefinition, etc...) 
of the plan changes
+   * between the schema() call, and a subsequent action, the cached schema 
might be inconsistent
+   * with the end schema.
+   */
+  private lazy val cachedSchema: StructType = {
 DataTypeProtoConverter
   .toCatalystType(
 sparkSession
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 6d37158142a6..4091016e0d59 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -120,6 +120,7 @@ class DataFrame:
 # Check whether _repr_html is supported or not, we use it to avoid 
calling RPC twice
 # by __repr__ and _repr_html_ while eager evaluation opens.
 self._support_repr_html = False
+self._cached_schema: Optional[StructType] = None
 
 def __repr__(self) -> str:
 if not self._support_repr_html:
@@ -1782,8 +1783,15 @@ class DataFrame:
 
 @property
 def schema(self) -> StructType:
-query = self._plan.to_proto(self._session.client)
-return self._session.client.schema(query)
+# Schema caching is correct in most cases. Connect is lazy by nature. 
This means that
+# we only resolve the plan when it is submitted for execution or 
analysis. We do not
+# cache intermediate resolved plan. If the input (changes table, view 
redefinition,
+# etc...) of the plan changes between the schema() call, and a 
subsequent action, the
+# cached schema might be inconsistent with the end schema.
+if self._cached_schema is None:
+query = self._plan.to_proto(self._session.client)
+self._cached_schema = self._session.client.schema(query)
+return self._cached_schema
 
 schema.__doc__ = PySparkDataFrame.schema.__doc__
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46946][SQL] Supporting broadcast of multiple filtering keys in DynamicPruning

2024-02-02 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 22e5938aefc7 [SPARK-46946][SQL] Supporting broadcast of multiple 
filtering keys in DynamicPruning
22e5938aefc7 is described below

commit 22e5938aefc784f50218a86e013e4c2247271072
Author: Thang Long VU 
AuthorDate: Fri Feb 2 19:55:34 2024 +0800

[SPARK-46946][SQL] Supporting broadcast of multiple filtering keys in 
DynamicPruning

### What changes were proposed in this pull request?

This PR extends `DynamicPruningSubquery` to support broadcasting of 
multiple filtering keys (instead of one as before). The majority of the PR is 
to simply generalise singularity to plurality.

**Note:** We actually do not use the multiple filtering keys 
`DynamicPruningSubquery` in this PR, we are doing this to make supporting DPP 
Null Safe Equality or multiple Equality predicates easier in the future.

In Null Safe Equality JOIN, the JOIN condition `a <=> b` is transformed to 
`Coalesce(key1, Literal(key1.dataType)) = Coalesce(key2, 
Literal(key2.dataType)) AND IsNull(key1) = IsNull(key2)`. In order to have the 
highest pruning efficiency, we broadcast the 2 keys `Coalesce(key, 
Literal(key.dataType))` and `IsNull(key)` and use them to prune the other side 
at the same time.

Before, the `DynamicPruningSubquery` only has one broadcasting key and we 
only supports DPP for one `EqualTo` JOIN predicate, now we are extending the 
subquery to multiple broadcasting keys. Please note that DPP has not been 
supported for multiple JOIN predicates.

Put it in another way, at the moment, we don't insert a DPP Filter for 
multiple JOIN predicates at the same time, only potentially insert a DPP Filter 
for a given Equality JOIN predicate.

### Why are the changes needed?

To make supporting DPP Null Safe Equality or DPP multiple Equality 
predicates easier in the future.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44988 from longvu-db/multiple-broadcast-filtering-keys.

Authored-by: Thang Long VU 
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/expressions/DynamicPruning.scala  | 12 +--
 .../expressions/DynamicPruningSubquerySuite.scala  | 89 ++
 .../execution/SubqueryAdaptiveBroadcastExec.scala  |  2 +-
 .../sql/execution/SubqueryBroadcastExec.scala  | 37 -
 .../PlanAdaptiveDynamicPruningFilters.scala|  8 +-
 .../adaptive/PlanAdaptiveSubqueries.scala  |  4 +-
 .../dynamicpruning/PartitionPruning.scala  | 15 ++--
 .../dynamicpruning/PlanDynamicPruningFilters.scala |  9 ++-
 .../spark/sql/DynamicPartitionPruningSuite.scala   |  2 +-
 9 files changed, 138 insertions(+), 40 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala
index ec6925eaa984..cc24a982d5d8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala
@@ -37,13 +37,13 @@ trait DynamicPruning extends Predicate
  *  beneficial and so it should be executed even if it cannot reuse the 
results of the
  *  broadcast through ReuseExchange; otherwise, it will use the filter only if 
it
  *  can reuse the results of the broadcast through ReuseExchange
- * @param broadcastKeyIndex the index of the filtering key collected from the 
broadcast
+ * @param broadcastKeyIndices the indices of the filtering keys collected from 
the broadcast
  */
 case class DynamicPruningSubquery(
 pruningKey: Expression,
 buildQuery: LogicalPlan,
 buildKeys: Seq[Expression],
-broadcastKeyIndex: Int,
+broadcastKeyIndices: Seq[Int],
 onlyInBroadcast: Boolean,
 exprId: ExprId = NamedExpression.newExprId,
 hint: Option[HintInfo] = None)
@@ -67,10 +67,12 @@ case class DynamicPruningSubquery(
   buildQuery.resolved &&
   buildKeys.nonEmpty &&
   buildKeys.forall(_.resolved) &&
-  broadcastKeyIndex >= 0 &&
-  broadcastKeyIndex < buildKeys.size &&
+  broadcastKeyIndices.forall(idx => idx >= 0 && idx < buildKeys.size) &&
   buildKeys.forall(_.references.subsetOf(buildQuery.outputSet)) &&
-  pruningKey.dataType == buildKeys(broadcastKeyIndex).dataType
+  // DynamicPruningSubquery should only have a single broadcasting key 
since
+  // there are no usage for multiple broadcasting keys at the moment.
+  broadcastKeyIndices.size == 1 &&
+  child.dataTyp

(spark) branch master updated: [SPARK-46949][SQL] Support CHAR/VARCHAR through ResolveDefaultColumns

2024-02-02 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 362a4d43159d [SPARK-46949][SQL] Support CHAR/VARCHAR through 
ResolveDefaultColumns
362a4d43159d is described below

commit 362a4d43159d73ef2e4c4f9267b572046220680f
Author: Kent Yao 
AuthorDate: Fri Feb 2 17:39:12 2024 +0800

[SPARK-46949][SQL] Support CHAR/VARCHAR through ResolveDefaultColumns

### What changes were proposed in this pull request?

We have issues resolving column definitions with default values, i.e., `c 
CHAR(5) DEFAULT 'foo'`, `v  VARCHAR(10) DEFAULT 'bar'`. The reason is that 
CAHR/VARCHAR types in schemas are not supplanted with STRING type to align with 
the value expression.

This PR fixes these issues in `ResolveDefaultColumns`, which seems to only 
cover the v1 tables. When I applied some related tests to v2 tables, they had 
the same issues. But beyond that, there are some other front-loading needs to 
be addressed. In this case, I'd like to separate v2 from the v1 PR.

### Why are the changes needed?

bugfix

```
spark-sql (default)> CREATE TABLE t( c CHAR(5) DEFAULT 'spark') USING 
parquet;
[INVALID_DEFAULT_VALUE.DATA_TYPE] Failed to execute CREATE TABLE command 
because the destination table column `c` has a DEFAULT value 'spark', which 
requires "CHAR(5)" type, but the statement provided a value of incompatible 
"STRING" type.
```
### Does this PR introduce _any_ user-facing change?

no, bugfix
### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44991 from yaooqinn/SPARK-46949.

Authored-by: Kent Yao 
Signed-off-by: Kent Yao 
---
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  | 26 +-
 .../spark/sql/ResolveDefaultColumnsSuite.scala | 41 ++
 2 files changed, 58 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index e782e017d1ef..da03de73557f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, 
InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions._
@@ -42,7 +42,9 @@ import org.apache.spark.util.ArrayImplicits._
 /**
  * This object contains fields to help process DEFAULT columns.
  */
-object ResolveDefaultColumns extends QueryErrorsBase with 
ResolveDefaultColumnsUtils {
+object ResolveDefaultColumns extends QueryErrorsBase
+  with ResolveDefaultColumnsUtils
+  with SQLConfHelper {
   // Name of attributes representing explicit references to the value stored 
in the above
   // CURRENT_DEFAULT_COLUMN_METADATA.
   val CURRENT_DEFAULT_COLUMN_NAME = "DEFAULT"
@@ -307,25 +309,26 @@ object ResolveDefaultColumns extends QueryErrorsBase with 
ResolveDefaultColumnsU
   statementType: String,
   colName: String,
   defaultSQL: String): Expression = {
+val supplanted = CharVarcharUtils.replaceCharVarcharWithString(dataType)
 // Perform implicit coercion from the provided expression type to the 
required column type.
-if (dataType == analyzed.dataType) {
+val ret = if (supplanted == analyzed.dataType) {
   analyzed
-} else if (Cast.canUpCast(analyzed.dataType, dataType)) {
-  Cast(analyzed, dataType)
+} else if (Cast.canUpCast(analyzed.dataType, supplanted)) {
+  Cast(analyzed, supplanted)
 } else {
   // If the provided default value is a literal of a wider type than the 
target column, but the
   // literal value fits within the narrower type, just coerce it for 
convenience. Exclude
   // boolean/array/struct/map types from consideration for this type 
coercion to avoid
   // surprising behavior like interpreting "false" as integer zero.
   val result = if (analyzed.isInstanceOf[Literal] &&
-!Seq(dataType, analyzed.dataType).exists(_ match {
+!Seq(supplanted, analyzed.dataType).exists(_ match {
   case _: BooleanType | _: ArrayType | _: StructType | _: MapType => 
true
   case _ => false
 })) {
 try {
-