This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ea3061beedf [SPARK-44340][SPARK-44341][SQL][PYTHON][FOLLOWUP] Set partition index correctly for WindowGroupLimitExec,WindowExec and WindowInPandasExec ea3061beedf is described below commit ea3061beedf7dc10f14e8de27d540dbcc5894fe7 Author: Jiaan Geng <belie...@163.com> AuthorDate: Mon Jul 31 13:53:32 2023 +0800 [SPARK-44340][SPARK-44341][SQL][PYTHON][FOLLOWUP] Set partition index correctly for WindowGroupLimitExec,WindowExec and WindowInPandasExec ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41899 and https://github.com/apache/spark/pull/41939, to set the partition index correctly even if it's not used for now. It also contains a few code cleanup. ### Why are the changes needed? future-proof ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? existing tests Closes #42208 from beliefer/SPARK-44340_followup. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/execution/python/WindowInPandasExec.scala | 8 +++----- .../scala/org/apache/spark/sql/execution/window/WindowExec.scala | 8 +++----- .../apache/spark/sql/execution/window/WindowGroupLimitExec.scala | 8 +++----- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala index ba1f2c132ff..ee0044162b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala @@ -80,24 +80,22 @@ case class WindowInPandasExec( ) protected override def doExecute(): RDD[InternalRow] = { - val spillSize = longMetric("spillSize") - val evaluatorFactory = new WindowInPandasEvaluatorFactory( windowExpression, partitionSpec, orderSpec, child.output, - spillSize, + longMetric("spillSize"), pythonMetrics) // Start processing. if (conf.usePartitionEvaluator) { child.execute().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.execute().mapPartitions { iter => + child.execute().mapPartitionsWithIndex { (index, rowIterator) => val evaluator = evaluatorFactory.createEvaluator() - evaluator.eval(0, iter) + evaluator.eval(index, rowIterator) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index 35e59aef94f..9ecd1c587a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -95,23 +95,21 @@ case class WindowExec( ) protected override def doExecute(): RDD[InternalRow] = { - val spillSize = longMetric("spillSize") - val evaluatorFactory = new WindowEvaluatorFactory( windowExpression, partitionSpec, orderSpec, child.output, - spillSize) + longMetric("spillSize")) // Start processing. if (conf.usePartitionEvaluator) { child.execute().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.execute().mapPartitions { iter => + child.execute().mapPartitionsWithIndex { (index, rowIterator) => val evaluator = evaluatorFactory.createEvaluator() - evaluator.eval(0, iter) + evaluator.eval(index, rowIterator) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala index 98969f60c2b..e975f3b219a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExec.scala @@ -72,8 +72,6 @@ case class WindowGroupLimitExec( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { - val numOutputRows = longMetric("numOutputRows") - val evaluatorFactory = new WindowGroupLimitEvaluatorFactory( partitionSpec, @@ -81,14 +79,14 @@ case class WindowGroupLimitExec( rankLikeFunction, limit, child.output, - numOutputRows) + longMetric("numOutputRows")) if (conf.usePartitionEvaluator) { child.execute().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.execute().mapPartitionsInternal { iter => + child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) => val evaluator = evaluatorFactory.createEvaluator() - evaluator.eval(0, iter) + evaluator.eval(index, rowIterator) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org