Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2024-04-08 Thread via GitHub


github-actions[bot] closed pull request #44076: [DO-NOT-REVIEW][DRAFT] Spark 
45637 multiple state test
URL: https://github.com/apache/spark/pull/44076


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2024-04-07 Thread via GitHub


github-actions[bot] commented on PR #44076:
URL: https://github.com/apache/spark/pull/44076#issuecomment-2041662026

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-12-28 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1437953509


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##
@@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec(
 thisRow: UnsafeRow,
 subIter: Iterator[InternalRow])
   extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) {
-
+  // scalastyle:off
   private val iteratorNotEmpty: Boolean = super.hasNext
 
   override def completion(): Unit = {
 val isLeftSemiWithMatch =
   joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
 // Add to state store only if both removal predicates do not match,
 // and the row is not matched for left side of left semi join.
+println(s"!stateKeyWatermarkPredicateFunc(key): 
${!stateKeyWatermarkPredicateFunc(key)}" +
+  s" !stateValueWatermarkPredicateFunc(thisRow): 
${!stateValueWatermarkPredicateFunc(thisRow)}")
 val shouldAddToState =
   !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
   !isLeftSemiWithMatch
 if (shouldAddToState) {
+  println(s"wei==add to state: $thisRow")

Review Comment:
   
https://github.com/apache/spark/commit/991726f31a8d182ed6d5b0e59185d97c0c5c532f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-12-28 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1437953371


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##
@@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec(
 thisRow: UnsafeRow,
 subIter: Iterator[InternalRow])
   extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) {
-
+  // scalastyle:off
   private val iteratorNotEmpty: Boolean = super.hasNext
 
   override def completion(): Unit = {
 val isLeftSemiWithMatch =
   joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
 // Add to state store only if both removal predicates do not match,
 // and the row is not matched for left side of left semi join.
+println(s"!stateKeyWatermarkPredicateFunc(key): 
${!stateKeyWatermarkPredicateFunc(key)}" +
+  s" !stateValueWatermarkPredicateFunc(thisRow): 
${!stateValueWatermarkPredicateFunc(thisRow)}")
 val shouldAddToState =
   !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
   !isLeftSemiWithMatch
 if (shouldAddToState) {
+  println(s"wei==add to state: $thisRow")

Review Comment:
   stateKeyWatermarkPredicateFunc's wm should be 0 here, not 8



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-12-28 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1437952915


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##
@@ -814,68 +814,183 @@ class MultiStatefulOperatorsSuite
 }
   }
 
-  test("stream-stream time interval join - output watermark for various 
intervals") {
-def testOutputWatermarkInJoin(
-df: DataFrame,
-input: MemoryStream[(String, Timestamp)],
-expectedOutputWatermark: Long): Unit = {
-  testStream(df)(
-// dummy row to trigger execution
-AddData(input, ("1", Timestamp.valueOf("2023-01-01 01:00:10"))),
-CheckAnswer(),
-Execute { query =>
-  val lastExecution = query.lastExecution
-  val joinOperator = lastExecution.executedPlan.collect {
-case j: StreamingSymmetricHashJoinExec => j
-  }.head
+  test("SPARK-45637-1 join on window, append mode") {

Review Comment:
   The row is added in the first batch (not no-data batch) here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-12-28 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1437952483


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##
@@ -814,68 +814,183 @@ class MultiStatefulOperatorsSuite
 }
   }
 
-  test("stream-stream time interval join - output watermark for various 
intervals") {
-def testOutputWatermarkInJoin(
-df: DataFrame,
-input: MemoryStream[(String, Timestamp)],
-expectedOutputWatermark: Long): Unit = {
-  testStream(df)(
-// dummy row to trigger execution
-AddData(input, ("1", Timestamp.valueOf("2023-01-01 01:00:10"))),
-CheckAnswer(),
-Execute { query =>
-  val lastExecution = query.lastExecution
-  val joinOperator = lastExecution.executedPlan.collect {
-case j: StreamingSymmetricHashJoinExec => j
-  }.head
+  test("SPARK-45637-1 join on window, append mode") {

Review Comment:
   ```
   -=-=-=-=-=-=-=
   currentBatchId = 0, lastExecutionRequiresAnotherBatch = false, 
isNewDataAvailable = true, shouldConstructNextBatch (with currbatchID) = true
   stateKeyWatermarkPredicateFunc: (input[0, 
struct, false].end <= 0)
   stateKeyWatermarkPredicateFunc: (input[0, 
struct, false].end <= 0)
   wei==thisrow: [0,100018,0,0,4c4b40]
   this row filtered: [0,100018,0,0,4c4b40]
   getJoinedRows -- key: [0,100018,0,0,4c4b40] numValues: 0
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,100018,0,0,4c4b40]
   wei==thisrow: [0,100018,0,4c4b40,989680]
   this row filtered: [0,100018,0,4c4b40,989680]
   getJoinedRows -- key: [0,100018,0,4c4b40,989680] numValues: 0
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,100018,0,4c4b40,989680]
   wei==thisrow: [0,100018,0,0,4c4b40]
   this row filtered: [0,100018,0,0,4c4b40]
   getJoinedRows -- key: [0,100018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,100018,0,0,4c4b40] + [0,100018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,100018,0,0,4c4b40]
   wei==thisrow: [0,100018,0,0,4c4b40]
   this row filtered: [0,100018,0,0,4c4b40]
   getJoinedRows -- key: [0,100018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,100018,0,0,4c4b40] + [0,100018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,100018,0,0,4c4b40]
   wei==thisrow: [0,100018,0,0,4c4b40]
   this row filtered: [0,100018,0,0,4c4b40]
   getJoinedRows -- key: [0,100018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,100018,0,0,4c4b40] + [0,100018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,100018,0,0,4c4b40]
   wei==thisrow: [0,100018,0,0,4c4b40]
   this row filtered: [0,100018,0,0,4c4b40]
   getJoinedRows -- key: [0,100018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,100018,0,0,4c4b40] + [0,100018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,100018,0,0,4c4b40]
   wei==thisrow: [0,100018,0,0,4c4b40]
   this row filtered: [0,100018,0,0,4c4b40]
   getJoinedRows -- key: [0,100018,0,0,4c4b40] numValues: 1
   joinedRow: {[0,100018,0,0,4c4b40] + [0,100018,0,0,4c4b40]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,100018,0,0,4c4b40]
   wei==thisrow: [0,100018,0,4c4b40,989680]
   this row filtered: [0,100018,0,4c4b40,989680]
   getJoinedRows -- key: [0,100018,0,4c4b40,989680] numValues: 1
   joinedRow: {[0,100018,0,4c4b40,989680] + [0,100018,0,4c4b40,989680]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,100018,0,4c4b40,989680]
   wei==thisrow: [0,100018,0,4c4b40,989680]
   this row filtered: [0,100018,0,4c4b40,989680]
   getJoinedRows -- key: [0,100018,0,4c4b40,989680] numValues: 1
   joinedRow: {[0,100018,0,4c4b40,989680] + [0,100018,0,4c4b40,989680]}
   ---joinedRow emitted
   !stateKeyWatermarkPredicateFunc(key): true 
!stateValueWatermarkPredicateFunc(thisRow): true
   wei==add to state: [0,100018,0,4c4b40,989680]
   wei==thisrow: [0,100018,0,4c4b40,989680]
   this row filtered: [0,100018,0,4c4b40,989680]
   getJoinedRows -- key: [0,100018,0,4c4b40,989680] numValues: 1
   joinedRow: {[0,100018,0,4c4b40,989680] 

Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-11-30 Thread via GitHub


WweiL commented on PR #44076:
URL: https://github.com/apache/spark/pull/44076#issuecomment-1834480537

   
https://github.com/apache/spark/commit/75d666b95a711787355ca3895057dabadd429023
   Seems like the prejoinFilter is just an optimization, i need to understand 
the cause of removing it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-11-30 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1411160900


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##
@@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec(
 thisRow: UnsafeRow,
 subIter: Iterator[InternalRow])
   extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) {
-
+  // scalastyle:off
   private val iteratorNotEmpty: Boolean = super.hasNext
 
   override def completion(): Unit = {
 val isLeftSemiWithMatch =
   joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
 // Add to state store only if both removal predicates do not match,
 // and the row is not matched for left side of left semi join.
+println(s"!stateKeyWatermarkPredicateFunc(key): 
${!stateKeyWatermarkPredicateFunc(key)}" +
+  s" !stateValueWatermarkPredicateFunc(thisRow): 
${!stateValueWatermarkPredicateFunc(thisRow)}")
 val shouldAddToState =
   !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
   !isLeftSemiWithMatch
 if (shouldAddToState) {
+  println(s"wei==add to state: $thisRow")

Review Comment:
   So what happens here is in the no data batch, the wm of 
`stateKeyWatermarkPredicateFunc` is updated to the new global wm (8). However 
the emitted key from both parent window aggregations are [0, 5), hence 
`stateKeyWatermarkPredicateFunc(key)` returns true, meaning that the window is 
not added to the join state store. As a result, when later 
`SymmetricHashJoinStateManager.getJoinedRows` wants to load the other side's 
stored row, it loads nothing.
   
   This is wrong, because the two [0, 5) windows should be joined here. At 
least one side of the window should be added to the state store, so the other 
side could load it and join. 
   
   This looks like some updates to the multiple state operators that we need to 
consider



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-11-30 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1411160900


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##
@@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec(
 thisRow: UnsafeRow,
 subIter: Iterator[InternalRow])
   extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) {
-
+  // scalastyle:off
   private val iteratorNotEmpty: Boolean = super.hasNext
 
   override def completion(): Unit = {
 val isLeftSemiWithMatch =
   joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
 // Add to state store only if both removal predicates do not match,
 // and the row is not matched for left side of left semi join.
+println(s"!stateKeyWatermarkPredicateFunc(key): 
${!stateKeyWatermarkPredicateFunc(key)}" +
+  s" !stateValueWatermarkPredicateFunc(thisRow): 
${!stateValueWatermarkPredicateFunc(thisRow)}")
 val shouldAddToState =
   !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
   !isLeftSemiWithMatch
 if (shouldAddToState) {
+  println(s"wei==add to state: $thisRow")

Review Comment:
   So what happens here is in the no data batch, the wm of 
`stateKeyWatermarkPredicateFunc` is updated to the new global wm (8). However 
the emitted key from both parent window aggregations are [0, 5), hence 
`stateKeyWatermarkPredicateFunc(key)` returns true, meaning that the window is 
not added to the join state store. As a result, when later 
`SymmetricHashJoinStateManager.getJoinedRows` wants to load the other side's 
stored row, it loads nothing.
   
   This is wrong, because the two [0, 5) windows should be joined here. At 
least one side of the window should be added to the state store, so the other 
side could load it and join. But because of this check, the row from both sides 
aren't added to the state and they can't be joined
   
   This looks like some updates to the multiple state operators that we need to 
consider



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-11-30 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1411160900


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##
@@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec(
 thisRow: UnsafeRow,
 subIter: Iterator[InternalRow])
   extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) {
-
+  // scalastyle:off
   private val iteratorNotEmpty: Boolean = super.hasNext
 
   override def completion(): Unit = {
 val isLeftSemiWithMatch =
   joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
 // Add to state store only if both removal predicates do not match,
 // and the row is not matched for left side of left semi join.
+println(s"!stateKeyWatermarkPredicateFunc(key): 
${!stateKeyWatermarkPredicateFunc(key)}" +
+  s" !stateValueWatermarkPredicateFunc(thisRow): 
${!stateValueWatermarkPredicateFunc(thisRow)}")
 val shouldAddToState =
   !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
   !isLeftSemiWithMatch
 if (shouldAddToState) {
+  println(s"wei==add to state: $thisRow")

Review Comment:
   So what happens here is in the no data batch, the wm of 
`stateKeyWatermarkPredicateFunc` is updated to the new global wm (8). However 
the emitted key from both parent window aggregations are [0, 5), hence 
`stateKeyWatermarkPredicateFunc(key)` returns true, meaning that the window is 
not added to the join state store.
   
   This is wrong, because the two [0, 5) windows should be joined here. At 
least one side of the window should be added to the state store, so the other 
side could load it and join. But because of this check, the row from both sides 
aren't added to the state and they can't be joined
   
   This looks like some updates to the multiple state operators that we need to 
consider



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-11-30 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1411160900


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##
@@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec(
 thisRow: UnsafeRow,
 subIter: Iterator[InternalRow])
   extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) {
-
+  // scalastyle:off
   private val iteratorNotEmpty: Boolean = super.hasNext
 
   override def completion(): Unit = {
 val isLeftSemiWithMatch =
   joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
 // Add to state store only if both removal predicates do not match,
 // and the row is not matched for left side of left semi join.
+println(s"!stateKeyWatermarkPredicateFunc(key): 
${!stateKeyWatermarkPredicateFunc(key)}" +
+  s" !stateValueWatermarkPredicateFunc(thisRow): 
${!stateValueWatermarkPredicateFunc(thisRow)}")
 val shouldAddToState =
   !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
   !isLeftSemiWithMatch
 if (shouldAddToState) {
+  println(s"wei==add to state: $thisRow")

Review Comment:
   So what happens here is in the no data batch, the wm of 
`stateKeyWatermarkPredicateFunc` is updated to the new global wm (8). However 
the emitted key from both parent window aggregations are [0, 5), hence 
`stateKeyWatermarkPredicateFunc(key)` returns true, meaning that the window 
should not be added to the join state store.
   
   This is wrong, because the two [0, 5) windows should be joined here. At 
least one side of the window should be added to the state store, so the other 
side could load it and join. But because of this check, the row from both sides 
aren't added to the state and they can't be joined
   
   This looks like some updates to the multiple state operators that we need to 
consider



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-11-30 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1411160900


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##
@@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec(
 thisRow: UnsafeRow,
 subIter: Iterator[InternalRow])
   extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) {
-
+  // scalastyle:off
   private val iteratorNotEmpty: Boolean = super.hasNext
 
   override def completion(): Unit = {
 val isLeftSemiWithMatch =
   joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
 // Add to state store only if both removal predicates do not match,
 // and the row is not matched for left side of left semi join.
+println(s"!stateKeyWatermarkPredicateFunc(key): 
${!stateKeyWatermarkPredicateFunc(key)}" +
+  s" !stateValueWatermarkPredicateFunc(thisRow): 
${!stateValueWatermarkPredicateFunc(thisRow)}")
 val shouldAddToState =
   !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
   !isLeftSemiWithMatch
 if (shouldAddToState) {
+  println(s"wei==add to state: $thisRow")

Review Comment:
   So what happens here is in the no data batch, the wm of 
`stateKeyWatermarkPredicateFunc` is updated to the new global wm (8), the 
emitted key from both parent window aggregations are [0, 5). Hence 
`stateKeyWatermarkPredicateFunc(key)` returns true, meaning that the window 
should not be added to the join state store.
   
   This is wrong, because the two [0, 5) windows should be joined here. 
   
   This looks like some updates to the multiple state operators that we need to 
consider



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-11-30 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1411160900


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##
@@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec(
 thisRow: UnsafeRow,
 subIter: Iterator[InternalRow])
   extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) {
-
+  // scalastyle:off
   private val iteratorNotEmpty: Boolean = super.hasNext
 
   override def completion(): Unit = {
 val isLeftSemiWithMatch =
   joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
 // Add to state store only if both removal predicates do not match,
 // and the row is not matched for left side of left semi join.
+println(s"!stateKeyWatermarkPredicateFunc(key): 
${!stateKeyWatermarkPredicateFunc(key)}" +
+  s" !stateValueWatermarkPredicateFunc(thisRow): 
${!stateValueWatermarkPredicateFunc(thisRow)}")
 val shouldAddToState =
   !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
   !isLeftSemiWithMatch
 if (shouldAddToState) {
+  println(s"wei==add to state: $thisRow")

Review Comment:
   So what happens here is in the no data batch, the wm of 
`stateKeyWatermarkPredicateFunc` is updated to the new global wm (8). However 
the emitted key from both parent window aggregations are [0, 5), hence 
`stateKeyWatermarkPredicateFunc(key)` returns true, meaning that the window 
should not be added to the join state store.
   
   This is wrong, because the two [0, 5) windows should be joined here. 
   
   This looks like some updates to the multiple state operators that we need to 
consider



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-11-30 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1409957775


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##
@@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec(
 thisRow: UnsafeRow,
 subIter: Iterator[InternalRow])
   extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) {
-
+  // scalastyle:off
   private val iteratorNotEmpty: Boolean = super.hasNext
 
   override def completion(): Unit = {
 val isLeftSemiWithMatch =
   joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
 // Add to state store only if both removal predicates do not match,
 // and the row is not matched for left side of left semi join.
+println(s"!stateKeyWatermarkPredicateFunc(key): 
${!stateKeyWatermarkPredicateFunc(key)}" +
+  s" !stateValueWatermarkPredicateFunc(thisRow): 
${!stateValueWatermarkPredicateFunc(thisRow)}")
 val shouldAddToState =
   !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
   !isLeftSemiWithMatch
 if (shouldAddToState) {
+  println(s"wei==add to state: $thisRow")

Review Comment:
   If comment out the if (shoudAddToState) above, then there is data:
   ```
   [info] - SPARK-45637 window agg + window agg -> join on window, append mode 
*** FAILED *** (6 seconds, 953 milliseconds)
   [info]   == Results ==
   [info]   !== Correct Answer - 1 ==  == Spark Answer - 1 ==
   [info]   !struct<_1:int,_2:int,_3:int,_4:int>   
struct,count:bigint,count:bigint>
   [info]   ![0,5,5,1] [[1969-12-31 
16:00:00.0,1969-12-31 16:00:05.0],1,5]
   [info] 
   ```
   My [0, 5, 5, 1] answer is badly formatted but it means the same as the spark 
answer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-11-29 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1409960967


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##
@@ -814,68 +814,183 @@ class MultiStatefulOperatorsSuite
 }
   }
 
-  test("stream-stream time interval join - output watermark for various 
intervals") {
-def testOutputWatermarkInJoin(
-df: DataFrame,
-input: MemoryStream[(String, Timestamp)],
-expectedOutputWatermark: Long): Unit = {
-  testStream(df)(
-// dummy row to trigger execution
-AddData(input, ("1", Timestamp.valueOf("2023-01-01 01:00:10"))),
-CheckAnswer(),
-Execute { query =>
-  val lastExecution = query.lastExecution
-  val joinOperator = lastExecution.executedPlan.collect {
-case j: StreamingSymmetricHashJoinExec => j
-  }.head
+  test("SPARK-45637-1 join on window, append mode") {

Review Comment:
   This is for me to understand the behavior of no agg window join
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-11-29 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1409958741


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##
@@ -814,68 +814,183 @@ class MultiStatefulOperatorsSuite
 }
   }
 
-  test("stream-stream time interval join - output watermark for various 
intervals") {
-def testOutputWatermarkInJoin(
-df: DataFrame,
-input: MemoryStream[(String, Timestamp)],
-expectedOutputWatermark: Long): Unit = {
-  testStream(df)(
-// dummy row to trigger execution
-AddData(input, ("1", Timestamp.valueOf("2023-01-01 01:00:10"))),
-CheckAnswer(),
-Execute { query =>
-  val lastExecution = query.lastExecution
-  val joinOperator = lastExecution.executedPlan.collect {
-case j: StreamingSymmetricHashJoinExec => j
-  }.head
+  test("SPARK-45637-1 join on window, append mode") {
+val impressions = MemoryStream[Int]
+val clicks = MemoryStream[Int]
 
-  val outputWatermark = joinOperator.produceOutputWatermark(0)
-  assert(outputWatermark.get === expectedOutputWatermark)
-}
+val impressionsWithWatermark = impressions.toDF()
+  .withColumn("impressionTime", timestamp_seconds($"value"))
+  .withColumnRenamed("value", "impressionAdId")
+  .withWatermark("impressionTime", "0 seconds")
+
+// clickTime is always later than impressionTime for clickAdId = 
impressionAdId
+// Here we manually set the difference to 2 seconds (see the 
(Multi)AddData below)
+val clicksWithWatermark = clicks.toDF()
+  .withColumn("timeSec", timestamp_seconds($"value"))
+  .selectExpr("value as clickAdId", "timeSec + INTERVAL 2 seconds as 
clickTime")
+  .withWatermark("clickTime", "0 seconds")
+
+val clicksWindow = clicksWithWatermark.select(
+  window($"clickTime", "5 seconds")
+)
+
+val impressionsWindow = impressionsWithWatermark.select(
+  window($"impressionTime", "5 seconds")
+)
+
+val clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", 
"inner")
+
+withSQLConf((SQLConf.SHUFFLE_PARTITIONS.key, "1")) {
+  testStream(clicksAndImpressions)(
+MultiAddData(
+  (impressions, Seq(0 to 8: _*)),
+  (clicks, Seq(1, 6))
+),
+CheckAnswer((0, 5, 5, 1))
   )
 }
+  }
 
-val input1 = MemoryStream[(String, Timestamp)]
-val df1 = input1.toDF()
-  .selectExpr("_1 as leftId", "_2 as leftEventTime")
-  .withWatermark("leftEventTime", "5 minutes")
+  test("SPARK-45637 window agg + window agg -> join on window, append mode") {

Review Comment:
   This is the test ran



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-11-29 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1409957775


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala:
##
@@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec(
 thisRow: UnsafeRow,
 subIter: Iterator[InternalRow])
   extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) {
-
+  // scalastyle:off
   private val iteratorNotEmpty: Boolean = super.hasNext
 
   override def completion(): Unit = {
 val isLeftSemiWithMatch =
   joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
 // Add to state store only if both removal predicates do not match,
 // and the row is not matched for left side of left semi join.
+println(s"!stateKeyWatermarkPredicateFunc(key): 
${!stateKeyWatermarkPredicateFunc(key)}" +
+  s" !stateValueWatermarkPredicateFunc(thisRow): 
${!stateValueWatermarkPredicateFunc(thisRow)}")
 val shouldAddToState =
   !stateKeyWatermarkPredicateFunc(key) && 
!stateValueWatermarkPredicateFunc(thisRow) &&
   !isLeftSemiWithMatch
 if (shouldAddToState) {
+  println(s"wei==add to state: $thisRow")

Review Comment:
   If comment out the if (shoudAddToState) above, then there is data:
   ```
   [info] - SPARK-45637 window agg + window agg -> join on window, append mode 
*** FAILED *** (6 seconds, 953 milliseconds)
   [info]   == Results ==
   [info]   !== Correct Answer - 1 ==  == Spark Answer - 1 ==
   [info]   !struct<_1:int,_2:int,_3:int,_4:int>   
struct,count:bigint,count:bigint>
   [info]   ![0,5,5,1] [[1969-12-31 
16:00:00.0,1969-12-31 16:00:05.0],1,5]
   [info] 
   ```
   My [0, 5, 5, 1] answer is badly written but the idea is the same as the 
spark answer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-11-29 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1408970548


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##
@@ -878,6 +878,147 @@ class MultiStatefulOperatorsSuite
 testOutputWatermarkInJoin(join3, input1, -40L * 1000 - 1)
   }
 
+  test("SPARK-45637 window agg + window agg -> join on window, append mode") {
+val impressions = MemoryStream[Int]
+val clicks = MemoryStream[Int]
+
+val impressionsWithWatermark = impressions.toDF()
+  .withColumn("impressionTime", timestamp_seconds($"value"))
+  .withColumnRenamed("value", "impressionAdId")
+  .withWatermark("impressionTime", "0 seconds")
+
+// clickTime is always later than impressionTime for clickAdId = 
impressionAdId
+// Here we manually set the difference to 2 seconds (see the 
(Multi)AddData below)
+val clicksWithWatermark = clicks.toDF()
+  .withColumn("timeSec", timestamp_seconds($"value"))
+  .selectExpr("value as clickAdId", "timeSec + INTERVAL 2 seconds as 
clickTime")
+  .withWatermark("clickTime", "0 seconds")
+
+val clicksWindow = clicksWithWatermark.groupBy(
+  window($"clickTime", "5 seconds")
+).count()
+
+val impressionsWindow = impressionsWithWatermark.groupBy(
+  window($"impressionTime", "5 seconds")
+).count()
+
+val clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", 
"inner")
+
+withSQLConf((SQLConf.SHUFFLE_PARTITIONS.key, "1")) {
+  testStream(clicksAndImpressions)(
+MultiAddData(
+  (impressions, Seq(0 to 8: _*)),
+  (clicks, Seq(6))
+),
+// data batch triggered
+
+// global watermark: (0, 0) [drop, evict]
+// impression [impressionAdId, impressionTime]:
+//wm: (0, 0)
+//input:  (0, 0), (1, 1), (2, 2), (3, 3), ..., (8, 8)
+//agg:[0, 5) -> 5, [5, 10) -> 4
+//state:  [0, 5) -> 5, [5, 10) -> 4
+//output: None
+// click [clickAdId, clickTime]
+//wm: (0, 0)
+//input:  (6, 8)
+//agg:[0, 5) -> 0, [5, 10) -> 1
+//state:  [0, 5) -> 0, [5, 10) -> 1
+//output: None
+// join:
+//all None
+
+// no-data batch triggered (shouldRunAnotherBatch)
+
+// global watermark: (0, 8) (default is min across multiple watermarks)
+// impression:
+//wm: (0, 8)
+//input:  None
+//agg:None
+//state:  [5, 10) -> 4
+//output: [0, 5) -> 5 (actually no, the state is not evicting any 
rows, why?)

Review Comment:
   why? logs see the description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]

2023-11-29 Thread via GitHub


WweiL commented on code in PR #44076:
URL: https://github.com/apache/spark/pull/44076#discussion_r1408970548


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala:
##
@@ -878,6 +878,147 @@ class MultiStatefulOperatorsSuite
 testOutputWatermarkInJoin(join3, input1, -40L * 1000 - 1)
   }
 
+  test("SPARK-45637 window agg + window agg -> join on window, append mode") {
+val impressions = MemoryStream[Int]
+val clicks = MemoryStream[Int]
+
+val impressionsWithWatermark = impressions.toDF()
+  .withColumn("impressionTime", timestamp_seconds($"value"))
+  .withColumnRenamed("value", "impressionAdId")
+  .withWatermark("impressionTime", "0 seconds")
+
+// clickTime is always later than impressionTime for clickAdId = 
impressionAdId
+// Here we manually set the difference to 2 seconds (see the 
(Multi)AddData below)
+val clicksWithWatermark = clicks.toDF()
+  .withColumn("timeSec", timestamp_seconds($"value"))
+  .selectExpr("value as clickAdId", "timeSec + INTERVAL 2 seconds as 
clickTime")
+  .withWatermark("clickTime", "0 seconds")
+
+val clicksWindow = clicksWithWatermark.groupBy(
+  window($"clickTime", "5 seconds")
+).count()
+
+val impressionsWindow = impressionsWithWatermark.groupBy(
+  window($"impressionTime", "5 seconds")
+).count()
+
+val clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", 
"inner")
+
+withSQLConf((SQLConf.SHUFFLE_PARTITIONS.key, "1")) {
+  testStream(clicksAndImpressions)(
+MultiAddData(
+  (impressions, Seq(0 to 8: _*)),
+  (clicks, Seq(6))
+),
+// data batch triggered
+
+// global watermark: (0, 0) [drop, evict]
+// impression [impressionAdId, impressionTime]:
+//wm: (0, 0)
+//input:  (0, 0), (1, 1), (2, 2), (3, 3), ..., (8, 8)
+//agg:[0, 5) -> 5, [5, 10) -> 4
+//state:  [0, 5) -> 5, [5, 10) -> 4
+//output: None
+// click [clickAdId, clickTime]
+//wm: (0, 0)
+//input:  (6, 8)
+//agg:[0, 5) -> 0, [5, 10) -> 1
+//state:  [0, 5) -> 0, [5, 10) -> 1
+//output: None
+// join:
+//all None
+
+// no-data batch triggered (shouldRunAnotherBatch)
+
+// global watermark: (0, 8) (default is min across multiple watermarks)
+// impression:
+//wm: (0, 8)
+//input:  None
+//agg:None
+//state:  [5, 10) -> 4
+//output: [0, 5) -> 5 (actually no, the state is not evicting any 
rows, why?)

Review Comment:
   why? logs see the description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org