HeartSaVioR commented on a change in pull request #33038:
URL: https://github.com/apache/spark/pull/33038#discussion_r667570880
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##########
@@ -60,21 +60,17 @@ trait ReadStateStore {
def get(key: UnsafeRow): UnsafeRow
/**
- * Get key value pairs with optional approximate `start` and `end` extents.
- * If the State Store implementation maintains indices for the data based on
the optional
- * `keyIndexOrdinal` over fields `keySchema` (see
`StateStoreProvider.init()`), then it can use
- * `start` and `end` to make a best-effort scan over the data. Default
implementation returns
- * the full data scan iterator, which is correct but inefficient. Custom
implementations must
- * ensure that updates (puts, removes) can be made while iterating over this
iterator.
+ * Return an iterator containing all the key-value pairs which are matched
with
+ * the given prefix key.
+ *
+ * Spark will provide numColsPrefixKey greater than 0 in
StateStoreProvider.init method if
+ * the state store is responsible to handle the request for prefix scan. The
schema of the
+ * prefix key should be same with the leftmost `numColsPrefixKey` columns of
the key schema.
*
- * @param start UnsafeRow having the `keyIndexOrdinal` column set with
appropriate starting value.
- * @param end UnsafeRow having the `keyIndexOrdinal` column set with
appropriate ending value.
- * @return An iterator of key-value pairs that is guaranteed not miss any
key between start and
- * end, both inclusive.
+ * It is expected to throw exception if Spark calls this method without
setting numColsPrefixKey
+ * to the greater than 0.
*/
- def getRange(start: Option[UnsafeRow], end: Option[UnsafeRow]):
Iterator[UnsafeRowPair] = {
- iterator()
- }
+ def prefixScan(prefixKey: UnsafeRow): Iterator[UnsafeRowPair]
Review comment:
MemoryStateStore is for test purpose. I just skipped implementing it as
there's no usage of prefix scan against MemoryStateStore. We can add it
whenever we need, like the case we start to feel it'd be easier to test the
prefix scan against MemoryStateStore.
For 3rd party implementations, I'd say it is necessary to support prefix
scan to continue supporting further Spark version. They can still use the trick
that throwing exception and saying "this state store provider doesn't support
session window", but I'd rather not say the feature is limited to session
window in the future, hence it would be eventually no longer true and more and
more things cannot be supported by such state store providers.
In the documentation we can guide the trick and what will happen with the
trick, but we probably couldn't update the doc every time for which
functionalities will break if prefix scan is not supported. If we still want to
allow 3rd party implementations to support partial features, I'll add a guide
in the doc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]