c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508876329



##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and complete mode",

Review comment:
       @viirya - updated for all places.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -246,37 +249,43 @@ case class StreamingSymmetricHashJoinExec(
 
     //  Join one side input using the other side's buffered/state rows. Here 
is how it is done.
     //
-    //  - `leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner)` 
generates all rows from
-    //    matching new left input with stored right input, and also stores all 
the left input
+    //  - `leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner)`
+    //    - inner, left outer, right outer join: generates all rows from 
matching new left input

Review comment:
       @viirya - updated.

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",

Review comment:
       @viirya - updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,22 +360,28 @@ case class StreamingSymmetricHashJoinExec(
 
       // Processing time between inner output completion and here comes from 
the outer portion of a

Review comment:
       @viirya - updated.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing 
internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.

Review comment:
       @HeartSaVioR - after more thought I feel the early eviction is more 
complicated and needs more thoughts.
   
   For left semi join state store eviction:
   * [if the watermark predicate is on 
value](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L542),
 we can add a method in `SymmetricHashJoinStateManager` or change current 
method `SymmetricHashJoinStateManager.removeByValueCondition()` to remove 
matched values when iterating all values.
   
   * [if the watermark predicate is on 
key](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L540),
 only the values of keys passed predicate will be iterated, but not all values 
in state store. So we need more efficient way to evict matched values here, 
otherwise we need to iterate all keys and values to find matched values.
   
   So I think this piece of code may be still needed (not remove completely 
away). Maybe we we can store the matched rows in some other data structure 
after getting all matched rows here. I feel a follow-up PR is appropriate, 
WDYT? @HeartSaVioR 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,22 +360,28 @@ case class StreamingSymmetricHashJoinExec(
 
       // Processing time between inner output completion and here comes from 
the outer portion of a
       // join, and thus counts as removal time as we remove old state from one 
side while iterating.
-      if (innerOutputCompletionTimeNs != 0) {
+      if (hashJoinOutputCompletionTimeNs != 0) {
         allRemovalsTimeMs +=
-          math.max(NANOSECONDS.toMillis(System.nanoTime - 
innerOutputCompletionTimeNs), 0)
+          math.max(NANOSECONDS.toMillis(System.nanoTime - 
hashJoinOutputCompletionTimeNs), 0)
       }
 
       allRemovalsTimeMs += timeTakenMs {
         // Remove any remaining state rows which aren't needed because they're 
below the watermark.
         //
-        // For inner joins, we have to remove unnecessary state rows from both 
sides if possible.
+        // For inner and left semi joins, we have to remove unnecessary state 
rows from both sides
+        // if possible.
+        //
         // For outer joins, we have already removed unnecessary state rows 
from the outer side
         // (e.g., left side for left outer join) while generating the outer 
"null" outputs. Now, we
         // have to remove unnecessary state rows from the other side (e.g., 
right side for the left
         // outer join) if possible. In all cases, nothing needs to be 
outputted, hence the removal
         // needs to be done greedily by immediately consuming the returned 
iterator.
+        //
+        // For left semi joins, we have to remove unnecessary state rows from 
both sides if
+        // possible.

Review comment:
       @viirya - removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to