Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
jinchengchenghh merged PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2050897070
##
gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution.ColumnarCollectLimitBaseExec
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
+
+class GlutenSQLCollectLimitExecSuite extends GlutenSQLTestsTrait {
Review Comment:
It seems we may need the 3.3 since older versions do not support the offset
API, the tests are slightly different depending on the offset support which was
added in 3.4 thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
zhztheplayer commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2050809795
##
gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution.ColumnarCollectLimitBaseExec
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
+
+class GlutenSQLCollectLimitExecSuite extends GlutenSQLTestsTrait {
Review Comment:
This is a big test file. Is it enough for us to only add this for the newest
Spark version (3.5)? Further maintenance can be make easier then.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2815668818 This should be fixed with the post transform rule, could you please take a look and help re-run the uts thanks! @jinchengchenghh @zhztheplayer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2815474381 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2815402825 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2815328308 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2814855467 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2812841638 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2812831707 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2812177965 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2800456943 > > removing the check seems to have broken tests. > > I suggest we figure out the reason of the test failures first. We'd make sure the operator outputs exactly the same data no matter it's offloaded or not. Otherwise it's a mismatch. > > What did the broken tests look like? Yes, if we offload with the R2C in between collectLimit and it's child, it changes the number of jobs with Gluten. The operator outputs exactly the same data in both ways. However, the current implementation only supports if the child is columnar to avoid R2C overhead and the failing UTs. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
jinchengchenghh commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2005277220
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -125,11 +199,21 @@ case class ColumnarCollectLimitExec(
if (childRDD.getNumPartitions == 1) childRDD
else shuffleLimitedPartitions(childRDD)
-processedRDD.mapPartitions(partition => collectLimitedRows(partition,
limit))
+processedRDD.mapPartitions(
+ partition => {
+val droppedRows = dropLimitedRows(partition, offset)
Review Comment:
We can add the argument `offset` to `collectLimitedRows`, just change it in
function `fetchNext`, it can make the function much easier, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2005547474
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -125,11 +199,21 @@ case class ColumnarCollectLimitExec(
if (childRDD.getNumPartitions == 1) childRDD
else shuffleLimitedPartitions(childRDD)
-processedRDD.mapPartitions(partition => collectLimitedRows(partition,
limit))
+processedRDD.mapPartitions(
+ partition => {
+val droppedRows = dropLimitedRows(partition, offset)
Review Comment:
Updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
zhztheplayer commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2778041193 > removing the check seems to have broken tests. I suggest we figure out the reason of the test failures first. We'd make sure the operator outputs exactly the same data no matter it's offloaded or not. Otherwise it's a mismatch. What did the broken tests look like? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2765999570 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2761953459 > > cc @zhztheplayer, removing the check seems to have broken tests. I have opened this #9166, and adding the check here so that we can move forward with the PR. Please let me know what you think thanks > > Do you mean you are incorporating a solution for #9166 in this PR? Would you help me locate the code? Thanks. Meant allowing using child to check for columnar execution and avoiding it in this PR. We can take up the custom rule in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
zhztheplayer commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2760743713 > cc @zhztheplayer, removing the check seems to have broken tests. I have opened this #9166, and adding the check here so that we can move forward with the PR. Please let me know what you think thanks Do you mean you are incorporating a solution for https://github.com/apache/incubator-gluten/issues/9166 in this PR? Would you help me locate the code? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2760671151 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2760462473 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2760389735 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2760212720 cc @zhztheplayer, removing the check seems to have broken tests. I have opened this https://github.com/apache/incubator-gluten/issues/9166, and adding the check here so that we can move forward with the PR. Please let me know what you think thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2017931919
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,101 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ assert(limit >= 0 || (limit == -1 && offset > 0))
Review Comment:
sure updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2760209609 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
jackylee-ch commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2017795655
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,101 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ assert(limit >= 0 || (limit == -1 && offset > 0))
Review Comment:
BTW, this's not a big problem and will not significantly increase time
consumption, just thought remove is better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2016534126
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,101 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ assert(limit >= 0 || (limit == -1 && offset > 0))
Review Comment:
Since the operator is replaced, the check is needed here, since this code
will execute. Else we will miss checking bad limits
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
jackylee-ch commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2016866987
##
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala:
##
@@ -344,9 +344,11 @@ object OffloadOthers {
child)
case plan: CollectLimitExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
+ val offset =
SparkShimLoader.getSparkShims.getCollectLimitOffset(plan)
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarCollectLimitExec(
plan.limit,
-plan.child
+plan.child,
+offset
Review Comment:
ok
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
jackylee-ch commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2016865890
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,101 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ assert(limit >= 0 || (limit == -1 && offset > 0))
Review Comment:
It is a static method that completes checks during the initialization of
`CollectLimitExec`, so we won't miss anything.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2016535508
##
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala:
##
@@ -344,9 +344,11 @@ object OffloadOthers {
child)
case plan: CollectLimitExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
+ val offset =
SparkShimLoader.getSparkShims.getCollectLimitOffset(plan)
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarCollectLimitExec(
plan.limit,
-plan.child
+plan.child,
+offset
Review Comment:
plan.offset is not available with spark 3.3/3.2 APIs. The offset is
conditionally provided through shims depending on the available and current
spark version
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2757954528 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
jackylee-ch commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2016395751
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,101 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ assert(limit >= 0 || (limit == -1 && offset > 0))
Review Comment:
nit: This should already checked in `CollectLimitExec`, we can remove this.
##
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala:
##
@@ -344,9 +344,11 @@ object OffloadOthers {
child)
case plan: CollectLimitExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
+ val offset =
SparkShimLoader.getSparkShims.getCollectLimitOffset(plan)
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarCollectLimitExec(
plan.limit,
-plan.child
+plan.child,
+offset
Review Comment:
we can directlly use `plan.offset` here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2757759308 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2757722353 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
zhztheplayer commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2757445113 > Please resolve the red CI > > ``` > ColumnarCollectLimitExec - basic limit test *** FAILED *** > 2025-03-25T20:14:08.3797544Z assertionCondition was false Operator ColumnarCollectLimitBaseExec not found in executed plan: > 2025-03-25T20:14:08.3798124ZList(ColumnarRange 0, 1000, 1, 2, 1000, [id#422926L] > 2025-03-25T20:14:08.3798481Z , ColumnarRange 0, 1000, 1, 2, 1000, [id#422926L] > 2025-03-25T20:14:08.3798784Z , ColumnarToRow > 2025-03-25T20:14:08.3799039Z +- ColumnarRange 0, 1000, 1, 2, 1000, [id#422926L] > 2025-03-25T20:14:08.3799350Z , CollectLimit 5 > 2025-03-25T20:14:08.3799876Z +- *(1) ColumnarToRow > 2025-03-25T20:14:08.3800143Z +- ColumnarRange 0, 1000, 1, 2, 1000, [id#422926L] > 2025-03-25T20:14:08.3800494Z ) (GlutenSQLCollectLimitExecSuite.scala:55) > ``` @ArnavBalyan I am opening https://github.com/apache/incubator-gluten/pull/9145 which may solve the CI error. Let's see if we can apply that one first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
jinchengchenghh commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2755041276 Please resolve the red CI ``` ColumnarCollectLimitExec - basic limit test *** FAILED *** 2025-03-25T20:14:08.3797544Z assertionCondition was false Operator ColumnarCollectLimitBaseExec not found in executed plan: 2025-03-25T20:14:08.3798124ZList(ColumnarRange 0, 1000, 1, 2, 1000, [id#422926L] 2025-03-25T20:14:08.3798481Z , ColumnarRange 0, 1000, 1, 2, 1000, [id#422926L] 2025-03-25T20:14:08.3798784Z , ColumnarToRow 2025-03-25T20:14:08.3799039Z +- ColumnarRange 0, 1000, 1, 2, 1000, [id#422926L] 2025-03-25T20:14:08.3799350Z , CollectLimit 5 2025-03-25T20:14:08.3799876Z +- *(1) ColumnarToRow 2025-03-25T20:14:08.3800143Z +- ColumnarRange 0, 1000, 1, 2, 1000, [id#422926L] 2025-03-25T20:14:08.3800494Z ) (GlutenSQLCollectLimitExecSuite.scala:55) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2755024613 cc @jinchengchenghh gentle reminder if all looks good thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2752111427 Could you please trigger the failed test, should be unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2750014868 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2748724970 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2010526842
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,102 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ assert(limit >= 0 || (limit == -1 && offset > 0))
override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+ private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+ private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .useSortBasedShuffle(outputPartitioning, child.output)
+
+ @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
+
+ @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+ .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+ readMetrics ++ writeMetrics
+
/**
- * Returns an iterator that yields up to `limit` rows in total from the
input partitionIter.
+ * Returns an iterator that gives offset to limit rows in total from the
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or
prune it if it
- * partially exceeds the remaining limit.
+ * partially exceeds the remaining limit/offset.
*/
- private def collectLimitedRows(
- partitionIter: Iterator[ColumnarBatch],
- limit: Int
- ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
- return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+ private def collectWithOffsetAndLimit(
+ inputIter: Iterator[ColumnarBatch],
+ offset: Int,
+ limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
- private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
private var nextBatch: Option[ColumnarBatch] = None
override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
}
override def next(): ColumnarBatch = {
-if (!hasNext) {
- throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches
available.")
val batch = nextBatch.get
nextBatch = None
batch
}
/**
- * Attempt to fetch the next batch from the underlying iterator if we
haven't yet hit the
- * limit. Returns true if we found a new batch, false otherwise.
+ * Advance the iterator until we find a batch (possibly sliced) that we
can return, or exhaust
+ * the input.
*/
- private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
- return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
- rowsCollected += currentBatchRowCount
- ColumnarBatches.retain(currentBatch)
- nextBatch = Some(currentBatch)
-} else {
- val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0,
remaining)
- rowsCollected += remaining
- nextBatch = Some(prunedBatch)
+ private def fetchNextBatch(): Boolean = {
+
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+ val batch = inputIter.next()
+ val batchSize = batch.numRows()
+
+ if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+ } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)
+
+val prunedBatch =
+ if (startIndex == 0 && needed == batchSize) {
+ColumnarBatches.retain(batch)
+batch
+ } else {
+val sliced = VeloxColumnarBatches.slice(batch, startIndex,
needed)
Review Comment:
Updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log o
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
jinchengchenghh commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2010521849
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,102 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ assert(limit >= 0 || (limit == -1 && offset > 0))
override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+ private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+ private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .useSortBasedShuffle(outputPartitioning, child.output)
+
+ @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
+
+ @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+ .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+ readMetrics ++ writeMetrics
+
/**
- * Returns an iterator that yields up to `limit` rows in total from the
input partitionIter.
+ * Returns an iterator that gives offset to limit rows in total from the
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or
prune it if it
- * partially exceeds the remaining limit.
+ * partially exceeds the remaining limit/offset.
*/
- private def collectLimitedRows(
- partitionIter: Iterator[ColumnarBatch],
- limit: Int
- ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
- return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+ private def collectWithOffsetAndLimit(
+ inputIter: Iterator[ColumnarBatch],
+ offset: Int,
+ limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
- private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
private var nextBatch: Option[ColumnarBatch] = None
override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
}
override def next(): ColumnarBatch = {
-if (!hasNext) {
- throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches
available.")
val batch = nextBatch.get
nextBatch = None
batch
}
/**
- * Attempt to fetch the next batch from the underlying iterator if we
haven't yet hit the
- * limit. Returns true if we found a new batch, false otherwise.
+ * Advance the iterator until we find a batch (possibly sliced) that we
can return, or exhaust
+ * the input.
*/
- private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
- return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
- rowsCollected += currentBatchRowCount
- ColumnarBatches.retain(currentBatch)
- nextBatch = Some(currentBatch)
-} else {
- val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0,
remaining)
- rowsCollected += remaining
- nextBatch = Some(prunedBatch)
+ private def fetchNextBatch(): Boolean = {
+
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+ val batch = inputIter.next()
+ val batchSize = batch.numRows()
+
+ if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+ } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)
+
+val prunedBatch =
+ if (startIndex == 0 && needed == batchSize) {
+ColumnarBatches.retain(batch)
+batch
+ } else {
+val sliced = VeloxColumnarBatches.slice(batch, startIndex,
needed)
Review Comment:
Don't need `val sliced`
--
This is an automated message from the Apache Git Service.
To respond to the m
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2748709643 cc @jinchengchenghh addressed all comments, can you please take a look thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2009753260
##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -67,7 +67,9 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with filter",
Array("3.2", "3.3")) {
+ testWithSpecifiedSparkVersion(
Review Comment:
updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2747773889 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2009974027
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,94 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ assert(limit >= 0 || (limit == -1 && offset > 0))
override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+ private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+ private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .useSortBasedShuffle(outputPartitioning, child.output)
+
+ @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
+
+ @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+ .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+ readMetrics ++ writeMetrics
+
/**
- * Returns an iterator that yields up to `limit` rows in total from the
input partitionIter.
+ * Returns an iterator that gives offset to limit rows in total from the
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or
prune it if it
- * partially exceeds the remaining limit.
+ * partially exceeds the remaining limit/offset.
*/
- private def collectLimitedRows(
- partitionIter: Iterator[ColumnarBatch],
- limit: Int
- ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
- return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+ private def collectWithOffsetAndLimit(
+ inputIter: Iterator[ColumnarBatch],
+ offset: Int,
+ limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
- private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
private var nextBatch: Option[ColumnarBatch] = None
override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
}
override def next(): ColumnarBatch = {
-if (!hasNext) {
- throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches
available.")
val batch = nextBatch.get
nextBatch = None
batch
}
/**
- * Attempt to fetch the next batch from the underlying iterator if we
haven't yet hit the
- * limit. Returns true if we found a new batch, false otherwise.
+ * Advance the iterator until we find a batch (possibly sliced)
+ * that we can return, or exhaust the input.
*/
- private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
- return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
- rowsCollected += currentBatchRowCount
- ColumnarBatches.retain(currentBatch)
- nextBatch = Some(currentBatch)
-} else {
- val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0,
remaining)
- rowsCollected += remaining
- nextBatch = Some(prunedBatch)
+ private def fetchNextBatch(): Boolean = {
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+ val batch = inputIter.next()
+ val batchSize = batch.numRows()
+
+ if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+ } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2009916832
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,94 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ assert(limit >= 0 || (limit == -1 && offset > 0))
override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+ private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+ private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .useSortBasedShuffle(outputPartitioning, child.output)
+
+ @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
+
+ @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+ .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+ readMetrics ++ writeMetrics
+
/**
- * Returns an iterator that yields up to `limit` rows in total from the
input partitionIter.
+ * Returns an iterator that gives offset to limit rows in total from the
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or
prune it if it
- * partially exceeds the remaining limit.
+ * partially exceeds the remaining limit/offset.
*/
- private def collectLimitedRows(
- partitionIter: Iterator[ColumnarBatch],
- limit: Int
- ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
- return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+ private def collectWithOffsetAndLimit(
+ inputIter: Iterator[ColumnarBatch],
+ offset: Int,
+ limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
- private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
private var nextBatch: Option[ColumnarBatch] = None
override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
}
override def next(): ColumnarBatch = {
-if (!hasNext) {
- throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches
available.")
val batch = nextBatch.get
nextBatch = None
batch
}
/**
- * Attempt to fetch the next batch from the underlying iterator if we
haven't yet hit the
- * limit. Returns true if we found a new batch, false otherwise.
+ * Advance the iterator until we find a batch (possibly sliced)
+ * that we can return, or exhaust the input.
*/
- private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
- return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
- rowsCollected += currentBatchRowCount
- ColumnarBatches.retain(currentBatch)
- nextBatch = Some(currentBatch)
-} else {
- val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0,
remaining)
- rowsCollected += remaining
- nextBatch = Some(prunedBatch)
+ private def fetchNextBatch(): Boolean = {
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+ val batch = inputIter.next()
+ val batchSize = batch.numRows()
+
+ if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+ } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)
Review Comment:
I see, you mean moving out this case to not slice, let me do the refactor
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
jinchengchenghh commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2009805093
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,94 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ assert(limit >= 0 || (limit == -1 && offset > 0))
override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+ private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+ private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .useSortBasedShuffle(outputPartitioning, child.output)
+
+ @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
+
+ @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+ .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+ readMetrics ++ writeMetrics
+
/**
- * Returns an iterator that yields up to `limit` rows in total from the
input partitionIter.
+ * Returns an iterator that gives offset to limit rows in total from the
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or
prune it if it
- * partially exceeds the remaining limit.
+ * partially exceeds the remaining limit/offset.
*/
- private def collectLimitedRows(
- partitionIter: Iterator[ColumnarBatch],
- limit: Int
- ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
- return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+ private def collectWithOffsetAndLimit(
+ inputIter: Iterator[ColumnarBatch],
+ offset: Int,
+ limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
- private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
private var nextBatch: Option[ColumnarBatch] = None
override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
}
override def next(): ColumnarBatch = {
-if (!hasNext) {
- throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches
available.")
val batch = nextBatch.get
nextBatch = None
batch
}
/**
- * Attempt to fetch the next batch from the underlying iterator if we
haven't yet hit the
- * limit. Returns true if we found a new batch, false otherwise.
+ * Advance the iterator until we find a batch (possibly sliced)
+ * that we can return, or exhaust the input.
*/
- private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
- return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
- rowsCollected += currentBatchRowCount
- ColumnarBatches.retain(currentBatch)
- nextBatch = Some(currentBatch)
-} else {
- val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0,
remaining)
- rowsCollected += remaining
- nextBatch = Some(prunedBatch)
+ private def fetchNextBatch(): Boolean = {
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+ val batch = inputIter.next()
+ val batchSize = batch.numRows()
+
+ if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+ } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)
Review Comment:
So we don't need to do the slice in that case, slice batch is the total
batch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscr...@glut
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2009752841
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,94 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ assert(limit >= 0 || (limit == -1 && offset > 0))
override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+ private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+ private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .useSortBasedShuffle(outputPartitioning, child.output)
+
+ @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
+
+ @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+ .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+ readMetrics ++ writeMetrics
+
/**
- * Returns an iterator that yields up to `limit` rows in total from the
input partitionIter.
+ * Returns an iterator that gives offset to limit rows in total from the
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or
prune it if it
- * partially exceeds the remaining limit.
+ * partially exceeds the remaining limit/offset.
*/
- private def collectLimitedRows(
- partitionIter: Iterator[ColumnarBatch],
- limit: Int
- ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
- return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+ private def collectWithOffsetAndLimit(
+ inputIter: Iterator[ColumnarBatch],
+ offset: Int,
+ limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
- private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
private var nextBatch: Option[ColumnarBatch] = None
override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
}
override def next(): ColumnarBatch = {
-if (!hasNext) {
- throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches
available.")
val batch = nextBatch.get
nextBatch = None
batch
}
/**
- * Attempt to fetch the next batch from the underlying iterator if we
haven't yet hit the
- * limit. Returns true if we found a new batch, false otherwise.
+ * Advance the iterator until we find a batch (possibly sliced)
+ * that we can return, or exhaust the input.
*/
- private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
- return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
- rowsCollected += currentBatchRowCount
- ColumnarBatches.retain(currentBatch)
- nextBatch = Some(currentBatch)
-} else {
- val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0,
remaining)
- rowsCollected += remaining
- nextBatch = Some(prunedBatch)
+ private def fetchNextBatch(): Boolean = {
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+ val batch = inputIter.next()
+ val batchSize = batch.numRows()
+
+ if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+ } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)
Review Comment:
@jinchengchenghh does this address the comment?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about thi
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2745109618 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2008698522
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,94 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ assert(limit >= 0 || (limit == -1 && offset > 0))
override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+ private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+ private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .useSortBasedShuffle(outputPartitioning, child.output)
+
+ @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
+
+ @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+ .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+ readMetrics ++ writeMetrics
+
/**
- * Returns an iterator that yields up to `limit` rows in total from the
input partitionIter.
+ * Returns an iterator that gives offset to limit rows in total from the
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or
prune it if it
- * partially exceeds the remaining limit.
+ * partially exceeds the remaining limit/offset.
*/
- private def collectLimitedRows(
- partitionIter: Iterator[ColumnarBatch],
- limit: Int
- ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
- return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+ private def collectWithOffsetAndLimit(
+ inputIter: Iterator[ColumnarBatch],
+ offset: Int,
+ limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
- private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
private var nextBatch: Option[ColumnarBatch] = None
override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
}
override def next(): ColumnarBatch = {
-if (!hasNext) {
- throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches
available.")
val batch = nextBatch.get
nextBatch = None
batch
}
/**
- * Attempt to fetch the next batch from the underlying iterator if we
haven't yet hit the
- * limit. Returns true if we found a new batch, false otherwise.
+ * Advance the iterator until we find a batch (possibly sliced)
+ * that we can return, or exhaust the input.
*/
- private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
- return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
- rowsCollected += currentBatchRowCount
- ColumnarBatches.retain(currentBatch)
- nextBatch = Some(currentBatch)
-} else {
- val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0,
remaining)
- rowsCollected += remaining
- nextBatch = Some(prunedBatch)
+ private def fetchNextBatch(): Boolean = {
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+ val batch = inputIter.next()
+ val batchSize = batch.numRows()
+
+ if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+ } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)
Review Comment:
In that case, startIndex would be 0, and leftoverAfterSkip = batchSize,
leading to `val prunedBatch = VeloxColumnarBatches.slice(batch, 0, batchSize)`
Could you give some example of batch size with limit and offset for the
above case
--
This is an automated message from the Apache Git Service
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2740273391 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
jinchengchenghh commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2005650398
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,94 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
case class ColumnarCollectLimitExec(
limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+ assert(limit >= 0 || (limit == -1 && offset > 0))
override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
+ private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+ private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+ private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .useSortBasedShuffle(outputPartitioning, child.output)
+
+ @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+ .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
+
+ @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+ .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+ readMetrics ++ writeMetrics
+
/**
- * Returns an iterator that yields up to `limit` rows in total from the
input partitionIter.
+ * Returns an iterator that gives offset to limit rows in total from the
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or
prune it if it
- * partially exceeds the remaining limit.
+ * partially exceeds the remaining limit/offset.
*/
- private def collectLimitedRows(
- partitionIter: Iterator[ColumnarBatch],
- limit: Int
- ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
- return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+ private def collectWithOffsetAndLimit(
+ inputIter: Iterator[ColumnarBatch],
+ offset: Int,
+ limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
- private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
private var nextBatch: Option[ColumnarBatch] = None
override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
}
override def next(): ColumnarBatch = {
-if (!hasNext) {
- throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches
available.")
val batch = nextBatch.get
nextBatch = None
batch
}
/**
- * Attempt to fetch the next batch from the underlying iterator if we
haven't yet hit the
- * limit. Returns true if we found a new batch, false otherwise.
+ * Advance the iterator until we find a batch (possibly sliced)
+ * that we can return, or exhaust the input.
*/
- private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
- return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
- rowsCollected += currentBatchRowCount
- ColumnarBatches.retain(currentBatch)
- nextBatch = Some(currentBatch)
-} else {
- val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0,
remaining)
- rowsCollected += remaining
- nextBatch = Some(prunedBatch)
+ private def fetchNextBatch(): Boolean = {
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+ val batch = inputIter.next()
+ val batchSize = batch.numRows()
+
+ if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+ } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)
Review Comment:
if needed <= remaining, we still need this logic, may return the total batch
instead of sliced batch
```
f (currentBatchRowCount <= remaining) {
rowsCollected += currentBatchRowCount
ColumnarBatches.retain(currentBatch)
nextBatch = Some(currentBatch)
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2738566222 cc @jinchengchenghh addressed the comments could you please take a look thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2737549144 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2736515534 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2733367485 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2727562264 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r1997651894
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -125,11 +199,20 @@ case class ColumnarCollectLimitExec(
if (childRDD.getNumPartitions == 1) childRDD
else shuffleLimitedPartitions(childRDD)
-processedRDD.mapPartitions(partition => collectLimitedRows(partition,
limit))
+processedRDD.mapPartitions(
+ partition => {
+val droppedRows = dropLimitedRows(partition, offset)
+val adjustedLimit = Math.max(0, limit - offset)
+collectLimitedRows(droppedRows, adjustedLimit)
Review Comment:
Yes, however it would not preserve order, since the current implementation
closely matches Spark, and users may see unexpected ordering and failure across
UTs. This keeps it similar to Spark implementation and maintains similar order
as spark thanks
##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -58,7 +58,7 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
testWithSpecifiedSparkVersion(
Review Comment:
done thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r1997651674
##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -138,4 +140,34 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf,
checkMatch = true)
}
+
+ testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - offset test",
Array("3.4", "3.5")) {
Review Comment:
Added more tests to cover the above scenario, spark UTs should also help
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r1997651585
##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -138,4 +140,34 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf,
checkMatch = true)
}
+
+ testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - offset test",
Array("3.4", "3.5")) {
Review Comment:
For 3.3 it would fail at compile time since offset api is not available with
collectlimitexec for older versions
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2727517945 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
jinchengchenghh commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r1983013290
##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -138,4 +140,34 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf,
checkMatch = true)
}
+
+ testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - offset test",
Array("3.4", "3.5")) {
Review Comment:
What's the result for spark3.3? Is the result also correct but operator not
matched?
##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -58,7 +58,7 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
testWithSpecifiedSparkVersion(
Review Comment:
testWithSpecifiedSparkVersion -> test
##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -125,11 +199,20 @@ case class ColumnarCollectLimitExec(
if (childRDD.getNumPartitions == 1) childRDD
else shuffleLimitedPartitions(childRDD)
-processedRDD.mapPartitions(partition => collectLimitedRows(partition,
limit))
+processedRDD.mapPartitions(
+ partition => {
+val droppedRows = dropLimitedRows(partition, offset)
+val adjustedLimit = Math.max(0, limit - offset)
+collectLimitedRows(droppedRows, adjustedLimit)
Review Comment:
Can we enhance the collectLimitedRows, we can slice the input RowVector from
offset to adjustedLimit?
##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -67,7 +67,9 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
}
- testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with filter",
Array("3.2", "3.3")) {
+ testWithSpecifiedSparkVersion(
Review Comment:
ditto, so as others
##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -138,4 +140,34 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf,
checkMatch = true)
}
+
+ testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - offset test",
Array("3.4", "3.5")) {
Review Comment:
If that, please also add the result check for spark3.2 and spark3.3
##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -138,4 +140,34 @@ class GlutenSQLCollectLimitExecSuite extends
WholeStageTransformerSuite {
assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf,
checkMatch = true)
}
+
+ testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - offset test",
Array("3.4", "3.5")) {
Review Comment:
Please add the test to cover more code path, such as limit(12)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2701217641 Run Gluten Clickhouse CI on x86 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]
github-actions[bot] commented on PR #8914: URL: https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2701216908 https://github.com/apache/incubator-gluten/issues/8912 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
