neilramaswamy commented on code in PR #48297:
URL: https://github.com/apache/spark/pull/48297#discussion_r1796403387
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -668,13 +668,38 @@ case class StreamingSymmetricHashJoinExec(
private val iteratorNotEmpty: Boolean = super.hasNext
override def completion(): Unit = {
- val isLeftSemiWithMatch =
- joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
- // Add to state store only if both removal predicates do not match,
- // and the row is not matched for left side of left semi join.
- val shouldAddToState =
- !stateKeyWatermarkPredicateFunc(key) &&
!stateValueWatermarkPredicateFunc(thisRow) &&
- !isLeftSemiWithMatch
+ // The criteria of whether the input has to be added into state store
or not:
+ // - Left side: input can be skipped to be added to the state store if
it's already matched
+ // and the join type is left semi.
+ // For other cases, the input should be added, including the case
it's going to be evicted
+ // in this batch. It hasn't yet evaluated with inputs from right
side "for this batch".
Review Comment:
super nit: "for this batch" doesn't need to be in quotes.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##########
@@ -668,13 +668,38 @@ case class StreamingSymmetricHashJoinExec(
private val iteratorNotEmpty: Boolean = super.hasNext
override def completion(): Unit = {
- val isLeftSemiWithMatch =
- joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
- // Add to state store only if both removal predicates do not match,
- // and the row is not matched for left side of left semi join.
- val shouldAddToState =
- !stateKeyWatermarkPredicateFunc(key) &&
!stateValueWatermarkPredicateFunc(thisRow) &&
- !isLeftSemiWithMatch
+ // The criteria of whether the input has to be added into state store
or not:
+ // - Left side: input can be skipped to be added to the state store if
it's already matched
+ // and the join type is left semi.
+ // For other cases, the input should be added, including the case
it's going to be evicted
+ // in this batch. It hasn't yet evaluated with inputs from right
side "for this batch".
+ // Refer to the classdoc of SteramingSymmetricHashJoinExec about how
stream-stream join
+ // works.
+ // - Right side: for this side, the evaluation with inputs from left
side "for this batch"
Review Comment:
ditto
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##########
@@ -878,6 +878,59 @@ class MultiStatefulOperatorsSuite
testOutputWatermarkInJoin(join3, input1, -40L * 1000 - 1)
}
+ test("SPARK-49829 time window agg per each source followed by stream-stream
join") {
+ val inputStream1 = MemoryStream[Long]
+ val inputStream2 = MemoryStream[Long]
+
+ val df1 = inputStream1.toDF()
+ .selectExpr("value", "timestamp_seconds(value) AS ts")
+ .withWatermark("ts", "5 seconds")
+
+ val df2 = inputStream2.toDF()
+ .selectExpr("value", "timestamp_seconds(value) AS ts")
+ .withWatermark("ts", "5 seconds")
+
+ val df1Window = df1.groupBy(
+ window($"ts", "10 seconds")
+ ).agg(sum("value").as("sum_df1"))
+
+ val df2Window = df2.groupBy(
+ window($"ts", "10 seconds")
+ ).agg(sum("value").as("sum_df2"))
+
+ val joined = df1Window.join(df2Window, "window", "inner")
+ .selectExpr("CAST(window.end AS long) AS window_end", "sum_df1",
"sum_df2")
+
+ // The test verifies the case where both sides produce input as time
window (append mode)
+ // for stream-stream join having join condition for equality of time
window.
+ // Inputs are produced into stream-stream join when the time windows are
completed, meaning
+ // they will be evicted in this batch for stream-stream join as well.
(NOTE: join condition
+ // does not delay the state watermark in stream-stream join).
+ // Before SPARK-49829, left side does not add the input to state store if
it's going to evict
+ // in this batch, which breaks the match between input from left side and
input from right
+ // side "for this batch".
Review Comment:
Can you comment below which records specifically were getting dropped
_before_ your changes?
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1966,4 +2046,125 @@ class StreamingLeftSemiJoinSuite extends
StreamingJoinSuite {
assertNumStateRows(total = 9, updated = 4)
)
}
+
+ test("SPARK-49829 two chained stream-stream left outer joins among three
input streams") {
Review Comment:
Can you also comment on this test which record was previously being dropped?
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala:
##########
@@ -1559,6 +1581,64 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite
{
)
}
}
+
+ test("SPARK-49829 left-outer join, input being unmatched is between WM for
late event and " +
+ "WM for eviction") {
+
+ withTempDir { checkpoint =>
+ // This config needs to be set, otherwise no-data batch will be
triggered and after
+ // no-data batch, WM for late event and WM for eviction would be same.
+ withSQLConf(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key ->
"false") {
+ val memoryStream1 = MemoryStream[(String, Int)]
+ val memoryStream2 = MemoryStream[(String, Int)]
+
+ val data1 = memoryStream1.toDF()
+ .selectExpr("_1 AS key", "timestamp_seconds(_2) AS eventTime")
+ .withWatermark("eventTime", "0 seconds")
+ val data2 = memoryStream2.toDF()
+ .selectExpr("_1 AS key", "timestamp_seconds(_2) AS eventTime")
+ .withWatermark("eventTime", "0 seconds")
+
+ val joinedDf = data1.join(data2, Seq("key", "eventTime"), "leftOuter")
+ .selectExpr("key", "CAST(eventTime AS long) AS eventTime")
+
+ def assertLeftRows(expected: Seq[Row]): AssertOnQuery = {
+ assertStateStoreRows(0L, "left", expected) { df =>
+ df.selectExpr("value.key", "CAST(value.eventTime AS long)")
+ }
+ }
+
+ def assertRightRows(expected: Seq[Row]): AssertOnQuery = {
+ assertStateStoreRows(0L, "right", expected) { df =>
+ df.selectExpr("value.key", "CAST(value.eventTime AS long)")
+ }
+ }
+
+ testStream(joinedDf)(
+ StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+ // batch 0
+ // WM: late record = 0, eviction = 0
+ MultiAddData(
+ (memoryStream1, Seq(("a", 1), ("b", 2))),
+ (memoryStream2, Seq(("b", 2), ("c", 1)))
+ ),
+ CheckNewAnswer(("b", 2)),
+ assertLeftRows(Seq(Row("a", 1), Row("b", 2))),
+ assertRightRows(Seq(Row("b", 2), Row("c", 1))),
+ // batch 1
+ // WM: late record = 0, eviction = 2
+ // Before Spark introduces multiple stateful operator, WM for late
record was same as
+ // WM for eviction, hence ("d", 1) was treated as late record.
+ // With the multiple state operator, ("d", 1) is added in batch 1
but also evicted in
+ // batch 1. Before SPARK-49829, this wasn't producing unmatched row,
and it is fixed.
Review Comment:
The key thing here is that 1 is less than the state watermark. The state
watermark is 2, since we are joining on event-time directly and the watermark
is 0. Thus, state watermark = eviction watermark - 0 = eviction watermark.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]