anishshri-db commented on code in PR #53911:
URL: https://github.com/apache/spark/pull/53911#discussion_r2800762982
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -1713,6 +1716,206 @@ class NoPrefixKeyStateEncoder(
}
}
+/**
+ * The singleton instance to provide utility-like methods for key state
encoders which include
+ * timestamp, specifically [[TimestampAsPrefixKeyStateEncoder]] and
+ * [[TimestampAsPostfixKeyStateEncoder]].
+ */
+object TimestampKeyStateEncoder {
+ private val INTERNAL_TIMESTAMP_COLUMN_NAME = "__event_time"
+
+ def keySchemaWithTimestamp(keySchema: StructType): StructType = {
+ StructType(keySchema.fields)
+ .add(name = INTERNAL_TIMESTAMP_COLUMN_NAME, dataType = LongType,
nullable = false)
+ }
+
+ def getAttachTimestampProjection(keyWithoutTimestampSchema: StructType):
UnsafeProjection = {
+ val refs = keyWithoutTimestampSchema.zipWithIndex.map(x =>
+ BoundReference(x._2, x._1.dataType, x._1.nullable))
+ UnsafeProjection.create(
+ refs :+ Literal(0L), // placeholder for timestamp
+ DataTypeUtils.toAttributes(StructType(keyWithoutTimestampSchema)))
+ }
+
+ def getDetachTimestampProjection(keyWithTimestampSchema: StructType):
UnsafeProjection = {
+ val refs = keyWithTimestampSchema.zipWithIndex.dropRight(1).map(x =>
+ BoundReference(x._2, x._1.dataType, x._1.nullable))
+ UnsafeProjection.create(refs)
+ }
+
+ def attachTimestamp(
+ attachTimestampProjection: UnsafeProjection,
+ keyWithTimestampSchema: StructType,
+ key: UnsafeRow,
+ timestamp: Long): UnsafeRow = {
+ val rowWithTimestamp = attachTimestampProjection(key)
+ rowWithTimestamp.setLong(keyWithTimestampSchema.length - 1, timestamp)
+ rowWithTimestamp
+ }
+
+ def extractTimestamp(key: UnsafeRow): Long = {
+ key.getLong(key.numFields - 1)
+ }
+}
+
+/**
+ * The abstract base class for key state encoders which include timestamp,
specifically
+ * [[TimestampAsPrefixKeyStateEncoder]] and
[[TimestampAsPostfixKeyStateEncoder]].
+ */
+abstract class TimestampKeyStateEncoder(
+ dataEncoder: RocksDBDataEncoder,
+ keySchema: StructType)
+ extends RocksDBKeyStateEncoder with Logging {
+
+ protected val detachTimestampProjection: UnsafeProjection =
+ TimestampKeyStateEncoder.getDetachTimestampProjection(keySchema)
+
+ protected val attachTimestampProjection: UnsafeProjection =
+ TimestampKeyStateEncoder.getAttachTimestampProjection(
+ StructType(keySchema.fields.dropRight(1)))
+
+ protected def decodeKey(keyBytes: Array[Byte], startPos: Int): UnsafeRow = {
+ val rowBytesLength = keyBytes.length - 8
+ val rowBytes = new Array[Byte](rowBytesLength)
+ Platform.copyMemory(
+ keyBytes, Platform.BYTE_ARRAY_OFFSET + startPos,
+ rowBytes, Platform.BYTE_ARRAY_OFFSET,
+ rowBytesLength
+ )
+ // The encoded row does not include the timestamp (it's stored separately),
+ // so decode with keySchema.length - 1 fields.
+ dataEncoder.decodeToUnsafeRow(rowBytes, keySchema.length - 1)
+ }
+
+ // NOTE: We reuse the ByteBuffer to avoid allocating a new one for every
encoding/decoding,
+ // which means the encoder is not thread-safe. Built-in operators do not
access the encoder in
+ // multiple threads, but if we are concerned about thread-safety in the
future, we can maintain
+ // the thread-local of ByteBuffer to retain the reusability of the instance
while avoiding
+ // thread-safety issue. We do not use position - we always put/get at offset
0.
+ private val buffForBigEndianLong =
ByteBuffer.allocate(8).order(ByteOrder.BIG_ENDIAN)
+
+ private val SIGN_MASK_FOR_LONG: Long = 0x8000000000000000L
+
+ protected def encodeTimestamp(timestamp: Long): Array[Byte] = {
+ // Flip the sign bit to ensure correct lexicographical ordering, even for
negative timestamps.
+ // We should flip the sign bit back when decoding the timestamp.
+ val signFlippedTimestamp = timestamp ^ SIGN_MASK_FOR_LONG
+ buffForBigEndianLong.putLong(0, signFlippedTimestamp)
+ buffForBigEndianLong.array()
+ }
+
+ protected def decodeTimestamp(keyBytes: Array[Byte], startPos: Int): Long = {
+ buffForBigEndianLong.put(0, keyBytes, startPos, 8)
+ val signFlippedTimestamp = buffForBigEndianLong.getLong(0)
+ // Flip the sign bit back to get the original timestamp.
+ signFlippedTimestamp ^ SIGN_MASK_FOR_LONG
+ }
+
+ protected def attachTimestamp(key: UnsafeRow, timestamp: Long): UnsafeRow = {
+ TimestampKeyStateEncoder.attachTimestamp(attachTimestampProjection,
keySchema, key, timestamp)
+ }
+
+ protected def detachTimestamp(key: UnsafeRow): UnsafeRow = {
+ detachTimestampProjection(key)
+ }
+
+ def extractTimestamp(key: UnsafeRow): Long = {
+ TimestampKeyStateEncoder.extractTimestamp(key)
+ }
+}
+
+/**
+ * Encodes row with timestamp as prefix of the key, so that they can be
scanned based on
+ * timestamp ordering.
+ *
+ * The encoder expects the provided key schema to have [original key
fields..., timestamp field].
+ * The key has to be conformed to this schema when putting/getting from the
state store. The schema
Review Comment:
nit: `has to conform to this schema`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]