HeartSaVioR commented on code in PR #48148:
URL: https://github.com/apache/spark/pull/48148#discussion_r1770741812
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -676,27 +681,83 @@ class RocksDBStateStoreChangeDataReader(
endVersion: Long,
compressionCodec: CompressionCodec,
keyValueEncoderMap:
- ConcurrentHashMap[String, (RocksDBKeyStateEncoder,
RocksDBValueStateEncoder)])
+ ConcurrentHashMap[String, (RocksDBKeyStateEncoder,
RocksDBValueStateEncoder)],
+ colFamilyNameOpt: Option[String] = None)
extends StateStoreChangeDataReader(
- fm, stateLocation, startVersion, endVersion, compressionCodec) {
+ fm, stateLocation, startVersion, endVersion, compressionCodec,
colFamilyNameOpt) {
override protected var changelogSuffix: String = "changelog"
- override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
- val reader = currentChangelogReader()
- if (reader == null) {
- return null
+ private def getColFamilyIdBytes: Option[Array[Byte]] = {
+ if (colFamilyNameOpt.isDefined) {
+ val colFamilyName = colFamilyNameOpt.get
+ if (!keyValueEncoderMap.containsKey(colFamilyName)) {
+ throw new IllegalStateException(
+ s"Column family $colFamilyName not found in the key value encoder
map")
+ }
+ Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
+ } else {
+ None
}
- val (recordType, keyArray, valueArray) = reader.next()
- // Todo: does not support multiple virtual column families
- val (rocksDBKeyStateEncoder, rocksDBValueStateEncoder) =
- keyValueEncoderMap.get(StateStore.DEFAULT_COL_FAMILY_NAME)
- val keyRow = rocksDBKeyStateEncoder.decodeKey(keyArray)
- if (valueArray == null) {
- (recordType, keyRow, null, currentChangelogVersion - 1)
+ }
+
+ private val colFamilyIdBytesOpt: Option[Array[Byte]] = getColFamilyIdBytes
+
+ override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
+ if (colFamilyIdBytesOpt.isDefined) {
+ // If we are reading records for a particular column family, the
corresponding vcf id
+ // will be encoded in the key byte array. We need to extract that and
compare for the
+ // expected column family id. If it matches, we return the record. If
not, we move to
+ // the next record. Note that this has be handled across multiple
changelog files and we
+ // rely on the currentChangelogReader to move to the next changelog file
when needed.
+ var currRecord: (RecordType.Value, Array[Byte], Array[Byte]) = null
+ var currEncoder: (RocksDBKeyStateEncoder, RocksDBValueStateEncoder) =
null
+
+ breakable {
Review Comment:
I think we barely use breakable - why not use `currRecord` and `currEncoder`
to be set when they fulfill the criteria (vcf ID) and use them as criteria of
exiting loop?
I know it's either defining two more variables (inside the loop) or cleaning
up variables if they don't fulfill the criteria, but let's avoid using the
feature we don't seem to prefer to.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -676,27 +681,83 @@ class RocksDBStateStoreChangeDataReader(
endVersion: Long,
compressionCodec: CompressionCodec,
keyValueEncoderMap:
- ConcurrentHashMap[String, (RocksDBKeyStateEncoder,
RocksDBValueStateEncoder)])
+ ConcurrentHashMap[String, (RocksDBKeyStateEncoder,
RocksDBValueStateEncoder)],
+ colFamilyNameOpt: Option[String] = None)
extends StateStoreChangeDataReader(
- fm, stateLocation, startVersion, endVersion, compressionCodec) {
+ fm, stateLocation, startVersion, endVersion, compressionCodec,
colFamilyNameOpt) {
override protected var changelogSuffix: String = "changelog"
- override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
- val reader = currentChangelogReader()
- if (reader == null) {
- return null
+ private def getColFamilyIdBytes: Option[Array[Byte]] = {
+ if (colFamilyNameOpt.isDefined) {
+ val colFamilyName = colFamilyNameOpt.get
+ if (!keyValueEncoderMap.containsKey(colFamilyName)) {
+ throw new IllegalStateException(
+ s"Column family $colFamilyName not found in the key value encoder
map")
+ }
+ Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
+ } else {
+ None
}
- val (recordType, keyArray, valueArray) = reader.next()
- // Todo: does not support multiple virtual column families
- val (rocksDBKeyStateEncoder, rocksDBValueStateEncoder) =
- keyValueEncoderMap.get(StateStore.DEFAULT_COL_FAMILY_NAME)
- val keyRow = rocksDBKeyStateEncoder.decodeKey(keyArray)
- if (valueArray == null) {
- (recordType, keyRow, null, currentChangelogVersion - 1)
+ }
+
+ private val colFamilyIdBytesOpt: Option[Array[Byte]] = getColFamilyIdBytes
+
+ override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
+ if (colFamilyIdBytesOpt.isDefined) {
+ // If we are reading records for a particular column family, the
corresponding vcf id
+ // will be encoded in the key byte array. We need to extract that and
compare for the
+ // expected column family id. If it matches, we return the record. If
not, we move to
+ // the next record. Note that this has be handled across multiple
changelog files and we
+ // rely on the currentChangelogReader to move to the next changelog file
when needed.
+ var currRecord: (RecordType.Value, Array[Byte], Array[Byte]) = null
+ var currEncoder: (RocksDBKeyStateEncoder, RocksDBValueStateEncoder) =
null
+
+ breakable {
+ while (true) {
+ val reader = currentChangelogReader()
+ if (reader == null) {
+ return null
+ }
+
+ currRecord = reader.next()
+ currEncoder = keyValueEncoderMap.get(colFamilyNameOpt
+ .getOrElse(StateStore.DEFAULT_COL_FAMILY_NAME))
+
+ val colFamilyIdBytes: Array[Byte] = colFamilyIdBytesOpt.get
+
+ val keyArrayWithColFamilyId = new Array[Byte](colFamilyIdBytes.size)
+ Array.copy(currRecord._2, 0, keyArrayWithColFamilyId, 0,
colFamilyIdBytes.size)
Review Comment:
I think it's unnecessary; Arrays.equals has an overloaded method which
receives "range".
```
public static boolean equals(byte[] a, int aFromIndex, int aToIndex, byte[]
b, int bFromIndex, int bToIndex)
```
This is added from JDK 9.
Even though there wasn't such method, I'd say it's worth having method to
manually compare elements without copying.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -676,27 +681,83 @@ class RocksDBStateStoreChangeDataReader(
endVersion: Long,
compressionCodec: CompressionCodec,
keyValueEncoderMap:
- ConcurrentHashMap[String, (RocksDBKeyStateEncoder,
RocksDBValueStateEncoder)])
+ ConcurrentHashMap[String, (RocksDBKeyStateEncoder,
RocksDBValueStateEncoder)],
+ colFamilyNameOpt: Option[String] = None)
extends StateStoreChangeDataReader(
- fm, stateLocation, startVersion, endVersion, compressionCodec) {
+ fm, stateLocation, startVersion, endVersion, compressionCodec,
colFamilyNameOpt) {
override protected var changelogSuffix: String = "changelog"
- override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
- val reader = currentChangelogReader()
- if (reader == null) {
- return null
+ private def getColFamilyIdBytes: Option[Array[Byte]] = {
+ if (colFamilyNameOpt.isDefined) {
+ val colFamilyName = colFamilyNameOpt.get
+ if (!keyValueEncoderMap.containsKey(colFamilyName)) {
+ throw new IllegalStateException(
+ s"Column family $colFamilyName not found in the key value encoder
map")
+ }
+ Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
+ } else {
+ None
}
- val (recordType, keyArray, valueArray) = reader.next()
- // Todo: does not support multiple virtual column families
- val (rocksDBKeyStateEncoder, rocksDBValueStateEncoder) =
- keyValueEncoderMap.get(StateStore.DEFAULT_COL_FAMILY_NAME)
- val keyRow = rocksDBKeyStateEncoder.decodeKey(keyArray)
- if (valueArray == null) {
- (recordType, keyRow, null, currentChangelogVersion - 1)
+ }
+
+ private val colFamilyIdBytesOpt: Option[Array[Byte]] = getColFamilyIdBytes
+
+ override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
+ if (colFamilyIdBytesOpt.isDefined) {
+ // If we are reading records for a particular column family, the
corresponding vcf id
+ // will be encoded in the key byte array. We need to extract that and
compare for the
+ // expected column family id. If it matches, we return the record. If
not, we move to
+ // the next record. Note that this has be handled across multiple
changelog files and we
+ // rely on the currentChangelogReader to move to the next changelog file
when needed.
+ var currRecord: (RecordType.Value, Array[Byte], Array[Byte]) = null
+ var currEncoder: (RocksDBKeyStateEncoder, RocksDBValueStateEncoder) =
null
+
+ breakable {
+ while (true) {
+ val reader = currentChangelogReader()
+ if (reader == null) {
+ return null
+ }
+
+ currRecord = reader.next()
+ currEncoder = keyValueEncoderMap.get(colFamilyNameOpt
+ .getOrElse(StateStore.DEFAULT_COL_FAMILY_NAME))
+
+ val colFamilyIdBytes: Array[Byte] = colFamilyIdBytesOpt.get
+
+ val keyArrayWithColFamilyId = new Array[Byte](colFamilyIdBytes.size)
+ Array.copy(currRecord._2, 0, keyArrayWithColFamilyId, 0,
colFamilyIdBytes.size)
+
+ if (java.util.Arrays.equals(keyArrayWithColFamilyId,
colFamilyIdBytes)) {
+ break()
+ }
+ }
+ }
+
+ val keyRow = currEncoder._1.decodeKey(currRecord._2)
+ if (currRecord._3 == null) {
+ (currRecord._1, keyRow, null, currentChangelogVersion - 1)
+ } else {
+ val valueRow = currEncoder._2.decodeValue(currRecord._3)
+ (currRecord._1, keyRow, valueRow, currentChangelogVersion - 1)
+ }
} else {
- val valueRow = rocksDBValueStateEncoder.decodeValue(valueArray)
- (recordType, keyRow, valueRow, currentChangelogVersion - 1)
+ val reader = currentChangelogReader()
+ if (reader == null) {
+ return null
+ }
+ val (recordType, keyArray, valueArray) = reader.next()
Review Comment:
Looks like this line and partially next line are the only difference between
the two. Once we figure out the current record and the encoder/decoder,
everything would be the same. Shall we consider dedup the code?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -676,27 +681,83 @@ class RocksDBStateStoreChangeDataReader(
endVersion: Long,
compressionCodec: CompressionCodec,
keyValueEncoderMap:
- ConcurrentHashMap[String, (RocksDBKeyStateEncoder,
RocksDBValueStateEncoder)])
+ ConcurrentHashMap[String, (RocksDBKeyStateEncoder,
RocksDBValueStateEncoder)],
+ colFamilyNameOpt: Option[String] = None)
extends StateStoreChangeDataReader(
- fm, stateLocation, startVersion, endVersion, compressionCodec) {
+ fm, stateLocation, startVersion, endVersion, compressionCodec,
colFamilyNameOpt) {
override protected var changelogSuffix: String = "changelog"
- override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
- val reader = currentChangelogReader()
- if (reader == null) {
- return null
+ private def getColFamilyIdBytes: Option[Array[Byte]] = {
+ if (colFamilyNameOpt.isDefined) {
+ val colFamilyName = colFamilyNameOpt.get
+ if (!keyValueEncoderMap.containsKey(colFamilyName)) {
+ throw new IllegalStateException(
+ s"Column family $colFamilyName not found in the key value encoder
map")
+ }
+ Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
+ } else {
+ None
}
- val (recordType, keyArray, valueArray) = reader.next()
- // Todo: does not support multiple virtual column families
- val (rocksDBKeyStateEncoder, rocksDBValueStateEncoder) =
- keyValueEncoderMap.get(StateStore.DEFAULT_COL_FAMILY_NAME)
- val keyRow = rocksDBKeyStateEncoder.decodeKey(keyArray)
- if (valueArray == null) {
- (recordType, keyRow, null, currentChangelogVersion - 1)
+ }
+
+ private val colFamilyIdBytesOpt: Option[Array[Byte]] = getColFamilyIdBytes
+
+ override def getNext(): (RecordType.Value, UnsafeRow, UnsafeRow, Long) = {
+ if (colFamilyIdBytesOpt.isDefined) {
+ // If we are reading records for a particular column family, the
corresponding vcf id
+ // will be encoded in the key byte array. We need to extract that and
compare for the
+ // expected column family id. If it matches, we return the record. If
not, we move to
+ // the next record. Note that this has be handled across multiple
changelog files and we
+ // rely on the currentChangelogReader to move to the next changelog file
when needed.
+ var currRecord: (RecordType.Value, Array[Byte], Array[Byte]) = null
+ var currEncoder: (RocksDBKeyStateEncoder, RocksDBValueStateEncoder) =
null
+
+ breakable {
+ while (true) {
+ val reader = currentChangelogReader()
+ if (reader == null) {
+ return null
+ }
+
+ currRecord = reader.next()
+ currEncoder = keyValueEncoderMap.get(colFamilyNameOpt
Review Comment:
This does not need to be initialized from the loop - it's never changed in
the loop, and even further, never be used in the loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]