micheal-o commented on code in PR #53911:
URL: https://github.com/apache/spark/pull/53911#discussion_r2725963696
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -48,6 +48,9 @@ sealed trait RocksDBKeyStateEncoder {
def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte]
def encodeKey(row: UnsafeRow): Array[Byte]
def decodeKey(keyBytes: Array[Byte]): UnsafeRow
+ def supportEventTime: Boolean
Review Comment:
Why not implement another class that extends the KeyStateEncoder and adds
this 3 methods?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -480,6 +480,264 @@ private[sql] class RocksDBStateStoreProvider
new StateStoreIterator(iter, rocksDbIter.closeIfNeeded)
}
+ class RocksDBEventTimeAwareStateOperations(cfName: String)
+ extends EventTimeAwareStateOperations {
+
+ override val columnFamilyName: String = cfName
+
+ verifyColFamilyOperations("doEventTimeAwareStateOperations",
columnFamilyName)
+
+ private val kvEncoder = keyValueEncoderMap.get(columnFamilyName)
+ private val keyEncoder = kvEncoder._1
+ private val valueEncoder = kvEncoder._2
+
+ require(keyEncoder.supportEventTime,
+ "EventTimeAwareStateOperations requires encoder supporting event
time!")
+
+ override def get(key: UnsafeRow, eventTime: Long): UnsafeRow = {
Review Comment:
There is a lot of duplication with the original func in these funcs. Most of
them , the major difference is which encoder/decoder api is called. Can we make
them use common code and then each can pass in how they want to handle
encoder/decoder. This will help reduce duplication and code maintenance.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -1709,6 +1873,83 @@ class NoPrefixKeyStateEncoder(
override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
throw new IllegalStateException("This encoder doesn't support prefix key!")
}
+
+ override def supportEventTime: Boolean = false
+
+ override def encodeKeyWithEventTime(row: UnsafeRow, eventTime: Long):
Array[Byte] = {
+ throw new IllegalStateException("This encoder doesn't support key with
event time!")
+ }
+
+ override def decodeKeyWithEventTime(keyBytes: Array[Byte]): (UnsafeRow,
Long) = {
+ throw new IllegalStateException("This encoder doesn't support key with
event time!")
+ }
+}
+
+class EventTimeAsPrefixStateEncoder(
+ dataEncoder: RocksDBDataEncoder,
+ keySchema: StructType,
+ useColumnFamilies: Boolean = false)
+ extends RocksDBKeyStateEncoder with Logging {
+
+ override def supportPrefixKeyScan: Boolean = false
+
+ override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
+ throw new IllegalStateException("This encoder doesn't support key without
event time!")
+ }
+
+ override def encodeKey(row: UnsafeRow): Array[Byte] = {
+ throw new IllegalStateException("This encoder doesn't support key without
event time!")
+ }
+
+ override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = {
+ throw new IllegalStateException("This encoder doesn't support key without
event time!")
+ }
+
+ // TODO: Revisit whether we need to mark this to true to support delete range
+ override def supportsDeleteRange: Boolean = false
+
+ override def supportEventTime: Boolean = true
+
+ override def encodeKeyWithEventTime(row: UnsafeRow, eventTime: Long):
Array[Byte] = {
+ dataEncoder.encodeKeyForEventTimeAsPrefix(row, eventTime)
+ }
+
+ override def decodeKeyWithEventTime(keyBytes: Array[Byte]): (UnsafeRow,
Long) = {
+ dataEncoder.decodeKeyForEventTimeAsPrefix(keyBytes)
+ }
+}
+
+class EventTimeAsPostfixStateEncoder(
+ dataEncoder: RocksDBDataEncoder,
Review Comment:
ditto
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -328,6 +331,39 @@ trait DataEncoder {
*/
def encodePrefixKeyForRangeScan(row: UnsafeRow): Array[Byte]
+ /**
+ * Encodes key and event time, ensuring prefix scan with key and also proper
sort order with
+ * event time within the same key in RocksDB.
+ *
+ * This method handles the encoding as follows:
+ * - Encodes the key columns normally and put them first
+ * - Appends the event time Long value in big-endian order as the last 8
bytes
+ *
+ * @param row An UnsafeRow denoting a key
+ * @param eventTime Long value representing the event time
+ * @return Serialized bytes that will maintain prefix scan with key and sort
order with
+ * event time
+ * @throws UnsupportedOperationException if called on an encoder that
doesn't support event time
+ * as postfix.
+ */
+ def encodeKeyForEventTimeAsPostfix(row: UnsafeRow, eventTime: Long):
Array[Byte]
Review Comment:
This doesn't seem like it should be implemented at this layer. We can have
another Encoder class that adds this prefix/postfix after calling the
underlying encoder.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -1709,6 +1873,83 @@ class NoPrefixKeyStateEncoder(
override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
throw new IllegalStateException("This encoder doesn't support prefix key!")
}
+
+ override def supportEventTime: Boolean = false
+
+ override def encodeKeyWithEventTime(row: UnsafeRow, eventTime: Long):
Array[Byte] = {
+ throw new IllegalStateException("This encoder doesn't support key with
event time!")
+ }
+
+ override def decodeKeyWithEventTime(keyBytes: Array[Byte]): (UnsafeRow,
Long) = {
+ throw new IllegalStateException("This encoder doesn't support key with
event time!")
+ }
+}
+
+class EventTimeAsPrefixStateEncoder(
+ dataEncoder: RocksDBDataEncoder,
+ keySchema: StructType,
+ useColumnFamilies: Boolean = false)
+ extends RocksDBKeyStateEncoder with Logging {
+
+ override def supportPrefixKeyScan: Boolean = false
+
+ override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
+ throw new IllegalStateException("This encoder doesn't support key without
event time!")
+ }
+
+ override def encodeKey(row: UnsafeRow): Array[Byte] = {
+ throw new IllegalStateException("This encoder doesn't support key without
event time!")
+ }
+
+ override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = {
+ throw new IllegalStateException("This encoder doesn't support key without
event time!")
+ }
+
+ // TODO: Revisit whether we need to mark this to true to support delete range
+ override def supportsDeleteRange: Boolean = false
+
+ override def supportEventTime: Boolean = true
+
+ override def encodeKeyWithEventTime(row: UnsafeRow, eventTime: Long):
Array[Byte] = {
+ dataEncoder.encodeKeyForEventTimeAsPrefix(row, eventTime)
Review Comment:
This class should just take in its own implementation of Data encoder that
adds the prefix/postfix. So that we don't need to add this APIs at the
`DataEncoder` level.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -346,6 +346,108 @@ trait StateStore extends ReadStateStore {
* Whether all updates have been committed
*/
def hasCommitted: Boolean
+
+ /** Initiate event-time aware state operations for the given column family.
*/
+ def initiateEventTimeAwareStateOperations(
+ columnFamilyName: String): EventTimeAwareStateOperations
+}
+
+/**
+ * The interface for event-time aware state operations, which are based on
(key, eventTime, value)
+ * data structure instead of (key, value).
+ */
+trait EventTimeAwareStateOperations {
Review Comment:
We want the APIs at the state store layer to be generic enough for
reusability. This implementation is too tied to EventTime which is an operator
level detail that shouldn't be coupled into the state store details.
We have a `StateStoreRow` class that allows passing in extra fields with an
UnsafeRow. We use this for Row checksum too. So might be better to extend
`StateStoreRow` and add `eventTime` field.
Then you can expose new state store APIs that take in `StateStoreRow`
instead of `UnsafeRow`. Then this would make this implementation generic
enough, that it can be reused for other use cases in the future and not tie it
too much to event time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]