HeartSaVioR opened a new pull request #36002:
URL: https://github.com/apache/spark/pull/36002


   ### What changes were proposed in this pull request?
   
   This PR fixes the correctness issue on stream-stream outer join with RocksDB 
state store provider, which can occur in certain condition, like below:
   
   * stream-stream time interval outer join
     * left outer join has an issue on left side, right outer join has an issue 
on right side, full outer join has an issue on both sides
   * At batch N, produce non-late row(s) on the problematic side
   * At the same batch (batch N), some row(s) on the problematic side are 
evicted by the condition of watermark
   
   
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L327-L339
   
   
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L619-L627
   
   
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L195-L201
   
   
https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L208-L223
   
   The root cause is same as 
[SPARK-38320](https://issues.apache.org/jira/browse/SPARK-38320) - weak read 
consistency on iterator, especially with RocksDB state store provider. (Quoting 
from SPARK-38320: The problem is due to the StateStore.iterator not reflecting 
StateStore changes made after its creation.)
   
   More specifically, if updates are performed during processing input rows and 
somehow updates the number of values for grouping key, the update is not seen 
in SymmetricHashJoinStateManager.removeByValueCondition, and the method does 
the eviction with the number of values in out of sync.
   
   Making it more worse, if the method performs the eviction and updates the 
number of values for grouping key, it "overwrites" the number of value, 
effectively drop all rows being inserted in the same batch.
   
   This PR fixes the outer iterators as late evaluation to ensure all updates 
on processing input rows are reflected "before" outer iterators are initialized.
   
   ### Why are the changes needed?
   
   The bug is described in above section.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   New UT added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to