c21 commented on a change in pull request #32476:
URL: https://github.com/apache/spark/pull/32476#discussion_r630570546



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
##########
@@ -431,6 +433,66 @@ case class SortMergeJoinExec(
     // Copy the streamed keys as class members so they could be used in next 
function call.
     val matchedKeyVars = copyKeys(ctx, streamedKeyVars)
 
+    // Handle the case when streamed rows has any NULL keys.
+    val handleStreamedAnyNull = joinType match {
+      case _: InnerLike =>
+        // Skip streamed row.
+        s"""
+           |$streamedRow = null;
+           |continue;
+         """.stripMargin
+      case LeftOuter | RightOuter =>
+        // Eagerly return streamed row. Only call `matches.clear()` when 
`matches.isEmpty()` is
+        // false, to reduce unnecessary computation.
+        s"""
+           |if (!$matches.isEmpty()) {
+           |  $matches.clear();
+           |}
+           |return false;
+         """.stripMargin
+      case x =>
+        throw new IllegalArgumentException(
+          s"SortMergeJoin.genScanner should not take $x as the JoinType")
+    }
+
+    // Handle the case when streamed keys has no match with buffered side.
+    val handleStreamedWithoutMatch = joinType match {
+      case _: InnerLike =>
+        // Skip streamed row.
+        s"$streamedRow = null;"
+      case LeftOuter | RightOuter =>
+        // Eagerly return with streamed row.
+        "return false;"
+      case x =>
+        throw new IllegalArgumentException(
+          s"SortMergeJoin.genScanner should not take $x as the JoinType")
+    }
+
+    // Generate a function to scan both streamed and buffered sides to find a 
match.
+    // Return whether a match is found.
+    //
+    // `streamedIter`: the iterator for streamed side.
+    // `bufferedIter`: the iterator for buffered side.
+    // `streamedRow`: the current row from streamed side.
+    //                When `streamedIter` is empty, `streamedRow` is null.
+    // `matches`: the rows from buffered side already matched with 
`streamedRow`.
+    //            `matches` is buffered and reused for all `streamedRow`s 
having same join keys.
+    //            If there is no match with `streamedRow`, `matches` is empty.
+    // `bufferedRow`: the current matched row from buffered side.
+    //
+    // The function has the following step:
+    //  - Step 1: Find the next `streamedRow` with non-null join keys.
+    //            For `streamedRow` with null join keys 
(`handleStreamedAnyNull`):
+    //            1. Inner join: skip the row.
+    //            2. Left/Right Outer join: clear the previous `matches` if 
needed, keep the row,
+    //                                      and return false.
+    //  - Step 2: Find the `matches` from buffered side having same join keys 
with `streamedRow`.
+    //            If previous `matches` is not empty, check the join keys and 
clear the `matches`
+    //            if the keys are not matched. Use `bufferedRow` to iterate 
buffered side to put
+    //            all matched rows into `matches`. Return true when getting 
all matched rows.
+    //            For `streamedRow` without `matches` 
(`handleStreamedWithoutMatch`):
+    //            1. Inner join: skip the row.
+    //            2. Left/Right Outer join: keep the row and return false.

Review comment:
       @cloud-fan - updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
##########
@@ -431,6 +433,66 @@ case class SortMergeJoinExec(
     // Copy the streamed keys as class members so they could be used in next 
function call.
     val matchedKeyVars = copyKeys(ctx, streamedKeyVars)
 
+    // Handle the case when streamed rows has any NULL keys.
+    val handleStreamedAnyNull = joinType match {
+      case _: InnerLike =>
+        // Skip streamed row.
+        s"""
+           |$streamedRow = null;
+           |continue;
+         """.stripMargin
+      case LeftOuter | RightOuter =>
+        // Eagerly return streamed row. Only call `matches.clear()` when 
`matches.isEmpty()` is
+        // false, to reduce unnecessary computation.
+        s"""
+           |if (!$matches.isEmpty()) {
+           |  $matches.clear();
+           |}
+           |return false;
+         """.stripMargin
+      case x =>
+        throw new IllegalArgumentException(
+          s"SortMergeJoin.genScanner should not take $x as the JoinType")
+    }
+
+    // Handle the case when streamed keys has no match with buffered side.
+    val handleStreamedWithoutMatch = joinType match {
+      case _: InnerLike =>
+        // Skip streamed row.
+        s"$streamedRow = null;"
+      case LeftOuter | RightOuter =>
+        // Eagerly return with streamed row.
+        "return false;"
+      case x =>
+        throw new IllegalArgumentException(
+          s"SortMergeJoin.genScanner should not take $x as the JoinType")
+    }
+
+    // Generate a function to scan both streamed and buffered sides to find a 
match.
+    // Return whether a match is found.
+    //
+    // `streamedIter`: the iterator for streamed side.
+    // `bufferedIter`: the iterator for buffered side.
+    // `streamedRow`: the current row from streamed side.
+    //                When `streamedIter` is empty, `streamedRow` is null.
+    // `matches`: the rows from buffered side already matched with 
`streamedRow`.
+    //            `matches` is buffered and reused for all `streamedRow`s 
having same join keys.
+    //            If there is no match with `streamedRow`, `matches` is empty.
+    // `bufferedRow`: the current matched row from buffered side.
+    //
+    // The function has the following step:
+    //  - Step 1: Find the next `streamedRow` with non-null join keys.
+    //            For `streamedRow` with null join keys 
(`handleStreamedAnyNull`):
+    //            1. Inner join: skip the row.
+    //            2. Left/Right Outer join: clear the previous `matches` if 
needed, keep the row,
+    //                                      and return false.
+    //  - Step 2: Find the `matches` from buffered side having same join keys 
with `streamedRow`.
+    //            If previous `matches` is not empty, check the join keys and 
clear the `matches`

Review comment:
       @cloud-fan - updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to