HeartSaVioR opened a new pull request, #53930:
URL: https://github.com/apache/spark/pull/53930
### What changes were proposed in this pull request?
This PR proposes to implement the new state format for stream-stream join,
based on the new state key encoding w.r.t. event time awareness.
The new state format is focused to eliminate the necessity of full scan
during eviction & populating unmatched rows. The overhead of eviction should
have bound to the actual number of state rows to be evicted (indirectly
impacted by the amount of watermark advancement), but we have been doing the
full scan with the existing state format, which could take more than 2 seconds
in 1,000,000 rows even if there is zero row to be evicted. The overhead of
eviction with the new state format would be bound to the actual number of state
rows to be evicted, taking around 30ms or even less in 1,000,000 rows when
there is zero row to be evicted.
To achieve the above, we make a drastic change of data structure to move out
from the logical array, and introduce a secondary index in addition to the main
data.
Each side of the join will use two (virtual) column families (total 4 column
families), which are following:
* KeyWithTsToValuesStore
* Primary data store
* (key, event time) -> values
* each element in values consists of (value, matched)
* TsWithKeyTypeStore
* Secondary index for efficient eviction
* (event time, key) -> numValues
* numValues is kept updated when a new value is added into values in
primary data store
* This is to track the number of deleted rows accurately. It's optional
but the metric has been useful so we want to keep it as it is.
As the format of key part implies, KeyWithTsToValuesStore will use
`EventTimeAsPostfixStateEncoderSpec`, and TsWithKeyTypeStore will use
`EventTimeAsPrefixStateEncoderSpec`.
The granularity of the timestamp for event time is 1 millisecond, which is
in line with the granularity for watermark advancement. This can be a kind of
knob controlling the number of the keys vs the number of the values in the key,
trading off the granularity of eviction based on watermark advancement vs the
size of key space (may impact performance).
Note that the amount of bytes to write will be "increased" compared to the
existing state format. If assuming 10 rows having 10 timestamps are bound to a
single key,
* existing state format: key bytes would appear 11 times (1 time in
KeyToNumValuesStore, 10 times in KeyWithIndexToValueStore)
* new state format: key bytes would appear 20 times (10 times in
KeyWithTsToValuesStore, 10 times in TsWithKeyTypeStore)
This could incur performance regression of insertion, but retrieval and
eviction have improved with the new state format, which covers the regression
and can show much better performance in terms of the scenario of shorter batch
duration & huge state store size.
There are several follow-ups with this state format implementation, which
can be addressed on top of this:
* yet to cover the case of regular join where there is no event time in both
join condition and the value (**may require a change to the state format**)
* addressing this is needed to unify the state format and minimize the
cost of maintenance
* 1 millisecond timestamp granularity does not work with the performance
* we need to find a way to avoid regression on insertion since that join
case doesn't have eviction, which don't have a strong benefit of this format
* further optimizations with RocksDB offering: WriteBatch (for batched
writes), MGET, etc.
* retrieving matched rows with the "scope" of timestamps (in time-interval
join)
* while the format is ready to support ordered scan of timestamp, this
needs another state store API to define the range of keys to scan, which needs
some effort
### Why are the changes needed?
The cost of eviction based on full scan is severe to make the stream-stream
join to be lower latency. Also, the logic of maintaining logical array is
complicated enough to maintain and the performance characteristic is less
predictable given the behavior of deleting the element in random index (placing
the value of the last index to the deleted index).
### Does this PR introduce _any_ user-facing change?
No. At this point, this state format is not integrated with the actual
stream-stream join operator, and we need to do follow-up work for integration
to finally introduce the change to user-facing.
### How was this patch tested?
New UT suites, refactoring the existing suite to test with both time window
and time interval cases.
### Was this patch authored or co-authored using generative AI tooling?
No.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]