c21 commented on a change in pull request #29473:
URL: https://github.com/apache/spark/pull/29473#discussion_r479612474
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceOrRepartitionBucketsInJoin.scala
##########
@@ -27,45 +27,48 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec,
ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.joins.{BaseJoinExec,
ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.BucketReadStrategyInJoin
/**
- * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
+ * This rule coalesces or repartitions one side of the `SortMergeJoin` and
`ShuffledHashJoin`
* if the following conditions are met:
* - Two bucketed tables are joined.
* - Join keys match with output partition expressions on their respective
sides.
* - The larger bucket number is divisible by the smaller bucket number.
- * - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true.
* - The ratio of the number of buckets is less than the value set in
- * COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
+ * COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.
Review comment:
nit: shouldn't it be `BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO` ?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2655,24 +2655,34 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- val COALESCE_BUCKETS_IN_JOIN_ENABLED =
- buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
- .doc("When true, if two bucketed tables with the different number of
buckets are joined, " +
- "the side with a bigger number of buckets will be coalesced to have
the same number " +
- "of buckets as the other side. Bigger number of buckets is divisible
by the smaller " +
- "number of buckets. Bucket coalescing is applied to sort-merge joins
and " +
- "shuffled hash join. Note: Coalescing bucketed table can avoid
unnecessary shuffling " +
- "in join, but it also reduces parallelism and could possibly cause OOM
for " +
- "shuffled hash join.")
- .version("3.1.0")
- .booleanConf
- .createWithDefault(false)
+ object BucketReadStrategyInJoin extends Enumeration {
+ val COALESCE, REPARTITION, OFF = Value
+ }
- val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
- buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
- .doc("The ratio of the number of two buckets being coalesced should be
less than or " +
- "equal to this value for bucket coalescing to be applied. This
configuration only " +
- s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set
to true.")
+ val BUCKET_READ_STRATEGY_IN_JOIN =
+ buildConf("spark.sql.bucketing.bucketReadStrategyInJoin")
+ .doc("When set to COALESCE, if two bucketed tables with the different
number of buckets " +
Review comment:
nit: shall we first mention the allowed values to be "one of COALESCE,
REPARTITION, OFF"? User might not follow exactly after long description here.
Also probably worth to mention by default is "OFF" where we do not coalesce and
repartition.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -548,22 +560,42 @@ case class FileSourceScanExec(
filesGroupedToBuckets
}
- val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets
=>
- logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
- val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 %
numCoalescedBuckets)
- Seq.tabulate(numCoalescedBuckets) { bucketId =>
- val partitionedFiles = coalescedBuckets.get(bucketId).map {
- _.values.flatten.toArray
- }.getOrElse(Array.empty)
- FilePartition(bucketId, partitionedFiles)
- }
- }.getOrElse {
- Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+ if (optionalNewNumBuckets.isEmpty) {
+ val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
FilePartition(bucketId,
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
}
+ new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+ } else {
+ val newNumBuckets = optionalNewNumBuckets.get
+ if (newNumBuckets < bucketSpec.numBuckets) {
+ assert(bucketSpec.numBuckets % newNumBuckets == 0)
+ logInfo(s"Coalescing to $newNumBuckets buckets from
${bucketSpec.numBuckets} buckets")
+ val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 %
newNumBuckets)
+ val filePartitions = Seq.tabulate(newNumBuckets) { bucketId =>
+ val partitionedFiles = coalescedBuckets
+ .get(bucketId)
+ .map(_.values.flatten.toArray)
+ .getOrElse(Array.empty)
+ FilePartition(bucketId, partitionedFiles)
+ }
+ new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+ } else {
+ assert(newNumBuckets % bucketSpec.numBuckets == 0)
+ logInfo(s"Repartitioning to $newNumBuckets buckets from
${bucketSpec.numBuckets} buckets")
+ val filePartitions = Seq.tabulate(newNumBuckets) { bucketId =>
+ FilePartition(
+ bucketId,
+ prunedFilesGroupedToBuckets.getOrElse(bucketId %
bucketSpec.numBuckets, Array.empty))
+ }
+ // There are now more files to be read.
+ val filesNum = filePartitions.map(_.files.size.toLong).sum
+ val filesSize = filePartitions.map(_.files.map(_.length).sum).sum
+ driverMetrics("numFiles") = filesNum
Review comment:
per `setFilesNumAndSizeMetric`, should we set `staticFilesNum` here or
`numFiles ` ?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2655,24 +2655,34 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- val COALESCE_BUCKETS_IN_JOIN_ENABLED =
- buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
- .doc("When true, if two bucketed tables with the different number of
buckets are joined, " +
- "the side with a bigger number of buckets will be coalesced to have
the same number " +
- "of buckets as the other side. Bigger number of buckets is divisible
by the smaller " +
- "number of buckets. Bucket coalescing is applied to sort-merge joins
and " +
- "shuffled hash join. Note: Coalescing bucketed table can avoid
unnecessary shuffling " +
- "in join, but it also reduces parallelism and could possibly cause OOM
for " +
- "shuffled hash join.")
- .version("3.1.0")
- .booleanConf
- .createWithDefault(false)
+ object BucketReadStrategyInJoin extends Enumeration {
+ val COALESCE, REPARTITION, OFF = Value
+ }
- val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
- buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
- .doc("The ratio of the number of two buckets being coalesced should be
less than or " +
- "equal to this value for bucket coalescing to be applied. This
configuration only " +
- s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set
to true.")
+ val BUCKET_READ_STRATEGY_IN_JOIN =
+ buildConf("spark.sql.bucketing.bucketReadStrategyInJoin")
Review comment:
nit: shall we have a name also consistent with existing config
"spark.sql.sources.bucketing", e.g.
"spark.sql.sources.bucketing.readStrategyInJoin". No big deal, but
"bucketing.bucket..." seems a little bit verbose. Point out here because users
might depend on this config for bucketing optimization and raise questions for
developers with this config.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -164,15 +165,24 @@ case class FileSourceScanExec(
requiredSchema: StructType,
partitionFilters: Seq[Expression],
optionalBucketSet: Option[BitSet],
- optionalNumCoalescedBuckets: Option[Int],
+ optionalNewNumBuckets: Option[Int],
dataFilters: Seq[Expression],
tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec {
// Note that some vals referring the file-based relation are lazy
intentionally
// so that this plan can be canonicalized on executor side too. See
SPARK-23731.
override lazy val supportsColumnar: Boolean = {
- relation.fileFormat.supportBatch(relation.sparkSession, schema)
+ // `RepartitioningBucketRDD` converts columnar batches to rows to
calculate bucket id for each
+ // row, thus columnar is not supported when `RepartitioningBucketRDD` is
used to avoid
+ // conversions from batches to rows and back to batches.
+ relation.fileFormat.supportBatch(relation.sparkSession, schema) &&
!isRepartitioningBuckets
+ }
+
+ @transient private lazy val isRepartitioningBuckets: Boolean = {
+ relation.bucketSpec.isDefined &&
+ optionalNewNumBuckets.isDefined &&
+ optionalNewNumBuckets.get > relation.bucketSpec.get.numBuckets
Review comment:
we don't want to do the repartition if user disables bucketing right?
better to do with `bucketedScan && optionalNewNumBuckets.isDefined &&
optionalNewNumBuckets.get > relation.bucketSpec.get.numBuckets` ?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2655,24 +2655,34 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
- val COALESCE_BUCKETS_IN_JOIN_ENABLED =
- buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled")
- .doc("When true, if two bucketed tables with the different number of
buckets are joined, " +
- "the side with a bigger number of buckets will be coalesced to have
the same number " +
- "of buckets as the other side. Bigger number of buckets is divisible
by the smaller " +
- "number of buckets. Bucket coalescing is applied to sort-merge joins
and " +
- "shuffled hash join. Note: Coalescing bucketed table can avoid
unnecessary shuffling " +
- "in join, but it also reduces parallelism and could possibly cause OOM
for " +
- "shuffled hash join.")
- .version("3.1.0")
- .booleanConf
- .createWithDefault(false)
+ object BucketReadStrategyInJoin extends Enumeration {
+ val COALESCE, REPARTITION, OFF = Value
+ }
- val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO =
- buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio")
- .doc("The ratio of the number of two buckets being coalesced should be
less than or " +
- "equal to this value for bucket coalescing to be applied. This
configuration only " +
- s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set
to true.")
+ val BUCKET_READ_STRATEGY_IN_JOIN =
+ buildConf("spark.sql.bucketing.bucketReadStrategyInJoin")
+ .doc("When set to COALESCE, if two bucketed tables with the different
number of buckets " +
+ "are joined, the side with a bigger number of buckets will be
coalesced to have the same " +
+ "number of buckets as the other side. When set to REPARTITION, the
side with a bigger " +
+ "number of buckets will be repartitioned to have the same number of
buckets as the other " +
+ "side. The bigger number of buckets must be divisible by the smaller
number of buckets, " +
+ "and the strategy is applied to sort-merge joins and shuffled hash
joins. " +
+ "Note: Coalescing bucketed table can avoid unnecessary shuffle in
join, but it also " +
+ "reduces parallelism and could possibly cause OOM for shuffled hash
join. Repartitioning " +
+ "bucketed table avoids unnecessary shuffle in join while maintaining
the parallelism " +
+ "at the cost of reading duplicate data.")
+ .version("3.1.0")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(BucketReadStrategyInJoin.values.map(_.toString))
+ .createWithDefault(BucketReadStrategyInJoin.OFF.toString)
+
+ val BUCKET_READ_STRATEGY_IN_JOIN_MAX_BUCKET_RATIO =
+ buildConf("spark.sql.bucketing.bucketReadStrategyInJoin.maxBucketRatio")
Review comment:
nit: same as above, might be just
""spark.sql.sources.bucketing.readStrategyInJoinMaxBucketRatio" ?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -314,7 +324,7 @@ case class FileSourceScanExec(
val singleFilePartitions = bucketToFilesGrouping.forall(p =>
p._2.length <= 1)
// TODO SPARK-24528 Sort order is currently ignored if buckets are
coalesced.
- if (singleFilePartitions && optionalNumCoalescedBuckets.isEmpty) {
+ if (singleFilePartitions && (optionalNewNumBuckets.isEmpty ||
isRepartitioningBuckets)) {
Review comment:
we don't need `|| isRepartitioningBuckets` right?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -548,22 +560,42 @@ case class FileSourceScanExec(
filesGroupedToBuckets
}
- val filePartitions = optionalNumCoalescedBuckets.map { numCoalescedBuckets
=>
- logInfo(s"Coalescing to ${numCoalescedBuckets} buckets")
- val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 %
numCoalescedBuckets)
- Seq.tabulate(numCoalescedBuckets) { bucketId =>
- val partitionedFiles = coalescedBuckets.get(bucketId).map {
- _.values.flatten.toArray
- }.getOrElse(Array.empty)
- FilePartition(bucketId, partitionedFiles)
- }
- }.getOrElse {
- Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
+ if (optionalNewNumBuckets.isEmpty) {
+ val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
FilePartition(bucketId,
prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty))
}
+ new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+ } else {
+ val newNumBuckets = optionalNewNumBuckets.get
+ if (newNumBuckets < bucketSpec.numBuckets) {
+ assert(bucketSpec.numBuckets % newNumBuckets == 0)
+ logInfo(s"Coalescing to $newNumBuckets buckets from
${bucketSpec.numBuckets} buckets")
+ val coalescedBuckets = prunedFilesGroupedToBuckets.groupBy(_._1 %
newNumBuckets)
+ val filePartitions = Seq.tabulate(newNumBuckets) { bucketId =>
+ val partitionedFiles = coalescedBuckets
+ .get(bucketId)
+ .map(_.values.flatten.toArray)
+ .getOrElse(Array.empty)
+ FilePartition(bucketId, partitionedFiles)
+ }
+ new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
+ } else {
+ assert(newNumBuckets % bucketSpec.numBuckets == 0)
+ logInfo(s"Repartitioning to $newNumBuckets buckets from
${bucketSpec.numBuckets} buckets")
+ val filePartitions = Seq.tabulate(newNumBuckets) { bucketId =>
+ FilePartition(
+ bucketId,
+ prunedFilesGroupedToBuckets.getOrElse(bucketId %
bucketSpec.numBuckets, Array.empty))
+ }
+ // There are now more files to be read.
+ val filesNum = filePartitions.map(_.files.size.toLong).sum
+ val filesSize = filePartitions.map(_.files.map(_.length).sum).sum
+ driverMetrics("numFiles") = filesNum
+ driverMetrics("filesSize") = filesSize
+ new BucketRepartitioningRDD(
+ fsRelation.sparkSession, readFile, filePartitions, bucketSpec,
newNumBuckets, output)
Review comment:
nit: we can pass
`outputPartitioning.asInstanceOf[HashPartitioning].partitionIdExpression`, so
do not need to re-do logic to get `bucketIdExpression` in
`BucketRepartitioningRDD`.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/BucketRepartitioningRDD.scala
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD,
PartitionedFile}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+/**
+ * An RDD that filters out the rows that do not belong to the current bucket
file being read.
+ */
+private[spark] class BucketRepartitioningRDD(
+ @transient private val sparkSession: SparkSession,
+ readFunction: PartitionedFile => Iterator[InternalRow],
+ @transient override val filePartitions: Seq[FilePartition],
+ bucketSpec: BucketSpec,
+ numRepartitionedBuckets: Int,
+ output: Seq[Attribute])
+ extends FileScanRDD(sparkSession, readFunction, filePartitions) {
+ assert(numRepartitionedBuckets > bucketSpec.numBuckets)
Review comment:
nit: probably just one assert for `numRepartitionedBuckets >
bucketSpec.numBuckets && numRepartitionedBuckets % bucketSpec.numBuckets == 0`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]