[spark] branch master updated: [SPARK-39394][DOCS][SS] Improve PySpark Structured Streaming page more readable
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b30a080398b [SPARK-39394][DOCS][SS] Improve PySpark Structured Streaming page more readable b30a080398b is described below commit b30a080398ba1092093ea3bbd62bdb3a3ce8de03 Author: itholic AuthorDate: Tue Jun 7 14:27:20 2022 +0900 [SPARK-39394][DOCS][SS] Improve PySpark Structured Streaming page more readable ### What changes were proposed in this pull request? This PR proposes to improve the PySpark Structured Streaming API reference page to be more readable, So far, the PySpark Structured Streaming API reference page is not-well organized so it's a bit uncomfortable to be read as below: ![Screen Shot 2022-06-07 at 12 29 33 PM](https://user-images.githubusercontent.com/44108233/172289683-0c130b6a-7716-40a3-b22b-42e38febe8c7.png) ### Why are the changes needed? The improvement of document readability will also improve the usability for PySpark Structured Streaming. ### Does this PR introduce _any_ user-facing change? Yes, now the documentation is categorized by its class or their own purpose more clearly as below: ![Screen Shot 2022-06-07 at 12 30 01 PM](https://user-images.githubusercontent.com/44108233/172289737-bd6ebf0e-601c-4a80-a16a-cf885302e7b6.png) ### How was this patch tested? The existing doc build in CI should cover. Closes #36782 from itholic/SPARK-39394. Authored-by: itholic Signed-off-by: Hyukjin Kwon --- python/docs/source/reference/index.rst | 2 +- .../{index.rst => pyspark.ss/core_classes.rst} | 27 +--- .../source/reference/{ => pyspark.ss}/index.rst| 23 -- .../{pyspark.ss.rst => pyspark.ss/io.rst} | 50 ++ .../query_management.rst} | 50 +- 5 files changed, 25 insertions(+), 127 deletions(-) diff --git a/python/docs/source/reference/index.rst b/python/docs/source/reference/index.rst index b16c614d34c..2f316924405 100644 --- a/python/docs/source/reference/index.rst +++ b/python/docs/source/reference/index.rst @@ -29,7 +29,7 @@ Pandas API on Spark follows the API specifications of latest pandas release. pyspark.sql/index pyspark.pandas/index - pyspark.ss + pyspark.ss/index pyspark.ml pyspark.streaming pyspark.mllib diff --git a/python/docs/source/reference/index.rst b/python/docs/source/reference/pyspark.ss/core_classes.rst similarity index 68% copy from python/docs/source/reference/index.rst copy to python/docs/source/reference/pyspark.ss/core_classes.rst index b16c614d34c..10c2211ef1d 100644 --- a/python/docs/source/reference/index.rst +++ b/python/docs/source/reference/pyspark.ss/core_classes.rst @@ -16,22 +16,17 @@ under the License. -= -API Reference -= + +Core Classes + -This page lists an overview of all public PySpark modules, classes, functions and methods. +.. currentmodule:: pyspark.sql.streaming -Pandas API on Spark follows the API specifications of latest pandas release. +.. autosummary:: +:toctree: api/ -.. toctree:: - :maxdepth: 2 - - pyspark.sql/index - pyspark.pandas/index - pyspark.ss - pyspark.ml - pyspark.streaming - pyspark.mllib - pyspark - pyspark.resource +DataStreamReader +DataStreamWriter +StreamingQuery +StreamingQueryManager +StreamingQueryListener diff --git a/python/docs/source/reference/index.rst b/python/docs/source/reference/pyspark.ss/index.rst similarity index 69% copy from python/docs/source/reference/index.rst copy to python/docs/source/reference/pyspark.ss/index.rst index b16c614d34c..2cb0b1216ef 100644 --- a/python/docs/source/reference/index.rst +++ b/python/docs/source/reference/pyspark.ss/index.rst @@ -16,22 +16,15 @@ under the License. -= -API Reference -= + +Structured Streaming + -This page lists an overview of all public PySpark modules, classes, functions and methods. - -Pandas API on Spark follows the API specifications of latest pandas release. +This page gives an overview of all public Structed Streaming API. .. toctree:: - :maxdepth: 2 +:maxdepth: 2 - pyspark.sql/index - pyspark.pandas/index - pyspark.ss - pyspark.ml - pyspark.streaming - pyspark.mllib - pyspark - pyspark.resource +core_classes +io +query_management diff --git a/python/docs/source/reference/pyspark.ss.rst b/python/docs/source/reference/pyspark.ss/io.rst similarity index 59% copy from python/docs/source/reference/pyspark.ss.rst copy to python/docs/source/reference/pyspark.ss/io.rst index d55d46b9139..da476fb6fac 100644 ---
[spark] branch master updated: [SPARK-39390][CORE] Hide and optimize `viewAcls`/`viewAclsGroups`/`modifyAcls`/`modifyAclsGroups` from INFO log
This is an automated email from the ASF dual-hosted git repository. huaxingao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 63f0f91b3f5 [SPARK-39390][CORE] Hide and optimize `viewAcls`/`viewAclsGroups`/`modifyAcls`/`modifyAclsGroups` from INFO log 63f0f91b3f5 is described below commit 63f0f91b3f5c5d1dee9236824027bd978192a9ff Author: Qian.Sun AuthorDate: Mon Jun 6 21:21:45 2022 -0700 [SPARK-39390][CORE] Hide and optimize `viewAcls`/`viewAclsGroups`/`modifyAcls`/`modifyAclsGroups` from INFO log ### What changes were proposed in this pull request? This PR aims to hide and optimize `viewAcls`/`viewAclsGroups`/`modifyAcls`/`modifyAclsGroups` from INFO log. ### Why are the changes needed? * In case of empty Set, `Set()`, there is no much information to users. * In case of non-empty Set, `Set(root)`, there is poor reading experience to users. ```scala 2022-06-02 22:02:48.328 - stderr> 22/06/03 05:02:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() ``` ### Does this PR introduce _any_ user-facing change? This is a INFO log only change. ### How was this patch tested? Manually. **BEFORE** ```scala 2022-06-02 22:02:48.328 - stderr> 22/06/03 05:02:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() ``` **AFTER** ```scala 2022-06-02 22:02:48.328 - stderr> 22/06/03 05:02:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: root; groups with view permissions: EMPTY; users with modify permissions: root; groups with modify permissions: root, spark ``` Closes #36777 from dcoliversun/SPARK-39390. Authored-by: Qian.Sun Signed-off-by: huaxingao --- core/src/main/scala/org/apache/spark/SecurityManager.scala | 12 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index f11176cc233..7e72ae8d89e 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -87,10 +87,14 @@ private[spark] class SecurityManager( private var secretKey: String = _ logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") + "; ui acls " + (if (aclsOn) "enabled" else "disabled") + -"; users with view permissions: " + viewAcls.toString() + -"; groups with view permissions: " + viewAclsGroups.toString() + -"; users with modify permissions: " + modifyAcls.toString() + -"; groups with modify permissions: " + modifyAclsGroups.toString()) +"; users with view permissions: " + +(if (viewAcls.nonEmpty) viewAcls.mkString(", ") else "EMPTY") + +"; groups with view permissions: " + +(if (viewAclsGroups.nonEmpty) viewAclsGroups.mkString(", ") else "EMPTY") + +"; users with modify permissions: " + +(if (modifyAcls.nonEmpty) modifyAcls.mkString(", ") else "EMPTY") + +"; groups with modify permissions: " + +(if (modifyAclsGroups.nonEmpty) modifyAclsGroups.mkString(", ") else "EMPTY")) private val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) // the default SSL configuration - it will be used by all communication layers unless overwritten - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] [spark-website] HeartSaVioR commented on pull request #391: [MINOR] Fix typo in streaming page
HeartSaVioR commented on PR #391: URL: https://github.com/apache/spark-website/pull/391#issuecomment-1148147340 Thanks for the quick review and merge! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark-website] branch asf-site updated: [MINOR] Fix typo in streaming page
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/spark-website.git The following commit(s) were added to refs/heads/asf-site by this push: new 5b1de5b85 [MINOR] Fix typo in streaming page 5b1de5b85 is described below commit 5b1de5b85aab4200c079bb7d677ce34bfa8c9429 Author: Jungtaek Lim AuthorDate: Mon Jun 6 22:05:41 2022 -0500 [MINOR] Fix typo in streaming page Ease to use -> Easy to use https://user-images.githubusercontent.com/1317309/172282889-40d0c007-5115-43a7-a23c-812388a96c9b.png;> Author: Jungtaek Lim Closes #391 from HeartSaVioR/WIP-fix-typo-streaming. --- site/streaming/index.html | 2 +- streaming/index.md| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/site/streaming/index.html b/site/streaming/index.html index 2f0f9ad1c..3a176fa11 100644 --- a/site/streaming/index.html +++ b/site/streaming/index.html @@ -130,7 +130,7 @@ -Ease to use +Easy to use Spark Structured Streaming abstracts away complex streaming concepts such as incremental processing, checkpointing, and watermarks so that you can build streaming applications and pipelines without learning any new concepts or tools. diff --git a/streaming/index.md b/streaming/index.md index 9e1d63cf9..98d487969 100644 --- a/streaming/index.md +++ b/streaming/index.md @@ -15,7 +15,7 @@ subproject: Streaming -Ease to use +Easy to use Spark Structured Streaming abstracts away complex streaming concepts such as incremental processing, checkpointing, and watermarks so that you can build streaming applications and pipelines without learning any new concepts or tools. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] [spark-website] srowen closed pull request #391: [MINOR] Fix typo in streaming page
srowen closed pull request #391: [MINOR] Fix typo in streaming page URL: https://github.com/apache/spark-website/pull/391 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] [spark-website] HeartSaVioR opened a new pull request, #391: [MINOR] Fix typo in streaming page
HeartSaVioR opened a new pull request, #391: URL: https://github.com/apache/spark-website/pull/391 Ease to use -> Easy to use https://user-images.githubusercontent.com/1317309/172282889-40d0c007-5115-43a7-a23c-812388a96c9b.png;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f80041fdfdd [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true f80041fdfdd is described below commit f80041fdfddae66bead7a3950028ee04d1b60bd2 Author: Aravind Patnam AuthorDate: Mon Jun 6 17:07:36 2022 -0500 [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true ### What changes were proposed in this pull request? Adds the corruption exception handling for merged shuffle chunk when spark.shuffle.detectCorrupt is set to true(default value is true) ### Why are the changes needed? Prior to Spark 3.0, spark.shuffle.detectCorrupt is set to true by default, and this configuration is one of the knob for early corruption detection. So the fallback can be triggered as expected. After Spark 3.0, even though spark.shuffle.detectCorrupt is still set to true by default, but the early corruption detect knob is controlled with a new configuration spark.shuffle.detectCorrupt.useExtraMemory, and it set to false by default. Thus the default behavior, with only Magnet enabled after Spark 3.2.0(internal li-3.1.1), will disable the early corruption detection, thus no fallback will be triggered. And it will drop to throw an exception when start to read the corrupted blocks. We handle the corrupted stream for merged blocks by throwing a FetchFailedException in this case. This will trigger a retry based on the values of spark.shuffle.detectCorrupt.useExtraMemory and spark.shuffle.detectCorrupt. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Tested on internal cluster - Added UT This is a PR to tackle some of the build weirdness found in PR 36601 (https://github.com/apache/spark/pull/36601). It contains the exact same diff. Closed that one out and recreated it here. Closes #36734 from akpatnam25/SPARK-38987. Authored-by: Aravind Patnam Signed-off-by: Mridul Muralidharan gmail.com> --- .../org/apache/spark/scheduler/DAGScheduler.scala | 22 +++- .../storage/ShuffleBlockFetcherIterator.scala | 3 + .../apache/spark/scheduler/DAGSchedulerSuite.scala | 115 - .../storage/ShuffleBlockFetcherIteratorSuite.scala | 28 + 4 files changed, 163 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7d26d9e8d61..289296f6fdb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1885,6 +1885,16 @@ private[spark] class DAGScheduler( mapOutputTracker. unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) } + } else { +// Unregister the merge result of if there is a FetchFailed event +// and is not a MetaDataFetchException which is signified by bmAddress being null +if (bmAddress != null && + bmAddress.executorId.equals(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)) { + assert(pushBasedShuffleEnabled, "Push based shuffle expected to " + +"be enabled when handling merge block fetch failure.") + mapOutputTracker. +unregisterMergeResult(shuffleId, reduceId, bmAddress, None) +} } if (failedStage.rdd.isBarrier()) { @@ -2449,7 +2459,15 @@ private[spark] class DAGScheduler( val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) logDebug(s"Considering removal of executor $execId; " + s"fileLost: $fileLost, currentEpoch: $currentEpoch") -if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) { +// Check if the execId is a shuffle push merger. We do not remove the executor if it is, +// and only remove the outputs on the host. +val isShuffleMerger = execId.equals(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER) +if (isShuffleMerger && pushBasedShuffleEnabled) { + hostToUnregisterOutputs.foreach( +host => blockManagerMaster.removeShufflePushMergerLocation(host)) +} +if (!isShuffleMerger && + (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch)) { executorFailureEpoch(execId) = currentEpoch logInfo(s"Executor lost: $execId (epoch $currentEpoch)") if (pushBasedShuffleEnabled) { @@ -2461,6 +2479,8 @@ private[spark] class DAGScheduler(
[spark] branch master updated: [SPARK-39391][CORE] Reuse Partitioner classes
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c548e593019 [SPARK-39391][CORE] Reuse Partitioner classes c548e593019 is described below commit c548e59301941a40ff2d07590645bcd24280a550 Author: Herman van Hovell AuthorDate: Mon Jun 6 15:50:50 2022 -0400 [SPARK-39391][CORE] Reuse Partitioner classes ### What changes were proposed in this pull request? This PR creates two new `Partitioner` classes: - `ConstantPartitioner`: This moves all tuples in a RDD into a single partition. This replaces two anonymous partitioners in `RDD` and `ShuffleExchangeExec`. - `PartitionIdPassthrough`: This is a dummy partitioner that passes through keys when they already have been computed. This is actually not a new class, it was moved from `ShuffleRowRDD.scala` to core. This replaces two anonymous partitioners in `BlockMatrix` and `RDD`. ### Why are the changes needed? Less code. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #36779 from hvanhovell/SPARK-39391. Authored-by: Herman van Hovell Signed-off-by: Herman van Hovell --- core/src/main/scala/org/apache/spark/Partitioner.scala | 16 core/src/main/scala/org/apache/spark/rdd/RDD.scala | 8 +--- .../spark/mllib/linalg/distributed/BlockMatrix.scala | 8 +++- .../org/apache/spark/sql/execution/ShuffledRowRDD.scala | 8 .../sql/execution/exchange/ShuffleExchangeExec.scala | 15 --- 5 files changed, 24 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index a0cba8ab13f..5dffba2ee8e 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -129,6 +129,22 @@ class HashPartitioner(partitions: Int) extends Partitioner { override def hashCode: Int = numPartitions } +/** + * A dummy partitioner for use with records whose partition ids have been pre-computed (i.e. for + * use on RDDs of (Int, Row) pairs where the Int is a partition id in the expected range). + */ +private[spark] class PartitionIdPassthrough(override val numPartitions: Int) extends Partitioner { + override def getPartition(key: Any): Int = key.asInstanceOf[Int] +} + +/** + * A [[org.apache.spark.Partitioner]] that partitions all records into a single partition. + */ +private[spark] class ConstantPartitioner extends Partitioner { + override def numPartitions: Int = 1 + override def getPartition(key: Any): Int = 0 +} + /** * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly * equal ranges. The ranges are determined by sampling the content of the RDD passed in. diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 89397b8aa69..b7284d25122 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1249,18 +1249,12 @@ abstract class RDD[T: ClassTag]( }.foldByKey(zeroValue, new HashPartitioner(curNumPartitions))(cleanCombOp).values } if (finalAggregateOnExecutor && partiallyAggregated.partitions.length > 1) { -// define a new partitioner that results in only 1 partition -val constantPartitioner = new Partitioner { - override def numPartitions: Int = 1 - - override def getPartition(key: Any): Int = 0 -} // map the partially aggregated rdd into a key-value rdd // do the computation in the single executor with one partition // get the new RDD[U] partiallyAggregated = partiallyAggregated .map(v => (0.toByte, v)) - .foldByKey(zeroValue, constantPartitioner)(cleanCombOp) + .foldByKey(zeroValue, new ConstantPartitioner)(cleanCombOp) .values } val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 452bbbe5f46..2b4333fe0fd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.linalg.distributed import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Matrix => BM} import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{Partitioner, SparkException} +import
[spark] branch branch-3.2 updated: [SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery alias from NATURAL/USING JOIN
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new d9477ddb1a8 [SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery alias from NATURAL/USING JOIN d9477ddb1a8 is described below commit d9477ddb1a805e3ff7640d682348fd5e780b3a80 Author: Karen Feng AuthorDate: Mon Jun 6 20:58:23 2022 +0800 [SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery alias from NATURAL/USING JOIN ### What changes were proposed in this pull request? Follows up from https://github.com/apache/spark/pull/31666. This PR introduced a bug where the qualified star expansion of a subquery alias containing a NATURAL/USING output duplicated columns. ### Why are the changes needed? Duplicated, hidden columns should not be output from a star expansion. ### Does this PR introduce _any_ user-facing change? The query ``` val df1 = Seq((3, 8)).toDF("a", "b") val df2 = Seq((8, 7)).toDF("b", "d") val joinDF = df1.join(df2, "b") joinDF.alias("r").select("r.*") ``` Now outputs a single column `b`, instead of two (duplicate) columns for `b`. ### How was this patch tested? UTs Closes #36763 from karenfeng/SPARK-39376. Authored-by: Karen Feng Signed-off-by: Wenchen Fan --- .../plans/logical/basicLogicalOperators.scala | 3 ++- .../org/apache/spark/sql/DataFrameJoinSuite.scala | 22 ++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index c4a7ea2dcf0..010722c0349 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1303,7 +1303,8 @@ case class SubqueryAlias( override def metadataOutput: Seq[Attribute] = { val qualifierList = identifier.qualifier :+ alias -child.metadataOutput.map(_.withQualifier(qualifierList)) +val nonHiddenMetadataOutput = child.metadataOutput.filter(!_.supportsQualifiedStar) +nonHiddenMetadataOutput.map(_.withQualifier(qualifierList)) } override def maxRows: Option[Long] = child.maxRows diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index a803fa88ed3..1fda13f996a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -499,4 +499,26 @@ class DataFrameJoinSuite extends QueryTest ) } } + + test("SPARK-39376: Hide duplicated columns in star expansion of subquery alias from USING JOIN") { +val joinDf = testData2.as("testData2").join( + testData3.as("testData3"), usingColumns = Seq("a"), joinType = "fullouter") +val equivalentQueries = Seq( + joinDf.select($"*"), + joinDf.as("r").select($"*"), + joinDf.as("r").select($"r.*") +) +equivalentQueries.foreach { query => + checkAnswer(query, +Seq( + Row(1, 1, null), + Row(1, 2, null), + Row(2, 1, 2), + Row(2, 2, 2), + Row(3, 1, null), + Row(3, 2, null) +) + ) +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery alias from NATURAL/USING JOIN
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 18ca369f019 [SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery alias from NATURAL/USING JOIN 18ca369f019 is described below commit 18ca369f01905b421a658144e23b5a4e60702655 Author: Karen Feng AuthorDate: Mon Jun 6 20:58:23 2022 +0800 [SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery alias from NATURAL/USING JOIN ### What changes were proposed in this pull request? Follows up from https://github.com/apache/spark/pull/31666. This PR introduced a bug where the qualified star expansion of a subquery alias containing a NATURAL/USING output duplicated columns. ### Why are the changes needed? Duplicated, hidden columns should not be output from a star expansion. ### Does this PR introduce _any_ user-facing change? The query ``` val df1 = Seq((3, 8)).toDF("a", "b") val df2 = Seq((8, 7)).toDF("b", "d") val joinDF = df1.join(df2, "b") joinDF.alias("r").select("r.*") ``` Now outputs a single column `b`, instead of two (duplicate) columns for `b`. ### How was this patch tested? UTs Closes #36763 from karenfeng/SPARK-39376. Authored-by: Karen Feng Signed-off-by: Wenchen Fan --- .../plans/logical/basicLogicalOperators.scala | 3 ++- .../org/apache/spark/sql/DataFrameJoinSuite.scala | 22 ++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index b4c6e19d0bc..677bdf27336 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1369,7 +1369,8 @@ case class SubqueryAlias( override def metadataOutput: Seq[Attribute] = { val qualifierList = identifier.qualifier :+ alias -child.metadataOutput.map(_.withQualifier(qualifierList)) +val nonHiddenMetadataOutput = child.metadataOutput.filter(!_.supportsQualifiedStar) +nonHiddenMetadataOutput.map(_.withQualifier(qualifierList)) } override def maxRows: Option[Long] = child.maxRows diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 5286a70674e..de900fffb34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -554,4 +554,26 @@ class DataFrameJoinSuite extends QueryTest ) } } + + test("SPARK-39376: Hide duplicated columns in star expansion of subquery alias from USING JOIN") { +val joinDf = testData2.as("testData2").join( + testData3.as("testData3"), usingColumns = Seq("a"), joinType = "fullouter") +val equivalentQueries = Seq( + joinDf.select($"*"), + joinDf.as("r").select($"*"), + joinDf.as("r").select($"r.*") +) +equivalentQueries.foreach { query => + checkAnswer(query, +Seq( + Row(1, 1, null), + Row(1, 2, null), + Row(2, 1, 2), + Row(2, 2, 2), + Row(3, 1, null), + Row(3, 2, null) +) + ) +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery alias from NATURAL/USING JOIN
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 3b549f43094 [SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery alias from NATURAL/USING JOIN 3b549f43094 is described below commit 3b549f4309497ecbe9f0b7a20d22a9a4417abb8b Author: Karen Feng AuthorDate: Mon Jun 6 20:58:23 2022 +0800 [SPARK-39376][SQL] Hide duplicated columns in star expansion of subquery alias from NATURAL/USING JOIN ### What changes were proposed in this pull request? Follows up from https://github.com/apache/spark/pull/31666. This PR introduced a bug where the qualified star expansion of a subquery alias containing a NATURAL/USING output duplicated columns. ### Why are the changes needed? Duplicated, hidden columns should not be output from a star expansion. ### Does this PR introduce _any_ user-facing change? The query ``` val df1 = Seq((3, 8)).toDF("a", "b") val df2 = Seq((8, 7)).toDF("b", "d") val joinDF = df1.join(df2, "b") joinDF.alias("r").select("r.*") ``` Now outputs a single column `b`, instead of two (duplicate) columns for `b`. ### How was this patch tested? UTs Closes #36763 from karenfeng/SPARK-39376. Authored-by: Karen Feng Signed-off-by: Wenchen Fan (cherry picked from commit 18ca369f01905b421a658144e23b5a4e60702655) Signed-off-by: Wenchen Fan --- .../plans/logical/basicLogicalOperators.scala | 3 ++- .../org/apache/spark/sql/DataFrameJoinSuite.scala | 22 ++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 692601be75d..774f6956162 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1328,7 +1328,8 @@ case class SubqueryAlias( override def metadataOutput: Seq[Attribute] = { val qualifierList = identifier.qualifier :+ alias -child.metadataOutput.map(_.withQualifier(qualifierList)) +val nonHiddenMetadataOutput = child.metadataOutput.filter(!_.supportsQualifiedStar) +nonHiddenMetadataOutput.map(_.withQualifier(qualifierList)) } override def maxRows: Option[Long] = child.maxRows diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index a803fa88ed3..1fda13f996a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -499,4 +499,26 @@ class DataFrameJoinSuite extends QueryTest ) } } + + test("SPARK-39376: Hide duplicated columns in star expansion of subquery alias from USING JOIN") { +val joinDf = testData2.as("testData2").join( + testData3.as("testData3"), usingColumns = Seq("a"), joinType = "fullouter") +val equivalentQueries = Seq( + joinDf.select($"*"), + joinDf.as("r").select($"*"), + joinDf.as("r").select($"r.*") +) +equivalentQueries.foreach { query => + checkAnswer(query, +Seq( + Row(1, 1, null), + Row(1, 2, null), + Row(2, 1, 2), + Row(2, 2, 2), + Row(3, 1, null), + Row(3, 2, null) +) + ) +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-39387][BUILD] Upgrade hive-storage-api to 2.7.3
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9e920782fd3 [SPARK-39387][BUILD] Upgrade hive-storage-api to 2.7.3 9e920782fd3 is described below commit 9e920782fd34396dbdf31246d3b4a3c86c16f8f1 Author: sychen AuthorDate: Mon Jun 6 19:57:35 2022 +0900 [SPARK-39387][BUILD] Upgrade hive-storage-api to 2.7.3 ### What changes were proposed in this pull request? This PR aims to upgrade Apache Hive `hive-storage-api` library from 2.7.2 to 2.7.3. ### Why are the changes needed? [HIVE-25190](https://issues.apache.org/jira/browse/HIVE-25190): Fix many small allocations in BytesColumnVector ```scala Caused by: java.lang.RuntimeException: Overflow of newLength. smallBuffer.length=1073741824, nextElemLength=408101 at org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector.increaseBufferSpace(BytesColumnVector.java:311) at org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector.setVal(BytesColumnVector.java:182) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.setColumn(WriterImpl.java:179) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.setColumn(WriterImpl.java:268) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.setColumn(WriterImpl.java:223) at org.apache.hadoop.hive.ql.io.orc.WriterImpl.addRow(WriterImpl.java:294) at org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat$OrcRecordWriter.write(OrcOutputFormat.java:105) at org.apache.spark.sql.hive.execution.HiveOutputWriter.write(HiveFileFormat.scala:157) at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:176) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithMetrics(FileFormatDataWriter.scala:86) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:93) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:312) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1534) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:319) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Production environment verification, SQL that fails to write to ORC can run successfully after upgrading the version. Closes #36772 from cxzl25/SPARK-39387. Authored-by: sychen Signed-off-by: Hyukjin Kwon --- dev/deps/spark-deps-hadoop-2-hive-2.3 | 2 +- dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 +- pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index 6f9b068180f..02819f1f6c5 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -102,7 +102,7 @@ hive-shims-0.23/2.3.9//hive-shims-0.23-2.3.9.jar hive-shims-common/2.3.9//hive-shims-common-2.3.9.jar hive-shims-scheduler/2.3.9//hive-shims-scheduler-2.3.9.jar hive-shims/2.3.9//hive-shims-2.3.9.jar -hive-storage-api/2.7.2//hive-storage-api-2.7.2.jar +hive-storage-api/2.7.3//hive-storage-api-2.7.3.jar hive-vector-code-gen/2.3.9//hive-vector-code-gen-2.3.9.jar hk2-api/2.6.1//hk2-api-2.6.1.jar hk2-locator/2.6.1//hk2-locator-2.6.1.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 46559b1fa27..d8f8c2025fc 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -92,7 +92,7 @@ hive-shims-0.23/2.3.9//hive-shims-0.23-2.3.9.jar hive-shims-common/2.3.9//hive-shims-common-2.3.9.jar hive-shims-scheduler/2.3.9//hive-shims-scheduler-2.3.9.jar hive-shims/2.3.9//hive-shims-2.3.9.jar -hive-storage-api/2.7.2//hive-storage-api-2.7.2.jar +hive-storage-api/2.7.3//hive-storage-api-2.7.3.jar hive-vector-code-gen/2.3.9//hive-vector-code-gen-2.3.9.jar hk2-api/2.6.1//hk2-api-2.6.1.jar hk2-locator/2.6.1//hk2-locator-2.6.1.jar diff --git a/pom.xml b/pom.xml index ce7aa0d5d70..4bce557484b 100644 --- a/pom.xml +++ b/pom.xml @@ -247,7 +247,7 @@ --> compile compile -2.7.2 +2.7.3 compile compile compile - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org