HeartSaVioR opened a new pull request #33038:
URL: https://github.com/apache/spark/pull/33038


   ### What changes were proposed in this pull request?
   
   This PR proposes to introduce a new feature "prefix match scan" on state 
store, which enables users of state store (mostly stateful operators) to group 
the keys into logical groups, and scan the keys in the same group efficiently.
   
   This PR will bring the API changes, though it's more likely developer API.
   
   * Registering the prefix key
   
   We propose to make an explicit change to the init() method of 
StateStoreProvider, as below:
   
   ```
   def init(
         stateStoreId: StateStoreId,
         keySchema: StructType,
         valueSchema: StructType,
         numColsPrefixKey: Int,
         storeConfs: StateStoreConf,
         hadoopConf: Configuration): Unit
   ```
   
   Please note that we remove an unused parameter “keyIndexOrdinal” as well. 
The parameter is coupled with getRange() which we will remove as well. See 
below for rationalization.
   
   Here we provide the number of columns we take to project the prefix key from 
the full key. If the operator doesn’t leverage prefix match scan, the value can 
(and should) be 0, because the state store provider may optimize the underlying 
storage format which may bring extra overhead.
   
   We would like to apply some restrictions on prefix key to simplify the 
functionality:
   
   * Prefix key is a part of the full key. It can’t be the same as the full key.
     * That said, the full key will be the (prefix key + remaining parts), and 
both prefix key and remaining parts should have at least one column.
   * We always take the columns from the leftmost sequentially, like 
“seq.take(nums)”.
   * We don’t allow reordering of the columns.
   * We only guarantee “equality” comparison against prefix keys, and don’t 
support the prefix “range” scan.
     * We only support scanning on the keys which match with the prefix key.
     * E.g. We don’t support the range scan from user A to user B due to 
technical complexity. That’s the reason we can’t leverage the existing getRange 
API.
   
   As we mentioned, we want to make an explicit change to the init() method of 
StateStoreProvider which would break backward compatibility, assuming that 3rd 
party state store providers need to update their code in any way to support 
prefix match scan. Given RocksDB state store provider is being donated to the 
OSS and plan to be available in Spark 3.2, the majority of the users would 
migrate to the built-in state store providers, which would remedy the concerns.
   
   * Scanning key-value pairs matched to the prefix key
   
   We propose to add a new method to the ReadStateStore (and StateStore by 
inheritance), as below:
   
   ```
   def prefixScan(prefixKey: UnsafeRow): Iterator[UnsafeRowPair]
   ```
   
   We require callers to pass the `prefixKey` which would have the same schema 
with the registered prefix key schema. In other words, the schema of the 
parameter `prefixKey` should match to the projection of the prefix key on the 
full key based on the number of columns for the prefix key.
   
   The method contract is clear - the method will return the iterator which 
will give the key-value pairs whose prefix key is matched with the given prefix 
key. Callers should only rely on the contract and should not expect any other 
characteristics based on specific details on the state store provider.
   
   In the caller’s point of view, the prefix key is only used for retrieving 
key-value pairs via prefix match scan. Callers should keep using the full key 
to do CRUD.
   
   Note that this PR also proposes to make a breaking change, removal of 
getRange(), which is never be implemented properly and hence never be called 
properly.
   
   ### Why are the changes needed?
   
   * Introducing prefix match scan feature
   
   Currently, the API in state store is only based on key-value data structure. 
This lacks on advanced data structures like list-like one, which required us to 
implement the data structure on our own whenever we need it. We had one in 
stream-stream join, and we were about to have another one in native session 
window. The custom implementation of data structure based on the state store 
API tends to be complicated and has to deal with multiple state stores.
   
   We decided to enhance the state store API a bit to remove the requirement 
for native session window to implement its own. From the operator of native 
session window, it will just need to do prefix scan on group key to retrieve 
all sessions belonging to the group key.
   
   Thanks to adding the feature to the part of state store API, this would 
enable state store providers to optimize the implementation based on the 
characteristic. (e.g. We will implement this in RocksDB state store provider 
via leveraging the characteristic that RocksDB sorts the key by natural order 
of binary format.)
   
   * Removal of getRange API
   
   Before introducing this we sought the way to leverage getRange, but it's 
quite hard to implement efficiently, with respecting its method contract. Spark 
always calls the method with (None, None) parameter and all the state store 
providers (including built-in) implement it as just calling iterator(), which 
is not respecting the method contract. That said, we can replace all getRange() 
usages to iterator(), and remove the API to remove any confusions/concerns.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes for the end users & maintainers of 3rd party state store provider. They 
will need to upgrade their state store provider implementations to adopt this 
change.
   
   ### How was this patch tested?
   
   Added UT, and also existing UTs to make sure it doesn't break anything.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to