agrawaldevesh commented on a change in pull request #29104:
URL: https://github.com/apache/spark/pull/29104#discussion_r460176730



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -323,11 +374,20 @@ private[joins] object UnsafeHashedRelation {
     // Create a mapping of buildKeys -> rows
     val keyGenerator = UnsafeProjection.create(key)
     var numFields = 0
+    val numKeys = key.length
+    val isOriginalInputEmpty = !input.hasNext
+    var allNullColumnKeyExistsInOriginalInput: Boolean = false
     while (input.hasNext) {
       val row = input.next().asInstanceOf[UnsafeRow]
       numFields = row.numFields()
       val key = keyGenerator(row)
-      if (!key.anyNull) {
+      if (isNullAware &&
+        !allNullColumnKeyExistsInOriginalInput &&
+        (0 until numKeys).forall(key.isNullAt)) {

Review comment:
       Dumb question, which I know I have asked before: Can `numKeys` be more 
than 1 now given that we are only considering the single key case ? If not, 
should this `allNullColumnKeyExistsInOriginalInput` be changed to 
`keyWithSingleNullColumnExistsInOriginalInput` (as an example name) in addition 
to changing the `forall` above to simply check for `key.isNullAll(0)` ? 
   
   If you do consider making this change, just a reminder to change 
`allNullColumnKeyExistsInOriginalInput` in the `LongHashedRelation` too

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -454,6 +491,48 @@ case class BroadcastHashJoinExec(
     val (matched, checkCondition, _) = getJoinCondition(ctx, input)
     val numOutput = metricTerm(ctx, "numOutputRows")
 
+    // fast stop if isOriginalInputEmpty = true, should accept all rows in 
streamedSide
+    if (broadcastRelation.value.isOriginalInputEmpty) {
+      return s"""
+                |// Anti Join isOriginalInputEmpty(true) accept all
+                |$numOutput.add(1);
+                |${consume(ctx, input)}
+          """.stripMargin
+    }
+
+    if (isNullAwareAntiJoin) {
+      if (broadcastRelation.value.allNullColumnKeyExistsInOriginalInput) {
+        return s"""
+                  |// NAAJ
+                  |// isOriginalInputEmpty(false) 
allNullColumnKeyExistsInOriginalInput(true)
+                  |// reject all
+            """.stripMargin
+      } else {
+        val found = ctx.freshName("found")
+        return s"""
+                  |// NAAJ
+                  |// isOriginalInputEmpty(false) 
allNullColumnKeyExistsInOriginalInput(false)
+                  |boolean $found = false;
+                  |// generate join key for stream side
+                  |${keyEv.code}
+                  |// anyNull is equivalent to allNull since it's a 
single-column key.

Review comment:
       nit: same, this reference to allNull is distracting now.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -133,10 +142,38 @@ case class BroadcastHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
 
     val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
-    streamedPlan.execute().mapPartitions { streamedIter =>
-      val hashed = broadcastRelation.value.asReadOnlyCopy()
-      
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
-      join(streamedIter, hashed, numOutputRows)
+    if (isNullAwareAntiJoin) {
+      streamedPlan.execute().mapPartitionsInternal { streamedIter =>
+        val hashed = broadcastRelation.value.asReadOnlyCopy()
+        
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
+        if (hashed.isOriginalInputEmpty) {
+          streamedIter
+        } else if (hashed.allNullColumnKeyExistsInOriginalInput) {
+          Iterator.empty
+        } else {
+          val keyGenerator = UnsafeProjection.create(
+            BindReferences.bindReferences[Expression](
+              leftKeys,
+              AttributeSeq(left.output))
+          )
+          streamedIter.filter(row => {
+            val lookupKey: UnsafeRow = keyGenerator(row)
+            // anyNull is equivalent to allNull since it's a single-column key.

Review comment:
       nit: I think this comment is distracting now that there is no such thing 
as 'allNull'

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -896,22 +969,32 @@ private[joins] object LongHashedRelation {
 
     // Create a mapping of key -> rows
     var numFields = 0
+    val isOriginalInputEmpty: Boolean = !input.hasNext
+    var allNullColumnKeyExistsInOriginalInput: Boolean = false
     while (input.hasNext) {
       val unsafeRow = input.next().asInstanceOf[UnsafeRow]
       numFields = unsafeRow.numFields()
       val rowKey = keyGenerator(unsafeRow)
       if (!rowKey.isNullAt(0)) {
+        // LongToUnsafeRowMap can't insert null key
         val key = rowKey.getLong(0)
         map.append(key, unsafeRow)
+      } else if (!allNullColumnKeyExistsInOriginalInput) {

Review comment:
       Should this check also incorporate `isNullAware`: `else if (isNullAware 
&& ! allNullColumnKeyExistsInOriginalInput)`

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -81,6 +81,34 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
    */
   def asReadOnlyCopy(): HashedRelation
 
+
+  /**
+   * Normally HashedRelation is built from an Source (input: 
Iterator[InternalRow]).
+   * This indicates the original input is empty.
+   * Note that, the hashed relation can be empty despite the input being not 
empty,
+   * since the hashed relation skips over null keys.
+   */
+  var isOriginalInputEmpty: Boolean
+
+  def setOriginalInputEmtpy(isOriginalInputEmpty: Boolean): HashedRelation = {
+    this.isOriginalInputEmpty = isOriginalInputEmpty
+    this
+  }
+
+  /**
+   * It's only used in null aware anti join.
+   * This will be set true if Source (input: Iterator[InternalRow]) contains a 
key,
+   * which is allNullColumn.

Review comment:
       nit: Can you further expand allNullColumn in the comment: eg, "contains 
a key, which has all null columns"

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -81,6 +81,34 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
    */
   def asReadOnlyCopy(): HashedRelation
 
+
+  /**
+   * Normally HashedRelation is built from an Source (input: 
Iterator[InternalRow]).
+   * This indicates the original input is empty.
+   * Note that, the hashed relation can be empty despite the input being not 
empty,
+   * since the hashed relation skips over null keys.
+   */
+  var isOriginalInputEmpty: Boolean
+
+  def setOriginalInputEmtpy(isOriginalInputEmpty: Boolean): HashedRelation = {
+    this.isOriginalInputEmpty = isOriginalInputEmpty
+    this
+  }
+
+  /**
+   * It's only used in null aware anti join.
+   * This will be set true if Source (input: Iterator[InternalRow]) contains a 
key,
+   * which is allNullColumn.
+   */
+  var allNullColumnKeyExistsInOriginalInput: Boolean
+
+  def setAllNullColumnKeyExistsInOriginalInput(
+      allNullColumnKeyExistsInOriginalInput: Boolean): HashedRelation = {
+    this.allNullColumnKeyExistsInOriginalInput =

Review comment:
       nit: I kind of think this line does not need to be split into two.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -923,9 +1006,9 @@ case class HashedRelationBroadcastMode(key: 
Seq[Expression])
       sizeHint: Option[Long]): HashedRelation = {
     sizeHint match {
       case Some(numRows) =>
-        HashedRelation(rows, canonicalized.key, numRows.toInt)
+        HashedRelation(rows, canonicalized.key, numRows.toInt, isNullAware = 
isNullAware)

Review comment:
       nit: Should this simply be `isNullAware` (ie passed in without a named 
argument) 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -896,22 +969,32 @@ private[joins] object LongHashedRelation {
 
     // Create a mapping of key -> rows
     var numFields = 0
+    val isOriginalInputEmpty: Boolean = !input.hasNext
+    var allNullColumnKeyExistsInOriginalInput: Boolean = false
     while (input.hasNext) {
       val unsafeRow = input.next().asInstanceOf[UnsafeRow]
       numFields = unsafeRow.numFields()
       val rowKey = keyGenerator(unsafeRow)
       if (!rowKey.isNullAt(0)) {
+        // LongToUnsafeRowMap can't insert null key
         val key = rowKey.getLong(0)
         map.append(key, unsafeRow)
+      } else if (!allNullColumnKeyExistsInOriginalInput) {
+        // LongHashedRelation stores single-column key

Review comment:
       nit: I think this comment online 983 is more globally applicable about 
LongHashedRelation: either remove it for fold it in the class description. Same 
might be said for the comment on line 979 above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to