c21 commented on a change in pull request #29342:
URL: https://github.com/apache/spark/pull/29342#discussion_r471022667
##########
File path: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
##########
@@ -428,6 +428,68 @@ public MapIterator destructiveIterator() {
return new MapIterator(numValues, new Location(), true);
}
+ /**
+ * Iterator for the entries of this map. This is to first iterate over key
index array
Review comment:
@maropu - sure, updated.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
##########
@@ -66,6 +66,30 @@ private[execution] sealed trait HashedRelation extends
KnownSizeEstimation {
throw new UnsupportedOperationException
}
+ /**
+ * Returns an iterator for key index and matched rows.
+ *
+ * Returns null if there is no matched rows.
+ */
+ def getWithKeyIndex(key: InternalRow): Iterator[ValueRowWithKeyIndex]
+
+ /**
+ * Returns key index and matched single row.
+ *
+ * Returns null if there is no matched rows.
+ */
+ def getValueWithKeyIndex(key: InternalRow): ValueRowWithKeyIndex
+
+ /**
+ * Returns an iterator for keys index and rows of InternalRow type.
+ */
+ def valuesWithKeyIndex(): Iterator[ValueRowWithKeyIndex]
+
+ /**
+ * Returns the maximum number of allowed keys index.
+ */
+ def maxNumKeysIndex: Int
Review comment:
@cloud-fan - per your comment in the other place, I take the current
naming is okay as well, let me know if it's not the case, thanks.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,217 @@ case class ShuffledHashJoinExec(
val numOutputRows = longMetric("numOutputRows")
streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter,
buildIter) =>
val hashed = buildHashedRelation(buildIter)
- join(streamIter, hashed, numOutputRows)
+ joinType match {
+ case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+ case _ => join(streamIter, hashed, numOutputRows)
+ }
+ }
+ }
+
+ private def fullOuterJoin(
+ streamIter: Iterator[InternalRow],
+ hashedRelation: HashedRelation,
+ numOutputRows: SQLMetric): Iterator[InternalRow] = {
+ val joinKeys = streamSideKeyGenerator()
+ val joinRow = new JoinedRow
+ val (joinRowWithStream, joinRowWithBuild) = {
+ buildSide match {
+ case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+ case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+ }
+ }
+ val buildNullRow = new GenericInternalRow(buildOutput.length)
+ val streamNullRow = new GenericInternalRow(streamedOutput.length)
+ lazy val streamNullJoinRowWithBuild = {
+ buildSide match {
+ case BuildLeft =>
+ joinRow.withRight(streamNullRow)
+ joinRow.withLeft _
+ case BuildRight =>
+ joinRow.withLeft(streamNullRow)
+ joinRow.withRight _
+ }
+ }
+
+ val iter = if (hashedRelation.keyIsUnique) {
+ fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys,
joinRowWithStream,
+ joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow,
streamNullRow)
+ } else {
+ fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys,
joinRowWithStream,
+ joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow,
streamNullRow)
}
+
+ val resultProj = UnsafeProjection.create(output, output)
+ iter.map { r =>
+ numOutputRows += 1
+ resultProj(r)
+ }
+ }
+
+ /**
+ * Full outer shuffled hash join with unique join keys:
+ * 1. Process rows from stream side by looking up hash relation.
+ * Mark the matched rows from build side be looked up.
+ * A `BitSet` is used to track matched rows with key index.
+ * 2. Process rows from build side by iterating hash relation.
+ * Filter out rows from build side being matched already,
+ * by checking key index from `BitSet`.
+ */
+ private def fullOuterJoinWithUniqueKey(
+ streamIter: Iterator[InternalRow],
+ hashedRelation: HashedRelation,
+ joinKeys: UnsafeProjection,
+ joinRowWithStream: InternalRow => JoinedRow,
+ joinRowWithBuild: InternalRow => JoinedRow,
+ streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
+ buildNullRow: GenericInternalRow,
+ streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+ val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
+
+ // Process stream side with looking up hash relation
+ val streamResultIter = streamIter.map { srow =>
+ joinRowWithStream(srow)
+ val keys = joinKeys(srow)
+ if (keys.anyNull) {
+ joinRowWithBuild(buildNullRow)
+ } else {
+ val matched = hashedRelation.getValueWithKeyIndex(keys)
+ if (matched != null) {
+ val keyIndex = matched.getKeyIndex
+ val buildRow = matched.getValue
+ val joinRow = joinRowWithBuild(buildRow)
+ if (boundCondition(joinRow)) {
+ matchedKeys.set(keyIndex)
+ joinRow
+ } else {
+ joinRowWithBuild(buildNullRow)
+ }
+ } else {
+ joinRowWithBuild(buildNullRow)
+ }
+ }
+ }
+
+ // Process build side with filtering out the matched rows
+ val buildResultIter = hashedRelation.valuesWithKeyIndex().flatMap {
+ valueRowWithKeyIndex =>
+ val keyIndex = valueRowWithKeyIndex.getKeyIndex
+ val isMatched = matchedKeys.get(keyIndex)
+ if (!isMatched) {
+ val buildRow = valueRowWithKeyIndex.getValue
+ Some(streamNullJoinRowWithBuild(buildRow))
+ } else {
+ None
+ }
+ }
+
+ streamResultIter ++ buildResultIter
+ }
+
+ /**
+ * Full outer shuffled hash join with non-unique join keys:
+ * 1. Process rows from stream side by looking up hash relation.
+ * Mark the matched rows from build side be looked up.
+ * A `HashSet[Long]` is used to track matched rows with
+ * key index (Int) and value index (Int) together.
+ * 2. Process rows from build side by iterating hash relation.
+ * Filter out rows from build side being matched already,
+ * by checking key index and value index from `HashSet`.
+ *
+ * The "value index" is defined as the index of the tuple in the chain
+ * of tuples having the same key. For example, if certain key is found
thrice,
+ * the value indices of its tuples will be 0, 1 and 2.
+ * Note that value indices of tuples with different keys are incomparable.
+ */
+ private def fullOuterJoinWithNonUniqueKey(
+ streamIter: Iterator[InternalRow],
+ hashedRelation: HashedRelation,
+ joinKeys: UnsafeProjection,
+ joinRowWithStream: InternalRow => JoinedRow,
+ joinRowWithBuild: InternalRow => JoinedRow,
+ streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
+ buildNullRow: GenericInternalRow,
+ streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+ val matchedRows = new mutable.HashSet[Long]
+
Review comment:
@maropu - good call. Yes, there's no metrics for this extra
bitset/hashset yet in current approach. It's not hard to add, but it needs more
discussion on that. E.g., size of `bitset` is pretty clear and easy to get. But
size of `HashSet[Long]` seems to need more discussion if we want to get
accurate number. To avoid more complexity in this PR, I want to make it as a
followup and address it separately. Thanks.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,217 @@ case class ShuffledHashJoinExec(
val numOutputRows = longMetric("numOutputRows")
streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter,
buildIter) =>
val hashed = buildHashedRelation(buildIter)
- join(streamIter, hashed, numOutputRows)
+ joinType match {
+ case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+ case _ => join(streamIter, hashed, numOutputRows)
+ }
+ }
+ }
+
+ private def fullOuterJoin(
+ streamIter: Iterator[InternalRow],
+ hashedRelation: HashedRelation,
+ numOutputRows: SQLMetric): Iterator[InternalRow] = {
+ val joinKeys = streamSideKeyGenerator()
+ val joinRow = new JoinedRow
+ val (joinRowWithStream, joinRowWithBuild) = {
+ buildSide match {
+ case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)
+ case BuildRight => (joinRow.withLeft _, joinRow.withRight _)
+ }
+ }
+ val buildNullRow = new GenericInternalRow(buildOutput.length)
+ val streamNullRow = new GenericInternalRow(streamedOutput.length)
+ lazy val streamNullJoinRowWithBuild = {
+ buildSide match {
+ case BuildLeft =>
+ joinRow.withRight(streamNullRow)
+ joinRow.withLeft _
+ case BuildRight =>
+ joinRow.withLeft(streamNullRow)
+ joinRow.withRight _
+ }
+ }
+
+ val iter = if (hashedRelation.keyIsUnique) {
+ fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys,
joinRowWithStream,
+ joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow,
streamNullRow)
+ } else {
+ fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys,
joinRowWithStream,
+ joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow,
streamNullRow)
}
+
+ val resultProj = UnsafeProjection.create(output, output)
+ iter.map { r =>
+ numOutputRows += 1
+ resultProj(r)
+ }
+ }
+
+ /**
+ * Full outer shuffled hash join with unique join keys:
+ * 1. Process rows from stream side by looking up hash relation.
+ * Mark the matched rows from build side be looked up.
+ * A `BitSet` is used to track matched rows with key index.
+ * 2. Process rows from build side by iterating hash relation.
+ * Filter out rows from build side being matched already,
+ * by checking key index from `BitSet`.
+ */
+ private def fullOuterJoinWithUniqueKey(
+ streamIter: Iterator[InternalRow],
+ hashedRelation: HashedRelation,
+ joinKeys: UnsafeProjection,
+ joinRowWithStream: InternalRow => JoinedRow,
+ joinRowWithBuild: InternalRow => JoinedRow,
+ streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
+ buildNullRow: GenericInternalRow,
+ streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
+ val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
Review comment:
@maropu - replied back to the original comment, thanks.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
##########
@@ -71,8 +85,217 @@ case class ShuffledHashJoinExec(
val numOutputRows = longMetric("numOutputRows")
streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter,
buildIter) =>
val hashed = buildHashedRelation(buildIter)
- join(streamIter, hashed, numOutputRows)
+ joinType match {
+ case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
+ case _ => join(streamIter, hashed, numOutputRows)
+ }
+ }
+ }
+
+ private def fullOuterJoin(
+ streamIter: Iterator[InternalRow],
+ hashedRelation: HashedRelation,
Review comment:
@maropu - good call. added unit tests for `empty build side`, `empty
stream side`, and `empty stream and build side` join tests in `JoinSuite.scala`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]