HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509080312
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1088,39 +1088,79 @@ class StreamingLeftSemiJoinSuite extends
StreamingJoinSuite {
testStream(joined)(
MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+ // states
+ // left: 1, 2, 3, 4 ,5
+ // right: 3, 4, 5, 6, 7
+ assertNumStateRows(total = 10, updated = 10),
MultiAddData(leftInput, 21)(rightInput, 22),
- // Watermark = 11, should remove rows having window=[0,10]
+ // Watermark = 11, should remove rows having window=[0,10].
CheckNewAnswer(),
- assertNumStateRows(total = 2, updated = 12),
+ // states
+ // left: 21
+ // right: 22
+ //
+ // states evicted
+ // left: 1, 2, 3, 4 ,5 (below watermark)
+ // right: 3, 4, 5, 6, 7 (below watermark)
+ assertNumStateRows(total = 2, updated = 2),
AddData(leftInput, 22),
CheckNewAnswer(Row(22, 30, 44)),
+ // Unlike inner/outer joins, given left input row matches with right
input row,
+ // we don't buffer the matched left input row to the state store.
+ //
+ // states
+ // left: 21
+ // right: 22
assertNumStateRows(total = 2, updated = 0),
StopStream,
StartStream(),
AddData(leftInput, 1),
- // Row not add as 1 < state key watermark = 12
+ // Row not add as 1 < state key watermark = 12.
CheckNewAnswer(),
- AddData(rightInput, 11),
- // Row not add as 11 < state key watermark = 12
- CheckNewAnswer()
+ // states
+ // left: 21
+ // right: 22
+ assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1),
+ AddData(rightInput, 5),
+ // Row not add as 5 < state key watermark = 12.
+ CheckNewAnswer(),
+ // states
+ // left: 21
+ // right: 22
+ assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
)
}
test("left semi early state exclusion on left") {
val (leftInput, rightInput, joined) =
setupWindowedJoinWithLeftCondition("left_semi")
testStream(joined)(
- MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3, 4, 5),
- // The left rows with leftValue <= 4 should not generate their semi join
row and
+ MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
Review comment:
I see you exclude 1, 2 from right in this commit and nothing changed for
both output and state rows. Could you please explain this, say, the reason 1, 2
in right side are not added in state?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]