c21 commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r470201781



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
##########
@@ -1188,4 +1188,53 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
         classOf[BroadcastNestedLoopJoinExec]))
     }
   }
+
+  test("SPARK-32399: Full outer shuffled hash join") {
+    val inputDFs = Seq(
+      // Test unique join key
+      (spark.range(10).selectExpr("id as k1"),
+        spark.range(30).selectExpr("id as k2"),
+        $"k1" === $"k2"),
+      // Test non-unique join key
+      (spark.range(10).selectExpr("id % 5 as k1"),
+        spark.range(30).selectExpr("id % 5 as k2"),
+        $"k1" === $"k2"),
+      // Test string join key
+      (spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
+        spark.range(30).selectExpr("cast(id as string) as k2"),
+        $"k1" === $"k2"),
+      // Test build side at right
+      (spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
+        spark.range(10).selectExpr("cast(id as string) as k2"),
+        $"k1" === $"k2"),
+      // Test NULL join key
+      (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value 
as k1"),
+        spark.range(30).map(i => if (i % 4 == 0) i else 
null).selectExpr("value as k2"),
+        $"k1" === $"k2"),
+      // Test multiple join keys
+      (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr(
+        "value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as 
long) as k3"),
+        spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr(
+          "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as 
long) as k6"),
+        $"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6")
+    )
+    inputDFs.foreach { case (df1, df2, joinExprs) =>
+      withSQLConf(
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",

Review comment:
       > I am wondering if you could add some "derivation" for the use of the 
magic constant 80 in the code comments ?
   
   @agrawaldevesh - I feel it's hard. It depends on estimated size for the join 
child and we have multiple test cases here.

##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -428,6 +428,62 @@ public MapIterator destructiveIterator() {
     return new MapIterator(numValues, new Location(), true);
   }
 
+  /**
+   * Iterator for the entries of this map. This is to first iterate over key 
index array
+   * `longArray` then accessing values in `dataPages`. NOTE: this is different 
from `MapIterator`
+   * in the sense that key index is preserved here
+   * (See `UnsafeHashedRelation` for example of usage).
+   */
+  public final class MapIteratorWithKeyIndex implements Iterator<Location> {
+
+    private int keyIndex = 0;
+    private int numRecords;
+    private final Location loc;
+
+    private MapIteratorWithKeyIndex(int numRecords, Location loc) {
+      this.numRecords = numRecords;
+      this.loc = loc;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return numRecords > 0;
+    }
+
+    @Override
+    public Location next() {
+      if (!loc.isDefined() || !loc.nextValue()) {
+        while (longArray.get(keyIndex * 2) == 0) {
+          keyIndex++;
+        }
+        loc.with(keyIndex, (int) longArray.get(keyIndex * 2 + 1), true);
+        keyIndex++;
+      }
+      numRecords--;
+      return loc;
+    }
+  }
+
+  /**
+   * Returns an iterator for iterating over the entries of this map,
+   * by first iterating over the key index inside hash map's `longArray`.
+   *
+   * For efficiency, all calls to `next()` will return the same {@link 
Location} object.
+   *
+   * The returned iterator is NOT thread-safe. If the map is modified while 
iterating over it,
+   * the behavior of the returned iterator is undefined.
+   */
+  public MapIteratorWithKeyIndex iteratorWithKeyIndex() {
+    return new MapIteratorWithKeyIndex(numValues, new Location());
+  }
+
+  /**
+   * The maximum number of allowed keys index.

Review comment:
       @cloud-fan - the key index is 0-index-based dis-contiguous, e.g. `0, 1, 
2, 3, 7, ...`. The allowed key index value is `[0, longArray.size() / 2 - 1]`. 
So if we want this method to be `max key index`, we should change to return 
`(int) (longArray.size() / 2 - 1)`, and we need to change 
`ShuffledHashJoinExec.fullOuterJoinWithUniqueKey.matchedKeys` to be `new 
BitSet(hashedRelation.maxKeysIndex + 1)`. Do you think it's worth to making the 
change here?
   
   Currently this is `max number of key index`, which is `(int) 
(longArray.size() / 2)`, and this looks correct to me. I added one comment for 
range of allowed key index value. Does it look better?

##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -601,6 +657,14 @@ public boolean isDefined() {
       return isDefined;
     }
 
+    /**
+     * Returns index for key.
+     */
+    public int getKeyIndex() {

Review comment:
       @agrawaldevesh - replied back in above comment, thanks.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getValueWithKeyIndex(keys)
+        if (matched != null) {
+          val keyIndex = matched.getKeyIndex
+          val buildRow = matched.getValue
+          val joinRow = joinRowWithBuild(buildRow)
+          if (boundCondition(joinRow)) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          val buildRow = valueRowWithKeyIndex.getValue
+          Some(streamNullJoinRowWithBuild(buildRow))
+        } else {
+          None
+        }
+    }
+
+    streamResultIter ++ buildResultIter
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `HashSet[Long]` is used to track matched rows with
+   *    key index (Int) and value index (Int) together.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index and value index from `HashSet`.
+   */
+  private def fullOuterJoinWithNonUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedRows = new mutable.HashSet[Long]
+
+    def markRowMatched(keyIndex: Int, valueIndex: Int): Unit = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.add(rowIndex)
+    }
+
+    def isRowMatched(keyIndex: Int, valueIndex: Int): Boolean = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.contains(rowIndex)
+    }
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.flatMap { srow =>
+      val joinRow = joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        Iterator.single(joinRowWithBuild(buildNullRow))
+      } else {
+        val matched = hashedRelation.getWithKeyIndex(keys)
+        if (matched != null) {
+          val (keyIndex, buildIter) = (matched._1, matched._2.zipWithIndex)
+
+          new RowIterator {
+            private var found = false
+            override def advanceNext(): Boolean = {
+              while (buildIter.hasNext) {
+                val (buildRow, valueIndex) = buildIter.next()
+                if (boundCondition(joinRowWithBuild(buildRow))) {

Review comment:
       @agrawaldevesh - yes.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getValueWithKeyIndex(keys)
+        if (matched != null) {
+          val keyIndex = matched.getKeyIndex
+          val buildRow = matched.getValue
+          val joinRow = joinRowWithBuild(buildRow)
+          if (boundCondition(joinRow)) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          val buildRow = valueRowWithKeyIndex.getValue
+          Some(streamNullJoinRowWithBuild(buildRow))
+        } else {
+          None
+        }
+    }
+
+    streamResultIter ++ buildResultIter
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `HashSet[Long]` is used to track matched rows with
+   *    key index (Int) and value index (Int) together.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index and value index from `HashSet`.
+   */
+  private def fullOuterJoinWithNonUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedRows = new mutable.HashSet[Long]
+
+    def markRowMatched(keyIndex: Int, valueIndex: Int): Unit = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.add(rowIndex)
+    }
+
+    def isRowMatched(keyIndex: Int, valueIndex: Int): Boolean = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.contains(rowIndex)
+    }
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.flatMap { srow =>
+      val joinRow = joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        Iterator.single(joinRowWithBuild(buildNullRow))
+      } else {
+        val matched = hashedRelation.getWithKeyIndex(keys)
+        if (matched != null) {
+          val (keyIndex, buildIter) = (matched._1, matched._2.zipWithIndex)
+
+          new RowIterator {
+            private var found = false
+            override def advanceNext(): Boolean = {
+              while (buildIter.hasNext) {
+                val (buildRow, valueIndex) = buildIter.next()
+                if (boundCondition(joinRowWithBuild(buildRow))) {
+                  markRowMatched(keyIndex, valueIndex)
+                  found = true
+                  return true
+                }
+              }
+              if (!found) {
+                joinRowWithBuild(buildNullRow)
+                found = true

Review comment:
       @agrawaldevesh - @viirya raised same question. See my comment 
[here](https://github.com/apache/spark/pull/29342#discussion_r467505397). btw 
[existing left/right outer join for BHJ/SHJ has same logic that you may want to 
check as 
well](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L204-L222).

##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -428,6 +428,62 @@ public MapIterator destructiveIterator() {
     return new MapIterator(numValues, new Location(), true);
   }
 
+  /**
+   * Iterator for the entries of this map. This is to first iterate over key 
index array
+   * `longArray` then accessing values in `dataPages`. NOTE: this is different 
from `MapIterator`
+   * in the sense that key index is preserved here
+   * (See `UnsafeHashedRelation` for example of usage).
+   */
+  public final class MapIteratorWithKeyIndex implements Iterator<Location> {
+
+    private int keyIndex = 0;

Review comment:
       @agrawaldevesh - are you suggesting a series naming of 
`MapIteratorWithPos`, `getPos`, `maxNumPos`, `iteratorWithPos`? I feel this is 
more confusing than `KeyIndex`, and we anyway need some `KeyIndex` notion in 
`HashedRelation` and `ShuffledHashJoinExec`. How about just leaving it as 
`keyIndex` here? 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getValueWithKeyIndex(keys)
+        if (matched != null) {
+          val keyIndex = matched.getKeyIndex
+          val buildRow = matched.getValue
+          val joinRow = joinRowWithBuild(buildRow)
+          if (boundCondition(joinRow)) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          val buildRow = valueRowWithKeyIndex.getValue
+          Some(streamNullJoinRowWithBuild(buildRow))
+        } else {
+          None
+        }
+    }
+
+    streamResultIter ++ buildResultIter
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `HashSet[Long]` is used to track matched rows with
+   *    key index (Int) and value index (Int) together.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index and value index from `HashSet`.
+   */
+  private def fullOuterJoinWithNonUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedRows = new mutable.HashSet[Long]
+
+    def markRowMatched(keyIndex: Int, valueIndex: Int): Unit = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.add(rowIndex)
+    }
+
+    def isRowMatched(keyIndex: Int, valueIndex: Int): Boolean = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.contains(rowIndex)
+    }
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.flatMap { srow =>
+      val joinRow = joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        Iterator.single(joinRowWithBuild(buildNullRow))
+      } else {
+        val matched = hashedRelation.getWithKeyIndex(keys)
+        if (matched != null) {
+          val (keyIndex, buildIter) = (matched._1, matched._2.zipWithIndex)
+
+          new RowIterator {
+            private var found = false
+            override def advanceNext(): Boolean = {
+              while (buildIter.hasNext) {
+                val (buildRow, valueIndex) = buildIter.next()
+                if (boundCondition(joinRowWithBuild(buildRow))) {
+                  markRowMatched(keyIndex, valueIndex)
+                  found = true
+                  return true
+                }
+              }
+              if (!found) {

Review comment:
       @cloud-fan - sounds good, added.

##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -428,6 +428,62 @@ public MapIterator destructiveIterator() {
     return new MapIterator(numValues, new Location(), true);
   }
 
+  /**
+   * Iterator for the entries of this map. This is to first iterate over key 
index array
+   * `longArray` then accessing values in `dataPages`. NOTE: this is different 
from `MapIterator`
+   * in the sense that key index is preserved here
+   * (See `UnsafeHashedRelation` for example of usage).
+   */
+  public final class MapIteratorWithKeyIndex implements Iterator<Location> {
+
+    private int keyIndex = 0;
+    private int numRecords;
+    private final Location loc;
+
+    private MapIteratorWithKeyIndex(int numRecords, Location loc) {
+      this.numRecords = numRecords;
+      this.loc = loc;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return numRecords > 0;
+    }
+
+    @Override
+    public Location next() {
+      if (!loc.isDefined() || !loc.nextValue()) {
+        while (longArray.get(keyIndex * 2) == 0) {
+          keyIndex++;
+        }
+        loc.with(keyIndex, (int) longArray.get(keyIndex * 2 + 1), true);

Review comment:
       @cloud-fan - sure, updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow

Review comment:
       @cloud-fan - I need to set stream side row to be NULL only once here. If 
we want to reuse `joinRow`, we can do `lazy val` and pass it as by-name 
parameter. Updated.

##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -428,6 +428,62 @@ public MapIterator destructiveIterator() {
     return new MapIterator(numValues, new Location(), true);
   }
 
+  /**
+   * Iterator for the entries of this map. This is to first iterate over key 
index array
+   * `longArray` then accessing values in `dataPages`. NOTE: this is different 
from `MapIterator`
+   * in the sense that key index is preserved here
+   * (See `UnsafeHashedRelation` for example of usage).
+   */
+  public final class MapIteratorWithKeyIndex implements Iterator<Location> {
+
+    private int keyIndex = 0;
+    private int numRecords;
+    private final Location loc;
+
+    private MapIteratorWithKeyIndex(int numRecords, Location loc) {
+      this.numRecords = numRecords;
+      this.loc = loc;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return numRecords > 0;
+    }
+
+    @Override
+    public Location next() {
+      if (!loc.isDefined() || !loc.nextValue()) {
+        while (longArray.get(keyIndex * 2) == 0) {
+          keyIndex++;
+        }
+        loc.with(keyIndex, (int) longArray.get(keyIndex * 2 + 1), true);
+        keyIndex++;

Review comment:
       > Basically keyIndex can grow beyond the longArray.size() if numRecords 
is sufficiently big ?
   
   @agrawaldevesh - no. If that happens, then there's a bug in 
`BytesToBytesMap`. I intentionally avoid bound checking for every key probing 
to avoid doing extra unnecessary work for saving CPU. But if others also think 
we should add that, I can add bound check too.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getValueWithKeyIndex(keys)
+        if (matched != null) {
+          val keyIndex = matched.getKeyIndex
+          val buildRow = matched.getValue
+          val joinRow = joinRowWithBuild(buildRow)
+          if (boundCondition(joinRow)) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          val buildRow = valueRowWithKeyIndex.getValue
+          Some(streamNullJoinRowWithBuild(buildRow))
+        } else {
+          None
+        }
+    }
+
+    streamResultIter ++ buildResultIter
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `HashSet[Long]` is used to track matched rows with
+   *    key index (Int) and value index (Int) together.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index and value index from `HashSet`.
+   */
+  private def fullOuterJoinWithNonUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedRows = new mutable.HashSet[Long]
+
+    def markRowMatched(keyIndex: Int, valueIndex: Int): Unit = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.add(rowIndex)
+    }
+
+    def isRowMatched(keyIndex: Int, valueIndex: Int): Boolean = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.contains(rowIndex)
+    }
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.flatMap { srow =>
+      val joinRow = joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        Iterator.single(joinRowWithBuild(buildNullRow))
+      } else {
+        val matched = hashedRelation.getWithKeyIndex(keys)
+        if (matched != null) {
+          val (keyIndex, buildIter) = (matched._1, matched._2.zipWithIndex)
+
+          new RowIterator {
+            private var found = false
+            override def advanceNext(): Boolean = {
+              while (buildIter.hasNext) {
+                val (buildRow, valueIndex) = buildIter.next()
+                if (boundCondition(joinRowWithBuild(buildRow))) {
+                  markRowMatched(keyIndex, valueIndex)
+                  found = true
+                  return true
+                }
+              }
+              if (!found) {
+                joinRowWithBuild(buildNullRow)
+                found = true
+                return true
+              }
+              false
+            }
+            override def getRow: InternalRow = joinRow
+          }.toScala
+        } else {
+          Iterator.single(joinRowWithBuild(buildNullRow))
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    var prevKeyIndex = -1
+    var valueIndex = -1
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        if (prevKeyIndex == -1 || keyIndex != prevKeyIndex) {

Review comment:
       @agrawaldevesh - the `prevKeyIndex = keyIndex` cannot be last statement 
as the return value is in `if (!isMatched) {} else {}`. Changed other place as 
suggested.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
##########
@@ -116,7 +116,9 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
    *
    * - Shuffle hash join:
    *     Only supported for equi-joins, while the join keys do not need to be 
sortable.
-   *     Supported for all join types except full outer joins.
+   *     Supported for all join types.
+   *     Building hash map from table is a memory-intensive operation and it 
could cause OOM

Review comment:
       @agrawaldevesh - I am referring to same "hash map" as stated in other 
place in this file e.g. [other 
comment](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L156).

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -179,6 +235,63 @@ private[joins] class UnsafeHashedRelation(
     }
   }
 
+  override def getWithKeyIndex(key: InternalRow): (Int, Iterator[InternalRow]) 
= {
+    val unsafeKey = key.asInstanceOf[UnsafeRow]
+    val map = binaryMap  // avoid the compiler error
+    val loc = new map.Location  // this could be allocated in stack
+    binaryMap.safeLookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset,
+      unsafeKey.getSizeInBytes, loc, unsafeKey.hashCode())
+    if (loc.isDefined) {
+      (loc.getKeyIndex,
+        new Iterator[UnsafeRow] {
+          private var _hasNext = true
+          override def hasNext: Boolean = _hasNext
+          override def next(): UnsafeRow = {
+            resultRow.pointTo(loc.getValueBase, loc.getValueOffset, 
loc.getValueLength)
+            _hasNext = loc.nextValue()
+            resultRow
+          }
+        })
+    } else {
+      null
+    }
+  }
+
+  override def getValueWithKeyIndex(key: InternalRow): ValueRowWithKeyIndex = {
+    val unsafeKey = key.asInstanceOf[UnsafeRow]
+    val map = binaryMap  // avoid the compiler error
+    val loc = new map.Location  // this could be allocated in stack
+    binaryMap.safeLookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset,
+      unsafeKey.getSizeInBytes, loc, unsafeKey.hashCode())
+    if (loc.isDefined) {
+      resultRow.pointTo(loc.getValueBase, loc.getValueOffset, 
loc.getValueLength)
+      valueRowWithKeyIndex.updates(loc.getKeyIndex, resultRow)
+    } else {
+      null
+    }
+  }
+
+  override def valuesWithKeyIndex(): Iterator[ValueRowWithKeyIndex] = {
+    val iter = binaryMap.iteratorWithKeyIndex()
+
+    new Iterator[ValueRowWithKeyIndex] {
+      override def hasNext: Boolean = iter.hasNext
+
+      override def next(): ValueRowWithKeyIndex = {
+        if (!hasNext) {
+          throw new NoSuchElementException("End of the iterator")
+        }
+        val loc = iter.next()
+        resultRow.pointTo(loc.getValueBase, loc.getValueOffset, 
loc.getValueLength)

Review comment:
       @agrawaldevesh - not sure if I fully understand this, but if you are 
talking about potentially misusing between `resultRow` and 
`valueRowWithKeyIndex`, yes it can lead to arbitrary kind of bug. One can argue 
that this is same problem for `resultRow` and 
[`JoinedRow`](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala)
 . `valueRowWithKeyIndex` is wrapper for `resultRow`, similar as `JoinedRow` is 
wrapper for `resultRow`.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -66,6 +66,30 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
     throw new UnsupportedOperationException
   }
 
+  /**
+   * Returns key index and matched rows.

Review comment:
       @agrawaldevesh - do you have any concrete suggestion to make it better? 
Honestly I feel this is best I can do now.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getValueWithKeyIndex(keys)
+        if (matched != null) {
+          val keyIndex = matched.getKeyIndex
+          val buildRow = matched.getValue
+          val joinRow = joinRowWithBuild(buildRow)
+          if (boundCondition(joinRow)) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          val buildRow = valueRowWithKeyIndex.getValue
+          Some(streamNullJoinRowWithBuild(buildRow))
+        } else {
+          None
+        }
+    }
+
+    streamResultIter ++ buildResultIter
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `HashSet[Long]` is used to track matched rows with
+   *    key index (Int) and value index (Int) together.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index and value index from `HashSet`.
+   */
+  private def fullOuterJoinWithNonUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedRows = new mutable.HashSet[Long]
+
+    def markRowMatched(keyIndex: Int, valueIndex: Int): Unit = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.add(rowIndex)
+    }
+
+    def isRowMatched(keyIndex: Int, valueIndex: Int): Boolean = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.contains(rowIndex)
+    }
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.flatMap { srow =>
+      val joinRow = joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        Iterator.single(joinRowWithBuild(buildNullRow))
+      } else {
+        val matched = hashedRelation.getWithKeyIndex(keys)
+        if (matched != null) {
+          val (keyIndex, buildIter) = (matched._1, matched._2.zipWithIndex)
+
+          new RowIterator {
+            private var found = false
+            override def advanceNext(): Boolean = {
+              while (buildIter.hasNext) {
+                val (buildRow, valueIndex) = buildIter.next()
+                if (boundCondition(joinRowWithBuild(buildRow))) {
+                  markRowMatched(keyIndex, valueIndex)

Review comment:
       > Can we reuse an existing term or it or define this semantic somewhere 
above clearly ?
   
   Wondering do you have any concrete suggestion/alternative here? I feel you 
but not sure how to make it better. Please be explicit and elaborate to bear 
with my poor understanding.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getValueWithKeyIndex(keys)
+        if (matched != null) {
+          val keyIndex = matched.getKeyIndex
+          val buildRow = matched.getValue
+          val joinRow = joinRowWithBuild(buildRow)
+          if (boundCondition(joinRow)) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          val buildRow = valueRowWithKeyIndex.getValue
+          Some(streamNullJoinRowWithBuild(buildRow))
+        } else {
+          None
+        }
+    }
+
+    streamResultIter ++ buildResultIter
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `HashSet[Long]` is used to track matched rows with
+   *    key index (Int) and value index (Int) together.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index and value index from `HashSet`.
+   */
+  private def fullOuterJoinWithNonUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedRows = new mutable.HashSet[Long]
+
+    def markRowMatched(keyIndex: Int, valueIndex: Int): Unit = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.add(rowIndex)
+    }
+
+    def isRowMatched(keyIndex: Int, valueIndex: Int): Boolean = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.contains(rowIndex)
+    }
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.flatMap { srow =>
+      val joinRow = joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        Iterator.single(joinRowWithBuild(buildNullRow))
+      } else {
+        val matched = hashedRelation.getWithKeyIndex(keys)
+        if (matched != null) {
+          val (keyIndex, buildIter) = (matched._1, matched._2.zipWithIndex)
+
+          new RowIterator {
+            private var found = false
+            override def advanceNext(): Boolean = {
+              while (buildIter.hasNext) {
+                val (buildRow, valueIndex) = buildIter.next()
+                if (boundCondition(joinRowWithBuild(buildRow))) {
+                  markRowMatched(keyIndex, valueIndex)
+                  found = true
+                  return true
+                }
+              }
+              if (!found) {
+                joinRowWithBuild(buildNullRow)
+                found = true
+                return true
+              }
+              false
+            }
+            override def getRow: InternalRow = joinRow
+          }.toScala
+        } else {
+          Iterator.single(joinRowWithBuild(buildNullRow))
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    var prevKeyIndex = -1
+    var valueIndex = -1
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        if (prevKeyIndex == -1 || keyIndex != prevKeyIndex) {
+          prevKeyIndex = keyIndex
+          valueIndex = -1
+        }
+        valueIndex += 1
+        val isMatched = isRowMatched(keyIndex, valueIndex)
+        if (!isMatched) {
+          val buildRow = valueRowWithKeyIndex.getValue
+          Some(streamNullJoinRowWithBuild(buildRow))

Review comment:
       @agrawaldevesh - I don't think so. see my comment below for the code 
pointer in `HashJoin.scala`, thanks.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getValueWithKeyIndex(keys)
+        if (matched != null) {
+          val keyIndex = matched.getKeyIndex
+          val buildRow = matched.getValue
+          val joinRow = joinRowWithBuild(buildRow)
+          if (boundCondition(joinRow)) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          val buildRow = valueRowWithKeyIndex.getValue
+          Some(streamNullJoinRowWithBuild(buildRow))
+        } else {
+          None
+        }
+    }
+
+    streamResultIter ++ buildResultIter
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `HashSet[Long]` is used to track matched rows with
+   *    key index (Int) and value index (Int) together.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index and value index from `HashSet`.
+   */
+  private def fullOuterJoinWithNonUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedRows = new mutable.HashSet[Long]
+
+    def markRowMatched(keyIndex: Int, valueIndex: Int): Unit = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.add(rowIndex)
+    }
+
+    def isRowMatched(keyIndex: Int, valueIndex: Int): Boolean = {
+      val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
+      matchedRows.contains(rowIndex)
+    }
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.flatMap { srow =>
+      val joinRow = joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        Iterator.single(joinRowWithBuild(buildNullRow))
+      } else {
+        val matched = hashedRelation.getWithKeyIndex(keys)
+        if (matched != null) {
+          val (keyIndex, buildIter) = (matched._1, matched._2.zipWithIndex)
+
+          new RowIterator {
+            private var found = false
+            override def advanceNext(): Boolean = {
+              while (buildIter.hasNext) {
+                val (buildRow, valueIndex) = buildIter.next()
+                if (boundCondition(joinRowWithBuild(buildRow))) {
+                  markRowMatched(keyIndex, valueIndex)
+                  found = true
+                  return true
+                }
+              }
+              if (!found) {
+                joinRowWithBuild(buildNullRow)
+                found = true
+                return true
+              }
+              false
+            }
+            override def getRow: InternalRow = joinRow
+          }.toScala
+        } else {
+          Iterator.single(joinRowWithBuild(buildNullRow))
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and

Review comment:
       @agrawaldevesh - sure, updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -110,14 +138,39 @@ private[execution] object HashedRelation {
 
     if (!input.hasNext) {
       EmptyHashedRelation
-    } else if (key.length == 1 && key.head.dataType == LongType) {
+    } else if (key.length == 1 && key.head.dataType == LongType && 
!allowsNullKey) {
+      // NOTE: LongHashedRelation does not support NULL keys.
       LongHashedRelation(input, key, sizeEstimate, mm, isNullAware)
     } else {
-      UnsafeHashedRelation(input, key, sizeEstimate, mm, isNullAware)
+      UnsafeHashedRelation(input, key, sizeEstimate, mm, isNullAware, 
allowsNullKey)
     }
   }
 }
 
+/**
+ * A wrapper for key index and value in InternalRow type.
+ * Designed to be instantiated once per thread and reused.
+ */
+private[execution] class ValueRowWithKeyIndex {
+  private var keyIndex: Int = _
+  private var value: InternalRow = _
+
+  /** Updates this ValueRowWithKeyIndex.  Returns itself. */
+  def updates(newKeyIndex: Int, newValue: InternalRow): ValueRowWithKeyIndex = 
{

Review comment:
       @agrawaldevesh - I kind of feel this is so nit, changed anyway.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getValueWithKeyIndex(keys)
+        if (matched != null) {
+          val keyIndex = matched.getKeyIndex
+          val buildRow = matched.getValue
+          val joinRow = joinRowWithBuild(buildRow)
+          if (boundCondition(joinRow)) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          val buildRow = valueRowWithKeyIndex.getValue
+          Some(streamNullJoinRowWithBuild(buildRow))
+        } else {
+          None
+        }
+    }
+
+    streamResultIter ++ buildResultIter
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `HashSet[Long]` is used to track matched rows with
+   *    key index (Int) and value index (Int) together.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index and value index from `HashSet`.
+   */
+  private def fullOuterJoinWithNonUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedRows = new mutable.HashSet[Long]

Review comment:
       > but any of those have got to be better than HashSet[Long]
   
   @agrawaldevesh - I thought we were agreeing on the execution plan proposed 
before. I did found `OpenHashSet`, to make sure it's better than `HashSet`, 
shouldn't we do some benchmark measurement? Could this be a followup?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,210 @@ case class ShuffledHashJoinExec(
     val numOutputRows = longMetric("numOutputRows")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows)
+      joinType match {
+        case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+        case _ => join(streamIter, hashed, numOutputRows)
+      }
+    }
+  }
+
+  private def fullOuterJoin(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
+    val joinKeys = streamSideKeyGenerator()
+    val joinRow = new JoinedRow
+    val (joinRowWithStream, joinRowWithBuild) = {
+      buildSide match {
+        case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+        case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+      }
+    }
+    val buildNullRow = new GenericInternalRow(buildOutput.length)
+    val streamNullRow = new GenericInternalRow(streamedOutput.length)
+    val streamNullJoinRow = new JoinedRow
+    val streamNullJoinRowWithBuild = {
+      buildSide match {
+        case BuildLeft =>
+          streamNullJoinRow.withRight(streamNullRow)
+          streamNullJoinRow.withLeft _
+        case BuildRight =>
+          streamNullJoinRow.withLeft(streamNullRow)
+          streamNullJoinRow.withRight _
+      }
+    }
+
+    val iter = if (hashedRelation.keyIsUnique) {
+      fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
+    } else {
+      fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, 
joinRowWithStream,
+        joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, 
streamNullRow)
     }
+
+    val resultProj = UnsafeProjection.create(output, output)
+    iter.map { r =>
+      numOutputRows += 1
+      resultProj(r)
+    }
+  }
+
+  /**
+   * Full outer shuffled hash join with unique join keys:
+   * 1. Process rows from stream side by looking up hash relation.
+   *    Mark the matched rows from build side be looked up.
+   *    A `BitSet` is used to track matched rows with key index.
+   * 2. Process rows from build side by iterating hash relation.
+   *    Filter out rows from build side being matched already,
+   *    by checking key index from `BitSet`.
+   */
+  private def fullOuterJoinWithUniqueKey(
+      streamIter: Iterator[InternalRow],
+      hashedRelation: HashedRelation,
+      joinKeys: UnsafeProjection,
+      joinRowWithStream: InternalRow => JoinedRow,
+      joinRowWithBuild: InternalRow => JoinedRow,
+      streamNullJoinRowWithBuild: InternalRow => JoinedRow,
+      buildNullRow: GenericInternalRow,
+      streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+    val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+    // Process stream side with looking up hash relation
+    val streamResultIter = streamIter.map { srow =>
+      joinRowWithStream(srow)
+      val keys = joinKeys(srow)
+      if (keys.anyNull) {
+        joinRowWithBuild(buildNullRow)
+      } else {
+        val matched = hashedRelation.getValueWithKeyIndex(keys)
+        if (matched != null) {
+          val keyIndex = matched.getKeyIndex
+          val buildRow = matched.getValue
+          val joinRow = joinRowWithBuild(buildRow)
+          if (boundCondition(joinRow)) {
+            matchedKeys.set(keyIndex)
+            joinRow
+          } else {
+            joinRowWithBuild(buildNullRow)
+          }
+        } else {
+          joinRowWithBuild(buildNullRow)
+        }
+      }
+    }
+
+    // Process build side with filtering out rows looked up and
+    // passed join condition already
+    val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+      valueRowWithKeyIndex =>
+        val keyIndex = valueRowWithKeyIndex.getKeyIndex
+        val isMatched = matchedKeys.get(keyIndex)
+        if (!isMatched) {
+          val buildRow = valueRowWithKeyIndex.getValue
+          Some(streamNullJoinRowWithBuild(buildRow))

Review comment:
       @agrawaldevesh - SHJ/BHJ inner join non-codegen path - 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L159-L166
 .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to