Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/8383#discussion_r38561395
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
---
@@ -271,3 +298,224 @@ private class RightOuterIterator(
override def getRow: InternalRow = resultProj(joinedRow)
}
+
+private class SortMergeFullJoinScanner(
+ leftKeyGenerator: Projection,
+ rightKeyGenerator: Projection,
+ keyOrdering: Ordering[InternalRow],
+ leftIter: RowIterator,
+ numLeftRows: LongSQLMetric,
+ rightIter: RowIterator,
+ numRightRows: LongSQLMetric,
+ boundCondition: InternalRow => Boolean,
+ leftNullRow: InternalRow,
+ rightNullRow: InternalRow) {
+ private[this] val joinedRow: JoinedRow = new JoinedRow()
+ private[this] var leftRow: InternalRow = _
+ private[this] var leftRowKey: InternalRow = _
+ private[this] var rightRow: InternalRow = _
+ private[this] var rightRowKey: InternalRow = _
+
+ private[this] var leftIndex: Int = _
+ private[this] var rightIndex: Int = _
+ private[this] val leftMatches: ArrayBuffer[InternalRow] = new
ArrayBuffer[InternalRow]
+ private[this] val rightMatches: ArrayBuffer[InternalRow] = new
ArrayBuffer[InternalRow]
+
+ private[this] var notMatchBitSet: BitSet = _
+ private[this] val foundRightIndex: ArrayBuffer[Int] = new
ArrayBuffer[Int]
+
+ private[this] var leftAdvanced: Boolean = false
+ private[this] var rightAdvanced: Boolean = false
+
+ // --- Private methods
--------------------------------------------------------------------------
+
+ /**
+ * Advance the left iterator and compute the new row's join key.
+ * @return true if the left iterator returned a row and false otherwise.
+ */
+ private def advancedLeft(): Boolean = {
+ if (leftIter.advanceNext()) {
+ leftRow = leftIter.getRow
+ leftRowKey = leftKeyGenerator(leftRow)
+ numLeftRows += 1
+ true
+ } else {
+ leftRow = null
+ leftRowKey = null
+ false
+ }
+ }
+
+ /**
+ * Advance the right iterator and compute the new row's join key.
+ * @return true if the right iterator returned a row and false otherwise.
+ */
+ private def advancedRight(): Boolean = {
+ if (rightIter.advanceNext()) {
+ rightRow = rightIter.getRow
+ rightRowKey = rightKeyGenerator(rightRow)
+ numRightRows += 1
+ true
+ } else {
+ rightRow = null
+ rightRowKey = null
+ false
+ }
+ }
+
+ /**
+ * Consume rows from both iterators. Keep record matching rows in
buffers until their
+ * row keys are different.
+ */
+ private def findMatchingRows(matchingKey: InternalRow): Unit = {
+ leftMatches.clear()
+ rightMatches.clear()
+
+ do {
+ leftMatches += leftRow.copy()
+ leftAdvanced = advancedLeft()
+ } while (leftAdvanced && (keyOrdering.compare(leftRowKey, matchingKey)
== 0))
+
+ do {
+ rightMatches += rightRow.copy()
+ rightAdvanced = advancedRight()
+ } while (rightAdvanced && (keyOrdering.compare(matchingKey,
rightRowKey) == 0))
+
+ leftIndex = 0
+ rightIndex = 0
+ notMatchBitSet = new BitSet(rightMatches.size)
+ }
+
+ /**
+ * Scan for next matching rows in buffers of both sides.
+ * @return true if next row(s) are found and false otherwise.
+ */
+ private def scanNextInBuffered(): Boolean = {
+ if (leftIndex < leftMatches.size) {
+ var found = false
+ do {
+ found = boundCondition(joinedRow(leftMatches(leftIndex),
rightMatches(rightIndex)))
+ if (!found) {
+ if (!foundRightIndex.contains(rightIndex)) {
+ notMatchBitSet.set(rightIndex)
+ }
+ rightIndex += 1
+ }
+ } while (!found && rightIndex < rightMatches.size)
+
+ // No match can be found
+ // Output only left row
+ if (!found) {
+ joinedRow(leftMatches(leftIndex), rightNullRow)
+ leftIndex += 1
+ rightIndex = 0
+ } else {
+ notMatchBitSet.unset(rightIndex)
+ foundRightIndex += rightIndex
+ rightIndex += 1
+ if (rightIndex == rightMatches.size) {
+ leftIndex += 1
+ rightIndex = 0
+ }
+ }
+ if (leftIndex == leftMatches.size) {
+ rightIndex = 0
+ foundRightIndex.clear()
+ }
+ true
+ } else if (rightIndex < rightMatches.size) {
+ // Output those right rows not matched by any left row
+ rightIndex = notMatchBitSet.nextSetBit(rightIndex)
+ if (rightIndex >= 0) {
+ joinedRow(leftNullRow, rightMatches(rightIndex))
+ rightIndex += 1
+ true
+ } else {
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ private def canCompare: Boolean = {
+ (leftAdvanced && rightAdvanced ) && (!leftRowKey.anyNull ||
!rightRowKey.anyNull)
+ }
+
+ // --- Public methods
--------------------------------------------------------------------------
+
+ def getLeftRow(): InternalRow = leftRow
--- End diff --
Is this needed anymore?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]