c21 commented on a change in pull request #29473:
URL: https://github.com/apache/spark/pull/29473#discussion_r479612474



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala
##########
@@ -27,45 +27,48 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
ProjectExec, SparkPlan}
 import org.apache.spark.sql.execution.joins.{BaseJoinExec, 
ShuffledHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.BucketReadStrategyInJoin
 
 /**
- * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
+ * This rule coalesces or repartitions one side of the `SortMergeJoin` and 
`ShuffledHashJoin`
  * if the following conditions are met:
  *   - Two bucketed tables are joined.
  *   - Join keys match with output partition expressions on their respective 
sides.
  *   - The larger bucket number is divisible by the smaller bucket number.
- *   - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
  *   - The ratio of the number of buckets is less than the value set in
- *     COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
+ *     COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.

Review comment:
       nit: shouldn't it be `BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO` ?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2655,24 +2655,34 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val COALESCE_BUCKETS_IN_JOIN_ENABLED =
-    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
-      .doc("When true, if two bucketed tables with the different number of 
buckets are joined, " +
-        "the side with a bigger number of buckets will be coalesced to have 
the same number " +
-        "of buckets as the other side. Bigger number of buckets is divisible 
by the smaller " +
-        "number of buckets. Bucket coalescing is applied to sort-merge joins 
and " +
-        "shuffled hash join. Note: Coalescing bucketed table can avoid 
unnecessary shuffling " +
-        "in join, but it also reduces parallelism and could possibly cause OOM 
for " +
-        "shuffled hash join.")
-      .version("3.1.0")
-      .booleanConf
-      .createWithDefault(false)
+  object BucketReadStrategyInJoin extends Enumeration {
+    val COALESCE, REPARTITION, OFF = Value
+  }
 
-  val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
-    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
-      .doc("The ratio of the number of two buckets being coalesced should be 
less than or " +
-        "equal to this value for bucket coalescing to be applied. This 
configuration only " +
-        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set 
to true.")
+  val BUCKET_READ_STRATEGY_IN_JOIN =
+    buildConf("spark.sql.bucketing.bucketReadStrategyInJoin")
+      .doc("When set to COALESCE, if two bucketed tables with the different 
number of buckets " +

Review comment:
       nit: shall we first mention the allowed values to be "one of COALESCE, 
REPARTITION, OFF"? User might not follow exactly after long description here. 
Also probably worth to mention by default is "OFF" where we do not coalesce and 
repartition.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -548,22 +560,42 @@ case class FileSourceScanExec(
       filesGroupedToBuckets
     }
 
-    val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets 
=>
-      logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
-      val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % 
numCoalescedBuckets)
-      Seq.tabulate(numCoalescedBuckets) { bucketId =>
-        val partitionedFiles = coalescedBuckets.get(bucketId).map {
-          _.values.flatten.toArray
-        }.getOrElse(Array.empty)
-        FilePartition(bucketId, partitionedFiles)
-      }
-    }.getOrElse {
-      Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+    if (optionalNewNumBuckets.isEmpty) {
+      val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
         FilePartition(bucketId, 
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
       }
+      new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+    } else {
+      val newNumBuckets = optionalNewNumBuckets.get
+      if (newNumBuckets < bucketSpec.numBuckets) {
+        assert(bucketSpec.numBuckets % newNumBuckets == 0)
+        logInfo(s"Coalescing to $newNumBuckets buckets from 
${bucketSpec.numBuckets} buckets")
+        val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % 
newNumBuckets)
+        val filePartitions = Seq.tabulate(newNumBuckets) { bucketId =>
+          val partitionedFiles = coalescedBuckets
+            .get(bucketId)
+            .map(_.values.flatten.toArray)
+            .getOrElse(Array.empty)
+          FilePartition(bucketId, partitionedFiles)
+        }
+        new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+      } else {
+        assert(newNumBuckets % bucketSpec.numBuckets == 0)
+        logInfo(s"Repartitioning to $newNumBuckets buckets from 
${bucketSpec.numBuckets} buckets")
+        val filePartitions = Seq.tabulate(newNumBuckets) { bucketId =>
+          FilePartition(
+            bucketId,
+            prunedFilesGroupedToBuckets.getOrElse(bucketId % 
bucketSpec.numBuckets, Array.empty))
+        }
+        // There are now more files to be read.
+        val filesNum = filePartitions.map(_.files.size.toLong).sum
+        val filesSize = filePartitions.map(_.files.map(_.length).sum).sum
+        driverMetrics("numFiles") = filesNum

Review comment:
       per `setFilesNumAndSizeMetric`, should we set `staticFilesNum` here or 
`numFiles ` ?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2655,24 +2655,34 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val COALESCE_BUCKETS_IN_JOIN_ENABLED =
-    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
-      .doc("When true, if two bucketed tables with the different number of 
buckets are joined, " +
-        "the side with a bigger number of buckets will be coalesced to have 
the same number " +
-        "of buckets as the other side. Bigger number of buckets is divisible 
by the smaller " +
-        "number of buckets. Bucket coalescing is applied to sort-merge joins 
and " +
-        "shuffled hash join. Note: Coalescing bucketed table can avoid 
unnecessary shuffling " +
-        "in join, but it also reduces parallelism and could possibly cause OOM 
for " +
-        "shuffled hash join.")
-      .version("3.1.0")
-      .booleanConf
-      .createWithDefault(false)
+  object BucketReadStrategyInJoin extends Enumeration {
+    val COALESCE, REPARTITION, OFF = Value
+  }
 
-  val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
-    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
-      .doc("The ratio of the number of two buckets being coalesced should be 
less than or " +
-        "equal to this value for bucket coalescing to be applied. This 
configuration only " +
-        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set 
to true.")
+  val BUCKET_READ_STRATEGY_IN_JOIN =
+    buildConf("spark.sql.bucketing.bucketReadStrategyInJoin")

Review comment:
       nit: shall we have a name also consistent with existing config 
"spark.sql.sources.bucketing", e.g. 
"spark.sql.sources.bucketing.readStrategyInJoin". No big deal, but 
"bucketing.bucket..." seems a little bit verbose. Point out here because users 
might depend on this config for bucketing optimization and raise questions for 
developers with this config.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -164,15 +165,24 @@ case class FileSourceScanExec(
     requiredSchema: StructType,
     partitionFilters: Seq[Expression],
     optionalBucketSet: Option[BitSet],
-    optionalNumCoalescedBuckets: Option[Int],
+    optionalNewNumBuckets: Option[Int],
     dataFilters: Seq[Expression],
     tableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec {
 
   // Note that some vals referring the file-based relation are lazy 
intentionally
   // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
   override lazy val supportsColumnar: Boolean = {
-    relation.fileFormat.supportBatch(relation.sparkSession, schema)
+    // `RepartitioningBucketRDD` converts columnar batches to rows to 
calculate bucket id for each
+    // row, thus columnar is not supported when `RepartitioningBucketRDD` is 
used to avoid
+    // conversions from batches to rows and back to batches.
+    relation.fileFormat.supportBatch(relation.sparkSession, schema) && 
!isRepartitioningBuckets
+  }
+
+  @transient private lazy val isRepartitioningBuckets: Boolean = {
+    relation.bucketSpec.isDefined &&
+      optionalNewNumBuckets.isDefined &&
+      optionalNewNumBuckets.get > relation.bucketSpec.get.numBuckets

Review comment:
       we don't want to do the repartition if user disables bucketing right?
   better to do with `bucketedScan && optionalNewNumBuckets.isDefined && 
optionalNewNumBuckets.get > relation.bucketSpec.get.numBuckets` ?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2655,24 +2655,34 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  val COALESCE_BUCKETS_IN_JOIN_ENABLED =
-    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
-      .doc("When true, if two bucketed tables with the different number of 
buckets are joined, " +
-        "the side with a bigger number of buckets will be coalesced to have 
the same number " +
-        "of buckets as the other side. Bigger number of buckets is divisible 
by the smaller " +
-        "number of buckets. Bucket coalescing is applied to sort-merge joins 
and " +
-        "shuffled hash join. Note: Coalescing bucketed table can avoid 
unnecessary shuffling " +
-        "in join, but it also reduces parallelism and could possibly cause OOM 
for " +
-        "shuffled hash join.")
-      .version("3.1.0")
-      .booleanConf
-      .createWithDefault(false)
+  object BucketReadStrategyInJoin extends Enumeration {
+    val COALESCE, REPARTITION, OFF = Value
+  }
 
-  val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
-    buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
-      .doc("The ratio of the number of two buckets being coalesced should be 
less than or " +
-        "equal to this value for bucket coalescing to be applied. This 
configuration only " +
-        s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set 
to true.")
+  val BUCKET_READ_STRATEGY_IN_JOIN =
+    buildConf("spark.sql.bucketing.bucketReadStrategyInJoin")
+      .doc("When set to COALESCE, if two bucketed tables with the different 
number of buckets " +
+        "are joined, the side with a bigger number of buckets will be 
coalesced to have the same " +
+        "number of buckets as the other side. When set to REPARTITION, the 
side with a bigger " +
+        "number of buckets will be repartitioned to have the same number of 
buckets as the other " +
+        "side. The bigger number of buckets must be divisible by the smaller 
number of buckets, " +
+        "and the strategy is applied to sort-merge joins and shuffled hash 
joins. " +
+        "Note: Coalescing bucketed table can avoid unnecessary shuffle in 
join, but it also " +
+        "reduces parallelism and could possibly cause OOM for shuffled hash 
join. Repartitioning " +
+        "bucketed table avoids unnecessary shuffle in join while maintaining 
the parallelism " +
+        "at the cost of reading duplicate data.")
+      .version("3.1.0")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(BucketReadStrategyInJoin.values.map(_.toString))
+      .createWithDefault(BucketReadStrategyInJoin.OFF.toString)
+
+  val BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO =
+    buildConf("spark.sql.bucketing.bucketReadStrategyInJoin.maxBucketRatio")

Review comment:
       nit: same as above, might be just 
""spark.sql.sources.bucketing.readStrategyInJoinMaxBucketRatio" ?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -314,7 +324,7 @@ case class FileSourceScanExec(
         val singleFilePartitions = bucketToFilesGrouping.forall(p => 
p._2.length <= 1)
 
         // TODO SPARK-24528 Sort order is currently ignored if buckets are 
coalesced.
-        if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) {
+        if (singleFilePartitions && (optionalNewNumBuckets.isEmpty || 
isRepartitioningBuckets)) {

Review comment:
       we don't need `|| isRepartitioningBuckets` right? 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -548,22 +560,42 @@ case class FileSourceScanExec(
       filesGroupedToBuckets
     }
 
-    val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets 
=>
-      logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
-      val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % 
numCoalescedBuckets)
-      Seq.tabulate(numCoalescedBuckets) { bucketId =>
-        val partitionedFiles = coalescedBuckets.get(bucketId).map {
-          _.values.flatten.toArray
-        }.getOrElse(Array.empty)
-        FilePartition(bucketId, partitionedFiles)
-      }
-    }.getOrElse {
-      Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+    if (optionalNewNumBuckets.isEmpty) {
+      val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
         FilePartition(bucketId, 
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
       }
+      new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+    } else {
+      val newNumBuckets = optionalNewNumBuckets.get
+      if (newNumBuckets < bucketSpec.numBuckets) {
+        assert(bucketSpec.numBuckets % newNumBuckets == 0)
+        logInfo(s"Coalescing to $newNumBuckets buckets from 
${bucketSpec.numBuckets} buckets")
+        val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 % 
newNumBuckets)
+        val filePartitions = Seq.tabulate(newNumBuckets) { bucketId =>
+          val partitionedFiles = coalescedBuckets
+            .get(bucketId)
+            .map(_.values.flatten.toArray)
+            .getOrElse(Array.empty)
+          FilePartition(bucketId, partitionedFiles)
+        }
+        new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+      } else {
+        assert(newNumBuckets % bucketSpec.numBuckets == 0)
+        logInfo(s"Repartitioning to $newNumBuckets buckets from 
${bucketSpec.numBuckets} buckets")
+        val filePartitions = Seq.tabulate(newNumBuckets) { bucketId =>
+          FilePartition(
+            bucketId,
+            prunedFilesGroupedToBuckets.getOrElse(bucketId % 
bucketSpec.numBuckets, Array.empty))
+        }
+        // There are now more files to be read.
+        val filesNum = filePartitions.map(_.files.size.toLong).sum
+        val filesSize = filePartitions.map(_.files.map(_.length).sum).sum
+        driverMetrics("numFiles") = filesNum
+        driverMetrics("filesSize") = filesSize
+        new BucketRepartitioningRDD(
+          fsRelation.sparkSession, readFile, filePartitions, bucketSpec, 
newNumBuckets, output)

Review comment:
       nit: we can pass 
`outputPartitioning.asInstanceOf[HashPartitioning].partitionIdExpression`, so 
do not need to re-do logic to get `bucketIdExpression` in 
`BucketRepartitioningRDD`.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/BucketRepartitioningRDD.scala
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
PartitionedFile}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+/**
+ * An RDD that filters out the rows that do not belong to the current bucket 
file being read.
+ */
+private[spark] class BucketRepartitioningRDD(
+    @transient private val sparkSession: SparkSession,
+    readFunction: PartitionedFile => Iterator[InternalRow],
+    @transient override val filePartitions: Seq[FilePartition],
+    bucketSpec: BucketSpec,
+    numRepartitionedBuckets: Int,
+    output: Seq[Attribute])
+  extends FileScanRDD(sparkSession, readFunction, filePartitions) {
+  assert(numRepartitionedBuckets > bucketSpec.numBuckets)

Review comment:
       nit: probably just one assert for `numRepartitionedBuckets > 
bucketSpec.numBuckets && numRepartitionedBuckets % bucketSpec.numBuckets == 0`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to