HeartSaVioR opened a new pull request #33038:
URL: https://github.com/apache/spark/pull/33038
### What changes were proposed in this pull request?
This PR proposes to introduce a new feature "prefix match scan" on state
store, which enables users of state store (mostly stateful operators) to group
the keys into logical groups, and scan the keys in the same group efficiently.
This PR will bring the API changes, though it's more likely developer API.
* Registering the prefix key
We propose to make an explicit change to the init() method of
StateStoreProvider, as below:
```
def init(
stateStoreId: StateStoreId,
keySchema: StructType,
valueSchema: StructType,
numColsPrefixKey: Int,
storeConfs: StateStoreConf,
hadoopConf: Configuration): Unit
```
Please note that we remove an unused parameter “keyIndexOrdinal” as well.
The parameter is coupled with getRange() which we will remove as well. See
below for rationalization.
Here we provide the number of columns we take to project the prefix key from
the full key. If the operator doesn’t leverage prefix match scan, the value can
(and should) be 0, because the state store provider may optimize the underlying
storage format which may bring extra overhead.
We would like to apply some restrictions on prefix key to simplify the
functionality:
* Prefix key is a part of the full key. It can’t be the same as the full key.
* That said, the full key will be the (prefix key + remaining parts), and
both prefix key and remaining parts should have at least one column.
* We always take the columns from the leftmost sequentially, like
“seq.take(nums)”.
* We don’t allow reordering of the columns.
* We only guarantee “equality” comparison against prefix keys, and don’t
support the prefix “range” scan.
* We only support scanning on the keys which match with the prefix key.
* E.g. We don’t support the range scan from user A to user B due to
technical complexity. That’s the reason we can’t leverage the existing getRange
API.
As we mentioned, we want to make an explicit change to the init() method of
StateStoreProvider which would break backward compatibility, assuming that 3rd
party state store providers need to update their code in any way to support
prefix match scan. Given RocksDB state store provider is being donated to the
OSS and plan to be available in Spark 3.2, the majority of the users would
migrate to the built-in state store providers, which would remedy the concerns.
* Scanning key-value pairs matched to the prefix key
We propose to add a new method to the ReadStateStore (and StateStore by
inheritance), as below:
```
def prefixScan(prefixKey: UnsafeRow): Iterator[UnsafeRowPair]
```
We require callers to pass the `prefixKey` which would have the same schema
with the registered prefix key schema. In other words, the schema of the
parameter `prefixKey` should match to the projection of the prefix key on the
full key based on the number of columns for the prefix key.
The method contract is clear - the method will return the iterator which
will give the key-value pairs whose prefix key is matched with the given prefix
key. Callers should only rely on the contract and should not expect any other
characteristics based on specific details on the state store provider.
In the caller’s point of view, the prefix key is only used for retrieving
key-value pairs via prefix match scan. Callers should keep using the full key
to do CRUD.
Note that this PR also proposes to make a breaking change, removal of
getRange(), which is never be implemented properly and hence never be called
properly.
### Why are the changes needed?
* Introducing prefix match scan feature
Currently, the API in state store is only based on key-value data structure.
This lacks on advanced data structures like list-like one, which required us to
implement the data structure on our own whenever we need it. We had one in
stream-stream join, and we were about to have another one in native session
window. The custom implementation of data structure based on the state store
API tends to be complicated and has to deal with multiple state stores.
We decided to enhance the state store API a bit to remove the requirement
for native session window to implement its own. From the operator of native
session window, it will just need to do prefix scan on group key to retrieve
all sessions belonging to the group key.
Thanks to adding the feature to the part of state store API, this would
enable state store providers to optimize the implementation based on the
characteristic. (e.g. We will implement this in RocksDB state store provider
via leveraging the characteristic that RocksDB sorts the key by natural order
of binary format.)
* Removal of getRange API
Before introducing this we sought the way to leverage getRange, but it's
quite hard to implement efficiently, with respecting its method contract. Spark
always calls the method with (None, None) parameter and all the state store
providers (including built-in) implement it as just calling iterator(), which
is not respecting the method contract. That said, we can replace all getRange()
usages to iterator(), and remove the API to remove any confusions/concerns.
### Does this PR introduce _any_ user-facing change?
Yes for the end users & maintainers of 3rd party state store provider. They
will need to upgrade their state store provider implementations to adopt this
change.
### How was this patch tested?
Added UT, and also existing UTs to make sure it doesn't break anything.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]