agrawaldevesh commented on a change in pull request #29104:
URL: https://github.com/apache/spark/pull/29104#discussion_r457857768
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -498,6 +547,8 @@ case class BroadcastHashJoinExec(
| }
| }
|}
+ |// special case for NullAwareAntiJoin, if anyNull in streamedRow,
row should be dropped.
Review comment:
Wow !. Did you take a look at that tiny code diff. Great job !!
##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -171,6 +171,23 @@
private volatile MapIterator destructiveIterator = null;
private LinkedList<UnsafeSorterSpillWriter> spillWriters = new
LinkedList<>();
+ private boolean anyNullKeyExists = false;
+
+ public boolean inputEmpty()
+ {
+ return ((numKeys == 0) && !anyNullKeyExists);
Review comment:
nit: The outer params can be dropped ?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -71,6 +71,16 @@ private[execution] sealed trait HashedRelation extends
KnownSizeEstimation {
*/
def keyIsUnique: Boolean
+ /**
+ * is input: Iterator[InternalRow] empty
+ */
+ def inputEmpty: Boolean
+
+ /**
+ * anyNull key exists in input
Review comment:
Need more context why this is worthwhile to consider: Perhaps to the
effect that it is only used in null aware anti join.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -71,6 +71,16 @@ private[execution] sealed trait HashedRelation extends
KnownSizeEstimation {
*/
def keyIsUnique: Boolean
+ /**
+ * is input: Iterator[InternalRow] empty
Review comment:
Can you expand this comment please: You can add a "Note that, the hashed
relation can be empty despite the Iterator[InternalRow] being not empty since
the hashed relation skips over null keys"
##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -171,6 +171,23 @@
private volatile MapIterator destructiveIterator = null;
private LinkedList<UnsafeSorterSpillWriter> spillWriters = new
LinkedList<>();
+ private boolean anyNullKeyExists = false;
+
+ public boolean inputEmpty()
+ {
+ return ((numKeys == 0) && !anyNullKeyExists);
+ }
+
+ public boolean isAnyNullKeyExists()
+ {
+ return anyNullKeyExists;
+ }
+
+ public void setAnyNullKeyExists(boolean anyNullKeyExists)
+ {
+ this.anyNullKeyExists = anyNullKeyExists;
Review comment:
So just making sure I am reading this code right: There is no extra scan
of the rows done to know if there are no null keys.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
##########
@@ -388,3 +390,36 @@ object PhysicalWindow {
case _ => None
}
}
+
+object ExtractSingleColumnNullAwareAntiJoin extends JoinSelectionHelper {
+
+ // SingleColumn NullAwareAntiJoin
+ // streamedSideKeys, buildSideKeys
+ // currently these two return Seq[Expression] should have only one element
+ private type ReturnType = (Seq[Expression], Seq[Expression])
+
+ /**
+ * See. [SPARK-32290]
+ * LeftAnti(condition: Or(EqualTo(a=b), IsNull(EqualTo(a=b)))
+ * will almost certainly be planned as a Broadcast Nested Loop join,
+ * which is very time consuming because it's an O(M*N) calculation.
+ * But if it's a single column case, and buildSide data is small enough,
+ * O(M*N) calculation could be optimized into O(M) using hash lookup instead
of loop lookup.
+ */
+ def unapply(join: Join): Option[ReturnType] = join match {
+ case Join(left, right, LeftAnti,
+ Some(Or(EqualTo(leftAttr: AttributeReference, rightAttr:
AttributeReference),
+ IsNull(EqualTo(tmpLeft: AttributeReference, tmpRight:
AttributeReference)))), _)
+ if SQLConf.get.nullAwareAntiJoinOptimizeEnabled &&
+ leftAttr.semanticEquals(tmpLeft) &&
rightAttr.semanticEquals(tmpRight) &&
Review comment:
Should this also refer to `canEvaluate` as done in `ExtractEquiJoinKeys`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]