neilramaswamy opened a new pull request, #44323: URL: https://github.com/apache/spark/pull/44323
### What changes were proposed in this pull request? In these changes, we modify the stream-stream state removal logic to trigger and drop state for one side of a stream-stream join by looking for the existence of a watermark on the _other_ stream. ### Why are the changes needed? Currently, we check whether to drop state for side `A` only if there exists a watermark on side `A`. However, that is not a correct condition: state removal from one side of stream, in a stream-stream join, should happens when there exists a watermark on the _other_ side (and a time-interval condition). If we have the time interval condition `L > R + 10`, the minimum value of `L` event-times that we need to hold onto is `min(R + 10)`, and we can drop all state from the left that is _less_ than that quantity. Distributing, we see that it is `min(R) + min(10)`, which is `min(R) + 10`. `min(R)` is the watermark of R. So, we should always check whether there exists a watermark on `R` to drop state from `L`. This is not to say that our state watermark removal predicate is wrong. It's actually correct. The (latent) mistake in our implementation is that the predicate we run to decide whether to apply state removal is wrong. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks. --> Unit tests. Against current Apache Spark master, they fail. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
