anishshri-db commented on code in PR #47445:
URL: https://github.com/apache/spark/pull/47445#discussion_r1687305345


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -124,22 +233,96 @@ class OperatorStateMetadataWriter(stateCheckpointPath: 
Path, hadoopConf: Configu
 }
 
 /**
- * Read OperatorStateMetadata from the state checkpoint directory.
+ * Read OperatorStateMetadata from the state checkpoint directory. This class 
will only be
+ * used to read OperatorStateMetadataV1.
+ * OperatorStateMetadataV2 will be read by the OperatorStateMetadataLog.
  */
-class OperatorStateMetadataReader(stateCheckpointPath: Path, hadoopConf: 
Configuration) {
+class OperatorStateMetadataV1Reader(
+    stateCheckpointPath: Path,
+    hadoopConf: Configuration) extends OperatorStateMetadataReader {
+  override def version: Int = 1
 
-  private val metadataFilePath = 
OperatorStateMetadataUtils.metadataFilePath(stateCheckpointPath)
+  private val metadataFilePath = 
OperatorStateMetadataV1.metadataFilePath(stateCheckpointPath)
 
   private lazy val fm = CheckpointFileManager.create(stateCheckpointPath, 
hadoopConf)
 
-  def read(): OperatorStateMetadata = {
+  def read(): Option[OperatorStateMetadata] = {
     val inputStream = fm.open(metadataFilePath)
     val inputReader =
       new BufferedReader(new InputStreamReader(inputStream, 
StandardCharsets.UTF_8))
     try {
       val versionStr = inputReader.readLine()
       val version = MetadataVersionUtil.validateVersion(versionStr, 1)
-      OperatorStateMetadataUtils.deserialize(version, inputReader)
+      Some(OperatorStateMetadataUtils.deserialize(version, inputReader))
+    } finally {
+      inputStream.close()
+    }
+  }
+}
+
+class OperatorStateMetadataV2Writer(

Review Comment:
   Isn't this redundant here ? could we combine the v1/v2 interfaces here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to