Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-18 Thread via GitHub


jinchengchenghh merged PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-18 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2050897070


##
gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution.ColumnarCollectLimitBaseExec
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
+
+class GlutenSQLCollectLimitExecSuite extends GlutenSQLTestsTrait {

Review Comment:
   It seems we may need the 3.3 since older versions do not support the offset 
API, the tests are slightly different depending on the offset support which was 
added in 3.4 thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-18 Thread via GitHub


zhztheplayer commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2050809795


##
gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution.ColumnarCollectLimitBaseExec
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
+
+class GlutenSQLCollectLimitExecSuite extends GlutenSQLTestsTrait {

Review Comment:
   This is a big test file. Is it enough for us to only add this for the newest 
Spark version (3.5)? Further maintenance can be make easier then. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-18 Thread via GitHub


ArnavBalyan commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2815668818

   This should be fixed with the post transform rule, could you please take a 
look and help re-run the uts thanks! @jinchengchenghh @zhztheplayer 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-18 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2815474381

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-18 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2815402825

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-18 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2815328308

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-18 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2814855467

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-17 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2812841638

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-17 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2812831707

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-17 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2812177965

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-13 Thread via GitHub


ArnavBalyan commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2800456943

   > > removing the check seems to have broken tests.
   > 
   > I suggest we figure out the reason of the test failures first. We'd make 
sure the operator outputs exactly the same data no matter it's offloaded or 
not. Otherwise it's a mismatch.
   > 
   > What did the broken tests look like?
   
   Yes, if we offload with the R2C in between collectLimit and it's child, it 
changes the number of jobs with Gluten. The operator outputs exactly the same 
data in both ways. However, the current implementation only supports if the 
child is columnar to avoid R2C overhead and the failing UTs. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-05 Thread via GitHub


jinchengchenghh commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2005277220


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -125,11 +199,21 @@ case class ColumnarCollectLimitExec(
   if (childRDD.getNumPartitions == 1) childRDD
   else shuffleLimitedPartitions(childRDD)
 
-processedRDD.mapPartitions(partition => collectLimitedRows(partition, 
limit))
+processedRDD.mapPartitions(
+  partition => {
+val droppedRows = dropLimitedRows(partition, offset)

Review Comment:
   We can add the argument `offset` to `collectLimitedRows`, just change it in 
function `fetchNext`, it can make the function much easier, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-04 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2005547474


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -125,11 +199,21 @@ case class ColumnarCollectLimitExec(
   if (childRDD.getNumPartitions == 1) childRDD
   else shuffleLimitedPartitions(childRDD)
 
-processedRDD.mapPartitions(partition => collectLimitedRows(partition, 
limit))
+processedRDD.mapPartitions(
+  partition => {
+val droppedRows = dropLimitedRows(partition, offset)

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-04-04 Thread via GitHub


zhztheplayer commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2778041193

   > removing the check seems to have broken tests. 
   
   I suggest we figure out the reason of the test failures first. We'd make 
sure the operator outputs exactly the same data no matter it's offloaded or 
not. Otherwise it's a mismatch.
   
   What did the broken tests look like?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-31 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2765999570

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-28 Thread via GitHub


ArnavBalyan commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2761953459

   > > cc @zhztheplayer, removing the check seems to have broken tests. I have 
opened this #9166, and adding the check here so that we can move forward with 
the PR. Please let me know what you think thanks
   > 
   > Do you mean you are incorporating a solution for #9166 in this PR? Would 
you help me locate the code? Thanks.
   
   Meant allowing using child to check for columnar execution and avoiding it 
in this PR. We can take up the custom rule in the future.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-28 Thread via GitHub


zhztheplayer commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2760743713

   > cc @zhztheplayer, removing the check seems to have broken tests. I have 
opened this #9166, and adding the check here so that we can move forward with 
the PR. Please let me know what you think thanks
   
   Do you mean you are incorporating a solution for 
https://github.com/apache/incubator-gluten/issues/9166 in this PR? Would you 
help me locate the code? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-28 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2760671151

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-28 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2760462473

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-28 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2760389735

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-27 Thread via GitHub


ArnavBalyan commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2760212720

   cc @zhztheplayer, removing the check seems to have broken tests. I have 
opened this https://github.com/apache/incubator-gluten/issues/9166, and adding 
the check here so that we can move forward with the PR. Please let me know what 
you think thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-27 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2017931919


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,101 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
 limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  assert(limit >= 0 || (limit == -1 && offset > 0))

Review Comment:
   sure updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-27 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2760209609

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-27 Thread via GitHub


jackylee-ch commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2017795655


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,101 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
 limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  assert(limit >= 0 || (limit == -1 && offset > 0))

Review Comment:
   BTW, this's not a big problem and will not significantly increase time 
consumption, just thought remove is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-27 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2016534126


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,101 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
 limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  assert(limit >= 0 || (limit == -1 && offset > 0))

Review Comment:
   Since the operator is replaced, the check is needed here, since this code 
will execute. Else we will miss checking bad limits



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-27 Thread via GitHub


jackylee-ch commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2016866987


##
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala:
##
@@ -344,9 +344,11 @@ object OffloadOthers {
 child)
 case plan: CollectLimitExec =>
   logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
+  val offset = 
SparkShimLoader.getSparkShims.getCollectLimitOffset(plan)
   
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarCollectLimitExec(
 plan.limit,
-plan.child
+plan.child,
+offset

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-27 Thread via GitHub


jackylee-ch commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2016865890


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,101 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
 limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  assert(limit >= 0 || (limit == -1 && offset > 0))

Review Comment:
   It is a static method that completes checks during the initialization of 
`CollectLimitExec`, so we won't miss anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-27 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2016535508


##
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala:
##
@@ -344,9 +344,11 @@ object OffloadOthers {
 child)
 case plan: CollectLimitExec =>
   logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
+  val offset = 
SparkShimLoader.getSparkShims.getCollectLimitOffset(plan)
   
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarCollectLimitExec(
 plan.limit,
-plan.child
+plan.child,
+offset

Review Comment:
   plan.offset is not available with spark 3.3/3.2 APIs. The offset is 
conditionally provided through shims depending on the available and current 
spark version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-27 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2757954528

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-27 Thread via GitHub


jackylee-ch commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2016395751


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,101 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
 limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  assert(limit >= 0 || (limit == -1 && offset > 0))

Review Comment:
   nit: This should already checked in `CollectLimitExec`, we can remove this.



##
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala:
##
@@ -344,9 +344,11 @@ object OffloadOthers {
 child)
 case plan: CollectLimitExec =>
   logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
+  val offset = 
SparkShimLoader.getSparkShims.getCollectLimitOffset(plan)
   
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarCollectLimitExec(
 plan.limit,
-plan.child
+plan.child,
+offset

Review Comment:
   we can directlly use `plan.offset` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-27 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2757759308

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-27 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2757722353

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-27 Thread via GitHub


zhztheplayer commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2757445113

   > Please resolve the red CI
   > 
   > ```
   > ColumnarCollectLimitExec - basic limit test *** FAILED ***
   > 2025-03-25T20:14:08.3797544Z   assertionCondition was false Operator 
ColumnarCollectLimitBaseExec not found in executed plan:
   > 2025-03-25T20:14:08.3798124ZList(ColumnarRange 0, 1000, 1, 2, 1000, 
[id#422926L]
   > 2025-03-25T20:14:08.3798481Z   , ColumnarRange 0, 1000, 1, 2, 1000, 
[id#422926L]
   > 2025-03-25T20:14:08.3798784Z   , ColumnarToRow
   > 2025-03-25T20:14:08.3799039Z   +- ColumnarRange 0, 1000, 1, 2, 1000, 
[id#422926L]
   > 2025-03-25T20:14:08.3799350Z   , CollectLimit 5
   > 2025-03-25T20:14:08.3799876Z   +- *(1) ColumnarToRow
   > 2025-03-25T20:14:08.3800143Z  +- ColumnarRange 0, 1000, 1, 2, 1000, 
[id#422926L]
   > 2025-03-25T20:14:08.3800494Z   ) (GlutenSQLCollectLimitExecSuite.scala:55)
   > ```
   
   @ArnavBalyan I am opening 
https://github.com/apache/incubator-gluten/pull/9145 which may solve the CI 
error. Let's see if we can apply that one first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-26 Thread via GitHub


jinchengchenghh commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2755041276

   Please resolve the red CI
   ```
   ColumnarCollectLimitExec - basic limit test *** FAILED ***
   2025-03-25T20:14:08.3797544Z   assertionCondition was false Operator 
ColumnarCollectLimitBaseExec not found in executed plan:
   2025-03-25T20:14:08.3798124ZList(ColumnarRange 0, 1000, 1, 2, 1000, 
[id#422926L]
   2025-03-25T20:14:08.3798481Z   , ColumnarRange 0, 1000, 1, 2, 1000, 
[id#422926L]
   2025-03-25T20:14:08.3798784Z   , ColumnarToRow
   2025-03-25T20:14:08.3799039Z   +- ColumnarRange 0, 1000, 1, 2, 1000, 
[id#422926L]
   2025-03-25T20:14:08.3799350Z   , CollectLimit 5
   2025-03-25T20:14:08.3799876Z   +- *(1) ColumnarToRow
   2025-03-25T20:14:08.3800143Z  +- ColumnarRange 0, 1000, 1, 2, 1000, 
[id#422926L]
   2025-03-25T20:14:08.3800494Z   ) (GlutenSQLCollectLimitExecSuite.scala:55)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-26 Thread via GitHub


ArnavBalyan commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2755024613

   cc @jinchengchenghh gentle reminder if all looks good thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-25 Thread via GitHub


ArnavBalyan commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2752111427

   Could you please trigger the failed test, should be unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-24 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2750014868

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-24 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2748724970

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-24 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2010526842


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,102 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
 limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  assert(limit >= 0 || (limit == -1 && offset > 0))
 
   override def batchType(): Convention.BatchType =
 BackendsApiManager.getSettings.primaryBatchType
 
+  private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+  private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+  private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .useSortBasedShuffle(outputPartitioning, child.output)
+
+  @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
+
+  @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+  .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+  readMetrics ++ writeMetrics
+
   /**
-   * Returns an iterator that yields up to `limit` rows in total from the 
input partitionIter.
+   * Returns an iterator that gives offset to limit rows in total from the 
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or 
prune it if it
-   * partially exceeds the remaining limit.
+   * partially exceeds the remaining limit/offset.
*/
-  private def collectLimitedRows(
-  partitionIter: Iterator[ColumnarBatch],
-  limit: Int
-  ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
-  return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+  private def collectWithOffsetAndLimit(
+  inputIter: Iterator[ColumnarBatch],
+  offset: Int,
+  limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
 
-  private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
   private var nextBatch: Option[ColumnarBatch] = None
 
   override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
   }
 
   override def next(): ColumnarBatch = {
-if (!hasNext) {
-  throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches 
available.")
 val batch = nextBatch.get
 nextBatch = None
 batch
   }
 
   /**
-   * Attempt to fetch the next batch from the underlying iterator if we 
haven't yet hit the
-   * limit. Returns true if we found a new batch, false otherwise.
+   * Advance the iterator until we find a batch (possibly sliced) that we 
can return, or exhaust
+   * the input.
*/
-  private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
-  return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
-  rowsCollected += currentBatchRowCount
-  ColumnarBatches.retain(currentBatch)
-  nextBatch = Some(currentBatch)
-} else {
-  val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0, 
remaining)
-  rowsCollected += remaining
-  nextBatch = Some(prunedBatch)
+  private def fetchNextBatch(): Boolean = {
+
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+  val batch = inputIter.next()
+  val batchSize = batch.numRows()
+
+  if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+  } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)
+
+val prunedBatch =
+  if (startIndex == 0 && needed == batchSize) {
+ColumnarBatches.retain(batch)
+batch
+  } else {
+val sliced = VeloxColumnarBatches.slice(batch, startIndex, 
needed)

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log o

Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-24 Thread via GitHub


jinchengchenghh commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2010521849


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,102 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
 limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  assert(limit >= 0 || (limit == -1 && offset > 0))
 
   override def batchType(): Convention.BatchType =
 BackendsApiManager.getSettings.primaryBatchType
 
+  private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+  private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+  private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .useSortBasedShuffle(outputPartitioning, child.output)
+
+  @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
+
+  @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+  .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+  readMetrics ++ writeMetrics
+
   /**
-   * Returns an iterator that yields up to `limit` rows in total from the 
input partitionIter.
+   * Returns an iterator that gives offset to limit rows in total from the 
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or 
prune it if it
-   * partially exceeds the remaining limit.
+   * partially exceeds the remaining limit/offset.
*/
-  private def collectLimitedRows(
-  partitionIter: Iterator[ColumnarBatch],
-  limit: Int
-  ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
-  return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+  private def collectWithOffsetAndLimit(
+  inputIter: Iterator[ColumnarBatch],
+  offset: Int,
+  limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
 
-  private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
   private var nextBatch: Option[ColumnarBatch] = None
 
   override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
   }
 
   override def next(): ColumnarBatch = {
-if (!hasNext) {
-  throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches 
available.")
 val batch = nextBatch.get
 nextBatch = None
 batch
   }
 
   /**
-   * Attempt to fetch the next batch from the underlying iterator if we 
haven't yet hit the
-   * limit. Returns true if we found a new batch, false otherwise.
+   * Advance the iterator until we find a batch (possibly sliced) that we 
can return, or exhaust
+   * the input.
*/
-  private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
-  return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
-  rowsCollected += currentBatchRowCount
-  ColumnarBatches.retain(currentBatch)
-  nextBatch = Some(currentBatch)
-} else {
-  val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0, 
remaining)
-  rowsCollected += remaining
-  nextBatch = Some(prunedBatch)
+  private def fetchNextBatch(): Boolean = {
+
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+  val batch = inputIter.next()
+  val batchSize = batch.numRows()
+
+  if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+  } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)
+
+val prunedBatch =
+  if (startIndex == 0 && needed == batchSize) {
+ColumnarBatches.retain(batch)
+batch
+  } else {
+val sliced = VeloxColumnarBatches.slice(batch, startIndex, 
needed)

Review Comment:
   Don't need `val sliced`



-- 
This is an automated message from the Apache Git Service.
To respond to the m

Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-24 Thread via GitHub


ArnavBalyan commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2748709643

   cc @jinchengchenghh addressed all comments, can you please take a look 
thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-24 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2009753260


##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -67,7 +67,9 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
 assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with filter", 
Array("3.2", "3.3")) {
+  testWithSpecifiedSparkVersion(

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-24 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2747773889

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-24 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2009974027


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,94 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
 limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  assert(limit >= 0 || (limit == -1 && offset > 0))
 
   override def batchType(): Convention.BatchType =
 BackendsApiManager.getSettings.primaryBatchType
 
+  private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+  private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+  private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .useSortBasedShuffle(outputPartitioning, child.output)
+
+  @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
+
+  @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+  .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+  readMetrics ++ writeMetrics
+
   /**
-   * Returns an iterator that yields up to `limit` rows in total from the 
input partitionIter.
+   * Returns an iterator that gives offset to limit rows in total from the 
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or 
prune it if it
-   * partially exceeds the remaining limit.
+   * partially exceeds the remaining limit/offset.
*/
-  private def collectLimitedRows(
-  partitionIter: Iterator[ColumnarBatch],
-  limit: Int
-  ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
-  return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+  private def collectWithOffsetAndLimit(
+   inputIter: Iterator[ColumnarBatch],
+   offset: Int,
+   limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
 
-  private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
   private var nextBatch: Option[ColumnarBatch] = None
 
   override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
   }
 
   override def next(): ColumnarBatch = {
-if (!hasNext) {
-  throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches 
available.")
 val batch = nextBatch.get
 nextBatch = None
 batch
   }
 
   /**
-   * Attempt to fetch the next batch from the underlying iterator if we 
haven't yet hit the
-   * limit. Returns true if we found a new batch, false otherwise.
+   * Advance the iterator until we find a batch (possibly sliced)
+   * that we can return, or exhaust the input.
*/
-  private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
-  return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
-  rowsCollected += currentBatchRowCount
-  ColumnarBatches.retain(currentBatch)
-  nextBatch = Some(currentBatch)
-} else {
-  val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0, 
remaining)
-  rowsCollected += remaining
-  nextBatch = Some(prunedBatch)
+  private def fetchNextBatch(): Boolean = {
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+  val batch = inputIter.next()
+  val batchSize = batch.numRows()
+
+  if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+  } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at

Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-24 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2009916832


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,94 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
 limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  assert(limit >= 0 || (limit == -1 && offset > 0))
 
   override def batchType(): Convention.BatchType =
 BackendsApiManager.getSettings.primaryBatchType
 
+  private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+  private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+  private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .useSortBasedShuffle(outputPartitioning, child.output)
+
+  @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
+
+  @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+  .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+  readMetrics ++ writeMetrics
+
   /**
-   * Returns an iterator that yields up to `limit` rows in total from the 
input partitionIter.
+   * Returns an iterator that gives offset to limit rows in total from the 
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or 
prune it if it
-   * partially exceeds the remaining limit.
+   * partially exceeds the remaining limit/offset.
*/
-  private def collectLimitedRows(
-  partitionIter: Iterator[ColumnarBatch],
-  limit: Int
-  ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
-  return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+  private def collectWithOffsetAndLimit(
+   inputIter: Iterator[ColumnarBatch],
+   offset: Int,
+   limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
 
-  private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
   private var nextBatch: Option[ColumnarBatch] = None
 
   override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
   }
 
   override def next(): ColumnarBatch = {
-if (!hasNext) {
-  throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches 
available.")
 val batch = nextBatch.get
 nextBatch = None
 batch
   }
 
   /**
-   * Attempt to fetch the next batch from the underlying iterator if we 
haven't yet hit the
-   * limit. Returns true if we found a new batch, false otherwise.
+   * Advance the iterator until we find a batch (possibly sliced)
+   * that we can return, or exhaust the input.
*/
-  private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
-  return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
-  rowsCollected += currentBatchRowCount
-  ColumnarBatches.retain(currentBatch)
-  nextBatch = Some(currentBatch)
-} else {
-  val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0, 
remaining)
-  rowsCollected += remaining
-  nextBatch = Some(prunedBatch)
+  private def fetchNextBatch(): Boolean = {
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+  val batch = inputIter.next()
+  val batchSize = batch.numRows()
+
+  if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+  } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)

Review Comment:
   I see, you mean moving out this case to not slice, let me do the refactor 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-24 Thread via GitHub


jinchengchenghh commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2009805093


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,94 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
 limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  assert(limit >= 0 || (limit == -1 && offset > 0))
 
   override def batchType(): Convention.BatchType =
 BackendsApiManager.getSettings.primaryBatchType
 
+  private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+  private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+  private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .useSortBasedShuffle(outputPartitioning, child.output)
+
+  @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
+
+  @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+  .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+  readMetrics ++ writeMetrics
+
   /**
-   * Returns an iterator that yields up to `limit` rows in total from the 
input partitionIter.
+   * Returns an iterator that gives offset to limit rows in total from the 
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or 
prune it if it
-   * partially exceeds the remaining limit.
+   * partially exceeds the remaining limit/offset.
*/
-  private def collectLimitedRows(
-  partitionIter: Iterator[ColumnarBatch],
-  limit: Int
-  ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
-  return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+  private def collectWithOffsetAndLimit(
+   inputIter: Iterator[ColumnarBatch],
+   offset: Int,
+   limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
 
-  private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
   private var nextBatch: Option[ColumnarBatch] = None
 
   override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
   }
 
   override def next(): ColumnarBatch = {
-if (!hasNext) {
-  throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches 
available.")
 val batch = nextBatch.get
 nextBatch = None
 batch
   }
 
   /**
-   * Attempt to fetch the next batch from the underlying iterator if we 
haven't yet hit the
-   * limit. Returns true if we found a new batch, false otherwise.
+   * Advance the iterator until we find a batch (possibly sliced)
+   * that we can return, or exhaust the input.
*/
-  private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
-  return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
-  rowsCollected += currentBatchRowCount
-  ColumnarBatches.retain(currentBatch)
-  nextBatch = Some(currentBatch)
-} else {
-  val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0, 
remaining)
-  rowsCollected += remaining
-  nextBatch = Some(prunedBatch)
+  private def fetchNextBatch(): Boolean = {
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+  val batch = inputIter.next()
+  val batchSize = batch.numRows()
+
+  if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+  } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)

Review Comment:
   So we don't need to do the slice in that case, slice batch is the total 
batch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@glut

Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-24 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2009752841


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,94 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
 limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  assert(limit >= 0 || (limit == -1 && offset > 0))
 
   override def batchType(): Convention.BatchType =
 BackendsApiManager.getSettings.primaryBatchType
 
+  private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+  private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+  private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .useSortBasedShuffle(outputPartitioning, child.output)
+
+  @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
+
+  @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+  .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+  readMetrics ++ writeMetrics
+
   /**
-   * Returns an iterator that yields up to `limit` rows in total from the 
input partitionIter.
+   * Returns an iterator that gives offset to limit rows in total from the 
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or 
prune it if it
-   * partially exceeds the remaining limit.
+   * partially exceeds the remaining limit/offset.
*/
-  private def collectLimitedRows(
-  partitionIter: Iterator[ColumnarBatch],
-  limit: Int
-  ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
-  return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+  private def collectWithOffsetAndLimit(
+   inputIter: Iterator[ColumnarBatch],
+   offset: Int,
+   limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
 
-  private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
   private var nextBatch: Option[ColumnarBatch] = None
 
   override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
   }
 
   override def next(): ColumnarBatch = {
-if (!hasNext) {
-  throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches 
available.")
 val batch = nextBatch.get
 nextBatch = None
 batch
   }
 
   /**
-   * Attempt to fetch the next batch from the underlying iterator if we 
haven't yet hit the
-   * limit. Returns true if we found a new batch, false otherwise.
+   * Advance the iterator until we find a batch (possibly sliced)
+   * that we can return, or exhaust the input.
*/
-  private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
-  return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
-  rowsCollected += currentBatchRowCount
-  ColumnarBatches.retain(currentBatch)
-  nextBatch = Some(currentBatch)
-} else {
-  val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0, 
remaining)
-  rowsCollected += remaining
-  nextBatch = Some(prunedBatch)
+  private def fetchNextBatch(): Boolean = {
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+  val batch = inputIter.next()
+  val batchSize = batch.numRows()
+
+  if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+  } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)

Review Comment:
   @jinchengchenghh does this address the comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about thi

Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-22 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2745109618

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-22 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2008698522


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,94 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
 limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  assert(limit >= 0 || (limit == -1 && offset > 0))
 
   override def batchType(): Convention.BatchType =
 BackendsApiManager.getSettings.primaryBatchType
 
+  private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+  private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+  private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .useSortBasedShuffle(outputPartitioning, child.output)
+
+  @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
+
+  @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+  .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+  readMetrics ++ writeMetrics
+
   /**
-   * Returns an iterator that yields up to `limit` rows in total from the 
input partitionIter.
+   * Returns an iterator that gives offset to limit rows in total from the 
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or 
prune it if it
-   * partially exceeds the remaining limit.
+   * partially exceeds the remaining limit/offset.
*/
-  private def collectLimitedRows(
-  partitionIter: Iterator[ColumnarBatch],
-  limit: Int
-  ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
-  return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+  private def collectWithOffsetAndLimit(
+   inputIter: Iterator[ColumnarBatch],
+   offset: Int,
+   limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
 
-  private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
   private var nextBatch: Option[ColumnarBatch] = None
 
   override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
   }
 
   override def next(): ColumnarBatch = {
-if (!hasNext) {
-  throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches 
available.")
 val batch = nextBatch.get
 nextBatch = None
 batch
   }
 
   /**
-   * Attempt to fetch the next batch from the underlying iterator if we 
haven't yet hit the
-   * limit. Returns true if we found a new batch, false otherwise.
+   * Advance the iterator until we find a batch (possibly sliced)
+   * that we can return, or exhaust the input.
*/
-  private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
-  return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
-  rowsCollected += currentBatchRowCount
-  ColumnarBatches.retain(currentBatch)
-  nextBatch = Some(currentBatch)
-} else {
-  val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0, 
remaining)
-  rowsCollected += remaining
-  nextBatch = Some(prunedBatch)
+  private def fetchNextBatch(): Boolean = {
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+  val batch = inputIter.next()
+  val batchSize = batch.numRows()
+
+  if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+  } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)

Review Comment:
   In that case, startIndex would be 0, and leftoverAfterSkip = batchSize, 
leading to `val prunedBatch = VeloxColumnarBatches.slice(batch, 0, batchSize)`
   Could you give some example of batch size with limit and offset for the 
above case



-- 
This is an automated message from the Apache Git Service

Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-20 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2740273391

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-20 Thread via GitHub


jinchengchenghh commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r2005650398


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -32,88 +32,94 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCollectLimitExec(
 limit: Int,
-child: SparkPlan
-) extends ColumnarCollectLimitBaseExec(limit, child) {
+child: SparkPlan,
+offset: Int = 0
+) extends ColumnarCollectLimitBaseExec(limit, child, offset) {
+
+  assert(limit >= 0 || (limit == -1 && offset > 0))
 
   override def batchType(): Convention.BatchType =
 BackendsApiManager.getSettings.primaryBatchType
 
+  private lazy val writeMetrics =
+SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+  private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+  private lazy val useSortBasedShuffle: Boolean =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .useSortBasedShuffle(outputPartitioning, child.output)
+
+  @transient private lazy val serializer: Serializer =
+BackendsApiManager.getSparkPlanExecApiInstance
+  .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
+
+  @transient override lazy val metrics: Map[String, SQLMetric] =
+BackendsApiManager.getMetricsApiInstance
+  .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+  readMetrics ++ writeMetrics
+
   /**
-   * Returns an iterator that yields up to `limit` rows in total from the 
input partitionIter.
+   * Returns an iterator that gives offset to limit rows in total from the 
input partitionIter.
* Either retain the entire batch if it fits within the remaining limit, or 
prune it if it
-   * partially exceeds the remaining limit.
+   * partially exceeds the remaining limit/offset.
*/
-  private def collectLimitedRows(
-  partitionIter: Iterator[ColumnarBatch],
-  limit: Int
-  ): Iterator[ColumnarBatch] = {
-if (partitionIter.isEmpty) {
-  return Iterator.empty
-}
-new Iterator[ColumnarBatch] {
+  private def collectWithOffsetAndLimit(
+   inputIter: Iterator[ColumnarBatch],
+   offset: Int,
+   limit: Int): Iterator[ColumnarBatch] = {
+
+val unlimited = limit < 0
+var rowsToSkip = math.max(offset, 0)
+var rowsToCollect = if (unlimited) Int.MaxValue else limit
 
-  private var rowsCollected = 0
+new Iterator[ColumnarBatch] {
   private var nextBatch: Option[ColumnarBatch] = None
 
   override def hasNext: Boolean = {
-nextBatch.isDefined || fetchNext()
+nextBatch.isDefined || fetchNextBatch()
   }
 
   override def next(): ColumnarBatch = {
-if (!hasNext) {
-  throw new NoSuchElementException("No more batches available.")
-}
+if (!hasNext) throw new NoSuchElementException("No more batches 
available.")
 val batch = nextBatch.get
 nextBatch = None
 batch
   }
 
   /**
-   * Attempt to fetch the next batch from the underlying iterator if we 
haven't yet hit the
-   * limit. Returns true if we found a new batch, false otherwise.
+   * Advance the iterator until we find a batch (possibly sliced)
+   * that we can return, or exhaust the input.
*/
-  private def fetchNext(): Boolean = {
-if (rowsCollected >= limit || !partitionIter.hasNext) {
-  return false
-}
-
-val currentBatch = partitionIter.next()
-val currentBatchRowCount = currentBatch.numRows()
-val remaining = limit - rowsCollected
-
-if (currentBatchRowCount <= remaining) {
-  rowsCollected += currentBatchRowCount
-  ColumnarBatches.retain(currentBatch)
-  nextBatch = Some(currentBatch)
-} else {
-  val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0, 
remaining)
-  rowsCollected += remaining
-  nextBatch = Some(prunedBatch)
+  private def fetchNextBatch(): Boolean = {
+if (rowsToCollect <= 0) return false
+
+while (inputIter.hasNext) {
+  val batch = inputIter.next()
+  val batchSize = batch.numRows()
+
+  if (rowsToSkip >= batchSize) {
+rowsToSkip -= batchSize
+  } else {
+val startIndex = rowsToSkip
+val leftoverAfterSkip = batchSize - startIndex
+rowsToSkip = 0
+
+val needed = math.min(rowsToCollect, leftoverAfterSkip)

Review Comment:
   if needed <= remaining, we still need this logic, may return the total batch 
instead of sliced batch
   ```
   f (currentBatchRowCount <= remaining) {
 rowsCollected += currentBatchRowCount
 ColumnarBatches.retain(currentBatch)
 nextBatch = Some(currentBatch)
 

Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-19 Thread via GitHub


ArnavBalyan commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2738566222

   cc @jinchengchenghh addressed the comments could you please take a look 
thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-19 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2737549144

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-19 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2736515534

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-18 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2733367485

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-16 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2727562264

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-16 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r1997651894


##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -125,11 +199,20 @@ case class ColumnarCollectLimitExec(
   if (childRDD.getNumPartitions == 1) childRDD
   else shuffleLimitedPartitions(childRDD)
 
-processedRDD.mapPartitions(partition => collectLimitedRows(partition, 
limit))
+processedRDD.mapPartitions(
+  partition => {
+val droppedRows = dropLimitedRows(partition, offset)
+val adjustedLimit = Math.max(0, limit - offset)
+collectLimitedRows(droppedRows, adjustedLimit)

Review Comment:
   Yes, however it would not preserve order, since the current implementation 
closely matches Spark, and users may see unexpected ordering and failure across 
UTs. This keeps it similar to Spark implementation and maintains similar order 
as spark thanks



##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -58,7 +58,7 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
 
   testWithSpecifiedSparkVersion(

Review Comment:
   done thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-16 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r1997651674


##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -138,4 +140,34 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
 
 assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf, 
checkMatch = true)
   }
+
+  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - offset test", 
Array("3.4", "3.5")) {

Review Comment:
   Added more tests to cover the above scenario, spark UTs should also help



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-16 Thread via GitHub


ArnavBalyan commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r1997651585


##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -138,4 +140,34 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
 
 assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf, 
checkMatch = true)
   }
+
+  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - offset test", 
Array("3.4", "3.5")) {

Review Comment:
   For 3.3 it would fail at compile time since offset api is not available with 
collectlimitexec for older versions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-16 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2727517945

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-13 Thread via GitHub


jinchengchenghh commented on code in PR #8914:
URL: https://github.com/apache/incubator-gluten/pull/8914#discussion_r1983013290


##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -138,4 +140,34 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
 
 assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf, 
checkMatch = true)
   }
+
+  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - offset test", 
Array("3.4", "3.5")) {

Review Comment:
   What's the result for spark3.3? Is the result also correct but operator not 
matched?



##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -58,7 +58,7 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
 
   testWithSpecifiedSparkVersion(

Review Comment:
   testWithSpecifiedSparkVersion -> test



##
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala:
##
@@ -125,11 +199,20 @@ case class ColumnarCollectLimitExec(
   if (childRDD.getNumPartitions == 1) childRDD
   else shuffleLimitedPartitions(childRDD)
 
-processedRDD.mapPartitions(partition => collectLimitedRows(partition, 
limit))
+processedRDD.mapPartitions(
+  partition => {
+val droppedRows = dropLimitedRows(partition, offset)
+val adjustedLimit = Math.max(0, limit - offset)
+collectLimitedRows(droppedRows, adjustedLimit)

Review Comment:
   Can we enhance the collectLimitedRows, we can slice the input RowVector from 
offset to adjustedLimit?



##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -67,7 +67,9 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
 assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
   }
 
-  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with filter", 
Array("3.2", "3.3")) {
+  testWithSpecifiedSparkVersion(

Review Comment:
   ditto, so as others



##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -138,4 +140,34 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
 
 assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf, 
checkMatch = true)
   }
+
+  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - offset test", 
Array("3.4", "3.5")) {

Review Comment:
   If that, please also add the result check for spark3.2 and spark3.3



##
backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala:
##
@@ -138,4 +140,34 @@ class GlutenSQLCollectLimitExecSuite extends 
WholeStageTransformerSuite {
 
 assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf, 
checkMatch = true)
   }
+
+  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - offset test", 
Array("3.4", "3.5")) {

Review Comment:
   Please add the test to cover more code path, such as limit(12)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-05 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2701217641

   Run Gluten Clickhouse CI on x86


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] [GLUTEN-8912][VL] Add Offset support for CollectLimitExec [incubator-gluten]

2025-03-05 Thread via GitHub


github-actions[bot] commented on PR #8914:
URL: 
https://github.com/apache/incubator-gluten/pull/8914#issuecomment-2701216908

   https://github.com/apache/incubator-gluten/issues/8912


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]