anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480575858


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -249,4 +259,109 @@ class NoPrefixKeyStateEncoder(keySchema: StructType, 
valueSchema: StructType)
   override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
     throw new IllegalStateException("This encoder doesn't support prefix key!")
   }
+
+  override def supportsMultipleValuesPerKey: Boolean = false
+
+  override def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] = {
+    throw new UnsupportedOperationException("encoder does not support multiple 
values per key")
+  }
+}
+
+/**
+ * Supports encoding multiple values per key in RocksDB.
+ * A single value is encoded in the format below, where first value is number 
of bytes
+ * in actual encodedUnsafeRow followed by the encoded value itself.
+ *
+ * |---size(bytes)--|--unsafeRowEncodedBytes--|
+ *
+ * Multiple values are separated by a delimiter character.
+ *
+ * This encoder supports RocksDB StringAppendOperator merge operator. Values 
encoded can be
+ * merged in RocksDB using merge operation, and all merged values can be read 
using decodeValues
+ * operation.
+ */
+class MultiValuedStateEncoder(keySchema: StructType, valueSchema: StructType)
+  extends RocksDBStateEncoder with Logging {
+
+  import RocksDBStateEncoder._
+
+  // Reusable objects
+  private val keyRow = new UnsafeRow(keySchema.size)
+  private val valueRow = new UnsafeRow(valueSchema.size)
+  private val rowTuple = new UnsafeRowPair()
+
+  override def supportPrefixKeyScan: Boolean = false
+
+  override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
+    throw new IllegalStateException("This encoder doesn't support prefix key!")
+  }
+
+  override def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
+    throw new IllegalStateException("This encoder doesn't support prefix key!")
+  }
+
+  override def encodeKey(row: UnsafeRow): Array[Byte] = {
+    encodeUnsafeRow(row)
+  }
+
+  override def encodeValue(row: UnsafeRow): Array[Byte] = {
+    val bytes = encodeUnsafeRow(row)
+    val numBytes = bytes.length
+
+    val encodedBytes = new Array[Byte](java.lang.Integer.BYTES + bytes.length)
+    Platform.putInt(encodedBytes, Platform.BYTE_ARRAY_OFFSET, numBytes)
+    Platform.copyMemory(bytes, Platform.BYTE_ARRAY_OFFSET,
+      encodedBytes, java.lang.Integer.BYTES + Platform.BYTE_ARRAY_OFFSET, 
bytes.length)
+
+    encodedBytes
+  }
+
+  override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = {
+    decodeToUnsafeRow(keyBytes, keyRow)
+  }
+
+  override def decodeValue(valueBytes: Array[Byte]): UnsafeRow = {
+    if (valueBytes == null) {
+      null
+    } else {
+      val numBytes = Platform.getInt(valueBytes, Platform.BYTE_ARRAY_OFFSET)
+      val encodedValue = new Array[Byte](numBytes)
+      Platform.copyMemory(valueBytes, java.lang.Integer.BYTES + 
Platform.BYTE_ARRAY_OFFSET,
+        encodedValue, Platform.BYTE_ARRAY_OFFSET, numBytes)
+      decodeToUnsafeRow(encodedValue, valueRow)
+    }
+  }
+
+  override def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] = {
+    if (valueBytes == null) {
+      Seq().iterator
+    } else {
+      new Iterator[UnsafeRow] {
+        private var pos: Int = Platform.BYTE_ARRAY_OFFSET
+        private val maxPos = Platform.BYTE_ARRAY_OFFSET + valueBytes.length
+
+        override def hasNext: Boolean = {
+          pos < maxPos
+        }
+
+        override def next(): UnsafeRow = {
+          val numBytes = Platform.getInt(valueBytes, pos)
+
+          pos += java.lang.Integer.BYTES
+          val encodedValue = new Array[Byte](numBytes)
+          Platform.copyMemory(valueBytes, pos,
+            encodedValue, Platform.BYTE_ARRAY_OFFSET, numBytes)
+
+          pos += numBytes
+          pos += 1 // eat the delimiter character
+          decodeToUnsafeRow(encodedValue, valueRow)
+        }
+      }
+    }
+  }
+  override def supportsMultipleValuesPerKey: Boolean = true

Review Comment:
   Nit: can we add a newline here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to