jingz-db commented on code in PR #47107:
URL: https://github.com/apache/spark/pull/47107#discussion_r1664944074


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -491,52 +522,80 @@ class RangeKeyScanStateEncoder(
     writer.getRow()
   }
 
-  override def encodeKey(row: UnsafeRow): Array[Byte] = {
+  override def encodeKey(row: UnsafeRow, vcfId: Option[Short]): Array[Byte] = {
+    val hasVirtualColFamilyPrefix: Boolean = vcfId.isDefined
     // This prefix key has the columns specified by orderingOrdinals
     val prefixKey = extractPrefixKey(row)
     val rangeScanKeyEncoded = 
encodeUnsafeRow(encodePrefixKeyForRangeScan(prefixKey))
 
+    val offSetForColFamilyPrefix =
+      if (hasVirtualColFamilyPrefix) VIRTUAL_COL_FAMILY_PREFIX_BYTES else 0
+
     val result = if (orderingOrdinals.length < keySchema.length) {
       val remainingEncoded = encodeUnsafeRow(remainingKeyProjection(row))
-      val encodedBytes = new Array[Byte](rangeScanKeyEncoded.length + 
remainingEncoded.length + 4)
-      Platform.putInt(encodedBytes, Platform.BYTE_ARRAY_OFFSET, 
rangeScanKeyEncoded.length)
+      val encodedBytes = new Array[Byte](rangeScanKeyEncoded.length +

Review Comment:
   refactored into a function in the base object



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -491,52 +522,80 @@ class RangeKeyScanStateEncoder(
     writer.getRow()
   }
 
-  override def encodeKey(row: UnsafeRow): Array[Byte] = {
+  override def encodeKey(row: UnsafeRow, vcfId: Option[Short]): Array[Byte] = {
+    val hasVirtualColFamilyPrefix: Boolean = vcfId.isDefined
     // This prefix key has the columns specified by orderingOrdinals
     val prefixKey = extractPrefixKey(row)
     val rangeScanKeyEncoded = 
encodeUnsafeRow(encodePrefixKeyForRangeScan(prefixKey))
 
+    val offSetForColFamilyPrefix =
+      if (hasVirtualColFamilyPrefix) VIRTUAL_COL_FAMILY_PREFIX_BYTES else 0
+
     val result = if (orderingOrdinals.length < keySchema.length) {
       val remainingEncoded = encodeUnsafeRow(remainingKeyProjection(row))
-      val encodedBytes = new Array[Byte](rangeScanKeyEncoded.length + 
remainingEncoded.length + 4)
-      Platform.putInt(encodedBytes, Platform.BYTE_ARRAY_OFFSET, 
rangeScanKeyEncoded.length)
+      val encodedBytes = new Array[Byte](rangeScanKeyEncoded.length +

Review Comment:
   Refactored into a function in the base object



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to