agrawaldevesh commented on a change in pull request #29104:
URL: https://github.com/apache/spark/pull/29104#discussion_r457589817
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2671,6 +2671,25 @@ object SQLConf {
.checkValue(_ > 0, "The difference must be positive.")
.createWithDefault(4)
+ val NULL_AWARE_ANTI_JOIN_OPTIMIZE_ENABLED =
Review comment:
I am assuming that you only have two codepaths / configs to exercise
this for testing, Right ?
Do you think we will be entering production with both
NULL_AWARE_ANTI_JOIN_OPTIMIZE_ENABLED and NULL_AWARE_ANTI_JOIN_OPTIMIZE_USE_BHJ
?
My assumption is that the current preference is leaning more for
NULL_AWARE_ANTI_JOIN_OPTIMIZE_USE_BHJ, and
NULL_AWARE_ANTI_JOIN_OPTIMIZE_ENABLED will be pruned as the PR progresses. Is
that fair to say ?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -2671,6 +2671,25 @@ object SQLConf {
.checkValue(_ > 0, "The difference must be positive.")
.createWithDefault(4)
+ val NULL_AWARE_ANTI_JOIN_OPTIMIZE_ENABLED =
+ buildConf("spark.sql.nullAware.antiJoin.optimize.enabled")
+ .internal()
+ .doc("When true, NULL-aware anti join execution will be planed into " +
+ "BroadcastNullAwareHashJoinExec, " +
Review comment:
Should this doc string be updated now ?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -385,55 +408,101 @@ case class BroadcastHashJoinExec(
val (matched, checkCondition, _) = getJoinCondition(ctx, input)
val numOutput = metricTerm(ctx, "numOutputRows")
- if (uniqueKeyCodePath) {
- val found = ctx.freshName("found")
- s"""
- |boolean $found = false;
- |// generate join key for stream side
- |${keyEv.code}
- |// Check if the key has nulls.
- |if (!($anyNull)) {
- | // Check if the HashedRelation exists.
- | UnsafeRow $matched =
(UnsafeRow)$relationTerm.getValue(${keyEv.value});
- | if ($matched != null) {
- | // Evaluate the condition.
- | $checkCondition {
- | $found = true;
- | }
- | }
- |}
- |if (!$found) {
- | $numOutput.add(1);
- | ${consume(ctx, input)}
- |}
- """.stripMargin
+ if (isNullAwareAntiJoin) {
+ require(leftKeys.length == 1, "leftKeys length should be 1")
+ require(rightKeys.length == 1, "rightKeys length should be 1")
+ require(right.output.length == 1, "not in subquery hash join optimize
only single column.")
+ require(joinType == LeftAnti, "joinType must be LeftAnti.")
+ require(buildSide == BuildRight, "buildSide must be BuildRight.")
+ require(SQLConf.get.nullAwareAntiJoinOptimizeEnabled,
+ "nullAwareAntiJoinOptimizeEnabled must turn on for
BroadcastNullAwareHashJoinExec.")
+ require(checkCondition == "", "not in subquery hash join optimize empty
condition.")
+
+ if (broadcastRelation.value.inputEmpty) {
+ s"""
+ |// singleColumn NAAJ inputEmpty(true) accept all
+ |$numOutput.add(1);
+ |${consume(ctx, input)}
+ """.stripMargin
+ } else if (broadcastRelation.value.anyNullKeyExists) {
+ s"""
+ |// singleColumn NAAJ inputEmpty(false) anyNullKeyExists(true)
reject all
+ """.stripMargin
+ } else {
+ val found = ctx.freshName("found")
+ s"""
+ |// singleColumn NAAJ inputEmpty(false) anyNullKeyExists(false)
+ |boolean $found = false;
+ |// generate join key for stream side
+ |${keyEv.code}
+ |// Check if the key has nulls.
+ |if (!($anyNull)) {
+ | // Check if the HashedRelation exists.
+ | UnsafeRow $matched =
(UnsafeRow)$relationTerm.getValue(${keyEv.value});
+ | if ($matched != null) {
+ | $found = true;
+ | }
+ |} else {
+ | $found = true;
+ |}
+ |
+ |if (!$found) {
+ | $numOutput.add(1);
+ | ${consume(ctx, input)}
+ |}
+ """.stripMargin
+ }
} else {
Review comment:
I am curious if it might be possible to apply the NAAJ special handling
before, and then fall back onto the regular hash probing logic (that has an
optimization for isUniqueKeyCodePath). Something like this:
(This only holds true for the single key case)
- Check NAAJ special cases: Build side has null, Build side is genuinely
empty.
- If the stream side has a null key, ignore that stream row.
- Then probe the HT regularly and if there is any match, then don't return
the row from the stream side. Else return.
My point is that the uniqueKeyCodePath pertains to the last step only of the
steps above and perhaps does not need to be handled separately whether NAAJ is
enabled or not.
I haven't thought through this, so it may not work at all :-P
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -385,55 +408,101 @@ case class BroadcastHashJoinExec(
val (matched, checkCondition, _) = getJoinCondition(ctx, input)
val numOutput = metricTerm(ctx, "numOutputRows")
- if (uniqueKeyCodePath) {
- val found = ctx.freshName("found")
- s"""
- |boolean $found = false;
- |// generate join key for stream side
- |${keyEv.code}
- |// Check if the key has nulls.
- |if (!($anyNull)) {
- | // Check if the HashedRelation exists.
- | UnsafeRow $matched =
(UnsafeRow)$relationTerm.getValue(${keyEv.value});
- | if ($matched != null) {
- | // Evaluate the condition.
- | $checkCondition {
- | $found = true;
- | }
- | }
- |}
- |if (!$found) {
- | $numOutput.add(1);
- | ${consume(ctx, input)}
- |}
- """.stripMargin
+ if (isNullAwareAntiJoin) {
+ require(leftKeys.length == 1, "leftKeys length should be 1")
+ require(rightKeys.length == 1, "rightKeys length should be 1")
+ require(right.output.length == 1, "not in subquery hash join optimize
only single column.")
+ require(joinType == LeftAnti, "joinType must be LeftAnti.")
+ require(buildSide == BuildRight, "buildSide must be BuildRight.")
+ require(SQLConf.get.nullAwareAntiJoinOptimizeEnabled,
+ "nullAwareAntiJoinOptimizeEnabled must turn on for
BroadcastNullAwareHashJoinExec.")
Review comment:
nit: nullAwareAntiJoinOptimizeEnabled must be on for
BroadcastNullAwareHashJoinExec
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -44,7 +45,7 @@ case class BroadcastHashJoinExec(
buildSide: BuildSide,
condition: Option[Expression],
left: SparkPlan,
- right: SparkPlan)
+ right: SparkPlan, isNullAwareAntiJoin: Boolean = false)
Review comment:
nit: Can you the `isNullAwareAntiJoin` on a new line.
Also, just to make sure: This should show up in the .sql.out files: This
should change the string representation of the node and thus you would probably
need to update some of the `.out` files. I think this should be the intended
behavior.
##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -171,6 +171,29 @@
private volatile MapIterator destructiveIterator = null;
private LinkedList<UnsafeSorterSpillWriter> spillWriters = new
LinkedList<>();
+ private boolean inputEmpty = false;
+ private boolean anyNullKeyExists = false;
+
+ public boolean isInputEmpty()
Review comment:
is `isInputEmpty() == ((numKeys == 0) && !anyNullKeyExists)` ? If so it
can be inlined.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]