HeartSaVioR commented on code in PR #53911:
URL: https://github.com/apache/spark/pull/53911#discussion_r2725991925
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -1709,6 +1873,83 @@ class NoPrefixKeyStateEncoder(
override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
throw new IllegalStateException("This encoder doesn't support prefix key!")
}
+
+ override def supportEventTime: Boolean = false
+
+ override def encodeKeyWithEventTime(row: UnsafeRow, eventTime: Long):
Array[Byte] = {
+ throw new IllegalStateException("This encoder doesn't support key with
event time!")
+ }
+
+ override def decodeKeyWithEventTime(keyBytes: Array[Byte]): (UnsafeRow,
Long) = {
+ throw new IllegalStateException("This encoder doesn't support key with
event time!")
+ }
+}
+
+class EventTimeAsPrefixStateEncoder(
+ dataEncoder: RocksDBDataEncoder,
+ keySchema: StructType,
+ useColumnFamilies: Boolean = false)
+ extends RocksDBKeyStateEncoder with Logging {
+
+ override def supportPrefixKeyScan: Boolean = false
+
+ override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
+ throw new IllegalStateException("This encoder doesn't support key without
event time!")
+ }
+
+ override def encodeKey(row: UnsafeRow): Array[Byte] = {
+ throw new IllegalStateException("This encoder doesn't support key without
event time!")
+ }
+
+ override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = {
+ throw new IllegalStateException("This encoder doesn't support key without
event time!")
+ }
+
+ // TODO: Revisit whether we need to mark this to true to support delete range
+ override def supportsDeleteRange: Boolean = false
+
+ override def supportEventTime: Boolean = true
+
+ override def encodeKeyWithEventTime(row: UnsafeRow, eventTime: Long):
Array[Byte] = {
+ dataEncoder.encodeKeyForEventTimeAsPrefix(row, eventTime)
Review Comment:
I think that should be the same with prefix/range scan, right? I thought we
were collecting the actual impl to DataEncoder.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]