c21 commented on a change in pull request #32550:
URL: https://github.com/apache/spark/pull/32550#discussion_r634066487
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
##########
@@ -172,6 +190,14 @@ case object NO_BROADCAST_HASH extends JoinStrategyHint {
override def hintAliases: Set[String] = Set.empty
}
+/**
+ * An internal hint to encourage shuffle hash join, used by adaptive query
execution.
+ */
+case object PREFER_SHUFFLE_HASH extends JoinStrategyHint {
Review comment:
Sorry if I miss anything, why we cannot reuse the existing hint
`SHUFFLE_HASH`?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -598,6 +598,16 @@ object SQLConf {
.bytesConf(ByteUnit.BYTE)
.createOptional
+ val ADAPTIVE_SHUFFLE_HASH_JOIN_LOCAL_MAP_THRESHOLD =
+ buildConf("spark.sql.adaptive.shuffledHashJoinLocalMapThreshold")
+ .doc("Configures the maximum size in bytes for per partition that can be
allowed to build " +
Review comment:
nit: unnecessary `for`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
##########
@@ -255,12 +255,28 @@ trait JoinSelectionHelper {
val buildLeft = if (hintOnly) {
hintToShuffleHashJoinLeft(hint)
} else {
- canBuildLocalHashMapBySize(left, conf) && muchSmaller(left, right)
+ if (hintToPreferShuffleHashJoinLeft(hint)) {
Review comment:
what about the case when both left and right sides meet requirement of
AQE shuffled hash join (i.e. each partition size < threshold) ? Here we always
build left. Shall we build right in case right side is smaller than left side?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala
##########
@@ -28,7 +28,7 @@ import org.apache.spark.util.Utils
class AQEOptimizer(conf: SQLConf) extends RuleExecutor[LogicalPlan] {
private val defaultBatches = Seq(
Batch("Eliminate Unnecessary Join", Once, EliminateUnnecessaryJoin),
- Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin)
+ Batch("Dynamic JoinSelection", Once, DynamicJoinSelection)
Review comment:
nit: `Dynamic Join Selection`?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -598,6 +598,16 @@ object SQLConf {
.bytesConf(ByteUnit.BYTE)
.createOptional
+ val ADAPTIVE_SHUFFLE_HASH_JOIN_LOCAL_MAP_THRESHOLD =
+ buildConf("spark.sql.adaptive.shuffledHashJoinLocalMapThreshold")
+ .doc("Configures the maximum size in bytes for per partition that can be
allowed to build " +
+ "local hash map. If all the partition size not larger than this
threshold, join " +
+ "selection may use shuffled hash join instead of sort merge join
regardless of the " +
Review comment:
I feel `may` is quite vague here. Can't we just say `prefer to use
shuffled hash join instead of sort merge join regardless of ...`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]