c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r507955252



##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and complete mode",
+    Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType 
= LeftSemi,
+      condition = Some(attribute === attribute))),
+    OutputMode.Complete(),
+    Seq("is not supported in Complete output mode"))
+
+  // Left ousemiter joins: stream-stream allowed with join on watermark 
attribute

Review comment:
       @HeartSaVioR - sorry, updated.

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed

Review comment:
       @HeartSaVioR - sure, will create a followup JIRA.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,24 +361,29 @@ case class StreamingSymmetricHashJoinExec(
 
       // Processing time between inner output completion and here comes from 
the outer portion of a
       // join, and thus counts as removal time as we remove old state from one 
side while iterating.
-      if (innerOutputCompletionTimeNs != 0) {
+      if (hashJoinOutputCompletionTimeNs != 0) {
         allRemovalsTimeMs +=
-          math.max(NANOSECONDS.toMillis(System.nanoTime - 
innerOutputCompletionTimeNs), 0)
+          math.max(NANOSECONDS.toMillis(System.nanoTime - 
hashJoinOutputCompletionTimeNs), 0)
       }
 
       allRemovalsTimeMs += timeTakenMs {
         // Remove any remaining state rows which aren't needed because they're 
below the watermark.
         //
         // For inner joins, we have to remove unnecessary state rows from both 
sides if possible.
+        //
         // For outer joins, we have already removed unnecessary state rows 
from the outer side
         // (e.g., left side for left outer join) while generating the outer 
"null" outputs. Now, we
         // have to remove unnecessary state rows from the other side (e.g., 
right side for the left
         // outer join) if possible. In all cases, nothing needs to be 
outputted, hence the removal
         // needs to be done greedily by immediately consuming the returned 
iterator.
+        //
+        // For left semi joins, we have to remove unnecessary state rows from 
both sides if
+        // possible.
         val cleanupIter = joinType match {
           case Inner => leftSideJoiner.removeOldState() ++ 
rightSideJoiner.removeOldState()
           case LeftOuter => rightSideJoiner.removeOldState()
           case RightOuter => leftSideJoiner.removeOldState()
+          case LeftSemi => leftSideJoiner.removeOldState() ++ 
rightSideJoiner.removeOldState()

Review comment:
       @HeartSaVioR - updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -330,11 +338,17 @@ case class StreamingSymmetricHashJoinExec(
           }
         }.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
 
-        innerOutputIter ++ outerOutputIter
+        hashJoinOutputIter ++ outerOutputIter
+      case LeftSemi =>

Review comment:
       @HeartSaVioR - updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,15 +99,22 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing 
internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.
+   *                                    This is used for right side of left 
semi join in
+   *                                    [[StreamingSymmetricHashJoinExec]] 
only.
    */
   def getJoinedRows(
       key: UnsafeRow,
       generateJoinedRow: InternalRow => JoinedRow,
-      predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+      predicate: JoinedRow => Boolean,
+      joinOnlyFirstTimeMatchedRow: Boolean = false): Iterator[JoinedRow] = {
     val numValues = keyToNumValues.get(key)
     keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
       val joinedRow = generateJoinedRow(keyIdxToValue.value)
-      if (predicate(joinedRow)) {
+      if (joinOnlyFirstTimeMatchedRow && keyIdxToValue.matched) {

Review comment:
       @HeartSaVioR - make sense to me, though the cost for `generateJoinedRow` 
is minor. Updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -151,7 +151,7 @@ case class StreamingSymmetricHashJoinExec(
       stateWatermarkPredicates = JoinStateWatermarkPredicates(), 
stateFormatVersion, left, right)
   }
 
-  if (stateFormatVersion < 2 && joinType != Inner) {
+  if (stateFormatVersion < 2 && (joinType == LeftOuter || joinType == 
RightOuter)) {

Review comment:
       updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to