c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509089331



##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1088,39 +1088,79 @@ class StreamingLeftSemiJoinSuite extends 
StreamingJoinSuite {
     testStream(joined)(
       MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
       CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      // states
+      // left: 1, 2, 3, 4 ,5
+      // right: 3, 4, 5, 6, 7
+      assertNumStateRows(total = 10, updated = 10),
       MultiAddData(leftInput, 21)(rightInput, 22),
-      // Watermark = 11, should remove rows having window=[0,10]
+      // Watermark = 11, should remove rows having window=[0,10].
       CheckNewAnswer(),
-      assertNumStateRows(total = 2, updated = 12),
+      // states
+      // left: 21
+      // right: 22
+      //
+      // states evicted
+      // left: 1, 2, 3, 4 ,5 (below watermark)
+      // right: 3, 4, 5, 6, 7 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
       AddData(leftInput, 22),
       CheckNewAnswer(Row(22, 30, 44)),
+      // Unlike inner/outer joins, given left input row matches with right 
input row,
+      // we don't buffer the matched left input row to the state store.
+      //
+      // states
+      // left: 21
+      // right: 22
       assertNumStateRows(total = 2, updated = 0),
       StopStream,
       StartStream(),
 
       AddData(leftInput, 1),
-      // Row not add as 1 < state key watermark = 12
+      // Row not add as 1 < state key watermark = 12.
       CheckNewAnswer(),
-      AddData(rightInput, 11),
-      // Row not add as 11 < state key watermark = 12
-      CheckNewAnswer()
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1),
+      AddData(rightInput, 5),
+      // Row not add as 5 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
     )
   }
 
   test("left semi early state exclusion on left") {
     val (leftInput, rightInput, joined) = 
setupWindowedJoinWithLeftCondition("left_semi")
 
     testStream(joined)(
-      MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3, 4, 5),
-      // The left rows with leftValue <= 4 should not generate their semi join 
row and
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),

Review comment:
       The join condition has `leftValue > 4` on left side. For left semi join 
there's a logical plan optimization rule to push down the condition from join 
(`PushPredicateThroughJoin`), so there's a filter operator on right side as 
well to filter out the rows with 1,2 before join. So the join result and state 
store will be the same with/without 1,2 rows because they are filtered out 
before join.
   
   Left semi join physical plan:
   
   ```
   *(4) Project [key#3, cast(precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, 
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) 
- 0) as double) / 1.0E7)) THEN 
(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), 
LongType, TimestampType) as bigint) AS end#46L, leftValue#5]
   +- StreamingSymmetricHashJoin [key#3, named_struct(start, 
precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, 
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) 
- 0) as double) / 1.0E7)) THEN 
(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, 
TimestampType), end, precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, 
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) 
- 0) as double) / 1.0E7)) THEN 
(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, L
 ongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), 
LongType, TimestampType))], [key#12, window#23-T10000ms], LeftSemi, condition = 
[ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ 
checkpoint = <unknown>, runId = 92a31d09-4275-4ee6-8bba-03e1973b4298, opId = 0, 
ver = 0, numPartitions = 5], 0, state cleanup [ left key predicate: (input[1, 
struct<start:timestamp,end:timestamp>, false].end <= 0), right key predicate: 
(input[1, struct<start:timestamp,end:timestamp>, false].end <= 0) ], 2
      :- Exchange hashpartitioning(key#3, named_struct(start, 
precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, 
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) 
- 0) as double) / 1.0E7)) THEN 
(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, 
TimestampType), end, precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, 
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) 
- 0) as double) / 1.0E7)) THEN 
(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, 
 LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), 
LongType, TimestampType)), 5), true, [id=#54]
      :  +- EventTimeWatermark leftTime#4: timestamp, 10 seconds
      :     +- *(1) Project [value#1 AS key#3, timestamp_seconds(value#1) AS 
leftTime#4, (value#1 * 2) AS leftValue#5]
      :        +- *(1) Filter ((value#1 * 2) > 4)
      :           +- StreamingRelation memory, [value#1]
      +- Exchange hashpartitioning(key#12, window#23-T10000ms, 5), true, 
[id=#63]
         +- *(3) Project [key#12, named_struct(start, 
precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(rightTime#13-T10000ms, 
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(rightTime#13-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) THEN 
(CEIL((cast((precisetimestampconversion(rightTime#13-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(rightTime#13-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, 
TimestampType), end, precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(rightTime#13-T10000ms, 
TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = 
(cast((precisetimestampconversion(rightTime#13-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) THEN 
(CEIL((cast((precisetimestampconversion(rightTime#13-T10000ms, Timestamp
 Type, LongType) - 0) as double) / 1.0E7)) + 1) ELSE 
CEIL((cast((precisetimestampconversion(rightTime#13-T10000ms, TimestampType, 
LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), 
LongType, TimestampType)) AS window#23-T10000ms]
            +- EventTimeWatermark rightTime#13: timestamp, 10 seconds
               +- *(2) Project [value#10 AS key#12, timestamp_seconds(value#10) 
AS rightTime#13]
                  +- *(2) Filter ((value#10 * 2) > 4)
                     +- StreamingRelation memory, [value#10]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to