[GitHub] [spark-website] HeartSaVioR commented on pull request #389: Promote Structured Streaming over Spark Streaming (DStream)

2022-05-17 Thread GitBox


HeartSaVioR commented on PR #389:
URL: https://github.com/apache/spark-website/pull/389#issuecomment-1129550667

   cc. @gengliangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] HeartSaVioR opened a new pull request, #389: Promote Structured Streaming over Spark Streaming (DStream)

2022-05-17 Thread GitBox


HeartSaVioR opened a new pull request, #389:
URL: https://github.com/apache/spark-website/pull/389

   This PR proposes to promote Structured Streaming over Spark Streaming, as we 
see efforts of community are more focused on Structured Streaming (based on 
Spark SQL) than Spark Streaming (DStream). We would like to encourage end users 
to use Structured Streaming than Spark Streaming whenever possible for their 
workloads.
   
   Here are screenshots of pages this PR changes.
   
   
![screenshot-spark-new-faq-page-20220518](https://user-images.githubusercontent.com/1317309/168956325-32750af1-23aa-4605-8ee7-f53d7b06e5ea.png)
   
![screenshot-spark-new-streaming-page-20220518](https://user-images.githubusercontent.com/1317309/168956337-2c032f5c-b9c9-488b-a2ce-5e14acbb0406.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources

2022-05-17 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 888bf1b2ef4 [SPARK-39193][SQL] Fasten Timestamp type inference of 
JSON/CSV data sources
888bf1b2ef4 is described below

commit 888bf1b2ef44a27c3d4be716a72175bbaa8c6453
Author: Gengliang Wang 
AuthorDate: Wed May 18 10:59:52 2022 +0800

[SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources

### What changes were proposed in this pull request?

When reading JSON/CSV files with inferring timestamp types 
(`.option("inferTimestamp", true)`), the Timestamp conversion will throw and 
catch exceptions.
As we are putting decent error messages in the exception:
```
  def cannotCastToDateTimeError(
  value: Any, from: DataType, to: DataType, errorContext: String): 
Throwable = {
val valueString = toSQLValue(value, from)
new SparkDateTimeException("INVALID_SYNTAX_FOR_CAST",
  Array(toSQLType(to), valueString, SQLConf.ANSI_ENABLED.key, 
errorContext))
  }
```
Throwing and catching the timestamp parsing exceptions is actually not 
cheap. It consumes more than 90% of the type inference time.

This PR improves the default timestamp parsing by returning optional 
results instead of throwing/catching the exceptions. With this PR, the schema 
inference time is reduced by 90% in a local benchmark.

Note this PR is for the default timestamp parser. It doesn't cover the 
scenarios of
* users provide a customized timestamp format via option
* users enable legacy timestamp formatter
We can have follow-ups for it.

### Why are the changes needed?

Performance improvement

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing UT
Also manual test the runtime to inferring a JSON file of 624MB with 
inferring timestamp enabled:
```
spark.read.option("inferTimestamp", true).json(file)
```

Before the change, it takes 166 seconds
After the change, it only 16 seconds.

Closes #36562 from gengliangwang/improveInferTS.

Lead-authored-by: Gengliang Wang 
Co-authored-by: Gengliang Wang 
Signed-off-by: Gengliang Wang 
---
 .../spark/sql/catalyst/csv/CSVInferSchema.scala|  4 +-
 .../spark/sql/catalyst/json/JsonInferSchema.scala  |  4 +-
 .../sql/catalyst/util/TimestampFormatter.scala | 51 ++
 .../catalyst/util/TimestampFormatterSuite.scala| 15 +++
 4 files changed, 70 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index f30fa8a0b5f..8b0c6c49b85 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -178,7 +178,7 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
 // We can only parse the value as TimestampNTZType if it does not have 
zone-offset or
 // time-zone component and can be parsed with the timestamp formatter.
 // Otherwise, it is likely to be a timestamp with timezone.
-if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, 
false)).isDefined) {
+if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
   SQLConf.get.timestampType
 } else {
   tryParseTimestamp(field)
@@ -187,7 +187,7 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
 
   private def tryParseTimestamp(field: String): DataType = {
 // This case infers a custom `dataFormat` is set.
-if ((allCatch opt timestampParser.parse(field)).isDefined) {
+if (timestampParser.parseOptional(field).isDefined) {
   TimestampType
 } else {
   tryParseBoolean(field)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index d08773d8469..f6064bd7195 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -151,10 +151,10 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
 if (options.prefersDecimal && decimalTry.isDefined) {
   decimalTry.get
 } else if (options.inferTimestamp &&
-(allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, 
false)).isDefined) {
+timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
   

[spark] branch branch-3.3 updated: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources

2022-05-17 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 0e998d31234 [SPARK-39193][SQL] Fasten Timestamp type inference of 
JSON/CSV data sources
0e998d31234 is described below

commit 0e998d31234f08be956c5bd2dec0b086952c2e18
Author: Gengliang Wang 
AuthorDate: Wed May 18 10:59:52 2022 +0800

[SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources

### What changes were proposed in this pull request?

When reading JSON/CSV files with inferring timestamp types 
(`.option("inferTimestamp", true)`), the Timestamp conversion will throw and 
catch exceptions.
As we are putting decent error messages in the exception:
```
  def cannotCastToDateTimeError(
  value: Any, from: DataType, to: DataType, errorContext: String): 
Throwable = {
val valueString = toSQLValue(value, from)
new SparkDateTimeException("INVALID_SYNTAX_FOR_CAST",
  Array(toSQLType(to), valueString, SQLConf.ANSI_ENABLED.key, 
errorContext))
  }
```
Throwing and catching the timestamp parsing exceptions is actually not 
cheap. It consumes more than 90% of the type inference time.

This PR improves the default timestamp parsing by returning optional 
results instead of throwing/catching the exceptions. With this PR, the schema 
inference time is reduced by 90% in a local benchmark.

Note this PR is for the default timestamp parser. It doesn't cover the 
scenarios of
* users provide a customized timestamp format via option
* users enable legacy timestamp formatter
We can have follow-ups for it.

### Why are the changes needed?

Performance improvement

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing UT
Also manual test the runtime to inferring a JSON file of 624MB with 
inferring timestamp enabled:
```
spark.read.option("inferTimestamp", true).json(file)
```

Before the change, it takes 166 seconds
After the change, it only 16 seconds.

Closes #36562 from gengliangwang/improveInferTS.

Lead-authored-by: Gengliang Wang 
Co-authored-by: Gengliang Wang 
Signed-off-by: Gengliang Wang 
(cherry picked from commit 888bf1b2ef44a27c3d4be716a72175bbaa8c6453)
Signed-off-by: Gengliang Wang 
---
 .../spark/sql/catalyst/csv/CSVInferSchema.scala|  4 +-
 .../spark/sql/catalyst/json/JsonInferSchema.scala  |  4 +-
 .../sql/catalyst/util/TimestampFormatter.scala | 51 ++
 .../catalyst/util/TimestampFormatterSuite.scala| 15 +++
 4 files changed, 70 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
index f30fa8a0b5f..8b0c6c49b85 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala
@@ -178,7 +178,7 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
 // We can only parse the value as TimestampNTZType if it does not have 
zone-offset or
 // time-zone component and can be parsed with the timestamp formatter.
 // Otherwise, it is likely to be a timestamp with timezone.
-if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, 
false)).isDefined) {
+if (timestampNTZFormatter.parseWithoutTimeZoneOptional(field, 
false).isDefined) {
   SQLConf.get.timestampType
 } else {
   tryParseTimestamp(field)
@@ -187,7 +187,7 @@ class CSVInferSchema(val options: CSVOptions) extends 
Serializable {
 
   private def tryParseTimestamp(field: String): DataType = {
 // This case infers a custom `dataFormat` is set.
-if ((allCatch opt timestampParser.parse(field)).isDefined) {
+if (timestampParser.parseOptional(field).isDefined) {
   TimestampType
 } else {
   tryParseBoolean(field)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index d08773d8469..f6064bd7195 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -151,10 +151,10 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
 if (options.prefersDecimal && decimalTry.isDefined) {
   decimalTry.get
 } else if (options.inferTimestamp &&
-(allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, 

[spark] branch master updated (69b8655d529 -> a4902c08f8c)

2022-05-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 69b8655d529 [SPARK-39192][PS][SQL] Make pandas-on-spark's kurt 
consistent with pandas
 add a4902c08f8c [SPARK-39054][PYTHON][PS] Ensure infer schema accuracy in 
GroupBy.apply

No new revisions were added by this update.

Summary of changes:
 python/pyspark/pandas/groupby.py|  5 -
 python/pyspark/pandas/tests/test_groupby.py | 16 
 2 files changed, 20 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-39192][PS][SQL] Make pandas-on-spark's kurt consistent with pandas

2022-05-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 69b8655d529 [SPARK-39192][PS][SQL] Make pandas-on-spark's kurt 
consistent with pandas
69b8655d529 is described below

commit 69b8655d529a36d9b041a9e03d3349f5f2c4cdfc
Author: Ruifeng Zheng 
AuthorDate: Wed May 18 08:39:16 2022 +0900

[SPARK-39192][PS][SQL] Make pandas-on-spark's kurt consistent with pandas

### What changes were proposed in this pull request?
make pandas-on-spark's kurt consistent with pandas

### Why are the changes needed?
1, the formulas of Kurtosis were different between spark sql and pandas;
2, pandas zeros out small `numerator` and `denominator` for better 
numerical stability;

### Does this PR introduce _any_ user-facing change?
yes, the logic of kurt changed

### How was this patch tested?
added UT

Closes #36560 from zhengruifeng/impl_pandas_kurt.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/generic.py   | 12 
 .../pyspark/pandas/tests/test_generic_functions.py |  4 +--
 python/pyspark/pandas/tests/test_stats.py  |  9 +-
 .../expressions/aggregate/CentralMomentAgg.scala   | 33 ++
 .../spark/sql/api/python/PythonSQLUtils.scala  |  6 +++-
 5 files changed, 55 insertions(+), 9 deletions(-)

diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index f5073315164..ec38935ced8 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -1556,20 +1556,20 @@ class Frame(object, metaclass=ABCMeta):
 Examples
 
 
->>> df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, 
np.nan]},
+>>> df = ps.DataFrame({'a': [1, 2, 3, np.nan, 6], 'b': [0.1, 0.2, 0.3, 
np.nan, 0.8]},
 ...   columns=['a', 'b'])
 
 On a DataFrame:
 
 >>> df.kurtosis()
-a   -1.5
-b   -1.5
+a1.50
+b2.703924
 dtype: float64
 
 On a Series:
 
 >>> df['a'].kurtosis()
--1.5
+1.5
 """
 axis = validate_axis(axis)
 
@@ -1587,7 +1587,9 @@ class Frame(object, metaclass=ABCMeta):
 spark_type_to_pandas_dtype(spark_type), 
spark_type.simpleString()
 )
 )
-return F.kurtosis(spark_column)
+
+sql_utils = SparkContext._active_spark_context._jvm.PythonSQLUtils
+return Column(sql_utils.pandasKurtosis(spark_column._jc))
 
 return self._reduce_for_stat_function(
 kurtosis,
diff --git a/python/pyspark/pandas/tests/test_generic_functions.py 
b/python/pyspark/pandas/tests/test_generic_functions.py
index 5062daa77e2..2a83a038713 100644
--- a/python/pyspark/pandas/tests/test_generic_functions.py
+++ b/python/pyspark/pandas/tests/test_generic_functions.py
@@ -150,8 +150,8 @@ class GenericFunctionsTest(PandasOnSparkTestCase, 
TestUtils):
 self.assert_eq(pdf.a.kurtosis(skipna=False), 
psdf.a.kurtosis(skipna=False))
 self.assert_eq(pdf.a.kurtosis(), psdf.a.kurtosis())
 self.assert_eq(pdf.b.kurtosis(skipna=False), 
psdf.b.kurtosis(skipna=False))
-# self.assert_eq(pdf.b.kurtosis(), psdf.b.kurtosis())  AssertionError: 
nan != -2.0
-self.assert_eq(-1.5, psdf.c.kurtosis())
+self.assert_eq(pdf.b.kurtosis(), psdf.b.kurtosis())
+self.assert_eq(pdf.c.kurtosis(), psdf.c.kurtosis())
 
 
 if __name__ == "__main__":
diff --git a/python/pyspark/pandas/tests/test_stats.py 
b/python/pyspark/pandas/tests/test_stats.py
index ccce140a4ac..e8f5048033b 100644
--- a/python/pyspark/pandas/tests/test_stats.py
+++ b/python/pyspark/pandas/tests/test_stats.py
@@ -180,6 +180,7 @@ class StatsTest(PandasOnSparkTestCase, SQLTestUtils):
 self.assert_eq(psdf.min(axis=1), pdf.min(axis=1))
 self.assert_eq(psdf.sum(axis=1), pdf.sum(axis=1))
 self.assert_eq(psdf.product(axis=1), pdf.product(axis=1))
+self.assert_eq(psdf.kurtosis(axis=0), pdf.kurtosis(axis=0), 
almost=True)
 self.assert_eq(psdf.kurtosis(axis=1), pdf.kurtosis(axis=1))
 self.assert_eq(psdf.skew(axis=0), pdf.skew(axis=0), almost=True)
 self.assert_eq(psdf.skew(axis=1), pdf.skew(axis=1))
@@ -216,6 +217,11 @@ class StatsTest(PandasOnSparkTestCase, SQLTestUtils):
 psdf.product(axis=1, numeric_only=True),
 pdf.product(axis=1, numeric_only=True).astype(float),
 )
+self.assert_eq(
+psdf.kurtosis(axis=0, numeric_only=True),
+pdf.kurtosis(axis=0, numeric_only=True),
+almost=True,
+)
 

[spark] branch master updated: [SPARK-39143][SQL] Support CSV scans with DEFAULT values

2022-05-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f5f67b851e2 [SPARK-39143][SQL] Support CSV scans with DEFAULT values
f5f67b851e2 is described below

commit f5f67b851e28afd898a2e3844c088c3041a199fe
Author: Daniel Tenedorio 
AuthorDate: Wed May 18 08:38:43 2022 +0900

[SPARK-39143][SQL] Support CSV scans with DEFAULT values

### What changes were proposed in this pull request?

Support CSV scans when the table schema has associated DEFAULT column 
values.

Example:

```
create table t(i int) using csv;
insert into t values(42);
alter table t add column s string default concat('abc', def');
select * from t;
> 42, 'abcdef'
```

### Why are the changes needed?

This change makes it easier to build, query, and maintain tables backed by 
CSV data.

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

This PR includes new test coverage.

Closes #36501 from dtenedor/default-csv.

Authored-by: Daniel Tenedorio 
Signed-off-by: Hyukjin Kwon 
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala   | 17 +++--
 .../spark/sql/errors/QueryCompilationErrors.scala  |  6 ++
 .../org/apache/spark/sql/types/StructField.scala   | 11 
 .../org/apache/spark/sql/types/StructType.scala| 28 -
 .../apache/spark/sql/types/StructTypeSuite.scala   | 63 +++
 .../org/apache/spark/sql/sources/InsertSuite.scala | 73 ++
 6 files changed, 192 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 56166950e67..ff46672e67f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, 
NoopFilters, OrderedFilters}
 import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, 
GenericInternalRow, Literal}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
@@ -67,9 +68,16 @@ class UnivocityParser(
   private val tokenIndexArr =
 requiredSchema.map(f => 
java.lang.Integer.valueOf(dataSchema.indexOf(f))).toArray
 
+  // True if we should inform the Univocity CSV parser to select which fields 
to read by their
+  // positions. Generally assigned by input configuration options, except when 
input column(s) have
+  // default values, in which case we omit the explicit indexes in order to 
know how many tokens
+  // were present in each line instead.
+  private def columnPruning: Boolean = options.columnPruning &&
+
!requiredSchema.exists(_.metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY))
+
   // When column pruning is enabled, the parser only parses the required 
columns based on
   // their positions in the data schema.
-  private val parsedSchema = if (options.columnPruning) requiredSchema else 
dataSchema
+  private val parsedSchema = if (columnPruning) requiredSchema else dataSchema
 
   val tokenizer: CsvParser = {
 val parserSetting = options.asParserSettings
@@ -266,7 +274,7 @@ class UnivocityParser(
*/
   val parse: String => Option[InternalRow] = {
 // This is intentionally a val to create a function once and reuse.
-if (options.columnPruning && requiredSchema.isEmpty) {
+if (columnPruning && requiredSchema.isEmpty) {
   // If `columnPruning` enabled and partition attributes scanned only,
   // `schema` gets empty.
   (_: String) => Some(InternalRow.empty)
@@ -276,7 +284,7 @@ class UnivocityParser(
 }
   }
 
-  private val getToken = if (options.columnPruning) {
+  private val getToken = if (columnPruning) {
 (tokens: Array[String], index: Int) => tokens(index)
   } else {
 (tokens: Array[String], index: Int) => tokens(tokenIndexArr(index))
@@ -318,7 +326,8 @@ class UnivocityParser(
 case e: SparkUpgradeException => throw e
 case NonFatal(e) =>
   badRecordException = badRecordException.orElse(Some(e))
-  row.setNullAt(i)
+  // Use the corresponding DEFAULT value associated with the column, 
if any.
+  row.update(i, requiredSchema.defaultValues(i))
   }
   i += 1
 }
diff --git 

[spark] branch branch-3.2 updated: [SPARK-39104][SQL] InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe

2022-05-17 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new cb85686a4b4 [SPARK-39104][SQL] 
InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe
cb85686a4b4 is described below

commit cb85686a4b493fa3ef4233ecd54b7360f1ddc102
Author: Cheng Pan 
AuthorDate: Tue May 17 18:26:55 2022 -0500

[SPARK-39104][SQL] InMemoryRelation#isCachedColumnBuffersLoaded should be 
thread-safe

### What changes were proposed in this pull request?

Add `synchronized` on method `isCachedColumnBuffersLoaded`

### Why are the changes needed?

`isCachedColumnBuffersLoaded` should has `synchronized` wrapped, otherwise 
may cause NPE when modify `_cachedColumnBuffers` concurrently.

```
def isCachedColumnBuffersLoaded: Boolean = {
  _cachedColumnBuffers != null && isCachedRDDLoaded
}

def isCachedRDDLoaded: Boolean = {
_cachedColumnBuffersAreLoaded || {
  val bmMaster = SparkEnv.get.blockManager.master
  val rddLoaded = _cachedColumnBuffers.partitions.forall { partition =>
bmMaster.getBlockStatus(RDDBlockId(_cachedColumnBuffers.id, 
partition.index), false)
  .exists { case(_, blockStatus) => blockStatus.isCached }
  }
  if (rddLoaded) {
_cachedColumnBuffersAreLoaded = rddLoaded
  }
  rddLoaded
  }
}
```

```
java.lang.NullPointerException
at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247)
at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241)
at 
org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189)
at 
org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176)
at 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at 
org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219)
at 
org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176)
at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220)
at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UT.

Closes #36496 from pan3793/SPARK-39104.

Authored-by: Cheng Pan 
Signed-off-by: Sean Owen 
(cherry picked from commit 3c8d8d7a864281fbe080316ad8de9b8eac80fa71)
Signed-off-by: Sean Owen 
---
 .../sql/execution/columnar/InMemoryRelation.scala  |  9 +++-
 .../columnar/InMemoryColumnarQuerySuite.scala  | 53 ++
 2 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 525653c3cfc..f6444ca5b9b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -238,10 +238,15 @@ case class CachedRDDBuilder(
   }
 
   def isCachedColumnBuffersLoaded: Boolean = {
-_cachedColumnBuffers != null && isCachedRDDLoaded
+if (_cachedColumnBuffers != null) {
+  synchronized {
+return _cachedColumnBuffers != null && isCachedRDDLoaded
+  }
+}
+false
   }
 
-  def isCachedRDDLoaded: Boolean = {
+  private def isCachedRDDLoaded: Boolean = {
   _cachedColumnBuffersAreLoaded || {
 val bmMaster = SparkEnv.get.blockManager.master
 val rddLoaded = _cachedColumnBuffers.partitions.forall { partition =>
diff --git 

[spark] branch branch-3.3 updated: [SPARK-39104][SQL] InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe

2022-05-17 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new e52b0487583 [SPARK-39104][SQL] 
InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe
e52b0487583 is described below

commit e52b0487583314ae159dab3496be3c28df3e56b7
Author: Cheng Pan 
AuthorDate: Tue May 17 18:26:55 2022 -0500

[SPARK-39104][SQL] InMemoryRelation#isCachedColumnBuffersLoaded should be 
thread-safe

### What changes were proposed in this pull request?

Add `synchronized` on method `isCachedColumnBuffersLoaded`

### Why are the changes needed?

`isCachedColumnBuffersLoaded` should has `synchronized` wrapped, otherwise 
may cause NPE when modify `_cachedColumnBuffers` concurrently.

```
def isCachedColumnBuffersLoaded: Boolean = {
  _cachedColumnBuffers != null && isCachedRDDLoaded
}

def isCachedRDDLoaded: Boolean = {
_cachedColumnBuffersAreLoaded || {
  val bmMaster = SparkEnv.get.blockManager.master
  val rddLoaded = _cachedColumnBuffers.partitions.forall { partition =>
bmMaster.getBlockStatus(RDDBlockId(_cachedColumnBuffers.id, 
partition.index), false)
  .exists { case(_, blockStatus) => blockStatus.isCached }
  }
  if (rddLoaded) {
_cachedColumnBuffersAreLoaded = rddLoaded
  }
  rddLoaded
  }
}
```

```
java.lang.NullPointerException
at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247)
at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241)
at 
org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189)
at 
org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176)
at 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at 
org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219)
at 
org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176)
at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220)
at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UT.

Closes #36496 from pan3793/SPARK-39104.

Authored-by: Cheng Pan 
Signed-off-by: Sean Owen 
(cherry picked from commit 3c8d8d7a864281fbe080316ad8de9b8eac80fa71)
Signed-off-by: Sean Owen 
---
 .../sql/execution/columnar/InMemoryRelation.scala  |  9 +++-
 .../columnar/InMemoryColumnarQuerySuite.scala  | 53 ++
 2 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 89323e7d1a4..0ace24777b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -238,10 +238,15 @@ case class CachedRDDBuilder(
   }
 
   def isCachedColumnBuffersLoaded: Boolean = {
-_cachedColumnBuffers != null && isCachedRDDLoaded
+if (_cachedColumnBuffers != null) {
+  synchronized {
+return _cachedColumnBuffers != null && isCachedRDDLoaded
+  }
+}
+false
   }
 
-  def isCachedRDDLoaded: Boolean = {
+  private def isCachedRDDLoaded: Boolean = {
   _cachedColumnBuffersAreLoaded || {
 val bmMaster = SparkEnv.get.blockManager.master
 val rddLoaded = _cachedColumnBuffers.partitions.forall { partition =>
diff --git 

[spark] branch master updated: [SPARK-39104][SQL] InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe

2022-05-17 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3c8d8d7a864 [SPARK-39104][SQL] 
InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe
3c8d8d7a864 is described below

commit 3c8d8d7a864281fbe080316ad8de9b8eac80fa71
Author: Cheng Pan 
AuthorDate: Tue May 17 18:26:55 2022 -0500

[SPARK-39104][SQL] InMemoryRelation#isCachedColumnBuffersLoaded should be 
thread-safe

### What changes were proposed in this pull request?

Add `synchronized` on method `isCachedColumnBuffersLoaded`

### Why are the changes needed?

`isCachedColumnBuffersLoaded` should has `synchronized` wrapped, otherwise 
may cause NPE when modify `_cachedColumnBuffers` concurrently.

```
def isCachedColumnBuffersLoaded: Boolean = {
  _cachedColumnBuffers != null && isCachedRDDLoaded
}

def isCachedRDDLoaded: Boolean = {
_cachedColumnBuffersAreLoaded || {
  val bmMaster = SparkEnv.get.blockManager.master
  val rddLoaded = _cachedColumnBuffers.partitions.forall { partition =>
bmMaster.getBlockStatus(RDDBlockId(_cachedColumnBuffers.id, 
partition.index), false)
  .exists { case(_, blockStatus) => blockStatus.isCached }
  }
  if (rddLoaded) {
_cachedColumnBuffersAreLoaded = rddLoaded
  }
  rddLoaded
  }
}
```

```
java.lang.NullPointerException
at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247)
at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241)
at 
org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189)
at 
org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176)
at 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at 
scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
at 
scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
at 
scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at 
org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219)
at 
org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176)
at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220)
at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UT.

Closes #36496 from pan3793/SPARK-39104.

Authored-by: Cheng Pan 
Signed-off-by: Sean Owen 
---
 .../sql/execution/columnar/InMemoryRelation.scala  |  9 +++-
 .../columnar/InMemoryColumnarQuerySuite.scala  | 53 ++
 2 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 89323e7d1a4..0ace24777b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -238,10 +238,15 @@ case class CachedRDDBuilder(
   }
 
   def isCachedColumnBuffersLoaded: Boolean = {
-_cachedColumnBuffers != null && isCachedRDDLoaded
+if (_cachedColumnBuffers != null) {
+  synchronized {
+return _cachedColumnBuffers != null && isCachedRDDLoaded
+  }
+}
+false
   }
 
-  def isCachedRDDLoaded: Boolean = {
+  private def isCachedRDDLoaded: Boolean = {
   _cachedColumnBuffersAreLoaded || {
 val bmMaster = SparkEnv.get.blockManager.master
 val rddLoaded = _cachedColumnBuffers.partitions.forall { partition =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 

[spark] branch branch-3.3 updated: [SPARK-39208][SQL] Fix query context bugs in decimal overflow under codegen mode

2022-05-17 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 4fb7fe2a406 [SPARK-39208][SQL] Fix query context bugs in decimal 
overflow under codegen mode
4fb7fe2a406 is described below

commit 4fb7fe2a40623526ed22311eac16c937450031e5
Author: Gengliang Wang 
AuthorDate: Tue May 17 22:31:30 2022 +0800

[SPARK-39208][SQL] Fix query context bugs in decimal overflow under codegen 
mode

### What changes were proposed in this pull request?

1. Fix logical bugs in adding query contexts as references under codegen 
mode. 
https://github.com/apache/spark/pull/36040/files#diff-4a70d2f3a4b99f58796b87192143f9838f4c4cf469f3313eb30af79c4e07153aR145
The code
```
val errorContextCode = if (nullOnOverflow) {
  ctx.addReferenceObj("errCtx", queryContext)
} else {
  "\"\""
}
```
should be
```
val errorContextCode = if (nullOnOverflow) {
  "\"\""
} else {
  ctx.addReferenceObj("errCtx", queryContext)
}
```

2. Similar to https://github.com/apache/spark/pull/36557, make 
`CheckOverflowInSum` support query context when WSCG is not available.

### Why are the changes needed?

Bugfix and enhancement in the query context of decimal expressions.
### Does this PR introduce _any_ user-facing change?

No, the query context is not released yet.

### How was this patch tested?

New UT

Closes #36577 from gengliangwang/fixDecimalSumOverflow.

Authored-by: Gengliang Wang 
Signed-off-by: Gengliang Wang 
(cherry picked from commit 191e535b975e5813719d3143797c9fcf86321368)
Signed-off-by: Gengliang Wang 
---
 .../sql/catalyst/expressions/aggregate/Sum.scala| 21 ++---
 .../catalyst/expressions/decimalExpressions.scala   | 15 ---
 .../expressions/DecimalExpressionSuite.scala| 19 +++
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala  | 19 +++
 4 files changed, 52 insertions(+), 22 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
index f2c6925b837..fa43565d807 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala
@@ -143,10 +143,11 @@ abstract class SumBase(child: Expression) extends 
DeclarativeAggregate
* So now, if ansi is enabled, then throw exception, if not then return null.
* If sum is not null, then return the sum.
*/
-  protected def getEvaluateExpression: Expression = resultType match {
+  protected def getEvaluateExpression(queryContext: String): Expression = 
resultType match {
 case d: DecimalType =>
-  If(isEmpty, Literal.create(null, resultType),
-CheckOverflowInSum(sum, d, !useAnsiAdd))
+  val checkOverflowInSum =
+CheckOverflowInSum(sum, d, !useAnsiAdd, queryContext)
+  If(isEmpty, Literal.create(null, resultType), checkOverflowInSum)
 case _ if shouldTrackIsEmpty =>
   If(isEmpty, Literal.create(null, resultType), sum)
 case _ => sum
@@ -172,7 +173,7 @@ abstract class SumBase(child: Expression) extends 
DeclarativeAggregate
 case class Sum(
 child: Expression,
 useAnsiAdd: Boolean = SQLConf.get.ansiEnabled)
-  extends SumBase(child) {
+  extends SumBase(child) with SupportQueryContext {
   def this(child: Expression) = this(child, useAnsiAdd = 
SQLConf.get.ansiEnabled)
 
   override def shouldTrackIsEmpty: Boolean = resultType match {
@@ -186,7 +187,13 @@ case class Sum(
 
   override lazy val mergeExpressions: Seq[Expression] = getMergeExpressions
 
-  override lazy val evaluateExpression: Expression = getEvaluateExpression
+  override lazy val evaluateExpression: Expression = 
getEvaluateExpression(queryContext)
+
+  override def initQueryContext(): String = if (useAnsiAdd) {
+origin.context
+  } else {
+""
+  }
 }
 
 // scalastyle:off line.size.limit
@@ -243,9 +250,9 @@ case class TrySum(child: Expression) extends SumBase(child) 
{
 
   override lazy val evaluateExpression: Expression =
 if (useAnsiAdd) {
-  TryEval(getEvaluateExpression)
+  TryEval(getEvaluateExpression(""))
 } else {
-  getEvaluateExpression
+  getEvaluateExpression("")
 }
 
   override protected def withNewChildInternal(newChild: Expression): 
Expression =
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala
index 

[spark] branch master updated (c5351f85dec -> 191e535b975)

2022-05-17 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from c5351f85dec [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry 
in the SparkSession
 add 191e535b975 [SPARK-39208][SQL] Fix query context bugs in decimal 
overflow under codegen mode

No new revisions were added by this update.

Summary of changes:
 .../sql/catalyst/expressions/aggregate/Sum.scala| 21 ++---
 .../catalyst/expressions/decimalExpressions.scala   | 15 ---
 .../expressions/DecimalExpressionSuite.scala| 19 +++
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala  | 19 +++
 4 files changed, 52 insertions(+), 22 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry in the SparkSession

2022-05-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new c07f65c5168 [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry 
in the SparkSession
c07f65c5168 is described below

commit c07f65c51681107e869d2ebb46aa546ac3871e3a
Author: Hyukjin Kwon 
AuthorDate: Tue May 17 23:05:48 2022 +0900

[SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry in the 
SparkSession

### What changes were proposed in this pull request?

This PR proposes:
1. Use the function registry in the Spark Session being used
2. Move function registration into `beforeAll`

### Why are the changes needed?

Registration of the function without `beforeAll` at `builtin` can affect 
other tests. See also 
https://lists.apache.org/thread/jp0ccqv10ht716g9xldm2ohdv3mpmmz1.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Unittests fixed.

Closes #36576 from HyukjinKwon/SPARK-32268-followup.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit c5351f85dec628a5c806893aa66777cbd77a4d65)
Signed-off-by: Hyukjin Kwon 
---
 .../spark/sql/BloomFilterAggregateQuerySuite.scala | 34 --
 1 file changed, 18 insertions(+), 16 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala
index 7fc89ecc88b..05513cddccb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/BloomFilterAggregateQuerySuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.FunctionIdentifier
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
@@ -35,23 +34,26 @@ class BloomFilterAggregateQuerySuite extends QueryTest with 
SharedSparkSession {
   val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg")
   val funcId_might_contain = new FunctionIdentifier("might_contain")
 
-  // Register 'bloom_filter_agg' to builtin.
-  FunctionRegistry.builtin.registerFunction(funcId_bloom_filter_agg,
-new ExpressionInfo(classOf[BloomFilterAggregate].getName, 
"bloom_filter_agg"),
-(children: Seq[Expression]) => children.size match {
-  case 1 => new BloomFilterAggregate(children.head)
-  case 2 => new BloomFilterAggregate(children.head, children(1))
-  case 3 => new BloomFilterAggregate(children.head, children(1), 
children(2))
-})
-
-  // Register 'might_contain' to builtin.
-  FunctionRegistry.builtin.registerFunction(funcId_might_contain,
-new ExpressionInfo(classOf[BloomFilterMightContain].getName, 
"might_contain"),
-(children: Seq[Expression]) => BloomFilterMightContain(children.head, 
children(1)))
+  override def beforeAll(): Unit = {
+super.beforeAll()
+// Register 'bloom_filter_agg' to builtin.
+
spark.sessionState.functionRegistry.registerFunction(funcId_bloom_filter_agg,
+  new ExpressionInfo(classOf[BloomFilterAggregate].getName, 
"bloom_filter_agg"),
+  (children: Seq[Expression]) => children.size match {
+case 1 => new BloomFilterAggregate(children.head)
+case 2 => new BloomFilterAggregate(children.head, children(1))
+case 3 => new BloomFilterAggregate(children.head, children(1), 
children(2))
+  })
+
+// Register 'might_contain' to builtin.
+spark.sessionState.functionRegistry.registerFunction(funcId_might_contain,
+  new ExpressionInfo(classOf[BloomFilterMightContain].getName, 
"might_contain"),
+  (children: Seq[Expression]) => BloomFilterMightContain(children.head, 
children(1)))
+  }
 
   override def afterAll(): Unit = {
-FunctionRegistry.builtin.dropFunction(funcId_bloom_filter_agg)
-FunctionRegistry.builtin.dropFunction(funcId_might_contain)
+spark.sessionState.functionRegistry.dropFunction(funcId_bloom_filter_agg)
+spark.sessionState.functionRegistry.dropFunction(funcId_might_contain)
 super.afterAll()
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (6d74557c385 -> c5351f85dec)

2022-05-17 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 6d74557c385 [SPARK-39102][CORE][SQL][DSTREAM] Add checkstyle rules to 
disabled use of Guava's `Files.createTempDir()`
 add c5351f85dec [SPARK-32268][SQL][TESTS][FOLLOW-UP] Use function registry 
in the SparkSession

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/BloomFilterAggregateQuerySuite.scala | 34 --
 1 file changed, 18 insertions(+), 16 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-39102][CORE][SQL][DSTREAM] Add checkstyle rules to disabled use of Guava's `Files.createTempDir()`

2022-05-17 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6d74557c385 [SPARK-39102][CORE][SQL][DSTREAM] Add checkstyle rules to 
disabled use of Guava's `Files.createTempDir()`
6d74557c385 is described below

commit 6d74557c385d487fb50b5603d9c322d0ae9d9b74
Author: yangjie01 
AuthorDate: Tue May 17 08:44:29 2022 -0500

[SPARK-39102][CORE][SQL][DSTREAM] Add checkstyle rules to disabled use of 
Guava's `Files.createTempDir()`

### What changes were proposed in this pull request?
The main change of this pr as follows:

- Add a checkstyle to `scalastyle-config.xml` to disabled use of Guava's 
`Files.createTempDir()` for Scala
- Add a checkstyle to `dev/checkstyle.xml` to disabled use of Guava's 
`Files.createTempDir()` for Java
- Introduce `JavaUtils.createTempDir()` method to replace the use of 
Guava's `Files.createTempDir()` in Java code
- Use `Utils.createTempDir()` to replace the use of Guava's 
`Files.createTempDir()` in Scala code

### Why are the changes needed?
Avoid the use of Guava's `Files.createTempDir()` in Spark code due to 
[CVE-2020-8908](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-8908)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GA

Closes #36529 from LuciferYang/SPARK-39102.

Authored-by: yangjie01 
Signed-off-by: Sean Owen 
---
 .../org/apache/spark/network/util/JavaUtils.java   | 56 +
 .../org/apache/spark/network/StreamTestHelper.java |  4 +-
 .../apache/spark/network/util/JavaUtilsSuite.java  | 58 ++
 .../network/shuffle/ExternalBlockHandlerSuite.java |  4 +-
 .../network/shuffle/TestShuffleDataContext.java|  5 +-
 .../test/org/apache/spark/Java8RDDAPISuite.java|  7 +--
 .../java/test/org/apache/spark/JavaAPISuite.java   |  5 +-
 .../org/apache/spark/deploy/IvyTestUtils.scala |  3 +-
 .../apache/spark/deploy/RPackageUtilsSuite.scala   |  3 +-
 dev/checkstyle.xml |  6 +++
 scalastyle-config.xml  |  7 +++
 .../org/apache/spark/streaming/JavaAPISuite.java   |  9 ++--
 12 files changed, 147 insertions(+), 20 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index b5497087634..f699bdd95c3 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -21,7 +21,9 @@ import java.io.*;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.Locale;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -362,6 +364,60 @@ public class JavaUtils {
 }
   }
 
+  /**
+   * Create a temporary directory inside `java.io.tmpdir` with default 
namePrefix "spark".
+   * The directory will be automatically deleted when the VM shuts down.
+   */
+  public static File createTempDir() throws IOException {
+return createTempDir(System.getProperty("java.io.tmpdir"), "spark");
+  }
+
+  /**
+   * Create a temporary directory inside the given parent directory. The 
directory will be
+   * automatically deleted when the VM shuts down.
+   */
+  public static File createTempDir(String root, String namePrefix) throws 
IOException {
+if (root == null) root = System.getProperty("java.io.tmpdir");
+if (namePrefix == null) namePrefix = "spark";
+File dir = createDirectory(root, namePrefix);
+dir.deleteOnExit();
+return dir;
+  }
+
+  /**
+   * Create a directory inside the given parent directory with default 
namePrefix "spark".
+   * The directory is guaranteed to be newly created, and is not marked for 
automatic deletion.
+   */
+  public static File createDirectory(String root) throws IOException {
+return createDirectory(root, "spark");
+  }
+
+  /**
+   * Create a directory inside the given parent directory. The directory is 
guaranteed to be
+   * newly created, and is not marked for automatic deletion.
+   */
+  public static File createDirectory(String root, String namePrefix) throws 
IOException {
+if (namePrefix == null) namePrefix = "spark";
+int attempts = 0;
+int maxAttempts = 10;
+File dir = null;
+while (dir == null) {
+  attempts += 1;
+  if (attempts > maxAttempts) {
+throw new IOException("Failed to create a temp directory (under " + 
root + ") after " +
+  maxAttempts + " attempts!");
+  }
+  try {
+dir = new 

[spark] branch master updated: [SPARK-39196][CORE][SQL][K8S] replace `getOrElse(null)` with `orNull`

2022-05-17 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b4c019627b6 [SPARK-39196][CORE][SQL][K8S] replace `getOrElse(null)` 
with `orNull`
b4c019627b6 is described below

commit b4c019627b676edf850c00bb070377896b66fad2
Author: Qian.Sun 
AuthorDate: Tue May 17 08:43:02 2022 -0500

[SPARK-39196][CORE][SQL][K8S] replace `getOrElse(null)` with `orNull`

### What changes were proposed in this pull request?

This PR aims to replace `getOrElse(null)` with `orNull`.

### Why are the changes needed?

Code simplification.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the GA.

Closes #36567 from dcoliversun/SPARK-39196.

Authored-by: Qian.Sun 
Signed-off-by: Sean Owen 
---
 .../src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala| 2 +-
 core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala| 2 +-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala| 2 +-
 core/src/main/scala/org/apache/spark/util/JsonProtocol.scala  | 2 +-
 graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala | 2 +-
 .../org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala| 2 +-
 .../org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 4 ++--
 .../deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala  | 3 +--
 .../apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala  | 4 ++--
 .../main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala | 2 +-
 .../apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala| 2 +-
 .../org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala| 4 ++--
 .../spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala  | 2 +-
 .../org/apache/spark/streaming/receiver/ReceiverSupervisor.scala  | 2 +-
 14 files changed, 17 insertions(+), 18 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index c82fda85eb4..a7840ef1055 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -162,7 +162,7 @@ private[kafka010] class KafkaSource(
   }
 
   override def reportLatestOffset(): streaming.Offset = {
-latestPartitionOffsets.map(KafkaSourceOffset(_)).getOrElse(null)
+latestPartitionOffsets.map(KafkaSourceOffset(_)).orNull
   }
 
   override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): 
streaming.Offset = {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 15707ab9157..efe31be897b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -556,7 +556,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   val obj = new Array[Byte](exLength)
   stream.readFully(obj)
   new PythonException(new String(obj, StandardCharsets.UTF_8),
-writerThread.exception.getOrElse(null))
+writerThread.exception.orNull)
 }
 
 protected def handleEndOfDataSection(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index c76b0d95d10..89397b8aa69 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1816,7 +1816,7 @@ abstract class RDD[T: ClassTag](
*/
   @Experimental
   @Since("3.1.0")
-  def getResourceProfile(): ResourceProfile = resourceProfile.getOrElse(null)
+  def getResourceProfile(): ResourceProfile = resourceProfile.orNull
 
   // ===
   // Other internal methods and fields
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index ef5a812e4b6..0c15b13d5a1 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -1272,7 +1272,7 @@ private[spark] object JsonProtocol {
   val properties = new Properties
   mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) }
   properties
-}.getOrElse(null)
+}.orNull
   }
 
   def UUIDFromJson(json: JValue): UUID = {
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala 

[spark] branch master updated: [SPARK-36718][SQL][FOLLOWUP] Improve the extract-only check in CollapseProject

2022-05-17 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 98fad57221d [SPARK-36718][SQL][FOLLOWUP] Improve the extract-only 
check in CollapseProject
98fad57221d is described below

commit 98fad57221d4dffc6f1fe28d9aca1093172ecf72
Author: Wenchen Fan 
AuthorDate: Tue May 17 15:56:47 2022 +0800

[SPARK-36718][SQL][FOLLOWUP] Improve the extract-only check in 
CollapseProject

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/36510 , to fix a 
corner case: if the `CreateStruct` is only referenced once in non-extract 
expressions, we should still allow collapsing the projects.

### Why are the changes needed?

completely fix the perf regression

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

a new test

Closes #36572 from cloud-fan/regression.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/catalyst/optimizer/Optimizer.scala  | 16 +---
 .../sql/catalyst/optimizer/CollapseProjectSuite.scala| 11 +++
 2 files changed, 20 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 9215609f154..2f93cf2d8c3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1008,20 +1008,22 @@ object CollapseProject extends Rule[LogicalPlan] with 
AliasHelper {
   val producer = producerMap.getOrElse(reference, reference)
   producer.deterministic && (count == 1 || alwaysInline || {
 val relatedConsumers = 
consumers.filter(_.references.contains(reference))
-val extractOnly = relatedConsumers.forall(isExtractOnly(_, 
reference))
+// It's still exactly-only if there is only one reference in 
non-extract expressions,
+// as we won't duplicate the expensive CreateStruct-like 
expressions.
+val extractOnly = relatedConsumers.map(refCountInNonExtract(_, 
reference)).sum <= 1
 shouldInline(producer, extractOnly)
   })
   }
   }
 
-  private def isExtractOnly(expr: Expression, ref: Attribute): Boolean = {
-def hasRefInNonExtractValue(e: Expression): Boolean = e match {
-  case a: Attribute => a.semanticEquals(ref)
+  private def refCountInNonExtract(expr: Expression, ref: Attribute): Int = {
+def refCount(e: Expression): Int = e match {
+  case a: Attribute if a.semanticEquals(ref) => 1
   // The first child of `ExtractValue` is the complex type to be extracted.
-  case e: ExtractValue if e.children.head.semanticEquals(ref) => false
-  case _ => e.children.exists(hasRefInNonExtractValue)
+  case e: ExtractValue if e.children.head.semanticEquals(ref) => 0
+  case _ => e.children.map(refCount).sum
 }
-!hasRefInNonExtractValue(expr)
+refCount(expr)
   }
 
   /**
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
index dd075837d51..baa7c94069a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
@@ -143,6 +143,17 @@ class CollapseProjectSuite extends PlanTest {
   .select(($"a" + ($"a" + 1)).as("add"))
   .analyze
 comparePlans(optimized2, expected2)
+
+// referencing `CreateStruct` only once in non-extract expression is OK.
+val query3 = testRelation
+  .select(namedStruct("a", $"a", "a_plus_1", $"a" + 1).as("struct"))
+  .select($"struct", $"struct".getField("a"))
+  .analyze
+val optimized3 = Optimize.execute(query3)
+val expected3 = testRelation
+  .select(namedStruct("a", $"a", "a_plus_1", $"a" + 1).as("struct"), 
$"a".as("struct.a"))
+  .analyze
+comparePlans(optimized3, expected3)
   }
 
   test("preserve top-level alias metadata while collapsing projects") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-36718][SQL][FOLLOWUP] Improve the extract-only check in CollapseProject

2022-05-17 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new c25624b4d0c [SPARK-36718][SQL][FOLLOWUP] Improve the extract-only 
check in CollapseProject
c25624b4d0c is described below

commit c25624b4d0c2d77f0a6db7e70ecf750e9a1143f2
Author: Wenchen Fan 
AuthorDate: Tue May 17 15:56:47 2022 +0800

[SPARK-36718][SQL][FOLLOWUP] Improve the extract-only check in 
CollapseProject

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/36510 , to fix a 
corner case: if the `CreateStruct` is only referenced once in non-extract 
expressions, we should still allow collapsing the projects.

### Why are the changes needed?

completely fix the perf regression

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

a new test

Closes #36572 from cloud-fan/regression.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 98fad57221d4dffc6f1fe28d9aca1093172ecf72)
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/catalyst/optimizer/Optimizer.scala  | 16 +---
 .../sql/catalyst/optimizer/CollapseProjectSuite.scala| 11 +++
 2 files changed, 20 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 759a7044f15..94e9d3cdd14 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -991,20 +991,22 @@ object CollapseProject extends Rule[LogicalPlan] with 
AliasHelper {
   val producer = producerMap.getOrElse(reference, reference)
   producer.deterministic && (count == 1 || alwaysInline || {
 val relatedConsumers = 
consumers.filter(_.references.contains(reference))
-val extractOnly = relatedConsumers.forall(isExtractOnly(_, 
reference))
+// It's still exactly-only if there is only one reference in 
non-extract expressions,
+// as we won't duplicate the expensive CreateStruct-like 
expressions.
+val extractOnly = relatedConsumers.map(refCountInNonExtract(_, 
reference)).sum <= 1
 shouldInline(producer, extractOnly)
   })
   }
   }
 
-  private def isExtractOnly(expr: Expression, ref: Attribute): Boolean = {
-def hasRefInNonExtractValue(e: Expression): Boolean = e match {
-  case a: Attribute => a.semanticEquals(ref)
+  private def refCountInNonExtract(expr: Expression, ref: Attribute): Int = {
+def refCount(e: Expression): Int = e match {
+  case a: Attribute if a.semanticEquals(ref) => 1
   // The first child of `ExtractValue` is the complex type to be extracted.
-  case e: ExtractValue if e.children.head.semanticEquals(ref) => false
-  case _ => e.children.exists(hasRefInNonExtractValue)
+  case e: ExtractValue if e.children.head.semanticEquals(ref) => 0
+  case _ => e.children.map(refCount).sum
 }
-!hasRefInNonExtractValue(expr)
+refCount(expr)
   }
 
   /**
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
index f6c3209726b..ba5c5572e24 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala
@@ -141,6 +141,17 @@ class CollapseProjectSuite extends PlanTest {
   .select(($"a" + ($"a" + 1)).as("add"))
   .analyze
 comparePlans(optimized2, expected2)
+
+// referencing `CreateStruct` only once in non-extract expression is OK.
+val query3 = testRelation
+  .select(namedStruct("a", $"a", "a_plus_1", $"a" + 1).as("struct"))
+  .select($"struct", $"struct".getField("a"))
+  .analyze
+val optimized3 = Optimize.execute(query3)
+val expected3 = testRelation
+  .select(namedStruct("a", $"a", "a_plus_1", $"a" + 1).as("struct"), 
$"a".as("struct.a"))
+  .analyze
+comparePlans(optimized3, expected3)
   }
 
   test("preserve top-level alias metadata while collapsing projects") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org