HeartSaVioR opened a new pull request, #53930:
URL: https://github.com/apache/spark/pull/53930

   ### What changes were proposed in this pull request?
   
   This PR proposes to implement the new state format for stream-stream join, 
based on the new state key encoding w.r.t. event time awareness.
   
   The new state format is focused to eliminate the necessity of full scan 
during eviction & populating unmatched rows. The overhead of eviction should 
have bound to the actual number of state rows to be evicted (indirectly 
impacted by the amount of watermark advancement), but we have been doing the 
full scan with the existing state format, which could take more than 2 seconds 
in 1,000,000 rows even if there is zero row to be evicted. The overhead of 
eviction with the new state format would be bound to the actual number of state 
rows to be evicted, taking around 30ms or even less in 1,000,000 rows when 
there is zero row to be evicted.
   
   To achieve the above, we make a drastic change of data structure to move out 
from the logical array, and introduce a secondary index in addition to the main 
data.
   
   Each side of the join will use two (virtual) column families (total 4 column 
families), which are following:
   
   * KeyWithTsToValuesStore
     * Primary data store
     * (key, event time) -> values
     * each element in values consists of (value, matched)
   * TsWithKeyTypeStore
     * Secondary index for efficient eviction
     * (event time, key) -> numValues
     * numValues is kept updated when a new value is added into values in 
primary data store
       * This is to track the number of deleted rows accurately. It's optional 
but the metric has been useful so we want to keep it as it is.
   
   As the format of key part implies, KeyWithTsToValuesStore will use 
`EventTimeAsPostfixStateEncoderSpec`, and TsWithKeyTypeStore will use 
`EventTimeAsPrefixStateEncoderSpec`.
   
   The granularity of the timestamp for event time is 1 millisecond, which is 
in line with the granularity for watermark advancement. This can be a kind of 
knob controlling the number of the keys vs the number of the values in the key, 
trading off the granularity of eviction based on watermark advancement vs the 
size of key space (may impact performance).
   
   Note that the amount of bytes to write will be "increased" compared to the 
existing state format. If assuming 10 rows having 10 timestamps are bound to a 
single key, 
   
   * existing state format: key bytes would appear 11 times (1 time in 
KeyToNumValuesStore, 10 times in KeyWithIndexToValueStore)
   * new state format: key bytes would appear 20 times (10 times in 
KeyWithTsToValuesStore, 10 times in TsWithKeyTypeStore)
   
   This could incur performance regression of insertion, but retrieval and 
eviction have improved with the new state format, which covers the regression 
and can show much better performance in terms of the scenario of shorter batch 
duration & huge state store size.
   
   There are several follow-ups with this state format implementation, which 
can be addressed on top of this:
   
   * yet to cover the case of regular join where there is no event time in both 
join condition and the value (**may require a change to the state format**)
     * addressing this is needed to unify the state format and minimize the 
cost of maintenance 
     * 1 millisecond timestamp granularity does not work with the performance
     * we need to find a way to avoid regression on insertion since that join 
case doesn't have eviction, which don't have a strong benefit of this format
   * further optimizations with RocksDB offering: WriteBatch (for batched 
writes), MGET, etc.
   * retrieving matched rows with the "scope" of timestamps (in time-interval 
join)
     * while the format is ready to support ordered scan of timestamp, this 
needs another state store API to define the range of keys to scan, which needs 
some effort
   
   ### Why are the changes needed?
   
   The cost of eviction based on full scan is severe to make the stream-stream 
join to be lower latency. Also, the logic of maintaining logical array is 
complicated enough to maintain and the performance characteristic is less 
predictable given the behavior of deleting the element in random index (placing 
the value of the last index to the deleted index).
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. At this point, this state format is not integrated with the actual 
stream-stream join operator, and we need to do follow-up work for integration 
to finally introduce the change to user-facing.
   
   ### How was this patch tested?
   
   New UT suites, refactoring the existing suite to test with both time window 
and time interval cases.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to