HeartSaVioR commented on code in PR #48148:
URL: https://github.com/apache/spark/pull/48148#discussion_r1770741812


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -676,27 +681,83 @@ class RocksDBStateStoreChangeDataReader(
     endVersion: Long,
     compressionCodec: CompressionCodec,
     keyValueEncoderMap:
-      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder)])
+      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder)],
+    colFamilyNameOpt: Option[String] = None)
   extends StateStoreChangeDataReader(
-    fm, stateLocation, startVersion, endVersion, compressionCodec) {
+    fm, stateLocation, startVersion, endVersion, compressionCodec, 
colFamilyNameOpt) {
 
   override protected var changelogSuffix: String = "changelog"
 
-  override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
-    val reader = currentChangelogReader()
-    if (reader == null) {
-      return null
+  private def getColFamilyIdBytes: Option[Array[Byte]] = {
+    if (colFamilyNameOpt.isDefined) {
+      val colFamilyName = colFamilyNameOpt.get
+      if (!keyValueEncoderMap.containsKey(colFamilyName)) {
+        throw new IllegalStateException(
+          s"Column family $colFamilyName not found in the key value encoder 
map")
+      }
+      Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
+    } else {
+      None
     }
-    val (recordType, keyArray, valueArray) = reader.next()
-    // Todo: does not support multiple virtual column families
-    val (rocksDBKeyStateEncoder, rocksDBValueStateEncoder) =
-      keyValueEncoderMap.get(StateStore.DEFAULT_COL_FAMILY_NAME)
-    val keyRow = rocksDBKeyStateEncoder.decodeKey(keyArray)
-    if (valueArray == null) {
-      (recordType, keyRow, null, currentChangelogVersion - 1)
+  }
+
+  private val colFamilyIdBytesOpt: Option[Array[Byte]] = getColFamilyIdBytes
+
+  override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
+    if (colFamilyIdBytesOpt.isDefined) {
+      // If we are reading records for a particular column family, the 
corresponding vcf id
+      // will be encoded in the key byte array. We need to extract that and 
compare for the
+      // expected column family id. If it matches, we return the record. If 
not, we move to
+      // the next record. Note that this has be handled across multiple 
changelog files and we
+      // rely on the currentChangelogReader to move to the next changelog file 
when needed.
+      var currRecord: (RecordType.Value, Array[Byte], Array[Byte]) = null
+      var currEncoder: (RocksDBKeyStateEncoder, RocksDBValueStateEncoder) = 
null
+
+      breakable {

Review Comment:
   I think we barely use breakable - why not use `currRecord` and `currEncoder` 
to be set when they fulfill the criteria (vcf ID) and use them as criteria of 
exiting loop?
   
   I know it's either defining two more variables (inside the loop) or cleaning 
up variables if they don't fulfill the criteria, but let's avoid using the 
feature we don't seem to prefer to.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -676,27 +681,83 @@ class RocksDBStateStoreChangeDataReader(
     endVersion: Long,
     compressionCodec: CompressionCodec,
     keyValueEncoderMap:
-      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder)])
+      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder)],
+    colFamilyNameOpt: Option[String] = None)
   extends StateStoreChangeDataReader(
-    fm, stateLocation, startVersion, endVersion, compressionCodec) {
+    fm, stateLocation, startVersion, endVersion, compressionCodec, 
colFamilyNameOpt) {
 
   override protected var changelogSuffix: String = "changelog"
 
-  override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
-    val reader = currentChangelogReader()
-    if (reader == null) {
-      return null
+  private def getColFamilyIdBytes: Option[Array[Byte]] = {
+    if (colFamilyNameOpt.isDefined) {
+      val colFamilyName = colFamilyNameOpt.get
+      if (!keyValueEncoderMap.containsKey(colFamilyName)) {
+        throw new IllegalStateException(
+          s"Column family $colFamilyName not found in the key value encoder 
map")
+      }
+      Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
+    } else {
+      None
     }
-    val (recordType, keyArray, valueArray) = reader.next()
-    // Todo: does not support multiple virtual column families
-    val (rocksDBKeyStateEncoder, rocksDBValueStateEncoder) =
-      keyValueEncoderMap.get(StateStore.DEFAULT_COL_FAMILY_NAME)
-    val keyRow = rocksDBKeyStateEncoder.decodeKey(keyArray)
-    if (valueArray == null) {
-      (recordType, keyRow, null, currentChangelogVersion - 1)
+  }
+
+  private val colFamilyIdBytesOpt: Option[Array[Byte]] = getColFamilyIdBytes
+
+  override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
+    if (colFamilyIdBytesOpt.isDefined) {
+      // If we are reading records for a particular column family, the 
corresponding vcf id
+      // will be encoded in the key byte array. We need to extract that and 
compare for the
+      // expected column family id. If it matches, we return the record. If 
not, we move to
+      // the next record. Note that this has be handled across multiple 
changelog files and we
+      // rely on the currentChangelogReader to move to the next changelog file 
when needed.
+      var currRecord: (RecordType.Value, Array[Byte], Array[Byte]) = null
+      var currEncoder: (RocksDBKeyStateEncoder, RocksDBValueStateEncoder) = 
null
+
+      breakable {
+        while (true) {
+          val reader = currentChangelogReader()
+          if (reader == null) {
+            return null
+          }
+
+          currRecord = reader.next()
+          currEncoder = keyValueEncoderMap.get(colFamilyNameOpt
+            .getOrElse(StateStore.DEFAULT_COL_FAMILY_NAME))
+
+          val colFamilyIdBytes: Array[Byte] = colFamilyIdBytesOpt.get
+
+          val keyArrayWithColFamilyId = new Array[Byte](colFamilyIdBytes.size)
+          Array.copy(currRecord._2, 0, keyArrayWithColFamilyId, 0, 
colFamilyIdBytes.size)

Review Comment:
   I think it's unnecessary; Arrays.equals has an overloaded method which 
receives "range".
   
   ```
   public static boolean equals(byte[] a, int aFromIndex, int aToIndex, byte[] 
b, int bFromIndex, int bToIndex)
   ```
   
   This is added from JDK 9.
   
   Even though there wasn't such method, I'd say it's worth having method to 
manually compare elements without copying.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -676,27 +681,83 @@ class RocksDBStateStoreChangeDataReader(
     endVersion: Long,
     compressionCodec: CompressionCodec,
     keyValueEncoderMap:
-      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder)])
+      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder)],
+    colFamilyNameOpt: Option[String] = None)
   extends StateStoreChangeDataReader(
-    fm, stateLocation, startVersion, endVersion, compressionCodec) {
+    fm, stateLocation, startVersion, endVersion, compressionCodec, 
colFamilyNameOpt) {
 
   override protected var changelogSuffix: String = "changelog"
 
-  override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
-    val reader = currentChangelogReader()
-    if (reader == null) {
-      return null
+  private def getColFamilyIdBytes: Option[Array[Byte]] = {
+    if (colFamilyNameOpt.isDefined) {
+      val colFamilyName = colFamilyNameOpt.get
+      if (!keyValueEncoderMap.containsKey(colFamilyName)) {
+        throw new IllegalStateException(
+          s"Column family $colFamilyName not found in the key value encoder 
map")
+      }
+      Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
+    } else {
+      None
     }
-    val (recordType, keyArray, valueArray) = reader.next()
-    // Todo: does not support multiple virtual column families
-    val (rocksDBKeyStateEncoder, rocksDBValueStateEncoder) =
-      keyValueEncoderMap.get(StateStore.DEFAULT_COL_FAMILY_NAME)
-    val keyRow = rocksDBKeyStateEncoder.decodeKey(keyArray)
-    if (valueArray == null) {
-      (recordType, keyRow, null, currentChangelogVersion - 1)
+  }
+
+  private val colFamilyIdBytesOpt: Option[Array[Byte]] = getColFamilyIdBytes
+
+  override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
+    if (colFamilyIdBytesOpt.isDefined) {
+      // If we are reading records for a particular column family, the 
corresponding vcf id
+      // will be encoded in the key byte array. We need to extract that and 
compare for the
+      // expected column family id. If it matches, we return the record. If 
not, we move to
+      // the next record. Note that this has be handled across multiple 
changelog files and we
+      // rely on the currentChangelogReader to move to the next changelog file 
when needed.
+      var currRecord: (RecordType.Value, Array[Byte], Array[Byte]) = null
+      var currEncoder: (RocksDBKeyStateEncoder, RocksDBValueStateEncoder) = 
null
+
+      breakable {
+        while (true) {
+          val reader = currentChangelogReader()
+          if (reader == null) {
+            return null
+          }
+
+          currRecord = reader.next()
+          currEncoder = keyValueEncoderMap.get(colFamilyNameOpt
+            .getOrElse(StateStore.DEFAULT_COL_FAMILY_NAME))
+
+          val colFamilyIdBytes: Array[Byte] = colFamilyIdBytesOpt.get
+
+          val keyArrayWithColFamilyId = new Array[Byte](colFamilyIdBytes.size)
+          Array.copy(currRecord._2, 0, keyArrayWithColFamilyId, 0, 
colFamilyIdBytes.size)
+
+          if (java.util.Arrays.equals(keyArrayWithColFamilyId, 
colFamilyIdBytes)) {
+            break()
+          }
+        }
+      }
+
+      val keyRow = currEncoder._1.decodeKey(currRecord._2)
+      if (currRecord._3 == null) {
+        (currRecord._1, keyRow, null, currentChangelogVersion - 1)
+      } else {
+        val valueRow = currEncoder._2.decodeValue(currRecord._3)
+        (currRecord._1, keyRow, valueRow, currentChangelogVersion - 1)
+      }
     } else {
-      val valueRow = rocksDBValueStateEncoder.decodeValue(valueArray)
-      (recordType, keyRow, valueRow, currentChangelogVersion - 1)
+      val reader = currentChangelogReader()
+      if (reader == null) {
+        return null
+      }
+      val (recordType, keyArray, valueArray) = reader.next()

Review Comment:
   Looks like this line and partially next line are the only difference between 
the two. Once we figure out the current record and the encoder/decoder, 
everything would be the same. Shall we consider dedup the code?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -676,27 +681,83 @@ class RocksDBStateStoreChangeDataReader(
     endVersion: Long,
     compressionCodec: CompressionCodec,
     keyValueEncoderMap:
-      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder)])
+      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder)],
+    colFamilyNameOpt: Option[String] = None)
   extends StateStoreChangeDataReader(
-    fm, stateLocation, startVersion, endVersion, compressionCodec) {
+    fm, stateLocation, startVersion, endVersion, compressionCodec, 
colFamilyNameOpt) {
 
   override protected var changelogSuffix: String = "changelog"
 
-  override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
-    val reader = currentChangelogReader()
-    if (reader == null) {
-      return null
+  private def getColFamilyIdBytes: Option[Array[Byte]] = {
+    if (colFamilyNameOpt.isDefined) {
+      val colFamilyName = colFamilyNameOpt.get
+      if (!keyValueEncoderMap.containsKey(colFamilyName)) {
+        throw new IllegalStateException(
+          s"Column family $colFamilyName not found in the key value encoder 
map")
+      }
+      Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
+    } else {
+      None
     }
-    val (recordType, keyArray, valueArray) = reader.next()
-    // Todo: does not support multiple virtual column families
-    val (rocksDBKeyStateEncoder, rocksDBValueStateEncoder) =
-      keyValueEncoderMap.get(StateStore.DEFAULT_COL_FAMILY_NAME)
-    val keyRow = rocksDBKeyStateEncoder.decodeKey(keyArray)
-    if (valueArray == null) {
-      (recordType, keyRow, null, currentChangelogVersion - 1)
+  }
+
+  private val colFamilyIdBytesOpt: Option[Array[Byte]] = getColFamilyIdBytes
+
+  override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
+    if (colFamilyIdBytesOpt.isDefined) {
+      // If we are reading records for a particular column family, the 
corresponding vcf id
+      // will be encoded in the key byte array. We need to extract that and 
compare for the
+      // expected column family id. If it matches, we return the record. If 
not, we move to
+      // the next record. Note that this has be handled across multiple 
changelog files and we
+      // rely on the currentChangelogReader to move to the next changelog file 
when needed.
+      var currRecord: (RecordType.Value, Array[Byte], Array[Byte]) = null
+      var currEncoder: (RocksDBKeyStateEncoder, RocksDBValueStateEncoder) = 
null
+
+      breakable {
+        while (true) {
+          val reader = currentChangelogReader()
+          if (reader == null) {
+            return null
+          }
+
+          currRecord = reader.next()
+          currEncoder = keyValueEncoderMap.get(colFamilyNameOpt

Review Comment:
   This does not need to be initialized from the loop - it's never changed in 
the loop, and even further, never be used in the loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to