[spark] branch master updated: [SPARK-39394][DOCS][SS] Improve PySpark Structured Streaming page more readable

2022-06-06 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b30a080398b [SPARK-39394][DOCS][SS] Improve PySpark Structured 
Streaming page more readable
b30a080398b is described below

commit b30a080398ba1092093ea3bbd62bdb3a3ce8de03
Author: itholic 
AuthorDate: Tue Jun 7 14:27:20 2022 +0900

[SPARK-39394][DOCS][SS] Improve PySpark Structured Streaming page more 
readable

### What changes were proposed in this pull request?

This PR proposes to improve the PySpark Structured Streaming API reference 
page to be more readable,

So far, the PySpark Structured Streaming API reference page is not-well 
organized so it's a bit uncomfortable to be read as below:

![Screen Shot 2022-06-07 at 12 29 33 
PM](https://user-images.githubusercontent.com/44108233/172289683-0c130b6a-7716-40a3-b22b-42e38febe8c7.png)

### Why are the changes needed?

The improvement of document readability will also improve the usability for 
PySpark Structured Streaming.

### Does this PR introduce _any_ user-facing change?

Yes, now the documentation is categorized by its class or their own purpose 
more clearly as below:

![Screen Shot 2022-06-07 at 12 30 01 
PM](https://user-images.githubusercontent.com/44108233/172289737-bd6ebf0e-601c-4a80-a16a-cf885302e7b6.png)

### How was this patch tested?

The existing doc build in CI should cover.

Closes #36782 from itholic/SPARK-39394.

Authored-by: itholic 
Signed-off-by: Hyukjin Kwon 
---
 python/docs/source/reference/index.rst |  2 +-
 .../{index.rst => pyspark.ss/core_classes.rst} | 27 +---
 .../source/reference/{ => pyspark.ss}/index.rst| 23 --
 .../{pyspark.ss.rst => pyspark.ss/io.rst}  | 50 ++
 .../query_management.rst}  | 50 +-
 5 files changed, 25 insertions(+), 127 deletions(-)

diff --git a/python/docs/source/reference/index.rst 
b/python/docs/source/reference/index.rst
index b16c614d34c..2f316924405 100644
--- a/python/docs/source/reference/index.rst
+++ b/python/docs/source/reference/index.rst
@@ -29,7 +29,7 @@ Pandas API on Spark follows the API specifications of latest 
pandas release.
 
pyspark.sql/index
pyspark.pandas/index
-   pyspark.ss
+   pyspark.ss/index
pyspark.ml
pyspark.streaming
pyspark.mllib
diff --git a/python/docs/source/reference/index.rst 
b/python/docs/source/reference/pyspark.ss/core_classes.rst
similarity index 68%
copy from python/docs/source/reference/index.rst
copy to python/docs/source/reference/pyspark.ss/core_classes.rst
index b16c614d34c..10c2211ef1d 100644
--- a/python/docs/source/reference/index.rst
+++ b/python/docs/source/reference/pyspark.ss/core_classes.rst
@@ -16,22 +16,17 @@
 under the License.
 
 
-=
-API Reference
-=
+
+Core Classes
+
 
-This page lists an overview of all public PySpark modules, classes, functions 
and methods.
+.. currentmodule:: pyspark.sql.streaming
 
-Pandas API on Spark follows the API specifications of latest pandas release.
+.. autosummary::
+:toctree: api/
 
-.. toctree::
-   :maxdepth: 2
-
-   pyspark.sql/index
-   pyspark.pandas/index
-   pyspark.ss
-   pyspark.ml
-   pyspark.streaming
-   pyspark.mllib
-   pyspark
-   pyspark.resource
+DataStreamReader
+DataStreamWriter
+StreamingQuery
+StreamingQueryManager
+StreamingQueryListener
diff --git a/python/docs/source/reference/index.rst 
b/python/docs/source/reference/pyspark.ss/index.rst
similarity index 69%
copy from python/docs/source/reference/index.rst
copy to python/docs/source/reference/pyspark.ss/index.rst
index b16c614d34c..2cb0b1216ef 100644
--- a/python/docs/source/reference/index.rst
+++ b/python/docs/source/reference/pyspark.ss/index.rst
@@ -16,22 +16,15 @@
 under the License.
 
 
-=
-API Reference
-=
+
+Structured Streaming
+
 
-This page lists an overview of all public PySpark modules, classes, functions 
and methods.
-
-Pandas API on Spark follows the API specifications of latest pandas release.
+This page gives an overview of all public Structed Streaming API.
 
 .. toctree::
-   :maxdepth: 2
+:maxdepth: 2
 
-   pyspark.sql/index
-   pyspark.pandas/index
-   pyspark.ss
-   pyspark.ml
-   pyspark.streaming
-   pyspark.mllib
-   pyspark
-   pyspark.resource
+core_classes
+io
+query_management
diff --git a/python/docs/source/reference/pyspark.ss.rst 
b/python/docs/source/reference/pyspark.ss/io.rst
similarity index 59%
copy from python/docs/source/reference/pyspark.ss.rst
copy to python/docs/source/reference/pyspark.ss/io.rst
index d55d46b9139..da476fb6fac 100644
--- 

[spark] branch master updated: [SPARK-39390][CORE] Hide and optimize `viewAcls`/`viewAclsGroups`/`modifyAcls`/`modifyAclsGroups` from INFO log

2022-06-06 Thread huaxingao
This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 63f0f91b3f5 [SPARK-39390][CORE] Hide and optimize 
`viewAcls`/`viewAclsGroups`/`modifyAcls`/`modifyAclsGroups` from INFO log
63f0f91b3f5 is described below

commit 63f0f91b3f5c5d1dee9236824027bd978192a9ff
Author: Qian.Sun 
AuthorDate: Mon Jun 6 21:21:45 2022 -0700

[SPARK-39390][CORE] Hide and optimize 
`viewAcls`/`viewAclsGroups`/`modifyAcls`/`modifyAclsGroups` from INFO log

### What changes were proposed in this pull request?

This PR aims to hide and optimize 
`viewAcls`/`viewAclsGroups`/`modifyAcls`/`modifyAclsGroups` from INFO log.

### Why are the changes needed?

* In case of empty Set, `Set()`, there is no much information to users.
* In case of non-empty Set, `Set(root)`, there is poor reading experience 
to users.
```scala
2022-06-02 22:02:48.328 - stderr> 22/06/03 05:02:48 INFO SecurityManager: 
SecurityManager: authentication
disabled; ui acls disabled; users  with view permissions: Set(root); groups 
with view permissions: Set();
users  with modify permissions: Set(root); groups with modify permissions: 
Set()
```
### Does this PR introduce _any_ user-facing change?

This is a INFO log only change.

### How was this patch tested?

Manually.

**BEFORE**

```scala
2022-06-02 22:02:48.328 - stderr> 22/06/03 05:02:48 INFO SecurityManager: 
SecurityManager: authentication
disabled; ui acls disabled; users  with view permissions: Set(root); groups 
with view permissions: Set();
users  with modify permissions: Set(root); groups with modify permissions: 
Set()
```
**AFTER**
```scala
2022-06-02 22:02:48.328 - stderr> 22/06/03 05:02:48 INFO SecurityManager: 
SecurityManager: authentication
disabled; ui acls disabled; users  with view permissions: root; groups with 
view permissions: EMPTY;
users  with modify permissions: root; groups with modify permissions: root, 
spark
```

Closes #36777 from dcoliversun/SPARK-39390.

Authored-by: Qian.Sun 
Signed-off-by: huaxingao 
---
 core/src/main/scala/org/apache/spark/SecurityManager.scala | 12 
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index f11176cc233..7e72ae8d89e 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -87,10 +87,14 @@ private[spark] class SecurityManager(
   private var secretKey: String = _
   logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else 
"disabled") +
 "; ui acls " + (if (aclsOn) "enabled" else "disabled") +
-"; users  with view permissions: " + viewAcls.toString() +
-"; groups with view permissions: " + viewAclsGroups.toString() +
-"; users  with modify permissions: " + modifyAcls.toString() +
-"; groups with modify permissions: " + modifyAclsGroups.toString())
+"; users with view permissions: " +
+(if (viewAcls.nonEmpty) viewAcls.mkString(", ") else "EMPTY") +
+"; groups with view permissions: " +
+(if (viewAclsGroups.nonEmpty) viewAclsGroups.mkString(", ") else "EMPTY") +
+"; users with modify permissions: " +
+(if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY") +
+"; groups with modify permissions: " +
+(if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else 
"EMPTY"))
 
   private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
   // the default SSL configuration - it will be used by all communication 
layers unless overwritten


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] HeartSaVioR commented on pull request #391: [MINOR] Fix typo in streaming page

2022-06-06 Thread GitBox


HeartSaVioR commented on PR #391:
URL: https://github.com/apache/spark-website/pull/391#issuecomment-1148147340

   Thanks for the quick review and merge!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark-website] branch asf-site updated: [MINOR] Fix typo in streaming page

2022-06-06 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/spark-website.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 5b1de5b85 [MINOR] Fix typo in streaming page
5b1de5b85 is described below

commit 5b1de5b85aab4200c079bb7d677ce34bfa8c9429
Author: Jungtaek Lim 
AuthorDate: Mon Jun 6 22:05:41 2022 -0500

[MINOR] Fix typo in streaming page

Ease to use -> Easy to use

https://user-images.githubusercontent.com/1317309/172282889-40d0c007-5115-43a7-a23c-812388a96c9b.png;>

Author: Jungtaek Lim 

Closes #391 from HeartSaVioR/WIP-fix-typo-streaming.
---
 site/streaming/index.html | 2 +-
 streaming/index.md| 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/site/streaming/index.html b/site/streaming/index.html
index 2f0f9ad1c..3a176fa11 100644
--- a/site/streaming/index.html
+++ b/site/streaming/index.html
@@ -130,7 +130,7 @@
 
 
   
-Ease to use
+Easy to use
 
   Spark Structured Streaming abstracts away complex streaming concepts 
such as incremental processing, checkpointing, and watermarks 
   so that you can build streaming applications and pipelines without 
learning any new concepts or tools.
diff --git a/streaming/index.md b/streaming/index.md
index 9e1d63cf9..98d487969 100644
--- a/streaming/index.md
+++ b/streaming/index.md
@@ -15,7 +15,7 @@ subproject: Streaming
 
 
   
-Ease to use
+Easy to use
 
   Spark Structured Streaming abstracts away complex streaming concepts 
such as incremental processing, checkpointing, and watermarks 
   so that you can build streaming applications and pipelines without 
learning any new concepts or tools.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] srowen closed pull request #391: [MINOR] Fix typo in streaming page

2022-06-06 Thread GitBox


srowen closed pull request #391: [MINOR] Fix typo in streaming page
URL: https://github.com/apache/spark-website/pull/391


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] HeartSaVioR opened a new pull request, #391: [MINOR] Fix typo in streaming page

2022-06-06 Thread GitBox


HeartSaVioR opened a new pull request, #391:
URL: https://github.com/apache/spark-website/pull/391

   Ease to use -> Easy to use
   
   https://user-images.githubusercontent.com/1317309/172282889-40d0c007-5115-43a7-a23c-812388a96c9b.png;>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true

2022-06-06 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f80041fdfdd [SPARK-38987][SHUFFLE] Throw FetchFailedException when 
merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to 
true
f80041fdfdd is described below

commit f80041fdfddae66bead7a3950028ee04d1b60bd2
Author: Aravind Patnam 
AuthorDate: Mon Jun 6 17:07:36 2022 -0500

[SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle 
blocks are corrupted and spark.shuffle.detectCorrupt is set to true

### What changes were proposed in this pull request?
Adds the corruption exception handling for merged shuffle chunk when 
spark.shuffle.detectCorrupt is set to true(default value is true)

### Why are the changes needed?
Prior to Spark 3.0, spark.shuffle.detectCorrupt is set to true by default, 
and this configuration is one of the knob for early corruption detection. So 
the fallback can be triggered as expected.

After Spark 3.0, even though spark.shuffle.detectCorrupt is still set to 
true by default, but the early corruption detect knob is controlled with a new 
configuration spark.shuffle.detectCorrupt.useExtraMemory, and it set to false 
by default. Thus the default behavior, with only Magnet enabled after Spark 
3.2.0(internal li-3.1.1), will disable the early corruption detection, thus no 
fallback will be triggered. And it will drop to throw an exception when start 
to read the corrupted blocks.

We handle the corrupted stream for merged blocks by throwing a 
FetchFailedException in this case. This will trigger a retry based on the 
values of spark.shuffle.detectCorrupt.useExtraMemory and 
spark.shuffle.detectCorrupt.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Tested on internal cluster
- Added UT

This is a PR to tackle some of the build weirdness found in PR 36601 
(https://github.com/apache/spark/pull/36601).
It contains the exact same diff. Closed that one out and recreated it here.

Closes #36734 from akpatnam25/SPARK-38987.

Authored-by: Aravind Patnam 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  22 +++-
 .../storage/ShuffleBlockFetcherIterator.scala  |   3 +
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 115 -
 .../storage/ShuffleBlockFetcherIteratorSuite.scala |  28 +
 4 files changed, 163 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7d26d9e8d61..289296f6fdb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1885,6 +1885,16 @@ private[spark] class DAGScheduler(
   mapOutputTracker.
 unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))
 }
+  } else {
+// Unregister the merge result of  if there 
is a FetchFailed event
+// and is not a  MetaDataFetchException which is signified by 
bmAddress being null
+if (bmAddress != null &&
+  
bmAddress.executorId.equals(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)) {
+  assert(pushBasedShuffleEnabled, "Push based shuffle expected to 
" +
+"be enabled when handling merge block fetch failure.")
+  mapOutputTracker.
+unregisterMergeResult(shuffleId, reduceId, bmAddress, None)
+}
   }
 
   if (failedStage.rdd.isBarrier()) {
@@ -2449,7 +2459,15 @@ private[spark] class DAGScheduler(
 val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
 logDebug(s"Considering removal of executor $execId; " +
   s"fileLost: $fileLost, currentEpoch: $currentEpoch")
-if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) 
< currentEpoch) {
+// Check if the execId is a shuffle push merger. We do not remove the 
executor if it is,
+// and only remove the outputs on the host.
+val isShuffleMerger = 
execId.equals(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+if (isShuffleMerger && pushBasedShuffleEnabled) {
+  hostToUnregisterOutputs.foreach(
+host => blockManagerMaster.removeShufflePushMergerLocation(host))
+}
+if (!isShuffleMerger &&
+  (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) 
< currentEpoch)) {
   executorFailureEpoch(execId) = currentEpoch
   logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
   if (pushBasedShuffleEnabled) {
@@ -2461,6 +2479,8 @@ private[spark] class DAGScheduler(
 

[spark] branch master updated: [SPARK-39391][CORE] Reuse Partitioner classes

2022-06-06 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c548e593019 [SPARK-39391][CORE] Reuse Partitioner classes
c548e593019 is described below

commit c548e59301941a40ff2d07590645bcd24280a550
Author: Herman van Hovell 
AuthorDate: Mon Jun 6 15:50:50 2022 -0400

[SPARK-39391][CORE] Reuse Partitioner classes

### What changes were proposed in this pull request?
This PR creates two new `Partitioner` classes:
- `ConstantPartitioner`: This moves all tuples in a RDD into a single 
partition. This replaces two anonymous partitioners in `RDD` and 
`ShuffleExchangeExec`.
- `PartitionIdPassthrough`: This is a dummy partitioner that passes through 
keys when they already have been computed. This is actually not a new class, it 
was moved from `ShuffleRowRDD.scala` to core. This replaces two anonymous 
partitioners in `BlockMatrix` and `RDD`.

### Why are the changes needed?
Less code.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests.

Closes #36779 from hvanhovell/SPARK-39391.

Authored-by: Herman van Hovell 
Signed-off-by: Herman van Hovell 
---
 core/src/main/scala/org/apache/spark/Partitioner.scala   | 16 
 core/src/main/scala/org/apache/spark/rdd/RDD.scala   |  8 +---
 .../spark/mllib/linalg/distributed/BlockMatrix.scala |  8 +++-
 .../org/apache/spark/sql/execution/ShuffledRowRDD.scala  |  8 
 .../sql/execution/exchange/ShuffleExchangeExec.scala | 15 ---
 5 files changed, 24 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala 
b/core/src/main/scala/org/apache/spark/Partitioner.scala
index a0cba8ab13f..5dffba2ee8e 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -129,6 +129,22 @@ class HashPartitioner(partitions: Int) extends Partitioner 
{
   override def hashCode: Int = numPartitions
 }
 
+/**
+ * A dummy partitioner for use with records whose partition ids have been 
pre-computed (i.e. for
+ * use on RDDs of (Int, Row) pairs where the Int is a partition id in the 
expected range).
+ */
+private[spark] class PartitionIdPassthrough(override val numPartitions: Int) 
extends Partitioner {
+  override def getPartition(key: Any): Int = key.asInstanceOf[Int]
+}
+
+/**
+ * A [[org.apache.spark.Partitioner]] that partitions all records into a 
single partition.
+ */
+private[spark] class ConstantPartitioner extends Partitioner {
+  override def numPartitions: Int = 1
+  override def getPartition(key: Any): Int = 0
+}
+
 /**
  * A [[org.apache.spark.Partitioner]] that partitions sortable records by 
range into roughly
  * equal ranges. The ranges are determined by sampling the content of the RDD 
passed in.
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 89397b8aa69..b7284d25122 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1249,18 +1249,12 @@ abstract class RDD[T: ClassTag](
 }.foldByKey(zeroValue, new 
HashPartitioner(curNumPartitions))(cleanCombOp).values
   }
   if (finalAggregateOnExecutor && partiallyAggregated.partitions.length > 
1) {
-// define a new partitioner that results in only 1 partition
-val constantPartitioner = new Partitioner {
-  override def numPartitions: Int = 1
-
-  override def getPartition(key: Any): Int = 0
-}
 // map the partially aggregated rdd into a key-value rdd
 // do the computation in the single executor with one partition
 // get the new RDD[U]
 partiallyAggregated = partiallyAggregated
   .map(v => (0.toByte, v))
-  .foldByKey(zeroValue, constantPartitioner)(cleanCombOp)
+  .foldByKey(zeroValue, new ConstantPartitioner)(cleanCombOp)
   .values
   }
   val copiedZeroValue = Utils.clone(zeroValue, 
sc.env.closureSerializer.newInstance())
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index 452bbbe5f46..2b4333fe0fd 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.linalg.distributed
 import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM}
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.{Partitioner, SparkException}
+import 

[spark] branch branch-3.2 updated: [SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery alias from NATURAL/USING JOIN

2022-06-06 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new d9477ddb1a8 [SPARK-39376][SQL] Hide duplicated columns in star 
expansion of subquery alias from NATURAL/USING JOIN
d9477ddb1a8 is described below

commit d9477ddb1a805e3ff7640d682348fd5e780b3a80
Author: Karen Feng 
AuthorDate: Mon Jun 6 20:58:23 2022 +0800

[SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery 
alias from NATURAL/USING JOIN

### What changes were proposed in this pull request?

Follows up from https://github.com/apache/spark/pull/31666. This PR 
introduced a bug where the qualified star expansion of a subquery alias 
containing a NATURAL/USING output duplicated columns.

### Why are the changes needed?

Duplicated, hidden columns should not be output from a star expansion.

### Does this PR introduce _any_ user-facing change?

The query

```
val df1 = Seq((3, 8)).toDF("a", "b")
val df2 = Seq((8, 7)).toDF("b", "d")
val joinDF = df1.join(df2, "b")
joinDF.alias("r").select("r.*")
```

Now outputs a single column `b`, instead of two (duplicate) columns for `b`.

### How was this patch tested?

UTs

Closes #36763 from karenfeng/SPARK-39376.

Authored-by: Karen Feng 
Signed-off-by: Wenchen Fan 
---
 .../plans/logical/basicLogicalOperators.scala  |  3 ++-
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  | 22 ++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index c4a7ea2dcf0..010722c0349 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1303,7 +1303,8 @@ case class SubqueryAlias(
 
   override def metadataOutput: Seq[Attribute] = {
 val qualifierList = identifier.qualifier :+ alias
-child.metadataOutput.map(_.withQualifier(qualifierList))
+val nonHiddenMetadataOutput = 
child.metadataOutput.filter(!_.supportsQualifiedStar)
+nonHiddenMetadataOutput.map(_.withQualifier(qualifierList))
   }
 
   override def maxRows: Option[Long] = child.maxRows
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index a803fa88ed3..1fda13f996a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -499,4 +499,26 @@ class DataFrameJoinSuite extends QueryTest
   )
 }
   }
+
+  test("SPARK-39376: Hide duplicated columns in star expansion of subquery 
alias from USING JOIN") {
+val joinDf = testData2.as("testData2").join(
+  testData3.as("testData3"), usingColumns = Seq("a"), joinType = 
"fullouter")
+val equivalentQueries = Seq(
+  joinDf.select($"*"),
+  joinDf.as("r").select($"*"),
+  joinDf.as("r").select($"r.*")
+)
+equivalentQueries.foreach { query =>
+  checkAnswer(query,
+Seq(
+  Row(1, 1, null),
+  Row(1, 2, null),
+  Row(2, 1, 2),
+  Row(2, 2, 2),
+  Row(3, 1, null),
+  Row(3, 2, null)
+)
+  )
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery alias from NATURAL/USING JOIN

2022-06-06 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 18ca369f019 [SPARK-39376][SQL] Hide duplicated columns in star 
expansion of subquery alias from NATURAL/USING JOIN
18ca369f019 is described below

commit 18ca369f01905b421a658144e23b5a4e60702655
Author: Karen Feng 
AuthorDate: Mon Jun 6 20:58:23 2022 +0800

[SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery 
alias from NATURAL/USING JOIN

### What changes were proposed in this pull request?

Follows up from https://github.com/apache/spark/pull/31666. This PR 
introduced a bug where the qualified star expansion of a subquery alias 
containing a NATURAL/USING output duplicated columns.

### Why are the changes needed?

Duplicated, hidden columns should not be output from a star expansion.

### Does this PR introduce _any_ user-facing change?

The query

```
val df1 = Seq((3, 8)).toDF("a", "b")
val df2 = Seq((8, 7)).toDF("b", "d")
val joinDF = df1.join(df2, "b")
joinDF.alias("r").select("r.*")
```

Now outputs a single column `b`, instead of two (duplicate) columns for `b`.

### How was this patch tested?

UTs

Closes #36763 from karenfeng/SPARK-39376.

Authored-by: Karen Feng 
Signed-off-by: Wenchen Fan 
---
 .../plans/logical/basicLogicalOperators.scala  |  3 ++-
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  | 22 ++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index b4c6e19d0bc..677bdf27336 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1369,7 +1369,8 @@ case class SubqueryAlias(
 
   override def metadataOutput: Seq[Attribute] = {
 val qualifierList = identifier.qualifier :+ alias
-child.metadataOutput.map(_.withQualifier(qualifierList))
+val nonHiddenMetadataOutput = 
child.metadataOutput.filter(!_.supportsQualifiedStar)
+nonHiddenMetadataOutput.map(_.withQualifier(qualifierList))
   }
 
   override def maxRows: Option[Long] = child.maxRows
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 5286a70674e..de900fffb34 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -554,4 +554,26 @@ class DataFrameJoinSuite extends QueryTest
   )
 }
   }
+
+  test("SPARK-39376: Hide duplicated columns in star expansion of subquery 
alias from USING JOIN") {
+val joinDf = testData2.as("testData2").join(
+  testData3.as("testData3"), usingColumns = Seq("a"), joinType = 
"fullouter")
+val equivalentQueries = Seq(
+  joinDf.select($"*"),
+  joinDf.as("r").select($"*"),
+  joinDf.as("r").select($"r.*")
+)
+equivalentQueries.foreach { query =>
+  checkAnswer(query,
+Seq(
+  Row(1, 1, null),
+  Row(1, 2, null),
+  Row(2, 1, 2),
+  Row(2, 2, 2),
+  Row(3, 1, null),
+  Row(3, 2, null)
+)
+  )
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery alias from NATURAL/USING JOIN

2022-06-06 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 3b549f43094 [SPARK-39376][SQL] Hide duplicated columns in star 
expansion of subquery alias from NATURAL/USING JOIN
3b549f43094 is described below

commit 3b549f4309497ecbe9f0b7a20d22a9a4417abb8b
Author: Karen Feng 
AuthorDate: Mon Jun 6 20:58:23 2022 +0800

[SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery 
alias from NATURAL/USING JOIN

### What changes were proposed in this pull request?

Follows up from https://github.com/apache/spark/pull/31666. This PR 
introduced a bug where the qualified star expansion of a subquery alias 
containing a NATURAL/USING output duplicated columns.

### Why are the changes needed?

Duplicated, hidden columns should not be output from a star expansion.

### Does this PR introduce _any_ user-facing change?

The query

```
val df1 = Seq((3, 8)).toDF("a", "b")
val df2 = Seq((8, 7)).toDF("b", "d")
val joinDF = df1.join(df2, "b")
joinDF.alias("r").select("r.*")
```

Now outputs a single column `b`, instead of two (duplicate) columns for `b`.

### How was this patch tested?

UTs

Closes #36763 from karenfeng/SPARK-39376.

Authored-by: Karen Feng 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 18ca369f01905b421a658144e23b5a4e60702655)
Signed-off-by: Wenchen Fan 
---
 .../plans/logical/basicLogicalOperators.scala  |  3 ++-
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  | 22 ++
 2 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 692601be75d..774f6956162 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1328,7 +1328,8 @@ case class SubqueryAlias(
 
   override def metadataOutput: Seq[Attribute] = {
 val qualifierList = identifier.qualifier :+ alias
-child.metadataOutput.map(_.withQualifier(qualifierList))
+val nonHiddenMetadataOutput = 
child.metadataOutput.filter(!_.supportsQualifiedStar)
+nonHiddenMetadataOutput.map(_.withQualifier(qualifierList))
   }
 
   override def maxRows: Option[Long] = child.maxRows
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index a803fa88ed3..1fda13f996a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -499,4 +499,26 @@ class DataFrameJoinSuite extends QueryTest
   )
 }
   }
+
+  test("SPARK-39376: Hide duplicated columns in star expansion of subquery 
alias from USING JOIN") {
+val joinDf = testData2.as("testData2").join(
+  testData3.as("testData3"), usingColumns = Seq("a"), joinType = 
"fullouter")
+val equivalentQueries = Seq(
+  joinDf.select($"*"),
+  joinDf.as("r").select($"*"),
+  joinDf.as("r").select($"r.*")
+)
+equivalentQueries.foreach { query =>
+  checkAnswer(query,
+Seq(
+  Row(1, 1, null),
+  Row(1, 2, null),
+  Row(2, 1, 2),
+  Row(2, 2, 2),
+  Row(3, 1, null),
+  Row(3, 2, null)
+)
+  )
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-39387][BUILD] Upgrade hive-storage-api to 2.7.3

2022-06-06 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9e920782fd3 [SPARK-39387][BUILD] Upgrade hive-storage-api to 2.7.3
9e920782fd3 is described below

commit 9e920782fd34396dbdf31246d3b4a3c86c16f8f1
Author: sychen 
AuthorDate: Mon Jun 6 19:57:35 2022 +0900

[SPARK-39387][BUILD] Upgrade hive-storage-api to 2.7.3

### What changes were proposed in this pull request?
This PR aims to upgrade Apache Hive `hive-storage-api` library from 2.7.2 
to 2.7.3.

### Why are the changes needed?

[HIVE-25190](https://issues.apache.org/jira/browse/HIVE-25190): Fix many 
small allocations in BytesColumnVector

```scala
Caused by: java.lang.RuntimeException: Overflow of newLength. 
smallBuffer.length=1073741824, nextElemLength=408101
at 
org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector.increaseBufferSpace(BytesColumnVector.java:311)
at 
org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector.setVal(BytesColumnVector.java:182)
at 
org.apache.hadoop.hive.ql.io.orc.WriterImpl.setColumn(WriterImpl.java:179)
at 
org.apache.hadoop.hive.ql.io.orc.WriterImpl.setColumn(WriterImpl.java:268)
at 
org.apache.hadoop.hive.ql.io.orc.WriterImpl.setColumn(WriterImpl.java:223)
at 
org.apache.hadoop.hive.ql.io.orc.WriterImpl.addRow(WriterImpl.java:294)
at 
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.write(OrcOutputFormat.java:105)
at 
org.apache.spark.sql.hive.execution.HiveOutputWriter.write(HiveFileFormat.scala:157)
at 
org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:176)
at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithMetrics(FileFormatDataWriter.scala:86)
at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:93)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:312)
at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1534)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:319)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Production environment verification, SQL that fails to write to ORC can run 
successfully after upgrading the version.

Closes #36772 from cxzl25/SPARK-39387.

Authored-by: sychen 
Signed-off-by: Hyukjin Kwon 
---
 dev/deps/spark-deps-hadoop-2-hive-2.3 | 2 +-
 dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 +-
 pom.xml   | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 
b/dev/deps/spark-deps-hadoop-2-hive-2.3
index 6f9b068180f..02819f1f6c5 100644
--- a/dev/deps/spark-deps-hadoop-2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2-hive-2.3
@@ -102,7 +102,7 @@ hive-shims-0.23/2.3.9//hive-shims-0.23-2.3.9.jar
 hive-shims-common/2.3.9//hive-shims-common-2.3.9.jar
 hive-shims-scheduler/2.3.9//hive-shims-scheduler-2.3.9.jar
 hive-shims/2.3.9//hive-shims-2.3.9.jar
-hive-storage-api/2.7.2//hive-storage-api-2.7.2.jar
+hive-storage-api/2.7.3//hive-storage-api-2.7.3.jar
 hive-vector-code-gen/2.3.9//hive-vector-code-gen-2.3.9.jar
 hk2-api/2.6.1//hk2-api-2.6.1.jar
 hk2-locator/2.6.1//hk2-locator-2.6.1.jar
diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 
b/dev/deps/spark-deps-hadoop-3-hive-2.3
index 46559b1fa27..d8f8c2025fc 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -92,7 +92,7 @@ hive-shims-0.23/2.3.9//hive-shims-0.23-2.3.9.jar
 hive-shims-common/2.3.9//hive-shims-common-2.3.9.jar
 hive-shims-scheduler/2.3.9//hive-shims-scheduler-2.3.9.jar
 hive-shims/2.3.9//hive-shims-2.3.9.jar
-hive-storage-api/2.7.2//hive-storage-api-2.7.2.jar
+hive-storage-api/2.7.3//hive-storage-api-2.7.3.jar
 hive-vector-code-gen/2.3.9//hive-vector-code-gen-2.3.9.jar
 hk2-api/2.6.1//hk2-api-2.6.1.jar
 hk2-locator/2.6.1//hk2-locator-2.6.1.jar
diff --git a/pom.xml b/pom.xml
index ce7aa0d5d70..4bce557484b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -247,7 +247,7 @@
 -->
 compile
 compile
-2.7.2
+2.7.3
 compile
 compile
 compile


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org