anishshri-db commented on code in PR #47104:
URL: https://github.com/apache/spark/pull/47104#discussion_r1659387824


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -278,25 +282,63 @@ case class StateStoreCustomTimingMetric(name: String, 
desc: String) extends Stat
     SQLMetrics.createTimingMetric(sparkContext, desc)
 }
 
-sealed trait KeyStateEncoderSpec
+sealed trait KeyStateEncoderSpec {
+  def jsonValue: JsonAST.JObject
+  def json: String = compact(render(jsonValue))
+}
+
+object KeyStateEncoderSpec {
+  def fromJson(m: Map[String, Any]): KeyStateEncoderSpec = {
+    // match on type
+    val keySchema = StructType.fromString(m("keySchema").asInstanceOf[String])
+    m("keyStateEncoderType").asInstanceOf[String] match {
+      case "NoPrefixKeyStateEncoderSpec" =>
+        NoPrefixKeyStateEncoderSpec(keySchema)
+      case "RangeKeyScanStateEncoderSpec" =>
+        val orderingOrdinals = m("orderingOrdinals").
+          asInstanceOf[List[_]].map(_.asInstanceOf[Int])
+        RangeKeyScanStateEncoderSpec(keySchema, orderingOrdinals)
+      case "PrefixKeyScanStateEncoderSpec" =>
+        val numColsPrefixKey = m("numColsPrefixKey").asInstanceOf[Int]
+        PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey)
+    }
+  }
+}
 
-case class NoPrefixKeyStateEncoderSpec(keySchema: StructType) extends 
KeyStateEncoderSpec
+case class NoPrefixKeyStateEncoderSpec(keySchema: StructType) extends 
KeyStateEncoderSpec {
+  override def jsonValue: JsonAST.JObject = {
+    ("keyStateEncoderType" -> JString("NoPrefixKeyStateEncoderSpec")) ~
+      ("keySchema" -> JString(keySchema.json))
+  }
+}
 
 case class PrefixKeyScanStateEncoderSpec(
     keySchema: StructType,
     numColsPrefixKey: Int) extends KeyStateEncoderSpec {
   if (numColsPrefixKey == 0 || numColsPrefixKey >= keySchema.length) {
     throw 
StateStoreErrors.incorrectNumOrderingColsForPrefixScan(numColsPrefixKey.toString)
   }
+
+  override def jsonValue: JsonAST.JObject = {
+    ("keyStateEncoderType" -> JString("PrefixKeyScanStateEncoderSpec")) ~
+      ("keySchema" -> JString(keySchema.json)) ~
+      ("numColsPrefixKey" -> JInt(numColsPrefixKey))
+  }
 }
 
 /** Encodes rows so that they can be range-scanned based on orderingOrdinals */
 case class RangeKeyScanStateEncoderSpec(
-    keySchema: StructType,
-    orderingOrdinals: Seq[Int]) extends KeyStateEncoderSpec {
+                                         keySchema: StructType,

Review Comment:
   nit: unintentional maybe ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -75,7 +76,9 @@ trait StatefulOperator extends SparkPlan {
 
   // Function used to record state schema for the first time and validate it 
against proposed
   // schema changes in the future. Runs as part of a planning rule on the 
driver.
-  def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): Unit
+  // Returns the schema file path for operators that write this to the 
metadata file,
+  // otherwise None
+  def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration, batchId: 
Long): Option[String]

Review Comment:
   for join - a single string might not be enough right ? we need a path for 
each store name ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -64,15 +77,12 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
   val metadataPath = new Path(path)
 
   protected val fileManager =
-    CheckpointFileManager.create(metadataPath, 
sparkSession.sessionState.newHadoopConf())
+    CheckpointFileManager.create(metadataPath, hadoopConf)
 
   if (!fileManager.exists(metadataPath)) {
     fileManager.mkdirs(metadataPath)
   }
 
-  protected val metadataCacheEnabled: Boolean

Review Comment:
   Why do we need this change ?
   



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -202,8 +202,19 @@ class IncrementalExecution(
   // changes across query runs.
   object StateSchemaValidationRule extends SparkPlanPartialRule {
     override val rule: PartialFunction[SparkPlan, SparkPlan] = {
+      // In the case of TransformWithStateExec, we want to collect this 
StateSchema
+      // filepath, and write this path out in the OperatorStateMetadata file
+      case tws: TransformWithStateExec if isFirstBatch =>
+        val stateSchemaPath =
+          tws.validateAndMaybeEvolveStateSchema(hadoopConf, currentBatchId)
+        // At this point, stateInfo should always be defined
+        tws.stateInfo match {
+            case Some(stateInfo) =>
+                tws.copy(stateInfo = Some(stateInfo.copy(stateSchemaPath = 
stateSchemaPath)))

Review Comment:
   Would it be easier to just combine the operator metadata rule and this one ? 
that way the state schema paths are directly available ? i think we could 
probably just do both for `StateStoreWriter` ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateSchemaV3File.scala:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming

Review Comment:
   could we move this file under the `state` directory ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala:
##########
@@ -48,9 +49,21 @@ import org.apache.spark.util.ArrayImplicits._
  * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they 
don't guarantee listing
  * files in a directory always shows the latest files.
  */
-class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, 
path: String)
+class HDFSMetadataLog[T <: AnyRef : ClassTag](
+    hadoopConf: Configuration,
+    path: String,

Review Comment:
   can we add a comment for the args maybe explaining how they are used ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to