HeartSaVioR opened a new pull request #36002:
URL: https://github.com/apache/spark/pull/36002
### What changes were proposed in this pull request?
This PR fixes the correctness issue on stream-stream outer join with RocksDB
state store provider, which can occur in certain condition, like below:
* stream-stream time interval outer join
* left outer join has an issue on left side, right outer join has an issue
on right side, full outer join has an issue on both sides
* At batch N, produce non-late row(s) on the problematic side
* At the same batch (batch N), some row(s) on the problematic side are
evicted by the condition of watermark
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L327-L339
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L619-L627
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L195-L201
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L208-L223
The root cause is same as
[SPARK-38320](https://issues.apache.org/jira/browse/SPARK-38320) - weak read
consistency on iterator, especially with RocksDB state store provider. (Quoting
from SPARK-38320: The problem is due to the StateStore.iterator not reflecting
StateStore changes made after its creation.)
More specifically, if updates are performed during processing input rows and
somehow updates the number of values for grouping key, the update is not seen
in SymmetricHashJoinStateManager.removeByValueCondition, and the method does
the eviction with the number of values in out of sync.
Making it more worse, if the method performs the eviction and updates the
number of values for grouping key, it "overwrites" the number of value,
effectively drop all rows being inserted in the same batch.
This PR fixes the outer iterators as late evaluation to ensure all updates
on processing input rows are reflected "before" outer iterators are initialized.
### Why are the changes needed?
The bug is described in above section.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT added.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]