[GitHub] [spark] cloud-fan commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-18 Thread GitBox


cloud-fan commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r442150993



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` if the following 
conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective 
sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ * COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.
+ */
+case class CoalesceBucketsInSortMergeJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): 
Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if (large % small == 0 &&
+  large / small <= 
conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def updateNumCoalescedBuckets(plan: SparkPlan, numCoalescedBuckets: 
Int): SparkPlan = {
+plan.transformUp {
+  case f: FileSourceScanExec =>
+f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+}
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED)) {
+  return plan
+}
+
+plan transform {
+  case ExtractSortMergeJoinWithBuckets(smj, numLeftBuckets, 
numRightBuckets)
+if numLeftBuckets != numRightBuckets =>
+mayCoalesce(numLeftBuckets, numRightBuckets, conf).map { 
numCoalescedBuckets =>
+  if (numCoalescedBuckets != numLeftBuckets) {
+smj.copy(left = updateNumCoalescedBuckets(smj.left, 
numCoalescedBuckets))
+  } else {
+smj.copy(right = updateNumCoalescedBuckets(smj.right, 
numCoalescedBuckets))
+  }
+}.getOrElse(smj)
+  case other => other
+}
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` where both sides of the join 
have the bucketed
+ * tables and are consisted of only the scan operation.
+ */
+object ExtractSortMergeJoinWithBuckets {
+  private def isScanOperation(plan: SparkPlan): Boolean = plan match {
+case f: FilterExec => isScanOperation(f.child)
+case p: ProjectExec => isScanOperation(p.child)
+case _: FileSourceScanExec => true
+case _ => false
+  }
+
+  private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
+plan.collectFirst {
+  case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
+  f.optionalNumCoalescedBuckets.isEmpty =>
+f.relation.bucketSpec.get
+}
+  }
+
+  /**
+   * The join keys should match with expressions for output partitioning. Note 
that
+   * the ordering does not matter because it will be handled in 
`EnsureRequirements`.
+   */
+  private def satisfiesOutputPartitioning(
+  keys: Seq[Expression],
+  partitioning: Partitioning): Boolean = {
+partitioning match {
+  case HashPartitioning(exprs, _) if exprs.length == keys.length =>
+exprs.forall(e => 

[GitHub] [spark] cloud-fan commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-18 Thread GitBox


cloud-fan commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r442150335



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala
##
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` if the following 
conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - Join keys match with output partition expressions on their respective 
sides.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ * COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.
+ */
+case class CoalesceBucketsInSortMergeJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): 
Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if (large % small == 0 &&
+  large / small <= 
conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def updateNumCoalescedBuckets(plan: SparkPlan, numCoalescedBuckets: 
Int): SparkPlan = {
+plan.transformUp {
+  case f: FileSourceScanExec =>
+f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+}
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED)) {
+  return plan
+}
+
+plan transform {
+  case ExtractSortMergeJoinWithBuckets(smj, numLeftBuckets, 
numRightBuckets)
+if numLeftBuckets != numRightBuckets =>
+mayCoalesce(numLeftBuckets, numRightBuckets, conf).map { 
numCoalescedBuckets =>
+  if (numCoalescedBuckets != numLeftBuckets) {
+smj.copy(left = updateNumCoalescedBuckets(smj.left, 
numCoalescedBuckets))
+  } else {
+smj.copy(right = updateNumCoalescedBuckets(smj.right, 
numCoalescedBuckets))
+  }
+}.getOrElse(smj)
+  case other => other
+}
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` where both sides of the join 
have the bucketed
+ * tables and are consisted of only the scan operation.
+ */
+object ExtractSortMergeJoinWithBuckets {
+  private def isScanOperation(plan: SparkPlan): Boolean = plan match {
+case f: FilterExec => isScanOperation(f.child)
+case p: ProjectExec => isScanOperation(p.child)
+case _: FileSourceScanExec => true
+case _ => false
+  }
+
+  private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = {
+plan.collectFirst {
+  case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty &&
+  f.optionalNumCoalescedBuckets.isEmpty =>
+f.relation.bucketSpec.get
+}
+  }
+
+  /**
+   * The join keys should match with expressions for output partitioning. Note 
that
+   * the ordering does not matter because it will be handled in 
`EnsureRequirements`.

Review comment:
   what can go wrong if we check the order here as well?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL 

[GitHub] [spark] cloud-fan commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-16 Thread GitBox


cloud-fan commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r440783817



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` if the following 
conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED is set to true.
+ *   - The ratio of the number of buckets is less than the value set in
+ * COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.
+ */
+case class CoalesceBucketsInSortMergeJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): 
Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if (large % small == 0 &&
+  large / small <= 
conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def updateNumCoalescedBuckets(plan: SparkPlan, numCoalescedBuckets: 
Int): SparkPlan = {
+plan.transformUp {
+  case f: FileSourceScanExec =>
+f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+}
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+if (!conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED)) {
+  return plan
+}
+
+plan transform {
+  case ExtractSortMergeJoinWithBuckets(smj, numLeftBuckets, 
numRightBuckets)
+if numLeftBuckets != numRightBuckets =>
+mayCoalesce(numLeftBuckets, numRightBuckets, conf).map { 
numCoalescedBuckets =>
+  if (numCoalescedBuckets != numLeftBuckets) {
+smj.copy(left = updateNumCoalescedBuckets(smj.left, 
numCoalescedBuckets))
+  } else {
+smj.copy(right = updateNumCoalescedBuckets(smj.right, 
numCoalescedBuckets))
+  }
+}.getOrElse(smj)
+  case other => other
+}
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` where both sides of the join 
have the bucketed
+ * tables and are consisted of only the scan operation.
+ */
+object ExtractSortMergeJoinWithBuckets {
+  private def isScanOperation(plan: SparkPlan): Boolean = plan match {
+case f: FilterExec => isScanOperation(f.child)
+case p: ProjectExec => isScanOperation(p.child)

Review comment:
   @viirya this is a good point! We should apply this optimizer rule more 
conservatively.
   
   For a sort-merge join with join keys `[k1, k2, ...]`, we should coalesce the 
buckets if the bucket keys are also `[k1, k2, ...]`. The keys can be renamed by 
Project and we should take care of it.
   
   Examples:
   `t1(bucket by a, b) JOIN t2(bucket by c, d) ON a = c AND b = d` should apply
   `t1(bucket by a, b) JOIN (SELECT c AS x, d AS y FROM t2(bucket by c, d)) ON 
a = x AND b = y` should apply
   `t1(bucket by a) JOIN t2(bucket by c) ON b = d` should not apply





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: 

[GitHub] [spark] cloud-fan commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-16 Thread GitBox


cloud-fan commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r440779860



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##
@@ -165,6 +166,7 @@ case class FileSourceScanExec(
 requiredSchema: StructType,
 partitionFilters: Seq[Expression],
 optionalBucketSet: Option[BitSet],
+optionalNumCoalescedBuckets: Option[Int],

Review comment:
   It's easy to fix: just update `FileSourceScanExec.metadata`. Let's do it 
in this PR





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-11 Thread GitBox


cloud-fan commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r438811200



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2595,6 +2595,26 @@ object SQLConf {
   .checkValue(_ > 0, "The timeout value must be positive")
   .createWithDefault(10L)
 
+  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
+buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
+  .doc("When true, if two bucketed tables with the different number of 
buckets are joined, " +
+"the side with a bigger number of buckets will be coalesced to have 
the same number " +
+"of buckets as the other side. Bucket coalescing is applied only to 
sort-merge joins " +
+"and only when the bigger number of buckets is divisible by the 
smaller number of buckets.")
+  .version("3.1.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO =
+
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio")
+  .doc("The ratio of the number of two buckets being coalesced should be 
less than or " +
+"equal to this value for bucket coalescing to be applied. This 
configuration only " +
+s"has an effect when 
'${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.")
+  .version("3.1.0")
+  .intConf
+  .checkValue(_ > 0, "The difference must be positive.")
+  .createWithDefault(10)

Review comment:
   I don't know what's the best default value, but seems better to pick 2 ^ 
n





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-01 Thread GitBox


cloud-fan commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r433262285



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2586,6 +2586,26 @@ object SQLConf {
   .checkValue(_ > 0, "The timeout value must be positive")
   .createWithDefault(10L)
 
+  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
+buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
+  .doc("When true, if two bucketed tables with the different number of 
buckets are joined, " +
+"the side with a bigger number of buckets will be coalesced to have 
the same number " +
+"of buckets as the other side. Bucket coalescing is applied only to 
sort-merge joins " +
+"and only when the bigger number of buckets is divisible by the 
smaller number of buckets.")
+  .version("3.1.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_NUM_BUCKETS_DIFF =
+
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxNumBucketsDiff")
+  .doc("The difference in count of two buckets being coalesced should be 
less than or " +

Review comment:
   shall we use the ratio not the absolute difference? e.g. coalesce 128 
buckets to 1 looks not good. Maybe we can say: we can coalesce the buckets to 
at most n times smaller. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-01 Thread GitBox


cloud-fan commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r433269587



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2885,6 +2905,12 @@ class SQLConf extends Serializable with Logging {
 LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY))
   }
 
+  def coalesceBucketsInSortMergeJoinEnabled: Boolean =
+getConf(COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED)
+
+  def coalesceBucketsInSortMergeJoinMaxNumBucketsDiff: Int =

Review comment:
   nit: we usually don't add methods if the config is only used once. just 
inline it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-01 Thread GitBox


cloud-fan commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r433268298



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` if the following 
conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED is set to true.
+ *   - The difference in the number of buckets is less than the value set in
+ * COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_NUM_BUCKETS_DIFF.
+ */
+case class CoalesceBucketsInSortMergeJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): 
Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if ((large % small == 0) &&
+  ((large - small) <= 
conf.coalesceBucketsInSortMergeJoinMaxNumBucketsDiff)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def updateNumCoalescedBuckets(plan: SparkPlan, numCoalescedBuckets: 
Int): SparkPlan = {
+plan.transformUp {
+  case f: FileSourceScanExec =>
+f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+}
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+if (conf.coalesceBucketsInSortMergeJoinEnabled) {
+  plan transform {
+case ExtractSortMergeJoinWithBuckets(smj, numLeftBuckets, 
numRightBuckets)
+  if numLeftBuckets != numRightBuckets =>
+  mayCoalesce(numLeftBuckets, numRightBuckets, conf).map { 
numCoalescedBuckets =>
+if (numCoalescedBuckets != numLeftBuckets) {
+  smj.copy(left = updateNumCoalescedBuckets(smj.left, 
numCoalescedBuckets))
+} else {
+  smj.copy(right = updateNumCoalescedBuckets(smj.right, 
numCoalescedBuckets))
+}
+  }.getOrElse(smj)
+case other => other
+  }
+} else {
+  plan
+}
+  }
+}
+
+/**
+ * An extractor that extracts `SortMergeJoinExec` where both sides of the join 
have the bucketed
+ * tables and are consisted of only the scan operation.
+ */
+object ExtractSortMergeJoinWithBuckets {
+  private def isScanOperation(plan: SparkPlan): Boolean = {

Review comment:
   nit:
   ```
   private def isScanOperation(plan: SparkPlan): Boolean = {
 case f: FilterExec => isScanOperation(f.child)
 case p: ProjectExec => isScanOperation(p.child)
 case  _: FileSourceScanExec => true
 case _ => false
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-01 Thread GitBox


cloud-fan commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r433266907



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule coalesces one side of the `SortMergeJoin` if the following 
conditions are met:
+ *   - Two bucketed tables are joined.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED is set to true.
+ *   - The difference in the number of buckets is less than the value set in
+ * COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_NUM_BUCKETS_DIFF.
+ */
+case class CoalesceBucketsInSortMergeJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): 
Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if ((large % small == 0) &&
+  ((large - small) <= 
conf.coalesceBucketsInSortMergeJoinMaxNumBucketsDiff)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def updateNumCoalescedBuckets(plan: SparkPlan, numCoalescedBuckets: 
Int): SparkPlan = {
+plan.transformUp {
+  case f: FileSourceScanExec =>
+f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
+}
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+if (conf.coalesceBucketsInSortMergeJoinEnabled) {

Review comment:
   nit: a common code style is:
   ```
   if (not enabled) return plan
   ...
   ```
   
   This can save one indentation level





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-01 Thread GitBox


cloud-fan commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r433262285



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2586,6 +2586,26 @@ object SQLConf {
   .checkValue(_ > 0, "The timeout value must be positive")
   .createWithDefault(10L)
 
+  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
+buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
+  .doc("When true, if two bucketed tables with the different number of 
buckets are joined, " +
+"the side with a bigger number of buckets will be coalesced to have 
the same number " +
+"of buckets as the other side. Bucket coalescing is applied only to 
sort-merge joins " +
+"and only when the bigger number of buckets is divisible by the 
smaller number of buckets.")
+  .version("3.1.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_NUM_BUCKETS_DIFF =
+
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxNumBucketsDiff")
+  .doc("The difference in count of two buckets being coalesced should be 
less than or " +

Review comment:
   shall we use the ration not the absolute difference? e.g. coalesce 128 
buckets to 1 looks not good. Maybe we can say: we can coalesce the buckets to 
at most n times smaller. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if applicable

2020-06-01 Thread GitBox


cloud-fan commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r433262285



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2586,6 +2586,26 @@ object SQLConf {
   .checkValue(_ > 0, "The timeout value must be positive")
   .createWithDefault(10L)
 
+  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED =
+buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled")
+  .doc("When true, if two bucketed tables with the different number of 
buckets are joined, " +
+"the side with a bigger number of buckets will be coalesced to have 
the same number " +
+"of buckets as the other side. Bucket coalescing is applied only to 
sort-merge joins " +
+"and only when the bigger number of buckets is divisible by the 
smaller number of buckets.")
+  .version("3.1.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_NUM_BUCKETS_DIFF =
+
buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxNumBucketsDiff")
+  .doc("The difference in count of two buckets being coalesced should be 
less than or " +

Review comment:
   shall we use the ration not the absolute difference? e.g. coalesce 128 
buckets to 1 looks not good. Maybe we can say: we can coalesce the buckets to 
at most n times slower. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org