HeartSaVioR commented on code in PR #53911:
URL: https://github.com/apache/spark/pull/53911#discussion_r2791748956


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -1713,6 +1715,206 @@ class NoPrefixKeyStateEncoder(
   }
 }
 
+object TimestampKeyStateEncoder {
+  val INTERNAL_TIMESTAMP_COLUMN_NAME = "__event_time"
+
+  val SIGN_MASK_FOR_LONG: Long = 0x8000000000000000L
+
+  def finalKeySchema(keySchema: StructType): StructType = {
+    StructType(keySchema.fields)
+      .add(name = "__event_time", dataType = LongType, nullable = false)

Review Comment:
   ```suggestion
         .add(name = INTERNAL_TIMESTAMP_COLUMN_NAME, dataType = LongType, 
nullable = false)
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -356,6 +358,7 @@ private[sql] class RocksDBStateStoreProvider
         colFamilyName)
     }
 
+

Review Comment:
   ```suggestion
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -314,6 +314,7 @@ private[sql] class RocksDBStateStoreProvider
       }
     }
 
+

Review Comment:
   ```suggestion
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -331,6 +332,7 @@ private[sql] class RocksDBStateStoreProvider
       rocksDB.merge(keyEncoder.encodeKey(key), 
valueEncoder.encodeValue(value), colFamilyName)
     }
 
+

Review Comment:
   ```suggestion
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -1121,6 +1192,8 @@ object RocksDBStateStoreProvider {
   private val AVRO_ENCODER_LIFETIME_HOURS = 1L
   private val DEFAULT_SCHEMA_IDS = StateSchemaInfo(0, 0)
 
+  type KeyValueEncoder = (RocksDBKeyStateEncoder, RocksDBValueStateEncoder, 
Short)

Review Comment:
   ```suggestion
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -1121,6 +1192,8 @@ object RocksDBStateStoreProvider {
   private val AVRO_ENCODER_LIFETIME_HOURS = 1L
   private val DEFAULT_SCHEMA_IDS = StateSchemaInfo(0, 0)
 
+  type KeyValueEncoder = (RocksDBKeyStateEncoder, RocksDBValueStateEncoder, 
Short)
+

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to