[spark] branch master updated (b204710 -> dc153f5)

2022-02-25 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from b204710  [MINOR] Add git ignores for vscode and metals
 add dc153f5  [SPARK-38237][SQL][SS] Allow `ClusteredDistribution` to 
require full clustering keys

No new revisions were added by this update.

Summary of changes:
 .../sql/catalyst/plans/physical/partitioning.scala | 44 +++
 .../org/apache/spark/sql/internal/SQLConf.scala| 12 ++
 .../spark/sql/catalyst/DistributionSuite.scala | 42 +--
 .../spark/sql/execution/adaptive/AQEUtils.scala|  2 +-
 .../streaming/FlatMapGroupsWithStateExec.scala |  6 ++-
 .../sql/execution/streaming/StreamExecution.scala  |  5 +++
 .../execution/streaming/statefulOperators.scala| 15 ---
 .../spark/sql/DataFrameWindowFunctionsSuite.scala  | 49 +-
 .../apache/spark/sql/execution/PlannerSuite.scala  |  2 +-
 9 files changed, 156 insertions(+), 21 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (daa5f9d -> b204710)

2022-02-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from daa5f9d  [MINOR][DOCS] Fix missing field in query
 add b204710  [MINOR] Add git ignores for vscode and metals

No new revisions were added by this update.

Summary of changes:
 .gitignore | 5 +
 1 file changed, 5 insertions(+)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.1 updated (82765a8 -> b98dc38)

2022-02-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 82765a8  [SPARK-38305][CORE] Explicitly check if source exists in 
unpack() before calling FileUtil methods
 add b98dc38  [MINOR][DOCS] Fix missing field in query

No new revisions were added by this update.

Summary of changes:
 docs/sql-ref-syntax-qry-select-window.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [MINOR][DOCS] Fix missing field in query

2022-02-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new a4b3775  [MINOR][DOCS] Fix missing field in query
a4b3775 is described below

commit a4b37757d444182006369d2e4a0b7faaf1d38917
Author: Alfonso 
AuthorDate: Fri Feb 25 08:38:51 2022 -0600

[MINOR][DOCS] Fix missing field in query

### What changes were proposed in this pull request?

This PR fixes sql query in doc, let the query confrom to the query result 
in the following

### Why are the changes needed?

Just a fix to doc

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

use project test

Closes #35624 from redsnow1992/patch-1.

Authored-by: Alfonso 
Signed-off-by: Sean Owen 
(cherry picked from commit daa5f9df4a1c8b3cf5db7142e54b765272c1f24c)
Signed-off-by: Sean Owen 
---
 docs/sql-ref-syntax-qry-select-window.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/sql-ref-syntax-qry-select-window.md 
b/docs/sql-ref-syntax-qry-select-window.md
index 6e65778..9fbebcf 100644
--- a/docs/sql-ref-syntax-qry-select-window.md
+++ b/docs/sql-ref-syntax-qry-select-window.md
@@ -109,7 +109,7 @@ SELECT * FROM employees;
 | Alex|  Sales| 3|   33|
 +-+---+--+-+
 
-SELECT name, dept, RANK() OVER (PARTITION BY dept ORDER BY salary) AS rank 
FROM employees;
+SELECT name, dept, salary, RANK() OVER (PARTITION BY dept ORDER BY salary) AS 
rank FROM employees;
 +-+---+--++
 | name|   dept|salary|rank|
 +-+---+--++
@@ -125,7 +125,7 @@ SELECT name, dept, RANK() OVER (PARTITION BY dept ORDER BY 
salary) AS rank FROM
 | Jeff|  Marketing| 35000|   3|
 +-+---+--++
 
-SELECT name, dept, DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary ROWS 
BETWEEN
+SELECT name, dept, salary, DENSE_RANK() OVER (PARTITION BY dept ORDER BY 
salary ROWS BETWEEN
 UNBOUNDED PRECEDING AND CURRENT ROW) AS dense_rank FROM employees;
 +-+---+--+--+
 | name|   dept|salary|dense_rank|

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (64e1f28 -> daa5f9d)

2022-02-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 64e1f28  [SPARK-38305][CORE] Explicitly check if source exists in 
unpack() before calling FileUtil methods
 add daa5f9d  [MINOR][DOCS] Fix missing field in query

No new revisions were added by this update.

Summary of changes:
 docs/sql-ref-syntax-qry-select-window.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.1 updated: [SPARK-38305][CORE] Explicitly check if source exists in unpack() before calling FileUtil methods

2022-02-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 82765a8  [SPARK-38305][CORE] Explicitly check if source exists in 
unpack() before calling FileUtil methods
82765a8 is described below

commit 82765a826b41311bd3cea2bc454f89ebdc0a3aa1
Author: Sean Owen 
AuthorDate: Fri Feb 25 08:34:04 2022 -0600

[SPARK-38305][CORE] Explicitly check if source exists in unpack() before 
calling FileUtil methods

### What changes were proposed in this pull request?

Explicitly check existence of source file in Utils.unpack before calling 
Hadoop FileUtil methods

### Why are the changes needed?

A discussion from the Hadoop community raised a potential issue in calling 
these methods when a file doesn't exist.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #35632 from srowen/SPARK-38305.

Authored-by: Sean Owen 
Signed-off-by: Sean Owen 
(cherry picked from commit 64e1f28f1626247cc1361dcb395288227454ca8f)
Signed-off-by: Sean Owen 
---
 core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1643aa6..efe9cc2 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -577,6 +577,9 @@ private[spark] object Utils extends Logging {
* basically copied from `org.apache.hadoop.yarn.util.FSDownload.unpack`.
*/
   def unpack(source: File, dest: File): Unit = {
+if (!source.exists()) {
+  throw new FileNotFoundException(source.getAbsolutePath)
+}
 val lowerSrc = StringUtils.toLowerCase(source.getName)
 if (lowerSrc.endsWith(".jar")) {
   RunJar.unJar(source, dest, RunJar.MATCH_ANY)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-38305][CORE] Explicitly check if source exists in unpack() before calling FileUtil methods

2022-02-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 775a829  [SPARK-38305][CORE] Explicitly check if source exists in 
unpack() before calling FileUtil methods
775a829 is described below

commit 775a829c9de3717a8f298146dde0d57dd7c0ab11
Author: Sean Owen 
AuthorDate: Fri Feb 25 08:34:04 2022 -0600

[SPARK-38305][CORE] Explicitly check if source exists in unpack() before 
calling FileUtil methods

### What changes were proposed in this pull request?

Explicitly check existence of source file in Utils.unpack before calling 
Hadoop FileUtil methods

### Why are the changes needed?

A discussion from the Hadoop community raised a potential issue in calling 
these methods when a file doesn't exist.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #35632 from srowen/SPARK-38305.

Authored-by: Sean Owen 
Signed-off-by: Sean Owen 
(cherry picked from commit 64e1f28f1626247cc1361dcb395288227454ca8f)
Signed-off-by: Sean Owen 
---
 core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 31514c8..776c8c4 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -585,6 +585,9 @@ private[spark] object Utils extends Logging {
* basically copied from `org.apache.hadoop.yarn.util.FSDownload.unpack`.
*/
   def unpack(source: File, dest: File): Unit = {
+if (!source.exists()) {
+  throw new FileNotFoundException(source.getAbsolutePath)
+}
 val lowerSrc = StringUtils.toLowerCase(source.getName)
 if (lowerSrc.endsWith(".jar")) {
   RunJar.unJar(source, dest, RunJar.MATCH_ANY)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-38305][CORE] Explicitly check if source exists in unpack() before calling FileUtil methods

2022-02-25 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 64e1f28  [SPARK-38305][CORE] Explicitly check if source exists in 
unpack() before calling FileUtil methods
64e1f28 is described below

commit 64e1f28f1626247cc1361dcb395288227454ca8f
Author: Sean Owen 
AuthorDate: Fri Feb 25 08:34:04 2022 -0600

[SPARK-38305][CORE] Explicitly check if source exists in unpack() before 
calling FileUtil methods

### What changes were proposed in this pull request?

Explicitly check existence of source file in Utils.unpack before calling 
Hadoop FileUtil methods

### Why are the changes needed?

A discussion from the Hadoop community raised a potential issue in calling 
these methods when a file doesn't exist.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #35632 from srowen/SPARK-38305.

Authored-by: Sean Owen 
Signed-off-by: Sean Owen 
---
 core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a9d6180..17bec9f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -593,6 +593,9 @@ private[spark] object Utils extends Logging {
* basically copied from `org.apache.hadoop.yarn.util.FSDownload.unpack`.
*/
   def unpack(source: File, dest: File): Unit = {
+if (!source.exists()) {
+  throw new FileNotFoundException(source.getAbsolutePath)
+}
 val lowerSrc = StringUtils.toLowerCase(source.getName)
 if (lowerSrc.endsWith(".jar")) {
   RunJar.unJar(source, dest, RunJar.MATCH_ANY)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-38325][SQL] ANSI mode: avoid potential runtime error in HashJoin.extractKeyExprAt()

2022-02-25 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new bc27bc5  [SPARK-38325][SQL] ANSI mode: avoid potential runtime error 
in HashJoin.extractKeyExprAt()
bc27bc5 is described below

commit bc27bc5ec173da006d3642c3a958468bb91291fe
Author: Gengliang Wang 
AuthorDate: Fri Feb 25 17:11:15 2022 +0800

[SPARK-38325][SQL] ANSI mode: avoid potential runtime error in 
HashJoin.extractKeyExprAt()

### What changes were proposed in this pull request?

SubqueryBroadcastExec retrieves the partition key from the broadcast 
results based on the type of HashedRelation returned. If the key is packed 
inside a Long, we extract it through bitwise operations and cast it as 
Byte/Short/Int if necessary.

The casting here can cause a potential runtime error. This PR is to fix it.

### Why are the changes needed?

Bug fix
### Does this PR introduce _any_ user-facing change?

Yes, avoid potential runtime error in dynamic pruning under ANSI mode

### How was this patch tested?

UT

Closes #35659 from gengliangwang/fixHashJoin.

Authored-by: Gengliang Wang 
Signed-off-by: Gengliang Wang 
(cherry picked from commit 29eca8c87f4e8c19c0380f7c30668fd88edee573)
Signed-off-by: Gengliang Wang 
---
 .../spark/sql/execution/joins/HashJoin.scala   | 27 +-
 .../sql/execution/joins/HashedRelationSuite.scala  | 22 +++---
 2 files changed, 35 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index f87acb8..58fc89c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -705,6 +705,13 @@ trait HashJoin extends JoinCodegenSupport {
 }
 
 object HashJoin extends CastSupport with SQLConfHelper {
+
+  private def canRewriteAsLongType(keys: Seq[Expression]): Boolean = {
+// TODO: support BooleanType, DateType and TimestampType
+keys.forall(_.dataType.isInstanceOf[IntegralType]) &&
+  keys.map(_.dataType.defaultSize).sum <= 8
+  }
+
   /**
* Try to rewrite the key as LongType so we can use getLong(), if they key 
can fit with a long.
*
@@ -712,9 +719,7 @@ object HashJoin extends CastSupport with SQLConfHelper {
*/
   def rewriteKeyExpr(keys: Seq[Expression]): Seq[Expression] = {
 assert(keys.nonEmpty)
-// TODO: support BooleanType, DateType and TimestampType
-if (keys.exists(!_.dataType.isInstanceOf[IntegralType])
-  || keys.map(_.dataType.defaultSize).sum > 8) {
+if (!canRewriteAsLongType(keys)) {
   return keys
 }
 
@@ -736,18 +741,28 @@ object HashJoin extends CastSupport with SQLConfHelper {
* determine the number of bits to shift
*/
   def extractKeyExprAt(keys: Seq[Expression], index: Int): Expression = {
+assert(canRewriteAsLongType(keys))
 // jump over keys that have a higher index value than the required key
 if (keys.size == 1) {
   assert(index == 0)
-  cast(BoundReference(0, LongType, nullable = false), keys(index).dataType)
+  Cast(
+child = BoundReference(0, LongType, nullable = false),
+dataType = keys(index).dataType,
+timeZoneId = Option(conf.sessionLocalTimeZone),
+ansiEnabled = false)
 } else {
   val shiftedBits =
 keys.slice(index + 1, keys.size).map(_.dataType.defaultSize * 8).sum
   val mask = (1L << (keys(index).dataType.defaultSize * 8)) - 1
   // build the schema for unpacking the required key
-  cast(BitwiseAnd(
+  val castChild = BitwiseAnd(
 ShiftRightUnsigned(BoundReference(0, LongType, nullable = false), 
Literal(shiftedBits)),
-Literal(mask)), keys(index).dataType)
+Literal(mask))
+  Cast(
+child = castChild,
+dataType = keys(index).dataType,
+timeZoneId = Option(conf.sessionLocalTimeZone),
+ansiEnabled = false)
 }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 84f6299..0a364ba 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.memory.{TaskMemoryManager, 
UnifiedMemoryManager}
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.internal.SQLConf

[spark] branch master updated: [SPARK-38325][SQL] ANSI mode: avoid potential runtime error in HashJoin.extractKeyExprAt()

2022-02-25 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 29eca8c  [SPARK-38325][SQL] ANSI mode: avoid potential runtime error 
in HashJoin.extractKeyExprAt()
29eca8c is described below

commit 29eca8c87f4e8c19c0380f7c30668fd88edee573
Author: Gengliang Wang 
AuthorDate: Fri Feb 25 17:11:15 2022 +0800

[SPARK-38325][SQL] ANSI mode: avoid potential runtime error in 
HashJoin.extractKeyExprAt()

### What changes were proposed in this pull request?

SubqueryBroadcastExec retrieves the partition key from the broadcast 
results based on the type of HashedRelation returned. If the key is packed 
inside a Long, we extract it through bitwise operations and cast it as 
Byte/Short/Int if necessary.

The casting here can cause a potential runtime error. This PR is to fix it.

### Why are the changes needed?

Bug fix
### Does this PR introduce _any_ user-facing change?

Yes, avoid potential runtime error in dynamic pruning under ANSI mode

### How was this patch tested?

UT

Closes #35659 from gengliangwang/fixHashJoin.

Authored-by: Gengliang Wang 
Signed-off-by: Gengliang Wang 
---
 .../spark/sql/execution/joins/HashJoin.scala   | 27 +-
 .../sql/execution/joins/HashedRelationSuite.scala  | 22 +++---
 2 files changed, 35 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 0e8bb84..4595ea0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -705,6 +705,13 @@ trait HashJoin extends JoinCodegenSupport {
 }
 
 object HashJoin extends CastSupport with SQLConfHelper {
+
+  private def canRewriteAsLongType(keys: Seq[Expression]): Boolean = {
+// TODO: support BooleanType, DateType and TimestampType
+keys.forall(_.dataType.isInstanceOf[IntegralType]) &&
+  keys.map(_.dataType.defaultSize).sum <= 8
+  }
+
   /**
* Try to rewrite the key as LongType so we can use getLong(), if they key 
can fit with a long.
*
@@ -712,9 +719,7 @@ object HashJoin extends CastSupport with SQLConfHelper {
*/
   def rewriteKeyExpr(keys: Seq[Expression]): Seq[Expression] = {
 assert(keys.nonEmpty)
-// TODO: support BooleanType, DateType and TimestampType
-if (keys.exists(!_.dataType.isInstanceOf[IntegralType])
-  || keys.map(_.dataType.defaultSize).sum > 8) {
+if (!canRewriteAsLongType(keys)) {
   return keys
 }
 
@@ -736,18 +741,28 @@ object HashJoin extends CastSupport with SQLConfHelper {
* determine the number of bits to shift
*/
   def extractKeyExprAt(keys: Seq[Expression], index: Int): Expression = {
+assert(canRewriteAsLongType(keys))
 // jump over keys that have a higher index value than the required key
 if (keys.size == 1) {
   assert(index == 0)
-  cast(BoundReference(0, LongType, nullable = false), keys(index).dataType)
+  Cast(
+child = BoundReference(0, LongType, nullable = false),
+dataType = keys(index).dataType,
+timeZoneId = Option(conf.sessionLocalTimeZone),
+ansiEnabled = false)
 } else {
   val shiftedBits =
 keys.slice(index + 1, keys.size).map(_.dataType.defaultSize * 8).sum
   val mask = (1L << (keys(index).dataType.defaultSize * 8)) - 1
   // build the schema for unpacking the required key
-  cast(BitwiseAnd(
+  val castChild = BitwiseAnd(
 ShiftRightUnsigned(BoundReference(0, LongType, nullable = false), 
Literal(shiftedBits)),
-Literal(mask)), keys(index).dataType)
+Literal(mask))
+  Cast(
+child = castChild,
+dataType = keys(index).dataType,
+timeZoneId = Option(conf.sessionLocalTimeZone),
+ansiEnabled = false)
 }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index b8ffc47..d5b7ed6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.memory.{TaskMemoryManager, 
UnifiedMemoryManager}
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.u