agrawaldevesh commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r470229887



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getValueWithKeyIndex(keys)
+        if (matched != null) {
+          val keyIndex = matched.getKeyIndex
+          val buildRow = matched.getValue
+          val joinRow = joinRowWithBuild(buildRow)
+          if (boundCondition(joinRow)) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          val buildRow = valueRowWithKeyIndex.getValue
+          Some(streamNullJoinRowWithBuild(buildRow))
+        } else {
+          None
+        }
+    }
+
+    streamResultIter ++ buildResultIter
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `HashSet[Long]` is used to track matched rows with
+   *    key index (Int) and value index (Int) together.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index and value index from `HashSet`.
+   */
+  private def fullOuterJoinWithNonUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedRows = new mutable.HashSet[Long]
+
+    def markRowMatched(keyIndex: Int, valueIndex: Int): Unit = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.add(rowIndex)
+    }
+
+    def isRowMatched(keyIndex: Int, valueIndex: Int): Boolean = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.contains(rowIndex)
+    }
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.flatMap { srow =>
+      val joinRow = joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        Iterator.single(joinRowWithBuild(buildNullRow))
+      } else {
+        val matched = hashedRelation.getWithKeyIndex(keys)
+        if (matched != null) {
+          val (keyIndex, buildIter) = (matched._1, matched._2.zipWithIndex)
+
+          new RowIterator {
+            private var found = false
+            override def advanceNext(): Boolean = {
+              while (buildIter.hasNext) {
+                val (buildRow, valueIndex) = buildIter.next()
+                if (boundCondition(joinRowWithBuild(buildRow))) {
+                  markRowMatched(keyIndex, valueIndex)
+                  found = true
+                  return true
+                }
+              }
+              if (!found) {
+                joinRowWithBuild(buildNullRow)
+                found = true

Review comment:
       Actually I see now why this is required. Thanks 

##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -428,6 +428,64 @@ public MapIterator destructiveIterator() {
     return new MapIterator(numValues, new Location(), true);
   }
 
+  /**
+   * Iterator for the entries of this map. This is to first iterate over key 
index array
+   * `longArray` then accessing values in `dataPages`. NOTE: this is different 
from `MapIterator`
+   * in the sense that key index is preserved here
+   * (See `UnsafeHashedRelation` for example of usage).
+   */
+  public final class MapIteratorWithKeyIndex implements Iterator<Location> {
+
+    private int keyIndex = 0;
+    private int numRecords;
+    private final Location loc;
+
+    private MapIteratorWithKeyIndex(int numRecords, Location loc) {
+      this.numRecords = numRecords;
+      this.loc = loc;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return numRecords > 0;
+    }
+
+    @Override
+    public Location next() {
+      if (!loc.isDefined() || !loc.nextValue()) {
+        while (longArray.get(keyIndex * 2) == 0) {
+          keyIndex++;
+        }
+        loc.with(keyIndex, 0, true);
+        keyIndex++;
+      }
+      numRecords--;
+      return loc;
+    }
+  }
+
+  /**
+   * Returns an iterator for iterating over the entries of this map,
+   * by first iterating over the key index inside hash map's `longArray`.
+   *
+   * For efficiency, all calls to `next()` will return the same {@link 
Location} object.
+   *
+   * The returned iterator is NOT thread-safe. If the map is modified while 
iterating over it,
+   * the behavior of the returned iterator is undefined.
+   */
+  public MapIteratorWithKeyIndex iteratorWithKeyIndex() {

Review comment:
       This is the only instantiation of MapIteratorWithKeyIndex. How about 
adding a default constructor with these args ? That would alleviate my 
questions around the bound checking of numRecords.

##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -428,6 +428,62 @@ public MapIterator destructiveIterator() {
     return new MapIterator(numValues, new Location(), true);
   }
 
+  /**
+   * Iterator for the entries of this map. This is to first iterate over key 
index array
+   * `longArray` then accessing values in `dataPages`. NOTE: this is different 
from `MapIterator`
+   * in the sense that key index is preserved here
+   * (See `UnsafeHashedRelation` for example of usage).
+   */
+  public final class MapIteratorWithKeyIndex implements Iterator<Location> {
+
+    private int keyIndex = 0;
+    private int numRecords;
+    private final Location loc;
+
+    private MapIteratorWithKeyIndex(int numRecords, Location loc) {
+      this.numRecords = numRecords;
+      this.loc = loc;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return numRecords > 0;
+    }
+
+    @Override
+    public Location next() {
+      if (!loc.isDefined() || !loc.nextValue()) {
+        while (longArray.get(keyIndex * 2) == 0) {
+          keyIndex++;
+        }
+        loc.with(keyIndex, (int) longArray.get(keyIndex * 2 + 1), true);
+        keyIndex++;

Review comment:
       Obviously you would do the bound checking in the constructor of this 
iterator and not per row. 

##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -428,6 +428,62 @@ public MapIterator destructiveIterator() {
     return new MapIterator(numValues, new Location(), true);
   }
 
+  /**
+   * Iterator for the entries of this map. This is to first iterate over key 
index array
+   * `longArray` then accessing values in `dataPages`. NOTE: this is different 
from `MapIterator`
+   * in the sense that key index is preserved here
+   * (See `UnsafeHashedRelation` for example of usage).
+   */
+  public final class MapIteratorWithKeyIndex implements Iterator<Location> {
+
+    private int keyIndex = 0;

Review comment:
       There is no existing mention of keyIndex in neither HashedRelation nor 
ShuffledHashJoinExec. That is a term that this PR is introducing and thus 
either this PR should clearly define them or stick to consistent nomenclature 
(which sounds hard to do, I agree!).
   
   I am okay with the words keyIndex/valueIndex if you can please define what 
they clearly mean somewhere. How about defining them in BytesToBytesMap ? 
   
   For example: 
   keyIndex: The index in longArray where the key is stored
   valueIndex: The index of the tuple in the chain of tuples having the same 
key. That is the a certain key is found thrice, the value indices of its tuples 
will be 0, 1, and 2. Note that value indices of tuples with different keys are 
incomparable.
   
   (Actually I don't fully understand the concept of valueIndex still :-P, so 
this is my "guess" of what it means).
   
   Is the meaning of keyIndex and valueIndex the same in HashedRelation and 
other places ?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getValueWithKeyIndex(keys)
+        if (matched != null) {
+          val keyIndex = matched.getKeyIndex
+          val buildRow = matched.getValue
+          val joinRow = joinRowWithBuild(buildRow)
+          if (boundCondition(joinRow)) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          val buildRow = valueRowWithKeyIndex.getValue
+          Some(streamNullJoinRowWithBuild(buildRow))

Review comment:
       Oh great. Thanks for the pointer. 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -179,6 +235,63 @@ private[joins] class UnsafeHashedRelation(
     }
   }
 
+  override def getWithKeyIndex(key: InternalRow): (Int, Iterator[InternalRow]) 
= {
+    val unsafeKey = key.asInstanceOf[UnsafeRow]
+    val map = binaryMap  // avoid the compiler error
+    val loc = new map.Location  // this could be allocated in stack
+    binaryMap.safeLookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset,
+      unsafeKey.getSizeInBytes, loc, unsafeKey.hashCode())
+    if (loc.isDefined) {
+      (loc.getKeyIndex,
+        new Iterator[UnsafeRow] {
+          private var _hasNext = true
+          override def hasNext: Boolean = _hasNext
+          override def next(): UnsafeRow = {
+            resultRow.pointTo(loc.getValueBase, loc.getValueOffset, 
loc.getValueLength)
+            _hasNext = loc.nextValue()
+            resultRow
+          }
+        })
+    } else {
+      null
+    }
+  }
+
+  override def getValueWithKeyIndex(key: InternalRow): ValueRowWithKeyIndex = {
+    val unsafeKey = key.asInstanceOf[UnsafeRow]
+    val map = binaryMap  // avoid the compiler error
+    val loc = new map.Location  // this could be allocated in stack
+    binaryMap.safeLookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset,
+      unsafeKey.getSizeInBytes, loc, unsafeKey.hashCode())
+    if (loc.isDefined) {
+      resultRow.pointTo(loc.getValueBase, loc.getValueOffset, 
loc.getValueLength)
+      valueRowWithKeyIndex.updates(loc.getKeyIndex, resultRow)
+    } else {
+      null
+    }
+  }
+
+  override def valuesWithKeyIndex(): Iterator[ValueRowWithKeyIndex] = {
+    val iter = binaryMap.iteratorWithKeyIndex()
+
+    new Iterator[ValueRowWithKeyIndex] {
+      override def hasNext: Boolean = iter.hasNext
+
+      override def next(): ValueRowWithKeyIndex = {
+        if (!hasNext) {
+          throw new NoSuchElementException("End of the iterator")
+        }
+        val loc = iter.next()
+        resultRow.pointTo(loc.getValueBase, loc.getValueOffset, 
loc.getValueLength)

Review comment:
       Okay. Sorry I am new to this code as well :-P But if this is the 
precedent then fine. 
   
   If I was coding this, I would have allocate a new row inside of 
valueRowWithKeyIndex, and then call 
valueRowWithKeyIndex.row.pointTo(loc.getValueBase, loc.getValueOffset, 
loc.getValueLength).
   
   But its okay since there is already precedent.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getValueWithKeyIndex(keys)
+        if (matched != null) {
+          val keyIndex = matched.getKeyIndex
+          val buildRow = matched.getValue
+          val joinRow = joinRowWithBuild(buildRow)
+          if (boundCondition(joinRow)) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          val buildRow = valueRowWithKeyIndex.getValue
+          Some(streamNullJoinRowWithBuild(buildRow))
+        } else {
+          None
+        }
+    }
+
+    streamResultIter ++ buildResultIter
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `HashSet[Long]` is used to track matched rows with
+   *    key index (Int) and value index (Int) together.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index and value index from `HashSet`.
+   */
+  private def fullOuterJoinWithNonUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedRows = new mutable.HashSet[Long]

Review comment:
       Sure. A follow up is fine.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to