This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new e306fc6  test: Reduce end-to-end test time (#109)
e306fc6 is described below

commit e306fc656ec41f814050a85fdb24d607e2a64147
Author: Chao Sun <sunc...@apache.org>
AuthorDate: Wed Feb 28 16:21:21 2024 -0800

    test: Reduce end-to-end test time (#109)
---
 .../apache/comet/exec/CometAggregateSuite.scala    |  27 +-
 ...Suite.scala => CometColumnarShuffleSuite.scala} | 683 +++++----------------
 .../comet/exec/CometNativeShuffleSuite.scala       | 216 +++++++
 3 files changed, 381 insertions(+), 545 deletions(-)

diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
index 04735b5..d64a3a3 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometAggregateSuite.scala
@@ -537,23 +537,18 @@ class CometAggregateSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
               withSQLConf(CometConf.COMET_BATCH_SIZE.key -> 
batchSize.toString) {
 
                 // Test all combinations of different aggregation & group-by 
types
-                (1 to 4).foreach { col =>
-                  (1 to 14).foreach { gCol =>
-                    withView("v") {
-                      sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _$col FROM 
tbl ORDER BY _$col")
-                      checkSparkAnswer(s"SELECT _g$gCol, FIRST(_$col) FROM v 
GROUP BY _g$gCol")
-                      checkSparkAnswer(s"SELECT _g$gCol, LAST(_$col) FROM v 
GROUP BY _g$gCol")
-                    }
-                    checkSparkAnswer(s"SELECT _g$gCol, SUM(_$col) FROM tbl 
GROUP BY _g$gCol")
-                    checkSparkAnswer(
-                      s"SELECT _g$gCol, SUM(DISTINCT _$col) FROM tbl GROUP BY 
_g$gCol")
-                    checkSparkAnswer(s"SELECT _g$gCol, COUNT(_$col) FROM tbl 
GROUP BY _g$gCol")
-                    checkSparkAnswer(
-                      s"SELECT _g$gCol, COUNT(DISTINCT _$col) FROM tbl GROUP 
BY _g$gCol")
-                    checkSparkAnswer(
-                      s"SELECT _g$gCol, MIN(_$col), MAX(_$col) FROM tbl GROUP 
BY _g$gCol")
-                    checkSparkAnswer(s"SELECT _g$gCol, AVG(_$col) FROM tbl 
GROUP BY _g$gCol")
+                (1 to 14).foreach { gCol =>
+                  withView("v") {
+                    sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _1, _2, _3, _4 
" +
+                      "FROM tbl ORDER BY _1, _2, _3, _4")
+                    checkSparkAnswer(s"SELECT _g$gCol, FIRST(_1), FIRST(_2), 
FIRST(_3), " +
+                      s"FIRST(_4), LAST(_1), LAST(_2), LAST(_3), LAST(_4) FROM 
v GROUP BY _g$gCol")
                   }
+                  checkSparkAnswer(s"SELECT _g$gCol, SUM(_1), SUM(_2), 
COUNT(_3), COUNT(_4), " +
+                    s"MIN(_1), MAX(_4), AVG(_2), AVG(_4) FROM tbl GROUP BY 
_g$gCol")
+                  checkSparkAnswer(s"SELECT _g$gCol, SUM(DISTINCT _3) FROM tbl 
GROUP BY _g$gCol")
+                  checkSparkAnswer(
+                    s"SELECT _g$gCol, COUNT(DISTINCT _1) FROM tbl GROUP BY 
_g$gCol")
                 }
               }
             }
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
similarity index 54%
rename from spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala
rename to 
spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
index 0d7c73d..fec6197 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometShuffleSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala
@@ -24,28 +24,23 @@ import org.scalatest.Tag
 
 import org.apache.hadoop.fs.Path
 import org.apache.spark.{Partitioner, SparkConf}
-import org.apache.spark.sql.{CometTestBase, Row}
+import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
 import org.apache.spark.sql.comet.execution.shuffle.{CometShuffleDependency, 
CometShuffleExchangeExec, CometShuffleManager}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 import org.apache.comet.CometConf
-import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
-
-abstract class CometShuffleSuiteBase extends CometTestBase with 
AdaptiveSparkPlanHelper {
 
+abstract class CometColumnarShuffleSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
   protected val adaptiveExecutionEnabled: Boolean
-
-  protected val fastMergeEnabled: Boolean = true
-
   protected val numElementsForceSpillThreshold: Int = 10
 
   override protected def sparkConf: SparkConf = {
     val conf = super.sparkConf
     conf
       .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, 
adaptiveExecutionEnabled.toString)
-      .set("spark.shuffle.unsafe.fastMergeEnabled", fastMergeEnabled.toString)
   }
 
   protected val asyncShuffleEnable: Boolean
@@ -55,7 +50,10 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
     super.test(testName, testTags: _*) {
       withSQLConf(
         CometConf.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED.key -> 
asyncShuffleEnable.toString,
-        CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> 
numElementsForceSpillThreshold.toString) {
+        CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> 
numElementsForceSpillThreshold.toString,
+        CometConf.COMET_EXEC_ENABLED.key -> "false",
+        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
+        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
         testFun
       }
     }
@@ -63,33 +61,10 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
 
   import testImplicits._
 
-  test("Native shuffle with dictionary of binary") {
-    Seq("true", "false").foreach { dictionaryEnabled =>
-      withSQLConf(
-        CometConf.COMET_EXEC_ENABLED.key -> "true",
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
-        withParquetTable(
-          (0 until 1000).map(i => (i % 5, (i % 5).toString.getBytes())),
-          "tbl",
-          dictionaryEnabled.toBoolean) {
-          val shuffled = sql("SELECT * FROM tbl").repartition(2, $"_2")
-
-          checkCometExchange(shuffled, 1, true)
-          checkSparkAnswer(shuffled)
-        }
-      }
-    }
-  }
-
   test("columnar shuffle on nested struct including nulls") {
     Seq(10, 201).foreach { numPartitions =>
       Seq("1.0", "10.0").foreach { ratio =>
-        withSQLConf(
-          CometConf.COMET_EXEC_ENABLED.key -> "false",
-          CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-          CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-          CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) {
+        withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
ratio) {
           withParquetTable(
             (0 until 50).map(i =>
               (i, Seq((i + 1, i.toString), null, (i + 3, (i + 3).toString)), i 
+ 1)),
@@ -99,8 +74,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
               .repartition(numPartitions, $"_1", $"_2", $"_3")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
         }
       }
@@ -110,11 +84,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
   test("columnar shuffle on struct including nulls") {
     Seq(10, 201).foreach { numPartitions =>
       Seq("1.0", "10.0").foreach { ratio =>
-        withSQLConf(
-          CometConf.COMET_EXEC_ENABLED.key -> "false",
-          CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-          CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-          CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) {
+        withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
ratio) {
           val data: Seq[(Int, (Int, String))] =
             Seq((1, (0, "1")), (2, (3, "3")), (3, null))
           withParquetTable(data, "tbl") {
@@ -123,8 +93,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
               .repartition(numPartitions, $"_1", $"_2")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
         }
       }
@@ -137,8 +106,6 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
         Seq("1.0", "10.0").foreach { ratio =>
           withSQLConf(
             CometConf.COMET_EXEC_ENABLED.key -> execEnabled,
-            CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
             CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) {
             withParquetTable((0 until 50).map(i => (Map(Seq(i, i + 1) -> i), i 
+ 1)), "tbl") {
               val df = sql("SELECT * FROM tbl")
@@ -146,9 +113,8 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 .repartition(numPartitions, $"_1", $"_2")
                 .sortWithinPartitions($"_2")
 
-              checkSparkAnswer(df)
               // Array map key array element fallback to Spark shuffle for now
-              checkCometExchange(df, 0, false)
+              checkShuffleAnswer(df, 0)
             }
 
             withParquetTable((0 until 50).map(i => (Map(i -> Seq(i, i + 1)), i 
+ 1)), "tbl") {
@@ -157,9 +123,8 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 .repartition(numPartitions, $"_1", $"_2")
                 .sortWithinPartitions($"_2")
 
-              checkSparkAnswer(df)
               // Array map value array element fallback to Spark shuffle for 
now
-              checkCometExchange(df, 0, false)
+              checkShuffleAnswer(df, 0)
             }
 
             withParquetTable((0 until 50).map(i => (Map((i, i.toString) -> i), 
i + 1)), "tbl") {
@@ -168,9 +133,8 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 .repartition(numPartitions, $"_1", $"_2")
                 .sortWithinPartitions($"_2")
 
-              checkSparkAnswer(df)
               // Struct map key array element fallback to Spark shuffle for now
-              checkCometExchange(df, 0, false)
+              checkShuffleAnswer(df, 0)
             }
 
             withParquetTable((0 until 50).map(i => (Map(i -> (i, i.toString)), 
i + 1)), "tbl") {
@@ -179,9 +143,8 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 .repartition(numPartitions, $"_1", $"_2")
                 .sortWithinPartitions($"_2")
 
-              checkSparkAnswer(df)
               // Struct map value array element fallback to Spark shuffle for 
now
-              checkCometExchange(df, 0, false)
+              checkShuffleAnswer(df, 0)
             }
           }
         }
@@ -195,8 +158,6 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
         Seq("1.0", "10.0").foreach { ratio =>
           withSQLConf(
             CometConf.COMET_EXEC_ENABLED.key -> execEnabled,
-            CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
             CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) {
             withParquetTable(
               (0 until 50).map(i => ((Seq(Map(1 -> i)), Map(2 -> i), Map(3 -> 
i)), i + 1)),
@@ -206,9 +167,8 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 .repartition(numPartitions, $"_1", $"_2")
                 .sortWithinPartitions($"_2")
 
-              checkSparkAnswer(df)
               // Map array element fallback to Spark shuffle for now
-              checkCometExchange(df, 0, false)
+              checkShuffleAnswer(df, 0)
             }
           }
         }
@@ -220,9 +180,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
     withSQLConf(
       // AQE has `ShuffleStage` which is a leaf node which blocks
       // collecting `CometShuffleExchangeExec` node.
-      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
-      CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-      CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
       withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
         val df = sql("SELECT * FROM tbl")
         val shuffled = df
@@ -280,11 +238,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
 
     Seq(10, 201).foreach { numPartitions =>
       Seq("1.0", "10.0").foreach { ratio =>
-        withSQLConf(
-          CometConf.COMET_EXEC_ENABLED.key -> "false",
-          CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-          CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-          CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) {
+        withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
ratio) {
           // Boolean key
           withParquetTable(genTuples(50, Seq(true, false)), "tbl") {
             val df = sql("SELECT * FROM tbl")
@@ -305,8 +259,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 $"_13")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
 
           // Byte key
@@ -329,8 +282,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 $"_13")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
 
           // Short key
@@ -353,8 +305,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 $"_13")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
 
           // Int key
@@ -377,8 +328,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 $"_13")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
 
           // Long key
@@ -401,8 +351,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 $"_13")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
 
           // Float key
@@ -425,8 +374,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 $"_13")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
 
           // Double key
@@ -449,8 +397,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 $"_13")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
 
           // Date key
@@ -475,8 +422,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 $"_13")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
 
           // Timestamp key
@@ -503,8 +449,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 $"_13")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
 
           // Decimal key
@@ -531,8 +476,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 $"_13")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
 
           // String key
@@ -555,8 +499,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 $"_13")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
 
           // Binary key
@@ -581,8 +524,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 $"_13")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
         }
       }
@@ -592,11 +534,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
   test("columnar shuffle on array") {
     Seq(10, 201).foreach { numPartitions =>
       Seq("1.0", "10.0").foreach { ratio =>
-        withSQLConf(
-          CometConf.COMET_EXEC_ENABLED.key -> "false",
-          CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-          CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-          CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) {
+        withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
ratio) {
           withParquetTable(
             (0 until 50).map(i =>
               (
@@ -617,8 +555,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
               .repartition(numPartitions, $"_1", $"_2", $"_3", $"_4", $"_5")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
         }
       }
@@ -629,11 +566,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
     Seq("false", "true").foreach { execEnabled =>
       Seq(10, 201).foreach { numPartitions =>
         Seq("1.0", "10.0").foreach { ratio =>
-          withSQLConf(
-            CometConf.COMET_EXEC_ENABLED.key -> execEnabled,
-            CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-            CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) {
+          withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
ratio) {
             withParquetTable(
               (0 until 50).map(i => (Seq(Seq(i + 1), Seq(i + 2), Seq(i + 3)), 
i + 1)),
               "tbl") {
@@ -642,9 +575,8 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
                 .repartition(numPartitions, $"_1", $"_2")
                 .sortWithinPartitions($"_1")
 
-              checkSparkAnswer(df)
               // Nested array fallback to Spark shuffle for now
-              checkCometExchange(df, 0, false)
+              checkShuffleAnswer(df, 0)
             }
           }
         }
@@ -655,11 +587,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
   test("columnar shuffle on nested struct") {
     Seq(10, 201).foreach { numPartitions =>
       Seq("1.0", "10.0").foreach { ratio =>
-        withSQLConf(
-          CometConf.COMET_EXEC_ENABLED.key -> "false",
-          CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-          CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-          CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> ratio) {
+        withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
ratio) {
           withParquetTable(
             (0 until 50).map(i =>
               ((i, 2.toString, (i + 1).toLong, (3.toString, i + 1, (i + 
2).toLong)), i + 1)),
@@ -669,44 +597,20 @@ abstract class CometShuffleSuiteBase extends 
CometTestBase with AdaptiveSparkPla
               .repartition(numPartitions, $"_1", $"_2")
               .sortWithinPartitions($"_1")
 
-            checkSparkAnswer(df)
-            checkCometExchange(df, 1, false)
+            checkShuffleAnswer(df, 1)
           }
         }
       }
     }
   }
 
-  test("fix: Dictionary arrays imported from native should not be overridden") 
{
-    Seq(10, 201).foreach { numPartitions =>
-      withSQLConf(
-        CometConf.COMET_EXEC_ENABLED.key -> "true",
-        CometConf.COMET_BATCH_SIZE.key -> "10",
-        CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
-        CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key -> "true",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false",
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
-        withParquetTable((0 until 50).map(i => (1.toString, 2.toString, (i + 
1).toLong)), "tbl") {
-          val df = sql("SELECT * FROM tbl")
-            .filter($"_1" === 1.toString)
-            .repartition(numPartitions, $"_1", $"_2")
-            .sortWithinPartitions($"_1")
-          checkSparkAnswerAndOperator(df)
-        }
-      }
-    }
-  }
-
   test("fix: closing sliced dictionary Comet vector should not close 
dictionary array") {
     (0 to 10).foreach { _ =>
       withSQLConf(
         SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
         CometConf.COMET_BATCH_SIZE.key -> "10",
-        CometConf.COMET_EXEC_ENABLED.key -> "false",
         CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1.1",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-        CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000",
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+        CometConf.COMET_EXEC_SHUFFLE_SPILL_THRESHOLD.key -> "1000000000") {
         val table1 = (0 until 1000)
           .map(i => (111111.toString, 2222222.toString, 3333333.toString, 
i.toLong))
           .toDF("a", "b", "c", "d")
@@ -718,7 +622,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
             val df = sql(
               "select a, b, count(distinct h) from tbl_a, tbl_b " +
                 "where c = e and b = '2222222' and a not like '2' group by a, 
b")
-            checkSparkAnswer(df)
+            checkShuffleAnswer(df, 4)
           }
         }
       }
@@ -727,11 +631,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
 
   test("fix: Dictionary field should have distinct dict_id") {
     Seq(10, 201).foreach { numPartitions =>
-      withSQLConf(
-        CometConf.COMET_EXEC_ENABLED.key -> "false",
-        CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+      withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
"2.0") {
         withParquetTable(
           (0 until 10000).map(i => (1.toString, 2.toString, (i + 1).toLong)),
           "tbl") {
@@ -740,7 +640,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
               "SELECT * FROM tbl")
               .count())
           val shuffled = sql("SELECT * FROM tbl").repartition(numPartitions, 
$"_1")
-          checkSparkAnswer(shuffled)
+          checkShuffleAnswer(shuffled, 1)
         }
       }
     }
@@ -748,18 +648,14 @@ abstract class CometShuffleSuiteBase extends 
CometTestBase with AdaptiveSparkPla
 
   test("dictionary shuffle") {
     Seq(10, 201).foreach { numPartitions =>
-      withSQLConf(
-        CometConf.COMET_EXEC_ENABLED.key -> "false",
-        CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "2.0",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+      withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
"2.0") {
         withParquetTable((0 until 10000).map(i => (1.toString, (i + 
1).toLong)), "tbl") {
           assert(
             sql("SELECT * FROM tbl").repartition(numPartitions, $"_1").count() 
== sql(
               "SELECT * FROM tbl")
               .count())
           val shuffled = sql("SELECT * FROM 
tbl").select($"_1").repartition(numPartitions, $"_1")
-          checkSparkAnswer(shuffled)
+          checkShuffleAnswer(shuffled, 1)
         }
       }
     }
@@ -767,33 +663,24 @@ abstract class CometShuffleSuiteBase extends 
CometTestBase with AdaptiveSparkPla
 
   test("dictionary shuffle: fallback to string") {
     Seq(10, 201).foreach { numPartitions =>
-      withSQLConf(
-        CometConf.COMET_EXEC_ENABLED.key -> "false",
-        CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> "1000000000.0",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+      withSQLConf(CometConf.COMET_SHUFFLE_PREFER_DICTIONARY_RATIO.key -> 
"1000000000.0") {
         withParquetTable((0 until 10000).map(i => (1.toString, (i + 
1).toLong)), "tbl") {
           assert(
             sql("SELECT * FROM tbl").repartition(numPartitions, $"_1").count() 
== sql(
               "SELECT * FROM tbl")
               .count())
           val shuffled = sql("SELECT * FROM 
tbl").select($"_1").repartition(numPartitions, $"_1")
-          checkSparkAnswer(shuffled)
+          checkShuffleAnswer(shuffled, 1)
         }
       }
     }
   }
 
   test("fix: inMemSorter should be reset after spilling") {
-    withSQLConf(
-      CometConf.COMET_EXEC_ENABLED.key -> "false",
-      CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-      CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
-      withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") {
-        assert(
-          sql("SELECT * FROM tbl").repartition(201, $"_1").count() == 
sql("SELECT * FROM tbl")
-            .count())
-      }
+    withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") {
+      assert(
+        sql("SELECT * FROM tbl").repartition(201, $"_1").count() == 
sql("SELECT * FROM tbl")
+          .count())
     }
   }
 
@@ -820,14 +707,9 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
           $"_18",
           $"_19",
           $"_20").foreach { col =>
-          withSQLConf(
-            CometConf.COMET_EXEC_ENABLED.key -> "false",
-            CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-            CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
-            readParquetFile(path.toString) { df =>
-              val shuffled = df.select(col).repartition(numPartitions, col)
-              checkSparkAnswer(shuffled)
-            }
+          readParquetFile(path.toString) { df =>
+            val shuffled = df.select(col).repartition(numPartitions, col)
+            checkShuffleAnswer(shuffled, 1)
           }
         }
       }
@@ -836,17 +718,14 @@ abstract class CometShuffleSuiteBase extends 
CometTestBase with AdaptiveSparkPla
 
   test("fix: StreamReader should always set useDecimal128 as true") {
     Seq(10, 201).foreach { numPartitions =>
-      withSQLConf(
-        CometConf.COMET_EXEC_ENABLED.key -> "true",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+      withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") {
         withTempPath { dir =>
           val data = makeDecimalRDD(1000, DecimalType(12, 2), false)
           data.write.parquet(dir.getCanonicalPath)
           readParquetFile(dir.getCanonicalPath) { df =>
             {
               val shuffled = df.repartition(numPartitions, $"dec")
-              checkSparkAnswer(shuffled)
+              checkShuffleAnswer(shuffled, 1)
             }
           }
         }
@@ -856,18 +735,13 @@ abstract class CometShuffleSuiteBase extends 
CometTestBase with AdaptiveSparkPla
 
   test("fix: Native Unsafe decimal accessors return incorrect results") {
     Seq(10, 201).foreach { numPartitions =>
-      withSQLConf(
-        CometConf.COMET_EXEC_ENABLED.key -> "true",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+      withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") {
         withTempPath { dir =>
           val data = makeDecimalRDD(1000, DecimalType(22, 2), false)
           data.write.parquet(dir.getCanonicalPath)
           readParquetFile(dir.getCanonicalPath) { df =>
-            {
-              val shuffled = df.repartition(numPartitions, $"dec")
-              checkSparkAnswer(shuffled)
-            }
+            val shuffled = df.repartition(numPartitions, $"dec")
+            checkShuffleAnswer(shuffled, 1)
           }
         }
       }
@@ -876,10 +750,7 @@ abstract class CometShuffleSuiteBase extends CometTestBase 
with AdaptiveSparkPla
 
   test("Comet shuffle reader should respect spark.comet.batchSize") {
     Seq(10, 201).foreach { numPartitions =>
-      withSQLConf(
-        CometConf.COMET_EXEC_ENABLED.key -> "true",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+      withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "true") {
         withParquetTable((0 until 10000).map(i => (1, (i + 1).toLong)), "tbl") 
{
           assert(
             sql("SELECT * FROM tbl").repartition(numPartitions, $"_1").count() 
== sql(
@@ -889,14 +760,11 @@ abstract class CometShuffleSuiteBase extends 
CometTestBase with AdaptiveSparkPla
     }
   }
 
-  test("Arrow shuffle should work with BatchScan") {
+  test("columnar shuffle should work with BatchScan") {
     withSQLConf(
       SQLConf.USE_V1_SOURCE_LIST.key -> "", // Use DataSourceV2
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", // Disable AQE
-      CometConf.COMET_SCAN_ENABLED.key -> "false", // Disable CometScan to use 
Spark BatchScan
-      CometConf.COMET_EXEC_ENABLED.key -> "false",
-      CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-      CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
+      CometConf.COMET_SCAN_ENABLED.key -> "false") { // Disable CometScan to 
use Spark BatchScan
       withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
         val df = sql("SELECT * FROM tbl")
         val shuffled = df
@@ -913,131 +781,69 @@ abstract class CometShuffleSuiteBase extends 
CometTestBase with AdaptiveSparkPla
 
   test("Columnar shuffle for large shuffle partition number") {
     Seq(10, 200, 201).foreach { numPartitions =>
-      withSQLConf(
-        CometConf.COMET_EXEC_ENABLED.key -> "false",
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
-        withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
-          val df = sql("SELECT * FROM tbl")
+      withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+        val df = sql("SELECT * FROM tbl")
 
-          val shuffled = df.repartitionByRange(numPartitions, $"_2")
+        val shuffled = df.repartitionByRange(numPartitions, $"_2")
 
-          val cometShuffleExecs = checkCometExchange(shuffled, 1, false)
-          // `CometSerializedShuffleHandle` is used for large shuffle 
partition number,
-          // i.e., sort-based shuffle writer
-          cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName
-            .contains("CometSerializedShuffleHandle")
+        val cometShuffleExecs = checkCometExchange(shuffled, 1, false)
+        // `CometSerializedShuffleHandle` is used for large shuffle partition 
number,
+        // i.e., sort-based shuffle writer
+        cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName
+          .contains("CometSerializedShuffleHandle")
 
-          checkSparkAnswer(shuffled)
-        }
+        checkSparkAnswer(shuffled)
       }
     }
   }
 
-  test("grouped aggregate: Comet shuffle") {
-    withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
-      withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") {
-        val df = sql("SELECT count(_2), sum(_2) FROM tbl GROUP BY _1")
-        checkCometExchange(df, 1, true)
-        checkSparkAnswerAndOperator(df)
-      }
-    }
-  }
+  test("hash-based columnar shuffle") {
+    Seq(10, 200, 201).foreach { numPartitions =>
+      withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+        val df = sql("SELECT * FROM tbl")
 
-  test("hash shuffle: Comet shuffle") {
-    // Disable CometExec to explicit test Comet Arrow shuffle path
-    Seq(true, false).foreach { execEnabled =>
-      withSQLConf(
-        CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString,
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> 
(!execEnabled).toString) {
-        withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
-          val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
-          val shuffled1 = df.repartition(10, $"_1")
+        val shuffled1 =
+          df.repartitionByRange(numPartitions, 
$"_2").limit(2).repartition(numPartitions, $"_1")
 
-          // If Comet execution is disabled, `Sort` operator is Spark operator
-          // and jvm arrow shuffle is applied.
-          checkCometExchange(shuffled1, 1, execEnabled)
-          checkSparkAnswer(shuffled1)
+        // 3 exchanges are expected: 1) shuffle to repartition by range, 2) 
shuffle to global limit, 3) hash shuffle
+        checkShuffleAnswer(shuffled1, 3)
 
-          val shuffled2 = df.repartition(10, $"_1", $"_2")
+        val shuffled2 = df
+          .repartitionByRange(numPartitions, $"_2")
+          .limit(2)
+          .repartition(numPartitions, $"_1", $"_2")
 
-          checkCometExchange(shuffled2, 1, execEnabled)
-          checkSparkAnswer(shuffled2)
+        checkShuffleAnswer(shuffled2, 3)
 
-          val shuffled3 = df.repartition(10, $"_2", $"_1")
+        val shuffled3 = df
+          .repartitionByRange(numPartitions, $"_2")
+          .limit(2)
+          .repartition(numPartitions, $"_2", $"_1")
 
-          checkCometExchange(shuffled3, 1, execEnabled)
-          checkSparkAnswer(shuffled3)
-        }
+        checkShuffleAnswer(shuffled3, 3)
       }
     }
   }
 
-  test("Comet shuffle: different data type") {
-    // Disable CometExec to explicit test Comet native shuffle path
-    Seq(true, false).foreach { execEnabled =>
-      Seq(true, false).foreach { dictionaryEnabled =>
-        withTempDir { dir =>
-          val path = new Path(dir.toURI.toString, "test.parquet")
-          makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 
10000)
-          val all_types = if (isSpark34Plus) {
-            Seq(
-              $"_1",
-              $"_2",
-              $"_3",
-              $"_4",
-              $"_5",
-              $"_6",
-              $"_7",
-              $"_8",
-              $"_9",
-              $"_10",
-              $"_11",
-              $"_13",
-              $"_14",
-              $"_15",
-              $"_16",
-              $"_17",
-              $"_18",
-              $"_19",
-              $"_20")
-          } else {
-            Seq(
-              $"_1",
-              $"_2",
-              $"_3",
-              $"_4",
-              $"_5",
-              $"_6",
-              $"_7",
-              $"_8",
-              $"_9",
-              $"_10",
-              $"_11",
-              $"_13",
-              $"_15",
-              $"_16",
-              $"_18",
-              $"_19",
-              $"_20")
-          }
-          all_types.foreach { col =>
-            withSQLConf(
-              CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString,
-              CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-              "parquet.enable.dictionary" -> dictionaryEnabled.toString) {
-              readParquetFile(path.toString) { df =>
-                val shuffled = df
-                  .select($"_1")
-                  .repartition(10, col)
-                checkCometExchange(shuffled, 1, true)
-                if (execEnabled) {
-                  checkSparkAnswerAndOperator(shuffled)
-                } else {
-                  checkSparkAnswer(shuffled)
-                }
+  test("columnar shuffle: different data type") {
+    Seq(true, false).foreach { dictionaryEnabled =>
+      withTempDir { dir =>
+        val path = new Path(dir.toURI.toString, "test.parquet")
+        makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 
1000)
+
+        Seq(10, 201).foreach { numPartitions =>
+          (1 to 20).map(i => s"_$i").foreach { c =>
+            readParquetFile(path.toString) { df =>
+              val shuffled = df
+                .select($"_1")
+                .repartition(numPartitions, col(c))
+              val cometShuffleExecs = checkCometExchange(shuffled, 1, false)
+              if (numPartitions > 200) {
+                // For sort-based shuffle writer
+                
cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName
+                  .contains("CometSerializedShuffleHandle")
               }
+              checkSparkAnswer(shuffled)
             }
           }
         }
@@ -1045,275 +851,94 @@ abstract class CometShuffleSuiteBase extends 
CometTestBase with AdaptiveSparkPla
     }
   }
 
-  test("hash shuffle: Comet columnar shuffle") {
-    Seq(10, 200, 201).foreach { numPartitions =>
-      withSQLConf(
-        CometConf.COMET_EXEC_ENABLED.key -> "false",
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
-        withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
-          val df = sql("SELECT * FROM tbl")
-
-          val shuffled1 =
-            df.repartitionByRange(numPartitions, 
$"_2").limit(2).repartition(numPartitions, $"_1")
-
-          // 3 exchanges are expected: 1) shuffle to repartition by range, 2) 
shuffle to global limit, 3) hash shuffle
-          checkCometExchange(shuffled1, 3, false)
-          checkSparkAnswer(shuffled1)
+  test("native operator after columnar shuffle") {
+    withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+      val df = sql("SELECT * FROM tbl")
 
-          val shuffled2 = df
-            .repartitionByRange(numPartitions, $"_2")
-            .limit(2)
-            .repartition(numPartitions, $"_1", $"_2")
+      val shuffled1 = df
+        .repartition(10, $"_2")
+        .select($"_1", $"_1" + 1, $"_2" + 2)
+        .repartition(10, $"_1")
+        .filter($"_1" > 1)
 
-          checkCometExchange(shuffled2, 3, false)
-          checkSparkAnswer(shuffled2)
-
-          val shuffled3 = df
-            .repartitionByRange(numPartitions, $"_2")
-            .limit(2)
-            .repartition(numPartitions, $"_2", $"_1")
-
-          checkCometExchange(shuffled3, 3, false)
-          checkSparkAnswer(shuffled3)
-        }
-      }
-    }
-  }
+      // 2 Comet shuffle exchanges are expected
+      checkShuffleAnswer(shuffled1, 2)
 
-  test("Comet columnar shuffle shuffle: different data type") {
-    Seq(10, 200, 201).foreach { numPartitions =>
-      Seq(true, false).foreach { dictionaryEnabled =>
-        withTempDir { dir =>
-          val path = new Path(dir.toURI.toString, "test.parquet")
-          makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 
10000)
-
-          Seq(
-            $"_1",
-            $"_2",
-            $"_3",
-            $"_4",
-            $"_5",
-            $"_6",
-            $"_7",
-            $"_8",
-            $"_9",
-            $"_10",
-            $"_11",
-            $"_13",
-            $"_14",
-            $"_15",
-            $"_16",
-            $"_17",
-            $"_18",
-            $"_19",
-            $"_20").foreach { col =>
-            withSQLConf(
-              CometConf.COMET_EXEC_ENABLED.key -> "false",
-              CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true",
-              CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
-              readParquetFile(path.toString) { df =>
-                val shuffled = df
-                  .select($"_1")
-                  .repartition(numPartitions, col)
-                val cometShuffleExecs = checkCometExchange(shuffled, 1, false)
-                if (numPartitions > 200) {
-                  // For sort-based shuffle writer
-                  
cometShuffleExecs(0).shuffleDependency.shuffleHandle.getClass.getName
-                    .contains("CometSerializedShuffleHandle")
-                }
-                checkSparkAnswer(shuffled)
-              }
-            }
-          }
-        }
-      }
-    }
-  }
+      val shuffled2 = df
+        .repartitionByRange(10, $"_2")
+        .select($"_1", $"_1" + 1, $"_2" + 2)
+        .repartition(10, $"_1")
+        .filter($"_1" > 1)
 
-  test("Comet native operator after Comet shuffle") {
-    Seq(true, false).foreach { columnarShuffle =>
-      withSQLConf(
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> 
columnarShuffle.toString) {
-        withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
-          val df = sql("SELECT * FROM tbl")
-
-          val shuffled1 = df
-            .repartition(10, $"_2")
-            .select($"_1", $"_1" + 1, $"_2" + 2)
-            .repartition(10, $"_1")
-            .filter($"_1" > 1)
-
-          // 2 Comet shuffle exchanges are expected
-          checkCometExchange(shuffled1, 2, !columnarShuffle)
-          checkSparkAnswer(shuffled1)
-
-          val shuffled2 = df
-            .repartitionByRange(10, $"_2")
-            .select($"_1", $"_1" + 1, $"_2" + 2)
-            .repartition(10, $"_1")
-            .filter($"_1" > 1)
-
-          // 2 Comet shuffle exchanges are expected, if columnar shuffle is 
enabled
-          if (columnarShuffle) {
-            checkCometExchange(shuffled2, 2, !columnarShuffle)
-          } else {
-            // Because the first exchange from the bottom is range exchange 
which native shuffle
-            // doesn't support. So Comet exec operators stop before the first 
exchange and thus
-            // there is no Comet exchange.
-            checkCometExchange(shuffled2, 0, true)
-          }
-          checkSparkAnswer(shuffled2)
-        }
-      }
+      // 2 Comet shuffle exchanges are expected, if columnar shuffle is enabled
+      checkShuffleAnswer(shuffled2, 2)
     }
   }
 
-  test("Comet shuffle: single partition") {
-    Seq(true, false).foreach { execEnabled =>
-      withSQLConf(
-        CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString,
-        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> 
(!execEnabled).toString) {
-        withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
-          val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
+  test("columnar shuffle: single partition") {
+    withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+      val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
 
-          val shuffled = df.repartition(1)
+      val shuffled = df.repartition(1)
 
-          checkCometExchange(shuffled, 1, execEnabled)
-          checkSparkAnswer(shuffled)
-        }
-      }
+      checkShuffleAnswer(shuffled, 1)
     }
   }
 
-  test("fix: comet native shuffle with binary data") {
-    withSQLConf(
-      CometConf.COMET_EXEC_ENABLED.key -> "true",
-      CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-      CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
-      withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
-        val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2 
FROM tbl")
-
-        val shuffled = df.repartition(1, $"binary")
+  test("sort-based columnar shuffle metrics") {
+    withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+      val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
+      val shuffled = df.repartition(201, $"_1")
 
-        checkCometExchange(shuffled, 1, true)
-        checkSparkAnswer(shuffled)
-      }
-    }
-  }
+      checkShuffleAnswer(shuffled, 1)
 
-  test("Comet shuffle metrics") {
-    withSQLConf(
-      CometConf.COMET_EXEC_ENABLED.key -> "true",
-      CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-      CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false") {
-      withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
-        val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
-        val shuffled = df.repartition(10, $"_1")
+      // Materialize the shuffled data
+      shuffled.collect()
+      val metrics = find(shuffled.queryExecution.executedPlan) {
+        case _: CometShuffleExchangeExec => true
+        case _ => false
+      }.map(_.metrics).get
 
-        checkCometExchange(shuffled, 1, true)
-        checkSparkAnswer(shuffled)
+      assert(metrics.contains("shuffleRecordsWritten"))
+      assert(metrics("shuffleRecordsWritten").value == 5L)
 
-        // Materialize the shuffled data
-        shuffled.collect()
-        val metrics = find(shuffled.queryExecution.executedPlan) {
-          case _: CometShuffleExchangeExec => true
-          case _ => false
-        }.map(_.metrics).get
+      assert(metrics.contains("shuffleBytesWritten"))
+      assert(metrics("shuffleBytesWritten").value > 0)
 
-        assert(metrics.contains("shuffleRecordsWritten"))
-        assert(metrics("shuffleRecordsWritten").value == 5L)
-      }
+      assert(metrics.contains("shuffleWriteTime"))
+      assert(metrics("shuffleWriteTime").value > 0)
     }
   }
 
-  test("sort-based shuffle metrics") {
-    withSQLConf(
-      CometConf.COMET_EXEC_ENABLED.key -> "false",
-      CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-      CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "true") {
-      withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
-        val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
-        val shuffled = df.repartition(201, $"_1")
-
-        checkCometExchange(shuffled, 1, false)
-        checkSparkAnswer(shuffled)
-
-        // Materialize the shuffled data
-        shuffled.collect()
-        val metrics = find(shuffled.queryExecution.executedPlan) {
-          case _: CometShuffleExchangeExec => true
-          case _ => false
-        }.map(_.metrics).get
-
-        assert(metrics.contains("shuffleRecordsWritten"))
-        assert(metrics("shuffleRecordsWritten").value == 5L)
-
-        assert(metrics.contains("shuffleBytesWritten"))
-        assert(metrics("shuffleBytesWritten").value > 0)
-
-        assert(metrics.contains("shuffleWriteTime"))
-        assert(metrics("shuffleWriteTime").value > 0)
-      }
-    }
+  /**
+   * Checks that `df` produces the same answer as Spark does, and has the 
`expectedNum` Comet
+   * exchange operators.
+   */
+  private def checkShuffleAnswer(df: DataFrame, expectedNum: Int): Unit = {
+    checkCometExchange(df, expectedNum, false)
+    checkSparkAnswer(df)
   }
 }
 
-class CometAsyncShuffleSuite extends CometShuffleSuiteBase {
+class CometAsyncShuffleSuite extends CometColumnarShuffleSuite {
   override protected val asyncShuffleEnable: Boolean = true
 
   protected val adaptiveExecutionEnabled: Boolean = true
 }
 
-class CometAsyncNonFastMergeShuffleSuite extends CometShuffleSuiteBase {
-  override protected val fastMergeEnabled: Boolean = false
-
-  protected val adaptiveExecutionEnabled: Boolean = true
-
-  protected val asyncShuffleEnable: Boolean = true
-}
-
-class CometNonFastMergeShuffleSuite extends CometShuffleSuiteBase {
-  override protected val fastMergeEnabled: Boolean = false
-
-  protected val adaptiveExecutionEnabled: Boolean = true
-
-  protected val asyncShuffleEnable: Boolean = false
-}
-
-class CometShuffleSuite extends CometShuffleSuiteBase {
+class CometShuffleSuite extends CometColumnarShuffleSuite {
   override protected val asyncShuffleEnable: Boolean = false
 
   protected val adaptiveExecutionEnabled: Boolean = true
-
-  import testImplicits._
-
-  // TODO: this test takes ~5mins to run, we should reduce the test time.
-  // Because this test takes too long, we only have it in `CometShuffleSuite`.
-  test("fix: Too many task completion listener of ArrowReaderIterator causes 
OOM") {
-    withSQLConf(
-      CometConf.COMET_EXEC_ENABLED.key -> "true",
-      CometConf.COMET_BATCH_SIZE.key -> "1",
-      CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false",
-      CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
-      withParquetTable((0 until 1000000).map(i => (1, (i + 1).toLong)), "tbl") 
{
-        assert(
-          sql("SELECT * FROM tbl").repartition(201, $"_1").count() == 
sql("SELECT * FROM tbl")
-            .count())
-      }
-    }
-  }
 }
 
-class DisableAQECometShuffleSuite extends CometShuffleSuiteBase {
+class DisableAQECometShuffleSuite extends CometColumnarShuffleSuite {
   override protected val asyncShuffleEnable: Boolean = false
 
   protected val adaptiveExecutionEnabled: Boolean = false
 }
 
-class DisableAQECometAsyncShuffleSuite extends CometShuffleSuiteBase {
+class DisableAQECometAsyncShuffleSuite extends CometColumnarShuffleSuite {
   override protected val asyncShuffleEnable: Boolean = true
 
   protected val adaptiveExecutionEnabled: Boolean = false
diff --git 
a/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala 
b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
new file mode 100644
index 0000000..c35763c
--- /dev/null
+++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeShuffleSuite.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.comet.exec
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{CometTestBase, DataFrame}
+import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.functions.col
+
+import org.apache.comet.CometConf
+import org.apache.comet.CometSparkSessionExtensions.isSpark34Plus
+
+class CometNativeShuffleSuite extends CometTestBase with 
AdaptiveSparkPlanHelper {
+  override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)(implicit
+      pos: Position): Unit = {
+    super.test(testName, testTags: _*) {
+      withSQLConf(
+        CometConf.COMET_EXEC_ENABLED.key -> "true",
+        CometConf.COMET_COLUMNAR_SHUFFLE_ENABLED.key -> "false",
+        CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
+        testFun
+      }
+    }
+  }
+
+  import testImplicits._
+
+  // TODO: this test takes a long time to run, we should reduce the test time.
+  test("fix: Too many task completion listener of ArrowReaderIterator causes 
OOM") {
+    withSQLConf(CometConf.COMET_BATCH_SIZE.key -> "1") {
+      withParquetTable((0 until 100000).map(i => (1, (i + 1).toLong)), "tbl") {
+        assert(
+          sql("SELECT * FROM tbl").repartition(201, $"_1").count() == 
sql("SELECT * FROM tbl")
+            .count())
+      }
+    }
+  }
+
+  test("native shuffle: different data type") {
+    Seq(true, false).foreach { execEnabled =>
+      Seq(true, false).foreach { dictionaryEnabled =>
+        withTempDir { dir =>
+          val path = new Path(dir.toURI.toString, "test.parquet")
+          makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 
1000)
+          var allTypes: Seq[Int] = (1 to 20)
+          if (isSpark34Plus) {
+            allTypes = allTypes.filterNot(Set(14, 17).contains)
+          }
+          allTypes.map(i => s"_$i").foreach { c =>
+            withSQLConf(
+              CometConf.COMET_EXEC_ENABLED.key -> execEnabled.toString,
+              "parquet.enable.dictionary" -> dictionaryEnabled.toString) {
+              readParquetFile(path.toString) { df =>
+                val shuffled = df
+                  .select($"_1")
+                  .repartition(10, col(c))
+                checkShuffleAnswer(shuffled, 1, checkNativeOperators = 
execEnabled)
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  test("hash-based native shuffle") {
+    withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+      val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
+      val shuffled1 = df.repartition(10, $"_1")
+      checkShuffleAnswer(shuffled1, 1)
+
+      val shuffled2 = df.repartition(10, $"_1", $"_2")
+      checkShuffleAnswer(shuffled2, 1)
+
+      val shuffled3 = df.repartition(10, $"_2", $"_1")
+      checkShuffleAnswer(shuffled3, 1)
+    }
+  }
+
+  test("native shuffle: single partition") {
+    withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+      val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
+
+      val shuffled = df.repartition(1)
+      checkShuffleAnswer(shuffled, 1)
+    }
+  }
+
+  test("native shuffle with dictionary of binary") {
+    Seq("true", "false").foreach { dictionaryEnabled =>
+      withParquetTable(
+        (0 until 1000).map(i => (i % 5, (i % 5).toString.getBytes())),
+        "tbl",
+        dictionaryEnabled.toBoolean) {
+        val shuffled = sql("SELECT * FROM tbl").repartition(2, $"_2")
+        checkShuffleAnswer(shuffled, 1)
+      }
+    }
+  }
+
+  test("native operator after native shuffle") {
+    withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+      val df = sql("SELECT * FROM tbl")
+
+      val shuffled1 = df
+        .repartition(10, $"_2")
+        .select($"_1", $"_1" + 1, $"_2" + 2)
+        .repartition(10, $"_1")
+        .filter($"_1" > 1)
+
+      // 2 Comet shuffle exchanges are expected
+      checkShuffleAnswer(shuffled1, 2)
+
+      val shuffled2 = df
+        .repartitionByRange(10, $"_2")
+        .select($"_1", $"_1" + 1, $"_2" + 2)
+        .repartition(10, $"_1")
+        .filter($"_1" > 1)
+
+      // Because the first exchange from the bottom is range exchange which 
native shuffle
+      // doesn't support. So Comet exec operators stop before the first 
exchange and thus
+      // there is no Comet exchange.
+      checkShuffleAnswer(shuffled2, 0)
+    }
+  }
+
+  test("grouped aggregate: native shuffle") {
+    withParquetTable((0 until 5).map(i => (i, i + 1)), "tbl") {
+      val df = sql("SELECT count(_2), sum(_2) FROM tbl GROUP BY _1")
+      checkShuffleAnswer(df, 1, checkNativeOperators = true)
+    }
+  }
+
+  test("native shuffle metrics") {
+    withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+      val df = sql("SELECT * FROM tbl").sortWithinPartitions($"_1".desc)
+      val shuffled = df.repartition(10, $"_1")
+
+      checkShuffleAnswer(shuffled, 1)
+
+      // Materialize the shuffled data
+      shuffled.collect()
+      val metrics = find(shuffled.queryExecution.executedPlan) {
+        case _: CometShuffleExchangeExec => true
+        case _ => false
+      }.map(_.metrics).get
+
+      assert(metrics.contains("shuffleRecordsWritten"))
+      assert(metrics("shuffleRecordsWritten").value == 5L)
+    }
+  }
+
+  test("fix: Dictionary arrays imported from native should not be overridden") 
{
+    Seq(10, 201).foreach { numPartitions =>
+      withSQLConf(
+        CometConf.COMET_BATCH_SIZE.key -> "10",
+        CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
+        CometConf.COMET_EXEC_ALL_EXPR_ENABLED.key -> "true") {
+        withParquetTable((0 until 50).map(i => (1.toString, 2.toString, (i + 
1).toLong)), "tbl") {
+          val df = sql("SELECT * FROM tbl")
+            .filter($"_1" === 1.toString)
+            .repartition(numPartitions, $"_1", $"_2")
+            .sortWithinPartitions($"_1")
+          checkSparkAnswerAndOperator(df)
+        }
+      }
+    }
+  }
+
+  test("fix: Comet native shuffle with binary data") {
+    withParquetTable((0 until 5).map(i => (i, (i + 1).toLong)), "tbl") {
+      val df = sql("SELECT cast(cast(_1 as STRING) as BINARY) as binary, _2 
FROM tbl")
+
+      val shuffled = df.repartition(1, $"binary")
+      checkShuffleAnswer(shuffled, 1)
+    }
+  }
+
+  /**
+   * Checks that `df` produces the same answer as Spark does, and has the 
`expectedNum` Comet
+   * exchange operators. When `checkNativeOperators` is true, this also checks 
that all operators
+   * used by `df` are Comet native operators.
+   */
+  private def checkShuffleAnswer(
+      df: DataFrame,
+      expectedNum: Int,
+      checkNativeOperators: Boolean = false): Unit = {
+    checkCometExchange(df, expectedNum, true)
+    if (checkNativeOperators) {
+      checkSparkAnswerAndOperator(df)
+    } else {
+      checkSparkAnswer(df)
+    }
+  }
+}

Reply via email to