anishshri-db commented on code in PR #47445:
URL: https://github.com/apache/spark/pull/47445#discussion_r1687304001


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -86,24 +123,96 @@ object OperatorStateMetadataUtils {
     operatorStateMetadata.version match {
       case 1 =>
         
Serialization.write(operatorStateMetadata.asInstanceOf[OperatorStateMetadataV1],
 out)
-
+      case 2 =>
+        
Serialization.write(operatorStateMetadata.asInstanceOf[OperatorStateMetadataV2],
 out)
       case _ =>
         throw new IllegalArgumentException(s"Failed to serialize operator 
metadata with " +
           s"version=${operatorStateMetadata.version}")
     }
   }
 }
 
+object OperatorStateMetadataReader {
+  def createReader(
+      stateCheckpointPath: Path,
+      hadoopConf: Configuration,
+      version: Int): OperatorStateMetadataReader = {
+    version match {
+      case 1 =>
+        new OperatorStateMetadataV1Reader(stateCheckpointPath, hadoopConf)
+      case 2 =>
+        new OperatorStateMetadataV2Reader(stateCheckpointPath, hadoopConf)
+      case _ =>
+        throw new IllegalArgumentException(s"Failed to create reader for 
operator metadata " +
+          s"with version=$version")
+    }
+  }
+}
+
+object OperatorStateMetadataWriter {
+  def createWriter(
+      stateCheckpointPath: Path,
+      hadoopConf: Configuration,
+      version: Int,
+      currentBatchId: Option[Long] = None): OperatorStateMetadataWriter = {
+    version match {
+      case 1 =>
+        new OperatorStateMetadataV1Writer(stateCheckpointPath, hadoopConf)
+      case 2 =>
+        if (currentBatchId.isEmpty) {
+          throw new IllegalArgumentException("currentBatchId is required for 
version 2")
+        }
+        new OperatorStateMetadataV2Writer(stateCheckpointPath, hadoopConf, 
currentBatchId.get)
+      case _ =>
+          throw new IllegalArgumentException(s"Failed to create writer for 
operator metadata " +
+          s"with version=$version")
+    }
+  }
+}
+
+object OperatorStateMetadataV1 {
+  def metadataFilePath(stateCheckpointPath: Path): Path =
+    new Path(new Path(stateCheckpointPath, "_metadata"), "metadata")
+}
+
+object OperatorStateMetadataV2 {
+  private implicit val formats: Formats = Serialization.formats(NoTypeHints)
+
+  @scala.annotation.nowarn
+  private implicit val manifest = Manifest
+    
.classType[OperatorStateMetadataV2](implicitly[ClassTag[OperatorStateMetadataV2]].runtimeClass)
+
+  def metadataDirPath(stateCheckpointPath: Path): Path =
+    new Path(new Path(new Path(stateCheckpointPath, "_metadata"), "metadata"), 
"v2")
+
+  def metadataFilePath(stateCheckpointPath: Path, currentBatchId: Long): Path =
+    new Path(metadataDirPath(stateCheckpointPath), currentBatchId.toString)
+
+  def deserialize(in: BufferedReader): OperatorStateMetadata = {

Review Comment:
   Where do we use this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to