anishshri-db commented on code in PR #43961:
URL: https://github.com/apache/spark/pull/43961#discussion_r1440222266
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -153,12 +265,70 @@ class StateStoreChangelogReader(
val valueSize = input.readInt()
if (valueSize < 0) {
// A deletion record
- (keyBuffer, null)
+ (RecordType.DELETE_RECORD, keyBuffer, null,
StateStore.DEFAULT_COL_FAMILY_NAME)
} else {
val valueBuffer = new Array[Byte](valueSize)
ByteStreams.readFully(input, valueBuffer, 0, valueSize)
// A put record.
- (keyBuffer, valueBuffer)
+ (RecordType.PUT_RECORD, keyBuffer, valueBuffer,
StateStore.DEFAULT_COL_FAMILY_NAME)
+ }
+ }
+ }
+}
+
+/**
+ * Read an iterator of change record from the changelog file.
+ * A record is represented by ByteArrayPair(recordType: RecordType.Value,
+ * key: Array[Byte], value: Array[Byte], colFamilyName: String)
+ * A put record is returned as a ByteArrayPair(recordType, key, value,
colFamilyName)
+ * A delete record is return as a ByteArrayPair(recordType, key, null,
colFamilyName)
+ */
+class StateStoreChangelogReaderV2(
+ fm: CheckpointFileManager,
+ fileToRead: Path,
+ compressionCodec: CompressionCodec)
+ extends StateStoreChangelogReader(fm, fileToRead, compressionCodec) {
+
+ private def parseBuffer(input: DataInputStream): Array[Byte] = {
+ val blockSize = input.readInt()
+ val blockBuffer = new Array[Byte](blockSize)
+ ByteStreams.readFully(input, blockBuffer, 0, blockSize)
+ blockBuffer
+ }
+
+ override def getNext(): (RecordType.Value, Array[Byte], Array[Byte], String)
= {
+ val recordTypeSize = input.readInt()
+ // A -1 key size mean end of file.
+ if (recordTypeSize == -1) {
+ finished = true
+ null
+ } else if (recordTypeSize < 0) {
+ throw new IOException(
+ s"Error reading streaming state file $fileToRead: " +
+ s"record type size cannot be $recordTypeSize")
+ } else {
+ val recordTypeBuffer = new Array[Byte](recordTypeSize)
+ ByteStreams.readFully(input, recordTypeBuffer, 0, recordTypeSize)
Review Comment:
Just used the already existing pattern for `StateStoreChangelog`. Seems like
its based on Google's guava package -
https://github.com/google/guava/blob/master/guava/src/com/google/common/io/ByteStreams.java
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]