Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
github-actions[bot] closed pull request #44076: [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test URL: https://github.com/apache/spark/pull/44076 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
github-actions[bot] commented on PR #44076: URL: https://github.com/apache/spark/pull/44076#issuecomment-2041662026 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1437953509 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ## @@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec( thisRow: UnsafeRow, subIter: Iterator[InternalRow]) extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) { - + // scalastyle:off private val iteratorNotEmpty: Boolean = super.hasNext override def completion(): Unit = { val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty // Add to state store only if both removal predicates do not match, // and the row is not matched for left side of left semi join. +println(s"!stateKeyWatermarkPredicateFunc(key): ${!stateKeyWatermarkPredicateFunc(key)}" + + s" !stateValueWatermarkPredicateFunc(thisRow): ${!stateValueWatermarkPredicateFunc(thisRow)}") val shouldAddToState = !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) && !isLeftSemiWithMatch if (shouldAddToState) { + println(s"wei==add to state: $thisRow") Review Comment: https://github.com/apache/spark/commit/991726f31a8d182ed6d5b0e59185d97c0c5c532f -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1437953371 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ## @@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec( thisRow: UnsafeRow, subIter: Iterator[InternalRow]) extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) { - + // scalastyle:off private val iteratorNotEmpty: Boolean = super.hasNext override def completion(): Unit = { val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty // Add to state store only if both removal predicates do not match, // and the row is not matched for left side of left semi join. +println(s"!stateKeyWatermarkPredicateFunc(key): ${!stateKeyWatermarkPredicateFunc(key)}" + + s" !stateValueWatermarkPredicateFunc(thisRow): ${!stateValueWatermarkPredicateFunc(thisRow)}") val shouldAddToState = !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) && !isLeftSemiWithMatch if (shouldAddToState) { + println(s"wei==add to state: $thisRow") Review Comment: stateKeyWatermarkPredicateFunc's wm should be 0 here, not 8 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1437952915 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala: ## @@ -814,68 +814,183 @@ class MultiStatefulOperatorsSuite } } - test("stream-stream time interval join - output watermark for various intervals") { -def testOutputWatermarkInJoin( -df: DataFrame, -input: MemoryStream[(String, Timestamp)], -expectedOutputWatermark: Long): Unit = { - testStream(df)( -// dummy row to trigger execution -AddData(input, ("1", Timestamp.valueOf("2023-01-01 01:00:10"))), -CheckAnswer(), -Execute { query => - val lastExecution = query.lastExecution - val joinOperator = lastExecution.executedPlan.collect { -case j: StreamingSymmetricHashJoinExec => j - }.head + test("SPARK-45637-1 join on window, append mode") { Review Comment: The row is added in the first batch (not no-data batch) here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1437952483 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala: ## @@ -814,68 +814,183 @@ class MultiStatefulOperatorsSuite } } - test("stream-stream time interval join - output watermark for various intervals") { -def testOutputWatermarkInJoin( -df: DataFrame, -input: MemoryStream[(String, Timestamp)], -expectedOutputWatermark: Long): Unit = { - testStream(df)( -// dummy row to trigger execution -AddData(input, ("1", Timestamp.valueOf("2023-01-01 01:00:10"))), -CheckAnswer(), -Execute { query => - val lastExecution = query.lastExecution - val joinOperator = lastExecution.executedPlan.collect { -case j: StreamingSymmetricHashJoinExec => j - }.head + test("SPARK-45637-1 join on window, append mode") { Review Comment: ``` -=-=-=-=-=-=-= currentBatchId = 0, lastExecutionRequiresAnotherBatch = false, isNewDataAvailable = true, shouldConstructNextBatch (with currbatchID) = true stateKeyWatermarkPredicateFunc: (input[0, struct, false].end <= 0) stateKeyWatermarkPredicateFunc: (input[0, struct, false].end <= 0) wei==thisrow: [0,100018,0,0,4c4b40] this row filtered: [0,100018,0,0,4c4b40] getJoinedRows -- key: [0,100018,0,0,4c4b40] numValues: 0 !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true wei==add to state: [0,100018,0,0,4c4b40] wei==thisrow: [0,100018,0,4c4b40,989680] this row filtered: [0,100018,0,4c4b40,989680] getJoinedRows -- key: [0,100018,0,4c4b40,989680] numValues: 0 !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true wei==add to state: [0,100018,0,4c4b40,989680] wei==thisrow: [0,100018,0,0,4c4b40] this row filtered: [0,100018,0,0,4c4b40] getJoinedRows -- key: [0,100018,0,0,4c4b40] numValues: 1 joinedRow: {[0,100018,0,0,4c4b40] + [0,100018,0,0,4c4b40]} ---joinedRow emitted !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true wei==add to state: [0,100018,0,0,4c4b40] wei==thisrow: [0,100018,0,0,4c4b40] this row filtered: [0,100018,0,0,4c4b40] getJoinedRows -- key: [0,100018,0,0,4c4b40] numValues: 1 joinedRow: {[0,100018,0,0,4c4b40] + [0,100018,0,0,4c4b40]} ---joinedRow emitted !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true wei==add to state: [0,100018,0,0,4c4b40] wei==thisrow: [0,100018,0,0,4c4b40] this row filtered: [0,100018,0,0,4c4b40] getJoinedRows -- key: [0,100018,0,0,4c4b40] numValues: 1 joinedRow: {[0,100018,0,0,4c4b40] + [0,100018,0,0,4c4b40]} ---joinedRow emitted !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true wei==add to state: [0,100018,0,0,4c4b40] wei==thisrow: [0,100018,0,0,4c4b40] this row filtered: [0,100018,0,0,4c4b40] getJoinedRows -- key: [0,100018,0,0,4c4b40] numValues: 1 joinedRow: {[0,100018,0,0,4c4b40] + [0,100018,0,0,4c4b40]} ---joinedRow emitted !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true wei==add to state: [0,100018,0,0,4c4b40] wei==thisrow: [0,100018,0,0,4c4b40] this row filtered: [0,100018,0,0,4c4b40] getJoinedRows -- key: [0,100018,0,0,4c4b40] numValues: 1 joinedRow: {[0,100018,0,0,4c4b40] + [0,100018,0,0,4c4b40]} ---joinedRow emitted !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true wei==add to state: [0,100018,0,0,4c4b40] wei==thisrow: [0,100018,0,4c4b40,989680] this row filtered: [0,100018,0,4c4b40,989680] getJoinedRows -- key: [0,100018,0,4c4b40,989680] numValues: 1 joinedRow: {[0,100018,0,4c4b40,989680] + [0,100018,0,4c4b40,989680]} ---joinedRow emitted !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true wei==add to state: [0,100018,0,4c4b40,989680] wei==thisrow: [0,100018,0,4c4b40,989680] this row filtered: [0,100018,0,4c4b40,989680] getJoinedRows -- key: [0,100018,0,4c4b40,989680] numValues: 1 joinedRow: {[0,100018,0,4c4b40,989680] + [0,100018,0,4c4b40,989680]} ---joinedRow emitted !stateKeyWatermarkPredicateFunc(key): true !stateValueWatermarkPredicateFunc(thisRow): true wei==add to state: [0,100018,0,4c4b40,989680] wei==thisrow: [0,100018,0,4c4b40,989680] this row filtered: [0,100018,0,4c4b40,989680] getJoinedRows -- key: [0,100018,0,4c4b40,989680] numValues: 1 joinedRow: {[0,100018,0,4c4b40,989680]
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on PR #44076: URL: https://github.com/apache/spark/pull/44076#issuecomment-1834480537 https://github.com/apache/spark/commit/75d666b95a711787355ca3895057dabadd429023 Seems like the prejoinFilter is just an optimization, i need to understand the cause of removing it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1411160900 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ## @@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec( thisRow: UnsafeRow, subIter: Iterator[InternalRow]) extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) { - + // scalastyle:off private val iteratorNotEmpty: Boolean = super.hasNext override def completion(): Unit = { val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty // Add to state store only if both removal predicates do not match, // and the row is not matched for left side of left semi join. +println(s"!stateKeyWatermarkPredicateFunc(key): ${!stateKeyWatermarkPredicateFunc(key)}" + + s" !stateValueWatermarkPredicateFunc(thisRow): ${!stateValueWatermarkPredicateFunc(thisRow)}") val shouldAddToState = !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) && !isLeftSemiWithMatch if (shouldAddToState) { + println(s"wei==add to state: $thisRow") Review Comment: So what happens here is in the no data batch, the wm of `stateKeyWatermarkPredicateFunc` is updated to the new global wm (8). However the emitted key from both parent window aggregations are [0, 5), hence `stateKeyWatermarkPredicateFunc(key)` returns true, meaning that the window is not added to the join state store. As a result, when later `SymmetricHashJoinStateManager.getJoinedRows` wants to load the other side's stored row, it loads nothing. This is wrong, because the two [0, 5) windows should be joined here. At least one side of the window should be added to the state store, so the other side could load it and join. This looks like some updates to the multiple state operators that we need to consider -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1411160900 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ## @@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec( thisRow: UnsafeRow, subIter: Iterator[InternalRow]) extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) { - + // scalastyle:off private val iteratorNotEmpty: Boolean = super.hasNext override def completion(): Unit = { val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty // Add to state store only if both removal predicates do not match, // and the row is not matched for left side of left semi join. +println(s"!stateKeyWatermarkPredicateFunc(key): ${!stateKeyWatermarkPredicateFunc(key)}" + + s" !stateValueWatermarkPredicateFunc(thisRow): ${!stateValueWatermarkPredicateFunc(thisRow)}") val shouldAddToState = !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) && !isLeftSemiWithMatch if (shouldAddToState) { + println(s"wei==add to state: $thisRow") Review Comment: So what happens here is in the no data batch, the wm of `stateKeyWatermarkPredicateFunc` is updated to the new global wm (8). However the emitted key from both parent window aggregations are [0, 5), hence `stateKeyWatermarkPredicateFunc(key)` returns true, meaning that the window is not added to the join state store. As a result, when later `SymmetricHashJoinStateManager.getJoinedRows` wants to load the other side's stored row, it loads nothing. This is wrong, because the two [0, 5) windows should be joined here. At least one side of the window should be added to the state store, so the other side could load it and join. But because of this check, the row from both sides aren't added to the state and they can't be joined This looks like some updates to the multiple state operators that we need to consider -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1411160900 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ## @@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec( thisRow: UnsafeRow, subIter: Iterator[InternalRow]) extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) { - + // scalastyle:off private val iteratorNotEmpty: Boolean = super.hasNext override def completion(): Unit = { val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty // Add to state store only if both removal predicates do not match, // and the row is not matched for left side of left semi join. +println(s"!stateKeyWatermarkPredicateFunc(key): ${!stateKeyWatermarkPredicateFunc(key)}" + + s" !stateValueWatermarkPredicateFunc(thisRow): ${!stateValueWatermarkPredicateFunc(thisRow)}") val shouldAddToState = !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) && !isLeftSemiWithMatch if (shouldAddToState) { + println(s"wei==add to state: $thisRow") Review Comment: So what happens here is in the no data batch, the wm of `stateKeyWatermarkPredicateFunc` is updated to the new global wm (8). However the emitted key from both parent window aggregations are [0, 5), hence `stateKeyWatermarkPredicateFunc(key)` returns true, meaning that the window is not added to the join state store. This is wrong, because the two [0, 5) windows should be joined here. At least one side of the window should be added to the state store, so the other side could load it and join. But because of this check, the row from both sides aren't added to the state and they can't be joined This looks like some updates to the multiple state operators that we need to consider -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1411160900 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ## @@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec( thisRow: UnsafeRow, subIter: Iterator[InternalRow]) extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) { - + // scalastyle:off private val iteratorNotEmpty: Boolean = super.hasNext override def completion(): Unit = { val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty // Add to state store only if both removal predicates do not match, // and the row is not matched for left side of left semi join. +println(s"!stateKeyWatermarkPredicateFunc(key): ${!stateKeyWatermarkPredicateFunc(key)}" + + s" !stateValueWatermarkPredicateFunc(thisRow): ${!stateValueWatermarkPredicateFunc(thisRow)}") val shouldAddToState = !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) && !isLeftSemiWithMatch if (shouldAddToState) { + println(s"wei==add to state: $thisRow") Review Comment: So what happens here is in the no data batch, the wm of `stateKeyWatermarkPredicateFunc` is updated to the new global wm (8). However the emitted key from both parent window aggregations are [0, 5), hence `stateKeyWatermarkPredicateFunc(key)` returns true, meaning that the window should not be added to the join state store. This is wrong, because the two [0, 5) windows should be joined here. At least one side of the window should be added to the state store, so the other side could load it and join. But because of this check, the row from both sides aren't added to the state and they can't be joined This looks like some updates to the multiple state operators that we need to consider -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1411160900 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ## @@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec( thisRow: UnsafeRow, subIter: Iterator[InternalRow]) extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) { - + // scalastyle:off private val iteratorNotEmpty: Boolean = super.hasNext override def completion(): Unit = { val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty // Add to state store only if both removal predicates do not match, // and the row is not matched for left side of left semi join. +println(s"!stateKeyWatermarkPredicateFunc(key): ${!stateKeyWatermarkPredicateFunc(key)}" + + s" !stateValueWatermarkPredicateFunc(thisRow): ${!stateValueWatermarkPredicateFunc(thisRow)}") val shouldAddToState = !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) && !isLeftSemiWithMatch if (shouldAddToState) { + println(s"wei==add to state: $thisRow") Review Comment: So what happens here is in the no data batch, the wm of `stateKeyWatermarkPredicateFunc` is updated to the new global wm (8), the emitted key from both parent window aggregations are [0, 5). Hence `stateKeyWatermarkPredicateFunc(key)` returns true, meaning that the window should not be added to the join state store. This is wrong, because the two [0, 5) windows should be joined here. This looks like some updates to the multiple state operators that we need to consider -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1411160900 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ## @@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec( thisRow: UnsafeRow, subIter: Iterator[InternalRow]) extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) { - + // scalastyle:off private val iteratorNotEmpty: Boolean = super.hasNext override def completion(): Unit = { val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty // Add to state store only if both removal predicates do not match, // and the row is not matched for left side of left semi join. +println(s"!stateKeyWatermarkPredicateFunc(key): ${!stateKeyWatermarkPredicateFunc(key)}" + + s" !stateValueWatermarkPredicateFunc(thisRow): ${!stateValueWatermarkPredicateFunc(thisRow)}") val shouldAddToState = !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) && !isLeftSemiWithMatch if (shouldAddToState) { + println(s"wei==add to state: $thisRow") Review Comment: So what happens here is in the no data batch, the wm of `stateKeyWatermarkPredicateFunc` is updated to the new global wm (8). However the emitted key from both parent window aggregations are [0, 5), hence `stateKeyWatermarkPredicateFunc(key)` returns true, meaning that the window should not be added to the join state store. This is wrong, because the two [0, 5) windows should be joined here. This looks like some updates to the multiple state operators that we need to consider -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1409957775 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ## @@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec( thisRow: UnsafeRow, subIter: Iterator[InternalRow]) extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) { - + // scalastyle:off private val iteratorNotEmpty: Boolean = super.hasNext override def completion(): Unit = { val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty // Add to state store only if both removal predicates do not match, // and the row is not matched for left side of left semi join. +println(s"!stateKeyWatermarkPredicateFunc(key): ${!stateKeyWatermarkPredicateFunc(key)}" + + s" !stateValueWatermarkPredicateFunc(thisRow): ${!stateValueWatermarkPredicateFunc(thisRow)}") val shouldAddToState = !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) && !isLeftSemiWithMatch if (shouldAddToState) { + println(s"wei==add to state: $thisRow") Review Comment: If comment out the if (shoudAddToState) above, then there is data: ``` [info] - SPARK-45637 window agg + window agg -> join on window, append mode *** FAILED *** (6 seconds, 953 milliseconds) [info] == Results == [info] !== Correct Answer - 1 == == Spark Answer - 1 == [info] !struct<_1:int,_2:int,_3:int,_4:int> struct,count:bigint,count:bigint> [info] ![0,5,5,1] [[1969-12-31 16:00:00.0,1969-12-31 16:00:05.0],1,5] [info] ``` My [0, 5, 5, 1] answer is badly formatted but it means the same as the spark answer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1409960967 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala: ## @@ -814,68 +814,183 @@ class MultiStatefulOperatorsSuite } } - test("stream-stream time interval join - output watermark for various intervals") { -def testOutputWatermarkInJoin( -df: DataFrame, -input: MemoryStream[(String, Timestamp)], -expectedOutputWatermark: Long): Unit = { - testStream(df)( -// dummy row to trigger execution -AddData(input, ("1", Timestamp.valueOf("2023-01-01 01:00:10"))), -CheckAnswer(), -Execute { query => - val lastExecution = query.lastExecution - val joinOperator = lastExecution.executedPlan.collect { -case j: StreamingSymmetricHashJoinExec => j - }.head + test("SPARK-45637-1 join on window, append mode") { Review Comment: This is for me to understand the behavior of no agg window join -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1409958741 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala: ## @@ -814,68 +814,183 @@ class MultiStatefulOperatorsSuite } } - test("stream-stream time interval join - output watermark for various intervals") { -def testOutputWatermarkInJoin( -df: DataFrame, -input: MemoryStream[(String, Timestamp)], -expectedOutputWatermark: Long): Unit = { - testStream(df)( -// dummy row to trigger execution -AddData(input, ("1", Timestamp.valueOf("2023-01-01 01:00:10"))), -CheckAnswer(), -Execute { query => - val lastExecution = query.lastExecution - val joinOperator = lastExecution.executedPlan.collect { -case j: StreamingSymmetricHashJoinExec => j - }.head + test("SPARK-45637-1 join on window, append mode") { +val impressions = MemoryStream[Int] +val clicks = MemoryStream[Int] - val outputWatermark = joinOperator.produceOutputWatermark(0) - assert(outputWatermark.get === expectedOutputWatermark) -} +val impressionsWithWatermark = impressions.toDF() + .withColumn("impressionTime", timestamp_seconds($"value")) + .withColumnRenamed("value", "impressionAdId") + .withWatermark("impressionTime", "0 seconds") + +// clickTime is always later than impressionTime for clickAdId = impressionAdId +// Here we manually set the difference to 2 seconds (see the (Multi)AddData below) +val clicksWithWatermark = clicks.toDF() + .withColumn("timeSec", timestamp_seconds($"value")) + .selectExpr("value as clickAdId", "timeSec + INTERVAL 2 seconds as clickTime") + .withWatermark("clickTime", "0 seconds") + +val clicksWindow = clicksWithWatermark.select( + window($"clickTime", "5 seconds") +) + +val impressionsWindow = impressionsWithWatermark.select( + window($"impressionTime", "5 seconds") +) + +val clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner") + +withSQLConf((SQLConf.SHUFFLE_PARTITIONS.key, "1")) { + testStream(clicksAndImpressions)( +MultiAddData( + (impressions, Seq(0 to 8: _*)), + (clicks, Seq(1, 6)) +), +CheckAnswer((0, 5, 5, 1)) ) } + } -val input1 = MemoryStream[(String, Timestamp)] -val df1 = input1.toDF() - .selectExpr("_1 as leftId", "_2 as leftEventTime") - .withWatermark("leftEventTime", "5 minutes") + test("SPARK-45637 window agg + window agg -> join on window, append mode") { Review Comment: This is the test ran -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1409957775 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ## @@ -637,18 +653,22 @@ case class StreamingSymmetricHashJoinExec( thisRow: UnsafeRow, subIter: Iterator[InternalRow]) extends CompletionIterator[InternalRow, Iterator[InternalRow]](subIter) { - + // scalastyle:off private val iteratorNotEmpty: Boolean = super.hasNext override def completion(): Unit = { val isLeftSemiWithMatch = joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty // Add to state store only if both removal predicates do not match, // and the row is not matched for left side of left semi join. +println(s"!stateKeyWatermarkPredicateFunc(key): ${!stateKeyWatermarkPredicateFunc(key)}" + + s" !stateValueWatermarkPredicateFunc(thisRow): ${!stateValueWatermarkPredicateFunc(thisRow)}") val shouldAddToState = !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) && !isLeftSemiWithMatch if (shouldAddToState) { + println(s"wei==add to state: $thisRow") Review Comment: If comment out the if (shoudAddToState) above, then there is data: ``` [info] - SPARK-45637 window agg + window agg -> join on window, append mode *** FAILED *** (6 seconds, 953 milliseconds) [info] == Results == [info] !== Correct Answer - 1 == == Spark Answer - 1 == [info] !struct<_1:int,_2:int,_3:int,_4:int> struct,count:bigint,count:bigint> [info] ![0,5,5,1] [[1969-12-31 16:00:00.0,1969-12-31 16:00:05.0],1,5] [info] ``` My [0, 5, 5, 1] answer is badly written but the idea is the same as the spark answer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1408970548 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala: ## @@ -878,6 +878,147 @@ class MultiStatefulOperatorsSuite testOutputWatermarkInJoin(join3, input1, -40L * 1000 - 1) } + test("SPARK-45637 window agg + window agg -> join on window, append mode") { +val impressions = MemoryStream[Int] +val clicks = MemoryStream[Int] + +val impressionsWithWatermark = impressions.toDF() + .withColumn("impressionTime", timestamp_seconds($"value")) + .withColumnRenamed("value", "impressionAdId") + .withWatermark("impressionTime", "0 seconds") + +// clickTime is always later than impressionTime for clickAdId = impressionAdId +// Here we manually set the difference to 2 seconds (see the (Multi)AddData below) +val clicksWithWatermark = clicks.toDF() + .withColumn("timeSec", timestamp_seconds($"value")) + .selectExpr("value as clickAdId", "timeSec + INTERVAL 2 seconds as clickTime") + .withWatermark("clickTime", "0 seconds") + +val clicksWindow = clicksWithWatermark.groupBy( + window($"clickTime", "5 seconds") +).count() + +val impressionsWindow = impressionsWithWatermark.groupBy( + window($"impressionTime", "5 seconds") +).count() + +val clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner") + +withSQLConf((SQLConf.SHUFFLE_PARTITIONS.key, "1")) { + testStream(clicksAndImpressions)( +MultiAddData( + (impressions, Seq(0 to 8: _*)), + (clicks, Seq(6)) +), +// data batch triggered + +// global watermark: (0, 0) [drop, evict] +// impression [impressionAdId, impressionTime]: +//wm: (0, 0) +//input: (0, 0), (1, 1), (2, 2), (3, 3), ..., (8, 8) +//agg:[0, 5) -> 5, [5, 10) -> 4 +//state: [0, 5) -> 5, [5, 10) -> 4 +//output: None +// click [clickAdId, clickTime] +//wm: (0, 0) +//input: (6, 8) +//agg:[0, 5) -> 0, [5, 10) -> 1 +//state: [0, 5) -> 0, [5, 10) -> 1 +//output: None +// join: +//all None + +// no-data batch triggered (shouldRunAnotherBatch) + +// global watermark: (0, 8) (default is min across multiple watermarks) +// impression: +//wm: (0, 8) +//input: None +//agg:None +//state: [5, 10) -> 4 +//output: [0, 5) -> 5 (actually no, the state is not evicting any rows, why?) Review Comment: why? logs see the description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-REVIEW][DRAFT] Spark 45637 multiple state test [spark]
WweiL commented on code in PR #44076: URL: https://github.com/apache/spark/pull/44076#discussion_r1408970548 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/MultiStatefulOperatorsSuite.scala: ## @@ -878,6 +878,147 @@ class MultiStatefulOperatorsSuite testOutputWatermarkInJoin(join3, input1, -40L * 1000 - 1) } + test("SPARK-45637 window agg + window agg -> join on window, append mode") { +val impressions = MemoryStream[Int] +val clicks = MemoryStream[Int] + +val impressionsWithWatermark = impressions.toDF() + .withColumn("impressionTime", timestamp_seconds($"value")) + .withColumnRenamed("value", "impressionAdId") + .withWatermark("impressionTime", "0 seconds") + +// clickTime is always later than impressionTime for clickAdId = impressionAdId +// Here we manually set the difference to 2 seconds (see the (Multi)AddData below) +val clicksWithWatermark = clicks.toDF() + .withColumn("timeSec", timestamp_seconds($"value")) + .selectExpr("value as clickAdId", "timeSec + INTERVAL 2 seconds as clickTime") + .withWatermark("clickTime", "0 seconds") + +val clicksWindow = clicksWithWatermark.groupBy( + window($"clickTime", "5 seconds") +).count() + +val impressionsWindow = impressionsWithWatermark.groupBy( + window($"impressionTime", "5 seconds") +).count() + +val clicksAndImpressions = clicksWindow.join(impressionsWindow, "window", "inner") + +withSQLConf((SQLConf.SHUFFLE_PARTITIONS.key, "1")) { + testStream(clicksAndImpressions)( +MultiAddData( + (impressions, Seq(0 to 8: _*)), + (clicks, Seq(6)) +), +// data batch triggered + +// global watermark: (0, 0) [drop, evict] +// impression [impressionAdId, impressionTime]: +//wm: (0, 0) +//input: (0, 0), (1, 1), (2, 2), (3, 3), ..., (8, 8) +//agg:[0, 5) -> 5, [5, 10) -> 4 +//state: [0, 5) -> 5, [5, 10) -> 4 +//output: None +// click [clickAdId, clickTime] +//wm: (0, 0) +//input: (6, 8) +//agg:[0, 5) -> 0, [5, 10) -> 1 +//state: [0, 5) -> 0, [5, 10) -> 1 +//output: None +// join: +//all None + +// no-data batch triggered (shouldRunAnotherBatch) + +// global watermark: (0, 8) (default is min across multiple watermarks) +// impression: +//wm: (0, 8) +//input: None +//agg:None +//state: [5, 10) -> 4 +//output: [0, 5) -> 5 (actually no, the state is not evicting any rows, why?) Review Comment: why? logs see the description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org