HeartSaVioR opened a new pull request, #53911:
URL: https://github.com/apache/spark/pull/53911
### What changes were proposed in this pull request?
This PR proposes to make changes to state store layer to introduce key
encodings which include "event time" as the first class.
Proposed key encodings:
* `EventTimeAsPrefixStateEncoderSpec`
* Place event time as a prefix, and key as remaining part of serialized
format
* `EventTimeAsPostfixStateEncoderSpec`
* Place key first, and event time as a postfix of serialized format
Event time is encoded as it is, the binary format of 8 bytes long, as "big
endian", which ensures the natural ordering of long type. The serialization
format of the key is the same, e.g. for UnsafeRow, same as underlying binary
format.
These encodings are specification of prefix and range key encodings:
* `EventTimeAsPrefixStateEncoderSpec` provides the range scan with event
time.
* `EventTimeAsPostfixStateEncoderSpec` provides the prefix scan with the
key, additionally provides the range scan with the remaining event time. NOTE:
The range scan with event time is only scoped to the same key.
Compared to the prefix/range key encoding, this can eliminate the overhead
of combining two UnsafeRows, 12 bytes in each key in overall (8 bytes of
null-tracking bitset, 4 bytes of storing length for one of two UnsafeRows). It
can also skip projection(s) from deserialization as well.
To maximize the benefit, this PR also proposes to define a new set of state
store operations (API) to receive an event time parameter separately or provide
an event time as a part of result for the methods. This is to make the spec
easy to use by the caller via removing the necessity of combining key and event
time into a single UnsafeRow. Since this decision duplicates the whole
operations and the new key encoder specs do not work properly with existing set
of state store operations, this PR introduces a new trait
(`EventTimeAwareStateOperations`) to place the new set of APIs and callers are
expected to obtain the implementation of the trait in prior to work with the CF
with new key encoder spec.
The API to obtain the implementation of the trait is following:
`initiateEventTimeAwareStateOperations(columnFamilyName):
EventTimeAwareStateOperations`
This PR also proposes to change `RocksDBKeyStateEncoder` to handle the
encode/decode with event time. It'd be ideal to have another interface for
event time aware encoders, but the existing codebase is quite tied to
`RocksDBKeyStateEncoder`, so this PR simply defines additional methods to cover
the new type of encoding. The new method `supportEventTime` will serve the
intention to isolate the encoding for "key only" vs "key + event time".
The change is big already, so this PR only enables the new key encoding with
UnsafeRow. Supporting Avro will be a follow up work.
### Why are the changes needed?
The existing key encodings are too general to serve the same with noticeable
overheads, in terms of additional bytes on serialized format, as well as
unnecessary projections to combine key and event time and tear down. The
proposed key encodings will do the same with minimized overhead, given the fact
it only needs to handle event time (8 bytes long, no negative value) along with
the key.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New test suite.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: claude-4.5-sonnet
The above is used for creating a new test suite. All other parts aren't
generated by LLM.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]