Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8579#discussion_r38975823
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
 ---
    @@ -271,3 +297,189 @@ private class RightOuterIterator(
     
       override def getRow: InternalRow = resultProj(joinedRow)
     }
    +
    +private class SortMergeFullOuterJoinScanner(
    +    leftKeyGenerator: Projection,
    +    rightKeyGenerator: Projection,
    +    keyOrdering: Ordering[InternalRow],
    +    leftIter: RowIterator,
    +    numLeftRows: LongSQLMetric,
    +    rightIter: RowIterator,
    +    numRightRows: LongSQLMetric,
    +    boundCondition: InternalRow => Boolean,
    +    leftNullRow: InternalRow,
    +    rightNullRow: InternalRow)  {
    +  private[this] val joinedRow: JoinedRow = new JoinedRow()
    +  private[this] var leftRow: InternalRow = _
    +  private[this] var leftRowKey: InternalRow = _
    +  private[this] var rightRow: InternalRow = _
    +  private[this] var rightRowKey: InternalRow = _
    +
    +  private[this] var leftIndex: Int = 0
    +  private[this] var rightIndex: Int = 0
    +  private[this] val leftMatches: ArrayBuffer[InternalRow] = new 
ArrayBuffer[InternalRow]
    +  private[this] val rightMatches: ArrayBuffer[InternalRow] = new 
ArrayBuffer[InternalRow]
    +  private[this] var leftMatched: BitSet = new BitSet(1)
    +  private[this] var rightMatched: BitSet = new BitSet(1)
    +
    +  advancedLeft()
    +  advancedRight()
    +
    +  // --- Private methods 
--------------------------------------------------------------------------
    +
    +  /**
    +   * Advance the left iterator and compute the new row's join key.
    +   * @return true if the left iterator returned a row and false otherwise.
    +   */
    +  private def advancedLeft(): Boolean = {
    +    if (leftIter.advanceNext()) {
    +      leftRow = leftIter.getRow
    +      leftRowKey = leftKeyGenerator(leftRow)
    +      numLeftRows += 1
    +      true
    +    } else {
    +      leftRow = null
    +      leftRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Advance the right iterator and compute the new row's join key.
    +   * @return true if the right iterator returned a row and false otherwise.
    +   */
    +  private def advancedRight(): Boolean = {
    +    if (rightIter.advanceNext()) {
    +      rightRow = rightIter.getRow
    +      rightRowKey = rightKeyGenerator(rightRow)
    +      numRightRows += 1
    +      true
    +    } else {
    +      rightRow = null
    +      rightRowKey = null
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Consume rows from both iterators until their keys are different from 
the provided matchingKey.
    +   */
    +  private def findMatchingRows(matchingKey: InternalRow): Unit = {
    +    leftMatches.clear()
    +    rightMatches.clear()
    +    leftIndex = 0
    +    rightIndex = 0
    +
    +    while (leftRowKey != null && keyOrdering.compare(leftRowKey, 
matchingKey) == 0) {
    +      leftMatches += leftRow.copy()
    +      advancedLeft()
    +    }
    +    while (rightRowKey != null && keyOrdering.compare(rightRowKey, 
matchingKey) == 0) {
    +      rightMatches += rightRow.copy()
    +      advancedRight()
    +    }
    +
    +    if (leftMatches.size <= leftMatched.capacity) {
    +      leftMatched.clear()
    +    } else {
    +      leftMatched = new BitSet(leftMatches.size)
    +    }
    +    if (rightMatches.size <= rightMatched.capacity) {
    +      rightMatched.clear()
    +    } else {
    +      rightMatched = new BitSet(rightMatches.size)
    +    }
    +  }
    +
    +  /**
    +   * Scan for next matching rows in buffers of both sides.
    +   * @return true if next row(s) are found and false otherwise.
    +   */
    +  private def scanNextInBuffered(): Boolean = {
    +    while (leftIndex < leftMatches.size) {
    +      while (rightIndex < rightMatches.size
    +        && !boundCondition(joinedRow(leftMatches(leftIndex), 
rightMatches(rightIndex)))) {
    +        rightIndex += 1
    +      }
    +      if (rightIndex < rightMatches.size) {
    +        // matched with condition
    +        leftMatched.set(leftIndex)
    +        rightMatched.set(rightIndex)
    +        rightIndex += 1
    +        return true
    +      }
    +      leftIndex += 1
    +      rightIndex = 0
    +      if (!leftMatched.get(leftIndex - 1)) {
    +        joinedRow(leftMatches(leftIndex - 1), rightNullRow)
    +        return true
    +      }
    +    }
    +
    +    while (rightIndex < rightMatches.size && rightMatched.get(rightIndex)) 
{
    +      rightIndex += 1
    +    }
    +    if (rightIndex < rightMatches.size) {
    +      joinedRow(leftNullRow, rightMatches(rightIndex))
    +      rightIndex += 1
    +      true
    +    } else {
    +      false
    +    }
    +  }
    +
    +  // --- Public methods 
--------------------------------------------------------------------------
    +
    +  def getJoinedRow(): JoinedRow = joinedRow
    +
    +  def advanceNext(): Boolean = {
    +    // We already buffered some matching rows, use them directly
    +    if (leftIndex <= leftMatches.size || rightIndex <= rightMatches.size) {
    +      if (scanNextInBuffered()) {
    +        return true
    +      }
    +    }
    +
    +    // Both left and right iterators have rows and they can be compared
    +    if (leftRow != null && (leftRowKey.anyNull || rightRow == null)) {
    +      // Only consume row(s) from left iterator
    +      joinedRow(leftRow.copy(), rightNullRow)
    +      advancedLeft()
    +      true
    +    } else if (rightRow != null && (rightRowKey.anyNull || leftRow == 
null)) {
    +      // Only consume row(s) from right iterator
    --- End diff --
    
    it's always a single row right? Also this comment doesn't really add much 
value so I think we can just remove it. Same as the one in L445


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to