[spark] branch master updated (e9ccf4a -> e9af457)

2021-06-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from e9ccf4a  [SPARK-35640][SQL] Refactor Parquet vectorized reader to 
remove duplicated code paths
 add e9af457  [SPARK-35718][SQL] Support casting of Date to timestamp 
without time zone type

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/expressions/Cast.scala  | 24 +++---
 .../spark/sql/catalyst/expressions/CastSuite.scala |  9 
 2 files changed, 30 insertions(+), 3 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (463daab -> e9ccf4a)

2021-06-10 Thread dbtsai
This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 463daab  [SPARK-34512][BUILD][SQL] Upgrade built-in Hive to 2.3.9
 add e9ccf4a  [SPARK-35640][SQL] Refactor Parquet vectorized reader to 
remove duplicated code paths

No new revisions were added by this update.

Summary of changes:
 .../datasources/parquet/ParquetVectorUpdater.java  |   86 ++
 .../parquet/ParquetVectorUpdaterFactory.java   | 1005 
 .../parquet/VectorizedColumnReader.java|  597 +---
 .../parquet/VectorizedRleValuesReader.java |  433 +
 .../datasources/parquet/ParquetIOSuite.scala   |   22 +
 5 files changed, 1144 insertions(+), 999 deletions(-)
 create mode 100644 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdater.java
 create mode 100644 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (912d60b -> 463daab)

2021-06-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 912d60b  [SPARK-35709][DOCS] Remove the reference to third party Nomad 
integration project
 add 463daab  [SPARK-34512][BUILD][SQL] Upgrade built-in Hive to 2.3.9

No new revisions were added by this update.

Summary of changes:
 dev/deps/spark-deps-hadoop-2.7-hive-2.3| 26 +-
 dev/deps/spark-deps-hadoop-3.2-hive-2.3| 26 +-
 docs/building-spark.md |  4 +--
 docs/sql-data-sources-hive-tables.md   |  8 +++---
 docs/sql-migration-guide.md|  2 +-
 pom.xml|  4 +--
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |  2 +-
 .../org/apache/spark/sql/hive/HiveUtils.scala  |  2 +-
 .../org/apache/spark/sql/hive/client/package.scala |  2 +-
 .../hive/HiveExternalCatalogVersionsSuite.scala|  2 +-
 .../hive/execution/HiveSerDeReadWriteSuite.scala   | 31 ++
 11 files changed, 70 insertions(+), 39 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (cf07036 -> 912d60b)

2021-06-10 Thread yamamuro
This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from cf07036  [SPARK-35593][K8S][CORE] Support shuffle data recovery on the 
reused PVCs
 add 912d60b  [SPARK-35709][DOCS] Remove the reference to third party Nomad 
integration project

No new revisions were added by this update.

Summary of changes:
 docs/cluster-overview.md | 3 ---
 1 file changed, 3 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (a97885b -> cf07036)

2021-06-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from a97885b  [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to 
create merge directory and to get the local shuffle merged data
 add cf07036  [SPARK-35593][K8S][CORE] Support shuffle data recovery on the 
reused PVCs

No new revisions were added by this update.

Summary of changes:
 .../scala/org/apache/spark/MapOutputTracker.scala  |  22 ++-
 .../shuffle/KubernetesLocalDiskShuffleDataIO.scala |  24 +--
 ...ernetesLocalDiskShuffleExecutorComponents.scala | 102 ++
 .../KubernetesLocalDiskShuffleDataIOSuite.scala| 219 +
 4 files changed, 353 insertions(+), 14 deletions(-)
 copy core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala => 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIO.scala
 (53%)
 create mode 100644 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala
 create mode 100644 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

2021-06-10 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a97885b  [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to 
create merge directory and to get the local shuffle merged data
a97885b is described below

commit a97885bb2c81563a18c99df233fb9e99ff368c9c
Author: Ye Zhou 
AuthorDate: Thu Jun 10 16:57:46 2021 -0500

[SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge 
directory and to get the local shuffle merged data

### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for 
push-based shuffle.

### Summary of changes:
Executor will create the merge directories under the application temp 
directory provided by YARN. The access control of the folder will be set to 
770, where Shuffle Service can create merged shuffle files and write merge 
shuffle data in to those files.

Serve the merged shuffle blocks fetch request, read the merged shuffle 
blocks.

### Why are the changes needed?
Refer to the SPIP in SPARK-30602.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Added unit tests.
The reference PR with the consolidated changes covering the complete 
implementation is also provided in SPARK-30602.
We have already verified the functionality and the improved performance as 
documented in the SPIP doc.

Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Ye Zhou yezhoulinkedin.com

Closes #32007 from zhouyejoe/SPARK-33350.

Lead-authored-by: Ye Zhou 
Co-authored-by: Chandni Singh 
Co-authored-by: Min Shen 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../network/shuffle/RemoteBlockPushResolver.java   |   8 +-
 .../scala/org/apache/spark/MapOutputTracker.scala  |   2 +-
 .../spark/shuffle/IndexShuffleBlockResolver.scala  |  73 +-
 .../spark/shuffle/ShuffleBlockResolver.scala   |  13 ++-
 .../scala/org/apache/spark/storage/BlockId.scala   |  37 +++
 .../org/apache/spark/storage/BlockManager.scala|  26 -
 .../apache/spark/storage/DiskBlockManager.scala| 112 +
 .../main/scala/org/apache/spark/util/Utils.scala   |  28 +-
 .../shuffle/HostLocalShuffleReadingSuite.scala |   9 ++
 .../sort/IndexShuffleBlockResolverSuite.scala  |  93 -
 .../org/apache/spark/storage/BlockIdSuite.scala|  36 +++
 .../spark/storage/DiskBlockManagerSuite.scala  |  39 +++
 .../scala/org/apache/spark/util/UtilsSuite.scala   |  12 ++-
 13 files changed, 470 insertions(+), 18 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 1ac33cd..47d2547 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -75,6 +75,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   private static final Logger logger = 
LoggerFactory.getLogger(RemoteBlockPushResolver.class);
   @VisibleForTesting
   static final String MERGE_MANAGER_DIR = "merge_manager";
+  public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged";
 
   private final ConcurrentMap appsPathInfo;
   private final ConcurrentMap> partitions;
@@ -211,7 +212,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 
   /**
* The logic here is consistent with
-   * org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
+   * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
+   *  org.apache.spark.storage.BlockId, scala.Option)]]
*/
   private File getFile(String appId, String filename) {
 // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
@@ -431,8 +433,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   executorInfo.subDirsPerLocalDir));
   }
   private static String generateFileName(AppShuffleId appShuffleId, int 
reduceId) {
-return String.format("mergedShuffle_%s_%d_%d", appShuffleId.appId, 
appShuffleId.shuffleId,
-  reduceId);
+return String.format("%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX, 
appShuffleId.appId,
+  appShuffleId.shuffleId, reduceId);
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 9f2228b..003b10f 100644
--- 

[spark] branch master updated (b4b78ce -> bc1edba)

2021-06-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from b4b78ce  [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception
 add bc1edba  [SPARK-35692][K8S] Use AtomicInteger for executor id 
generating

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 7 +++
 .../spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 6 ++
 2 files changed, 9 insertions(+), 4 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.1 updated: [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception

2021-06-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 0f3a251  [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception
0f3a251 is described below

commit 0f3a251af0795bfa4af75ce1efa6a845a31362fa
Author: Kent Yao 
AuthorDate: Thu Jun 10 13:39:39 2021 -0700

[SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception

### What changes were proposed in this pull request?

A follow-up for SPARK-32975 to avoid unexpected the `None.get` exception

Run SparkPi with docker desktop, as podName is an option, we will got
```logtalk
21/06/09 01:09:12 ERROR Utils: Uncaught exception in thread main
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:110)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1417)
at 
org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.start(ExecutorPodsAllocator.scala:111)
at 
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.start(KubernetesClusterSchedulerBackend.scala:99)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
at org.apache.spark.SparkContext.(SparkContext.scala:581)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2686)
at 
org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:948)
at scala.Option.getOrElse(Option.scala:189)
at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:942)
at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:30)
at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```

### Why are the changes needed?

fix a regression

### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

Manual.

Closes #32830 from yaooqinn/SPARK-32975.

Authored-by: Kent Yao 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit b4b78ce26567ce7ab83d47ce3b6af87c866bcacb)
Signed-off-by: Dongjoon Hyun 
---
 .../scheduler/cluster/k8s/ExecutorPodsAllocator.scala| 16 +---
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 358058e..5429e36 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -102,13 +102,15 @@ private[spark] class ExecutorPodsAllocator(
   @volatile private var deletedExecutorIds = Set.empty[Long]
 
   def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
-// Wait until the driver pod is ready before starting executors, as the 
headless service won't
-// be resolvable by DNS until the driver pod is ready.
-Utils.tryLogNonFatalError {
-  kubernetesClient
-.pods()
-.withName(kubernetesDriverPodName.get)
-.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
+driverPod.foreach { pod =>
+  // Wait until the driver pod is ready before starting executors, as the 
headless service won't
+  // be resolvable by DNS until the driver pod is ready.
+  Utils.tryLogNonFatalError {
+kubernetesClient
+  .pods()
+  

[spark] branch master updated (d21ff13 -> b4b78ce)

2021-06-10 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d21ff13  [SPARK-35716][SQL] Support casting of timestamp without time 
zone to date type
 add b4b78ce  [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception

No new revisions were added by this update.

Summary of changes:
 .../scheduler/cluster/k8s/ExecutorPodsAllocator.scala| 16 +---
 1 file changed, 9 insertions(+), 7 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type

2021-06-10 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d21ff13  [SPARK-35716][SQL] Support casting of timestamp without time 
zone to date type
d21ff13 is described below

commit d21ff1318f614fc207d9cd3c485e4337faa8e878
Author: Gengliang Wang 
AuthorDate: Thu Jun 10 23:37:02 2021 +0300

[SPARK-35716][SQL] Support casting of timestamp without time zone to date 
type

### What changes were proposed in this pull request?

Extend the Cast expression and support TimestampWithoutTZType in casting to 
DateType.

### Why are the changes needed?

To conform the ANSI SQL standard which requires to support such casting.

### Does this PR introduce _any_ user-facing change?

No, the new timestamp type is not released yet.

### How was this patch tested?

Unit test

Closes #32869 from gengliangwang/castToDate.

Authored-by: Gengliang Wang 
Signed-off-by: Max Gekk 
---
 .../org/apache/spark/sql/catalyst/expressions/Cast.scala  |  9 +
 .../org/apache/spark/sql/catalyst/expressions/CastSuite.scala | 11 +--
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 8de19ba..fba17d3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -72,6 +72,7 @@ object Cast {
 
 case (StringType, DateType) => true
 case (TimestampType, DateType) => true
+case (TimestampWithoutTZType, DateType) => true
 
 case (StringType, CalendarIntervalType) => true
 case (StringType, DayTimeIntervalType) => true
@@ -534,6 +535,8 @@ abstract class CastBase extends UnaryExpression with 
TimeZoneAwareExpression wit
   // throw valid precision more than seconds, according to Hive.
   // Timestamp.nanos is in 0 to 999,999,999, no more than a second.
   buildCast[Long](_, t => microsToDays(t, zoneId))
+case TimestampWithoutTZType =>
+  buildCast[Long](_, t => microsToDays(t, ZoneOffset.UTC))
   }
 
   // IntervalConverter
@@ -1204,6 +1207,11 @@ abstract class CastBase extends UnaryExpression with 
TimeZoneAwareExpression wit
 (c, evPrim, evNull) =>
   code"""$evPrim =
 org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToDays($c, 
$zid);"""
+  case TimestampWithoutTZType =>
+(c, evPrim, evNull) =>
+  // scalastyle:off line.size.limit
+  code"$evPrim = 
org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToDays($c, 
java.time.ZoneOffset.UTC);"
+  // scalastyle:on line.size.limit
   case _ =>
 (c, evPrim, evNull) => code"$evNull = true;"
 }
@@ -1953,6 +1961,7 @@ object AnsiCast {
 
 case (StringType, DateType) => true
 case (TimestampType, DateType) => true
+case (TimestampWithoutTZType, DateType) => true
 
 case (_: NumericType, _: NumericType) => true
 case (StringType, _: NumericType) => true
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
index a4e4257..51a7740 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import java.sql.{Date, Timestamp}
-import java.time.{DateTimeException, Duration, LocalDateTime, Period}
+import java.time.{DateTimeException, Duration, LocalDate, LocalDateTime, 
Period}
 import java.time.temporal.ChronoUnit
 import java.util.{Calendar, TimeZone}
 
@@ -1256,10 +1256,17 @@ abstract class AnsiCastSuiteBase extends CastSuiteBase {
 
   test("SPARK-35711: cast timestamp without time zone to timestamp with local 
time zone") {
 specialTs.foreach { s =>
-  val dt = LocalDateTime.parse(s.replace(" ", "T"))
+  val dt = LocalDateTime.parse(s)
   checkEvaluation(cast(dt, TimestampType), 
DateTimeUtils.localDateTimeToMicros(dt))
 }
   }
+
+  test("SPARK-35716: cast timestamp without time zone to date type") {
+specialTs.foreach { s =>
+  val dt = LocalDateTime.parse(s)
+  checkEvaluation(cast(dt, DateType), LocalDate.parse(s.split("T")(0)))
+}
+  }
 }
 
 /**

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage

2021-06-10 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b5a1503  [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge 
with Push based shuffle and preparation step for the reduce stage
b5a1503 is described below

commit b5a15035851bfba12ef1c68d10103cec42cbac0c
Author: Venkata krishnan Sowrirajan 
AuthorDate: Thu Jun 10 13:06:15 2021 -0500

[SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based 
shuffle and preparation step for the reduce stage

### What changes were proposed in this pull request?

Summary of the changes made as part of this PR:

1. `DAGScheduler` changes to finalize a ShuffleMapStage which involves 
talking to all the shuffle mergers (`ExternalShuffleService`) and getting all 
the completed merge statuses.
2. Once the `ShuffleMapStage` finalization is complete, mark the 
`ShuffleMapStage` to be finalized which marks the stage as complete and 
subsequently letting the child stage start.
3. Also added the relevant tests to `DAGSchedulerSuite` for changes made as 
part of [SPARK-32919](https://issues.apache.org/jira/browse/SPARK-32919)

Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Venkata krishnan Sowrirajan vsowrirajanlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com

### Why are the changes needed?

Refer to [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit tests to DAGSchedulerSuite

Closes #30691 from venkata91/SPARK-32920.

Lead-authored-by: Venkata krishnan Sowrirajan 
Co-authored-by: Min Shen 
Co-authored-by: Chandni Singh 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../main/scala/org/apache/spark/Dependency.scala   |  38 ++
 .../scala/org/apache/spark/MapOutputTracker.scala  |  44 +-
 .../org/apache/spark/internal/config/package.scala |  23 +-
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 257 +---
 .../apache/spark/scheduler/DAGSchedulerEvent.scala |   6 +
 .../org/apache/spark/scheduler/StageInfo.scala |   2 +-
 ...g.apache.spark.scheduler.ExternalClusterManager |   1 +
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 448 -
 8 files changed, 747 insertions(+), 72 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala 
b/core/src/main/scala/org/apache/spark/Dependency.scala
index d21b9d9..0a9acf4 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor}
 import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.Utils
 
 /**
  * :: DeveloperApi ::
@@ -96,12 +97,31 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
   val shuffleHandle: ShuffleHandle = 
_rdd.context.env.shuffleManager.registerShuffle(
 shuffleId, this)
 
+  // By default, shuffle merge is enabled for ShuffleDependency if push based 
shuffle
+  // is enabled
+  private[this] var _shuffleMergeEnabled =
+Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf) &&
+// TODO: SPARK-35547: Push based shuffle is currently unsupported for 
Barrier stages
+!rdd.isBarrier()
+
+  private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): 
Unit = {
+_shuffleMergeEnabled = shuffleMergeEnabled
+  }
+
+  def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled
+
   /**
* Stores the location of the list of chosen external shuffle services for 
handling the
* shuffle merge requests from mappers in this shuffle map stage.
*/
   private[spark] var mergerLocs: Seq[BlockManagerId] = Nil
 
+  /**
+   * Stores the information about whether the shuffle merge is finalized for 
the shuffle map stage
+   * associated with this shuffle dependency
+   */
+  private[this] var _shuffleMergedFinalized: Boolean = false
+
   def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = {
 if (mergerLocs != null) {
   this.mergerLocs = mergerLocs
@@ -110,6 +130,24 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
 
   def getMergerLocs: Seq[BlockManagerId] = mergerLocs
 
+  private[spark] def markShuffleMergeFinalized(): Unit = {
+_shuffleMergedFinalized = true
+  }
+
+  /**
+   * Returns true if push-based shuffle is disabled for this stage or empty 
RDD,
+   * or if the shuffle merge for this stage is finalized, i.e. the shuffle 
merge
+   * results for all partitions are available.
+   */
+  def shuffleMergeFinalized: Boolean = {
+// Empty RDD 

[spark] branch branch-3.0 updated: [SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec in a task handles multiple partitions

2021-06-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 4fed690  [SPARK-35296][SQL] Allow Dataset.observe to work even if 
CollectMetricsExec in a task handles multiple partitions
4fed690 is described below

commit 4fed690e0190403200ea820887e93e0f7f0aa693
Author: Kousuke Saruta 
AuthorDate: Fri Jun 11 01:20:35 2021 +0800

[SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec 
in a task handles multiple partitions

### What changes were proposed in this pull request?

This PR fixes an issue that `Dataset.observe` doesn't work if 
`CollectMetricsExec` in a task handles multiple partitions.
If `coalesce` follows `observe` and the number of partitions shrinks after 
`coalesce`, `CollectMetricsExec` can handle multiple partitions in a task.

### Why are the changes needed?

The current implementation of `CollectMetricsExec` doesn't consider the 
case it can handle multiple partitions.
Because new `updater` is created for each partition even though those 
partitions belong to the same task, `collector.setState(updater)` raise an 
assertion error.
This is a simple reproducible example.
```
$ bin/spark-shell --master "local[1]"
scala> spark.range(1, 4, 1, 3).observe("my_event", 
count($"id").as("count_val")).coalesce(2).collect
```
```
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at 
org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
at 
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125)
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #32786 from sarutak/fix-collectmetricsexec.

Authored-by: Kousuke Saruta 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 44b695fbb06b0d89783b4838941c68543c5a5c8b)
Signed-off-by: Wenchen Fan 
---
 .../sql/execution/AggregatingAccumulator.scala | 16 +-
 .../spark/sql/execution/CollectMetricsExec.scala   |  6 +++-
 .../spark/sql/util/DataFrameCallbackSuite.scala| 34 ++
 3 files changed, 48 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
index 9807b5d..68a3604 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
@@ -33,7 +33,7 @@ class AggregatingAccumulator private(
 bufferSchema: Seq[DataType],
 initialValues: Seq[Expression],
 updateExpressions: Seq[Expression],
-@transient private val mergeExpressions: Seq[Expression],
+mergeExpressions: Seq[Expression],
 @transient private val resultExpressions: Seq[Expression],
 imperatives: Array[ImperativeAggregate],
 typedImperatives: Array[TypedImperativeAggregate[_]],
@@ -95,13 +95,14 @@ class AggregatingAccumulator private(
 
   /**
* Driver side operations like `merge` and `value` are executed in the 
DAGScheduler thread. This
-   * thread does not have a SQL configuration so we attach our own here. Note 
that we can't (and
-   * shouldn't) call `merge` or `value` on an accumulator originating from an 
executor so we just
-   * return a default value here.
+   * thread does not have a SQL configuration so we attach our own here.
*/
-  private[this] def withSQLConf[T](default: => T)(body: => T): T = {
+  private[this] def withSQLConf[T](canRunOnExecutor: Boolean, default: => 
T)(body: => T): T = {
 if (conf != null) {
+  // When we can reach here, we are on the driver side.
   SQLConf.withExistingConf(conf)(body)
+} else if (canRunOnExecutor) {
+  body
 } else {
   default
 }
@@ -147,7 +148,8 @@ class AggregatingAccumulator private(
 }
   }
 
-  override def merge(other: AccumulatorV2[InternalRow, InternalRow]): Unit = 
withSQLConf(()) {

[spark] branch branch-3.1 updated: [SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec in a task handles multiple partitions

2021-06-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 0d4a2e6  [SPARK-35296][SQL] Allow Dataset.observe to work even if 
CollectMetricsExec in a task handles multiple partitions
0d4a2e6 is described below

commit 0d4a2e67677cea4e144ab14480f321068fc00961
Author: Kousuke Saruta 
AuthorDate: Fri Jun 11 01:20:35 2021 +0800

[SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec 
in a task handles multiple partitions

### What changes were proposed in this pull request?

This PR fixes an issue that `Dataset.observe` doesn't work if 
`CollectMetricsExec` in a task handles multiple partitions.
If `coalesce` follows `observe` and the number of partitions shrinks after 
`coalesce`, `CollectMetricsExec` can handle multiple partitions in a task.

### Why are the changes needed?

The current implementation of `CollectMetricsExec` doesn't consider the 
case it can handle multiple partitions.
Because new `updater` is created for each partition even though those 
partitions belong to the same task, `collector.setState(updater)` raise an 
assertion error.
This is a simple reproducible example.
```
$ bin/spark-shell --master "local[1]"
scala> spark.range(1, 4, 1, 3).observe("my_event", 
count($"id").as("count_val")).coalesce(2).collect
```
```
java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:208)
at 
org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204)
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72)
at 
org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71)
at 
org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125)
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124)
at 
org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124)
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137)
at 
org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #32786 from sarutak/fix-collectmetricsexec.

Authored-by: Kousuke Saruta 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 44b695fbb06b0d89783b4838941c68543c5a5c8b)
Signed-off-by: Wenchen Fan 
---
 .../sql/execution/AggregatingAccumulator.scala | 16 +-
 .../spark/sql/execution/CollectMetricsExec.scala   |  6 +++-
 .../spark/sql/util/DataFrameCallbackSuite.scala| 34 ++
 3 files changed, 48 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
index 94e159c..0fa4e6c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala
@@ -33,7 +33,7 @@ class AggregatingAccumulator private(
 bufferSchema: Seq[DataType],
 initialValues: Seq[Expression],
 updateExpressions: Seq[Expression],
-@transient private val mergeExpressions: Seq[Expression],
+mergeExpressions: Seq[Expression],
 @transient private val resultExpressions: Seq[Expression],
 imperatives: Array[ImperativeAggregate],
 typedImperatives: Array[TypedImperativeAggregate[_]],
@@ -95,13 +95,14 @@ class AggregatingAccumulator private(
 
   /**
* Driver side operations like `merge` and `value` are executed in the 
DAGScheduler thread. This
-   * thread does not have a SQL configuration so we attach our own here. Note 
that we can't (and
-   * shouldn't) call `merge` or `value` on an accumulator originating from an 
executor so we just
-   * return a default value here.
+   * thread does not have a SQL configuration so we attach our own here.
*/
-  private[this] def withSQLConf[T](default: => T)(body: => T): T = {
+  private[this] def withSQLConf[T](canRunOnExecutor: Boolean, default: => 
T)(body: => T): T = {
 if (conf != null) {
+  // When we can reach here, we are on the driver side.
   SQLConf.withExistingConf(conf)(body)
+} else if (canRunOnExecutor) {
+  body
 } else {
   default
 }
@@ -147,7 +148,8 @@ class AggregatingAccumulator private(
 }
   }
 
-  override def merge(other: AccumulatorV2[InternalRow, InternalRow]): Unit = 
withSQLConf(()) {

[spark] branch master updated (e2e3fe7 -> 44b695f)

2021-06-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from e2e3fe7  [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path 
fails for Map with case classes as keys or values
 add 44b695f  [SPARK-35296][SQL] Allow Dataset.observe to work even if 
CollectMetricsExec in a task handles multiple partitions

No new revisions were added by this update.

Summary of changes:
 .../sql/execution/AggregatingAccumulator.scala | 16 +-
 .../spark/sql/execution/CollectMetricsExec.scala   |  6 +++-
 .../spark/sql/util/DataFrameCallbackSuite.scala| 34 ++
 3 files changed, 48 insertions(+), 8 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values

2021-06-10 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new f3ba9d9  [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path 
fails for Map with case classes as keys or values
f3ba9d9 is described below

commit f3ba9d9408352e6b0ac6d1fc02d4d9c3a91b5952
Author: Emil Ejbyfeldt 
AuthorDate: Thu Jun 10 09:37:27 2021 -0700

[SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map 
with case classes as keys or values

### What changes were proposed in this pull request?
Use the key/value LambdaFunction to convert the elements instead of
using CatalystTypeConverters.createToScalaConverter. This is how it is
done in MapObjects and that correctly handles Arrays with case classes.

### Why are the changes needed?
Before these changes the added test cases would fail with the following:
```
[info] - encode/decode for map with case class as value: Map(1 -> 
IntAndString(1,a)) (interpreted path) *** FAILED *** (64 milliseconds)
[info]   Encoded/Decoded data does not match input data
[info]
[info]   in:  Map(1 -> IntAndString(1,a))
[info]   out: Map(1 -> [1,a])
[info]   types: scala.collection.immutable.Map$Map1 [info]
[info]   Encoded Data: 
[org.apache.spark.sql.catalyst.expressions.UnsafeMapData5ecf5d9e]
[info]   Schema: value#823
[info]   root
[info]   -- value: map (nullable = true)
[info]   |-- key: integer
[info]   |-- value: struct (valueContainsNull = true)
[info]   ||-- i: integer (nullable = false)
[info]   ||-- s: string (nullable = true)
[info]
[info]
[info]   fromRow Expressions:
[info]   catalysttoexternalmap(lambdavariable(CatalystToExternalMap_key, 
IntegerType, false, 178), lambdavariable(CatalystToExternalMap_key, 
IntegerType, false, 178), lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179), 
if (isnull(lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) 
null else newInstance(class org.apache.spark.sql.catalyst.encoders [...]
[info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 
178)
[info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 
178)
[info]   :- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :- if (isnull(lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) 
null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString)
[info]   :  :- isnull(lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))
[info]   :  :  +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :  :- null
[info]   :  +- newInstance(class 
org.apache.spark.sql.catalyst.encoders.IntAndString)
[info]   : :- assertnotnull(lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i)
[info]   : :  +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i
[info]   : : +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   : +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 
179).s.toString
[info]   :+- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s
[info]   :   +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   +- input[0, map>, true] 
(ExpressionEncoderSuite.scala:627)
```
So using a map with cases classes for keys or values and using the 
interpreted path would incorrect deserialize data from the catalyst 
representation.

### Does this PR introduce _any_ user-facing change?
Yes, it fixes the bug.

### How was this patch tested?
Existing and new unit tests in the ExpressionEncoderSuite

Closes #32783 from 
eejbyfeldt/fix-interpreted-path-for-map-with-case-classes.

Authored-by: Emil Ejbyfeldt 
Signed-off-by: Liang-Chi Hsieh 
(cherry picked from commit e2e3fe77823387f6d4164eede05bf077b4235c87)
Signed-off-by: Liang-Chi Hsieh 
---
 .../spark/sql/catalyst/expressions/objects/objects.scala   | 14 

[spark] branch branch-3.1 updated: [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values

2021-06-10 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 70c322a  [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path 
fails for Map with case classes as keys or values
70c322a is described below

commit 70c322ad041511ded6e531d92ffc64c11bfdc378
Author: Emil Ejbyfeldt 
AuthorDate: Thu Jun 10 09:37:27 2021 -0700

[SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map 
with case classes as keys or values

### What changes were proposed in this pull request?
Use the key/value LambdaFunction to convert the elements instead of
using CatalystTypeConverters.createToScalaConverter. This is how it is
done in MapObjects and that correctly handles Arrays with case classes.

### Why are the changes needed?
Before these changes the added test cases would fail with the following:
```
[info] - encode/decode for map with case class as value: Map(1 -> 
IntAndString(1,a)) (interpreted path) *** FAILED *** (64 milliseconds)
[info]   Encoded/Decoded data does not match input data
[info]
[info]   in:  Map(1 -> IntAndString(1,a))
[info]   out: Map(1 -> [1,a])
[info]   types: scala.collection.immutable.Map$Map1 [info]
[info]   Encoded Data: 
[org.apache.spark.sql.catalyst.expressions.UnsafeMapData5ecf5d9e]
[info]   Schema: value#823
[info]   root
[info]   -- value: map (nullable = true)
[info]   |-- key: integer
[info]   |-- value: struct (valueContainsNull = true)
[info]   ||-- i: integer (nullable = false)
[info]   ||-- s: string (nullable = true)
[info]
[info]
[info]   fromRow Expressions:
[info]   catalysttoexternalmap(lambdavariable(CatalystToExternalMap_key, 
IntegerType, false, 178), lambdavariable(CatalystToExternalMap_key, 
IntegerType, false, 178), lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179), 
if (isnull(lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) 
null else newInstance(class org.apache.spark.sql.catalyst.encoders [...]
[info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 
178)
[info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 
178)
[info]   :- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :- if (isnull(lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) 
null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString)
[info]   :  :- isnull(lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))
[info]   :  :  +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :  :- null
[info]   :  +- newInstance(class 
org.apache.spark.sql.catalyst.encoders.IntAndString)
[info]   : :- assertnotnull(lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i)
[info]   : :  +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i
[info]   : : +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   : +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 
179).s.toString
[info]   :+- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s
[info]   :   +- lambdavariable(CatalystToExternalMap_value, 
StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   +- input[0, map>, true] 
(ExpressionEncoderSuite.scala:627)
```
So using a map with cases classes for keys or values and using the 
interpreted path would incorrect deserialize data from the catalyst 
representation.

### Does this PR introduce _any_ user-facing change?
Yes, it fixes the bug.

### How was this patch tested?
Existing and new unit tests in the ExpressionEncoderSuite

Closes #32783 from 
eejbyfeldt/fix-interpreted-path-for-map-with-case-classes.

Authored-by: Emil Ejbyfeldt 
Signed-off-by: Liang-Chi Hsieh 
(cherry picked from commit e2e3fe77823387f6d4164eede05bf077b4235c87)
Signed-off-by: Liang-Chi Hsieh 
---
 .../spark/sql/catalyst/expressions/objects/objects.scala   | 14 

[spark] branch master updated (4180692 -> e2e3fe7)

2021-06-10 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4180692  [SPARK-35711][SQL] Support casting of timestamp without time 
zone to timestamp type
 add e2e3fe7  [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path 
fails for Map with case classes as keys or values

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/expressions/objects/objects.scala   | 14 ++
 .../sql/catalyst/encoders/ExpressionEncoderSuite.scala |  5 +
 2 files changed, 11 insertions(+), 8 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (88f1d82 -> 4180692)

2021-06-10 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 88f1d82  [SPARK-34524][SQL][FOLLOWUP] Remove unused 
checkAlterTablePartition in CheckAnalysis.scala
 add 4180692  [SPARK-35711][SQL] Support casting of timestamp without time 
zone to timestamp type

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/catalyst/expressions/Cast.scala  |  5 
 .../spark/sql/catalyst/expressions/CastSuite.scala | 32 ++
 2 files changed, 26 insertions(+), 11 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (5280f02 -> 88f1d82)

2021-06-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 5280f02  [SPARK-35673][SQL] Fix user-defined hint and unrecognized 
hint in subquery
 add 88f1d82  [SPARK-34524][SQL][FOLLOWUP] Remove unused 
checkAlterTablePartition in CheckAnalysis.scala

No new revisions were added by this update.

Summary of changes:
 .../sql/catalyst/analysis/CheckAnalysis.scala  | 22 +-
 1 file changed, 1 insertion(+), 21 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated: [SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery

2021-06-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new c8d0bd0  [SPARK-35673][SQL] Fix user-defined hint and unrecognized 
hint in subquery
c8d0bd0 is described below

commit c8d0bd07fc34861a802f176c35a3beb8f8b8d375
Author: Fu Chen 
AuthorDate: Thu Jun 10 15:32:10 2021 +0800

[SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery

Use `UnresolvedHint.resolved = child.resolved` instead 
`UnresolvedHint.resolved = false`, then the plan contains `UnresolvedHint` 
child can be optimized by rule in batch `Resolution`.

For instance, before this pr, the following plan can't be optimized by 
`ResolveReferences`.
```
!'Project [*]
 +- SubqueryAlias __auto_generated_subquery_name
+- UnresolvedHint use_hash
   +- Project [42 AS 42#10]
  +- OneRowRelation
```

fix hint in subquery bug

No.

New test.

Closes #32841 from cfmcgrady/SPARK-35673.

Authored-by: Fu Chen 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 5280f02747eed9849e4a64562d38aee11e21616f)
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/analysis/CheckAnalysis.scala  |  3 +++
 .../spark/sql/catalyst/plans/logical/hints.scala   |  5 -
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 16 +
 .../spark/sql/SparkSessionExtensionSuite.scala | 26 ++
 4 files changed, 49 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 2887967..fe12dd4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -105,6 +105,9 @@ trait CheckAnalysis extends PredicateHelper {
   case u: UnresolvedRelation =>
 u.failAnalysis(s"Table or view not found: 
${u.multipartIdentifier.quoted}")
 
+  case u: UnresolvedHint =>
+u.failAnalysis(s"Hint not found: ${u.name}")
+
   case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) =>
 failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
index a325b61..1202d7d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
@@ -30,7 +30,10 @@ import org.apache.spark.util.Utils
 case class UnresolvedHint(name: String, parameters: Seq[Any], child: 
LogicalPlan)
   extends UnaryNode {
 
-  override lazy val resolved: Boolean = false
+  // we need it to be resolved so that the analyzer can continue to analyze 
the rest of the query
+  // plan.
+  override lazy val resolved: Boolean = child.resolved
+
   override def output: Seq[Attribute] = child.output
 }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index afc9780..348c282 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -722,4 +722,20 @@ class AnalysisErrorSuite extends AnalysisTest {
   assertAnalysisError(plan, s"Correlated column is not allowed in 
predicate ($msg)" :: Nil)
 }
   }
+
+  test("SPARK-35673: fail if the plan still contains UnresolvedHint after 
analysis") {
+val hintName = "some_random_hint_that_does_not_exist"
+val plan = UnresolvedHint(hintName, Seq.empty,
+  Project(Alias(Literal(1), "x")() :: Nil, OneRowRelation())
+)
+assert(plan.resolved)
+
+val error = intercept[AnalysisException] {
+  SimpleAnalyzer.checkAnalysis(plan)
+}
+assert(error.message.contains(s"Hint not found: ${hintName}"))
+
+// UnresolvedHint be removed by batch `Remove Unresolved Hints`
+assertAnalysisSuccess(plan, true)
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index e5e8bc6..b30d579 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -349,6 +349,32 @@ class SparkSessionExtensionSuite extends SparkFunSuite {
   

[spark] branch branch-3.1 updated: [SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery

2021-06-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new e0b074a  [SPARK-35673][SQL] Fix user-defined hint and unrecognized 
hint in subquery
e0b074a is described below

commit e0b074a915ed39f0e51a21936787943dea8ae39f
Author: Fu Chen 
AuthorDate: Thu Jun 10 15:32:10 2021 +0800

[SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery

Use `UnresolvedHint.resolved = child.resolved` instead 
`UnresolvedHint.resolved = false`, then the plan contains `UnresolvedHint` 
child can be optimized by rule in batch `Resolution`.

For instance, before this pr, the following plan can't be optimized by 
`ResolveReferences`.
```
!'Project [*]
 +- SubqueryAlias __auto_generated_subquery_name
+- UnresolvedHint use_hash
   +- Project [42 AS 42#10]
  +- OneRowRelation
```

fix hint in subquery bug

No.

New test.

Closes #32841 from cfmcgrady/SPARK-35673.

Authored-by: Fu Chen 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 5280f02747eed9849e4a64562d38aee11e21616f)
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/analysis/CheckAnalysis.scala  |  3 +++
 .../spark/sql/catalyst/plans/logical/hints.scala   |  5 -
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 16 +
 .../spark/sql/SparkSessionExtensionSuite.scala | 26 ++
 4 files changed, 49 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 3dfe7f4..84de9b9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -112,6 +112,9 @@ trait CheckAnalysis extends PredicateHelper {
   case u: UnresolvedRelation =>
 u.failAnalysis(s"Table or view not found: 
${u.multipartIdentifier.quoted}")
 
+  case u: UnresolvedHint =>
+u.failAnalysis(s"Hint not found: ${u.name}")
+
   case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _) =>
 failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
index 4b5e278..1b72d21 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
@@ -29,7 +29,10 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 case class UnresolvedHint(name: String, parameters: Seq[Any], child: 
LogicalPlan)
   extends UnaryNode {
 
-  override lazy val resolved: Boolean = false
+  // we need it to be resolved so that the analyzer can continue to analyze 
the rest of the query
+  // plan.
+  override lazy val resolved: Boolean = child.resolved
+
   override def output: Seq[Attribute] = child.output
 }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 20ba9c5..d14c221 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -724,4 +724,20 @@ class AnalysisErrorSuite extends AnalysisTest {
   assertAnalysisError(plan, s"Correlated column is not allowed in 
predicate ($msg)" :: Nil)
 }
   }
+
+  test("SPARK-35673: fail if the plan still contains UnresolvedHint after 
analysis") {
+val hintName = "some_random_hint_that_does_not_exist"
+val plan = UnresolvedHint(hintName, Seq.empty,
+  Project(Alias(Literal(1), "x")() :: Nil, OneRowRelation())
+)
+assert(plan.resolved)
+
+val error = intercept[AnalysisException] {
+  SimpleAnalyzer.checkAnalysis(plan)
+}
+assert(error.message.contains(s"Hint not found: ${hintName}"))
+
+// UnresolvedHint be removed by batch `Remove Unresolved Hints`
+assertAnalysisSuccess(plan, true)
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 35d2513..03514d85 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -355,6 +355,32 @@ class SparkSessionExtensionSuite extends 

[spark] branch master updated (cadd3a0 -> 5280f02)

2021-06-10 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from cadd3a0  [SPARK-35474] Enable disallow_untyped_defs mypy check for 
pyspark.pandas.indexing
 add 5280f02  [SPARK-35673][SQL] Fix user-defined hint and unrecognized 
hint in subquery

No new revisions were added by this update.

Summary of changes:
 .../sql/catalyst/analysis/CheckAnalysis.scala  |  3 +++
 .../spark/sql/catalyst/plans/logical/hints.scala   |  5 -
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 16 +
 .../spark/sql/SparkSessionExtensionSuite.scala | 26 ++
 4 files changed, 49 insertions(+), 1 deletion(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org