anishshri-db commented on code in PR #43961:
URL: https://github.com/apache/spark/pull/43961#discussion_r1430640213


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -133,11 +219,30 @@ class StateStoreChangelogReader(
     case f: FileNotFoundException =>
       throw 
QueryExecutionErrors.failedToReadStreamingStateFileError(fileToRead, f)
   }
-  private val input: DataInputStream = decompressStream(sourceStream)
+  protected val input: DataInputStream = decompressStream(sourceStream)
 
   def close(): Unit = { if (input != null) input.close() }
 
-  override def getNext(): (Array[Byte], Array[Byte]) = {
+  override def getNext(): (RecordType.Value, Array[Byte], Array[Byte], String) 
= {
+    throw new UnsupportedOperationException("Iterator operations not supported 
on base " +
+      "changelog reader implementation")
+  }
+}
+
+/**
+ * Read an iterator of change record from the changelog file.
+ * A record is represented by ByteArrayPair(recordType: RecordType.Value,
+ *  key: Array[Byte], value: Array[Byte], colFamilyName: String)
+ * A put record is returned as a ByteArrayPair(recordType, key, value, 
colFamilyName)
+ * A delete record is return as a ByteArrayPair(recordType, key, null, 
colFamilyName)
+ */
+class StateStoreChangelogReaderV1(
+    fm: CheckpointFileManager,
+    fileToRead: Path,
+    compressionCodec: CompressionCodec) extends StateStoreChangelogReader(fm, 
fileToRead,

Review Comment:
   Done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -153,12 +258,70 @@ class StateStoreChangelogReader(
       val valueSize = input.readInt()
       if (valueSize < 0) {
         // A deletion record
-        (keyBuffer, null)
+        (RecordType.DELETE_RECORD, keyBuffer, null, 
StateStore.DEFAULT_COL_FAMILY_NAME)
       } else {
         val valueBuffer = new Array[Byte](valueSize)
         ByteStreams.readFully(input, valueBuffer, 0, valueSize)
         // A put record.
-        (keyBuffer, valueBuffer)
+        (RecordType.PUT_RECORD, keyBuffer, valueBuffer, 
StateStore.DEFAULT_COL_FAMILY_NAME)
+      }
+    }
+  }
+}
+
+/**
+ * Read an iterator of change record from the changelog file.
+ * A record is represented by ByteArrayPair(recordType: RecordType.Value,
+ *  key: Array[Byte], value: Array[Byte], colFamilyName: String)
+ * A put record is returned as a ByteArrayPair(recordType, key, value, 
colFamilyName)
+ * A delete record is return as a ByteArrayPair(recordType, key, null, 
colFamilyName)
+ */
+class StateStoreChangelogReaderV2(
+    fm: CheckpointFileManager,
+    fileToRead: Path,
+    compressionCodec: CompressionCodec) extends StateStoreChangelogReader(fm, 
fileToRead,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to