anishshri-db commented on code in PR #43425:
URL: https://github.com/apache/spark/pull/43425#discussion_r1364331798


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util.UUID
+
+import scala.util.Try
+
+import org.apache.hadoop.fs.{Path, PathFilter}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReaderFactory, Scan, ScanBuilder}
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide,
 RightSide}
+import org.apache.spark.sql.execution.streaming.state.StateStoreConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+class StateScanBuilder(
+    session: SparkSession,
+    schema: StructType,
+    stateCheckpointRootLocation: String,
+    batchId: Long,
+    operatorId: Long,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateStoreConf: StateStoreConf) extends ScanBuilder {
+  override def build(): Scan = new StateScan(session, schema, 
stateCheckpointRootLocation,
+    batchId, operatorId, storeName, joinSide, stateStoreConf)
+}
+
+class StateStoreInputPartition(
+    val partition: Int,
+    val queryId: UUID,
+    val stateCheckpointRootLocation: String,
+    val batchId: Long,
+    val operatorId: Long,
+    val storeName: String,
+    val joinSide: JoinSideValues) extends InputPartition
+
+class StateScan(
+    session: SparkSession,
+    schema: StructType,
+    stateCheckpointRootLocation: String,
+    batchId: Long,
+    operatorId: Long,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateStoreConf: StateStoreConf) extends Scan with Batch {
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val hadoopConfBroadcast = session.sparkContext.broadcast(
+    new SerializableConfiguration(session.sessionState.newHadoopConf()))
+
+  override def readSchema(): StructType = schema
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    val fs = 
stateCheckpointPartitionsLocation.getFileSystem(hadoopConfBroadcast.value.value)
+    val partitions = fs.listStatus(stateCheckpointPartitionsLocation, new 
PathFilter() {
+      override def accept(path: Path): Boolean = {
+        fs.isDirectory(path) && Try(path.getName.toInt).isSuccess && 
path.getName.toInt >= 0
+      }
+    })
+
+    if (partitions.headOption.isEmpty) {
+      Array.empty[InputPartition]
+    } else {
+      // just a dummy query id because we are actually not running streaming 
query
+      val queryId = UUID.randomUUID()
+
+      val partitionsSorted = partitions.sortBy(fs => fs.getPath.getName.toInt)
+      val partitionNums = partitionsSorted.map(_.getPath.getName.toInt)
+      // assuming no same number - they're directories hence no same name
+      val head = partitionNums.head
+      val tail = partitionNums(partitionNums.length - 1)
+      assert(head == 0, "Partition should start with 0")
+      assert((tail - head + 1) == partitionNums.length,
+        s"No continuous partitions in state: 
${partitionNums.mkString("Array(", ", ", ")")}")
+
+      partitionNums.map {
+        pn => new StateStoreInputPartition(pn, queryId, 
stateCheckpointRootLocation,
+          batchId, operatorId, storeName, joinSide)
+      }.toArray
+    }
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = joinSide match {
+    case JoinSideValues.left =>
+      val userFacingSchema = schema
+      val stateSchema = StreamStreamJoinStateHelper.readSchema(session,
+        stateCheckpointRootLocation, operatorId.toInt, LeftSide, 
excludeAuxColumns = false)
+      new StreamStreamJoinStatePartitionReaderFactory(stateStoreConf,
+        hadoopConfBroadcast.value, userFacingSchema, stateSchema)
+
+    case JoinSideValues.right =>
+      val userFacingSchema = schema
+      val stateSchema = StreamStreamJoinStateHelper.readSchema(session,
+        stateCheckpointRootLocation, operatorId.toInt, RightSide, 
excludeAuxColumns = false)
+      new StreamStreamJoinStatePartitionReaderFactory(stateStoreConf,
+        hadoopConfBroadcast.value, userFacingSchema, stateSchema)
+
+    case JoinSideValues.none =>
+      new StatePartitionReaderFactory(stateStoreConf, 
hadoopConfBroadcast.value, schema)
+  }
+
+  override def toBatch: Batch = this
+
+  // FIXME: show more configs?

Review Comment:
   Which other configs are you referring to ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util.UUID
+
+import scala.util.Try
+
+import org.apache.hadoop.fs.{Path, PathFilter}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReaderFactory, Scan, ScanBuilder}
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide,
 RightSide}
+import org.apache.spark.sql.execution.streaming.state.StateStoreConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+class StateScanBuilder(
+    session: SparkSession,
+    schema: StructType,
+    stateCheckpointRootLocation: String,
+    batchId: Long,
+    operatorId: Long,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateStoreConf: StateStoreConf) extends ScanBuilder {
+  override def build(): Scan = new StateScan(session, schema, 
stateCheckpointRootLocation,
+    batchId, operatorId, storeName, joinSide, stateStoreConf)
+}
+
+class StateStoreInputPartition(
+    val partition: Int,
+    val queryId: UUID,
+    val stateCheckpointRootLocation: String,
+    val batchId: Long,
+    val operatorId: Long,
+    val storeName: String,
+    val joinSide: JoinSideValues) extends InputPartition
+
+class StateScan(
+    session: SparkSession,
+    schema: StructType,
+    stateCheckpointRootLocation: String,
+    batchId: Long,
+    operatorId: Long,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateStoreConf: StateStoreConf) extends Scan with Batch {
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val hadoopConfBroadcast = session.sparkContext.broadcast(
+    new SerializableConfiguration(session.sessionState.newHadoopConf()))
+
+  override def readSchema(): StructType = schema
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    val fs = 
stateCheckpointPartitionsLocation.getFileSystem(hadoopConfBroadcast.value.value)
+    val partitions = fs.listStatus(stateCheckpointPartitionsLocation, new 
PathFilter() {
+      override def accept(path: Path): Boolean = {
+        fs.isDirectory(path) && Try(path.getName.toInt).isSuccess && 
path.getName.toInt >= 0
+      }
+    })
+
+    if (partitions.headOption.isEmpty) {
+      Array.empty[InputPartition]
+    } else {
+      // just a dummy query id because we are actually not running streaming 
query
+      val queryId = UUID.randomUUID()
+
+      val partitionsSorted = partitions.sortBy(fs => fs.getPath.getName.toInt)
+      val partitionNums = partitionsSorted.map(_.getPath.getName.toInt)
+      // assuming no same number - they're directories hence no same name
+      val head = partitionNums.head
+      val tail = partitionNums(partitionNums.length - 1)
+      assert(head == 0, "Partition should start with 0")
+      assert((tail - head + 1) == partitionNums.length,
+        s"No continuous partitions in state: 
${partitionNums.mkString("Array(", ", ", ")")}")
+
+      partitionNums.map {
+        pn => new StateStoreInputPartition(pn, queryId, 
stateCheckpointRootLocation,
+          batchId, operatorId, storeName, joinSide)
+      }.toArray
+    }
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = joinSide match {
+    case JoinSideValues.left =>
+      val userFacingSchema = schema
+      val stateSchema = StreamStreamJoinStateHelper.readSchema(session,
+        stateCheckpointRootLocation, operatorId.toInt, LeftSide, 
excludeAuxColumns = false)
+      new StreamStreamJoinStatePartitionReaderFactory(stateStoreConf,
+        hadoopConfBroadcast.value, userFacingSchema, stateSchema)
+
+    case JoinSideValues.right =>
+      val userFacingSchema = schema
+      val stateSchema = StreamStreamJoinStateHelper.readSchema(session,
+        stateCheckpointRootLocation, operatorId.toInt, RightSide, 
excludeAuxColumns = false)
+      new StreamStreamJoinStatePartitionReaderFactory(stateStoreConf,
+        hadoopConfBroadcast.value, userFacingSchema, stateSchema)
+
+    case JoinSideValues.none =>
+      new StatePartitionReaderFactory(stateStoreConf, 
hadoopConfBroadcast.value, schema)
+  }
+
+  override def toBatch: Batch = this
+
+  // FIXME: show more configs?
+  override def description(): String = s"StateScan " +
+    s"[stateCkptLocation=$stateCheckpointRootLocation]" +
+    s"[batchId=$batchId][operatorId=$operatorId][storeName=$storeName]" +
+    s"[joinSide=$joinSide]"

Review Comment:
   maybe log this only if join is involved ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, SupportsRead, Table, TableCapability}
+import org.apache.spark.sql.connector.read.ScanBuilder
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues
+import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
+import org.apache.spark.sql.execution.streaming.state.StateStoreConf
+import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class StateTable(
+    session: SparkSession,
+    override val schema: StructType,
+    stateCheckpointLocation: String,
+    batchId: Long,
+    operatorId: Int,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateConf: StateStoreConf)
+  extends Table with SupportsRead with SupportsMetadataColumns {
+
+  import StateTable._
+
+  if (!isValidSchema(schema)) {
+    throw new AnalysisException("The fields of schema should be 'key' and 
'value', " +

Review Comment:
   Could we also log checkpointLoc and batchId/operatorId maybe ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -185,6 +187,51 @@ class SymmetricHashJoinStateManager(
     }
   }
 
+  def iterator: Iterator[KeyToValuePair] = {

Review Comment:
   nit: Add function comment ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReaderFactory.scala:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
+import org.apache.spark.sql.execution.streaming.state.StateStoreConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+class StreamStreamJoinStatePartitionReaderFactory(

Review Comment:
   Any reason to keep it in a separate file or can we combine with file above ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
GenericInternalRow, Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.connector.read.PartitionReader
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues
+import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
+import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{JoinSide,
 LeftSide, RightSide}
+import org.apache.spark.sql.execution.streaming.state.{StateStoreConf, 
SymmetricHashJoinStateManager}
+import org.apache.spark.sql.types.{BooleanType, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+class StreamStreamJoinStatePartitionReader(
+    storeConf: StateStoreConf,
+    hadoopConf: SerializableConfiguration,
+    partition: StateStoreInputPartition,
+    userFacingSchema: StructType,
+    stateSchema: StructType) extends PartitionReader[InternalRow] with Logging 
{
+
+  private val keySchema = SchemaUtil.getSchemaAsDataType(stateSchema, "key")
+    .asInstanceOf[StructType]
+  private val valueSchema = SchemaUtil.getSchemaAsDataType(stateSchema, 
"value")
+    .asInstanceOf[StructType]
+
+  private val userFacingValueSchema = 
SchemaUtil.getSchemaAsDataType(userFacingSchema, "value")
+    .asInstanceOf[StructType]
+
+  private val joinSide: JoinSide = partition.joinSide match {
+    case JoinSideValues.left => LeftSide
+    case JoinSideValues.right => RightSide
+    case JoinSideValues.none =>
+      throw new IllegalStateException("Unexpected join side for stream-stream 
read!")
+  }
+
+  private val (inputAttributes, formatVersion) = {

Review Comment:
   Can we add some comment explaining what this block is doing ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, SupportsRead, Table, TableCapability}
+import org.apache.spark.sql.connector.read.ScanBuilder
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues
+import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
+import org.apache.spark.sql.execution.streaming.state.StateStoreConf
+import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class StateTable(
+    session: SparkSession,
+    override val schema: StructType,
+    stateCheckpointLocation: String,
+    batchId: Long,
+    operatorId: Int,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateConf: StateStoreConf)
+  extends Table with SupportsRead with SupportsMetadataColumns {
+
+  import StateTable._
+
+  if (!isValidSchema(schema)) {
+    throw new AnalysisException("The fields of schema should be 'key' and 
'value', " +
+      "and each field should have corresponding fields (they should be a 
StructType)")
+  }
+
+  override def name(): String =
+    
s"state-table-ckpt-$stateCheckpointLocation-batch-$batchId-operator-$operatorId-"
 +
+      s"store-$storeName-joinside-$joinSide-stateconf-$stateConf"
+
+  override def capabilities(): util.Set[TableCapability] = CAPABILITY
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
+    new StateScanBuilder(session, schema, stateCheckpointLocation, batchId, 
operatorId, storeName,
+      joinSide, stateConf)
+
+  // FIXME: pop more critical configurations from stateConf?

Review Comment:
   Can you expand on this a bit ? we want to remove some settings from 
storeConf ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util.UUID
+
+import org.apache.spark.sql.SparkSession
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinSide
+import 
org.apache.spark.sql.execution.streaming.state.{StateSchemaCompatibilityChecker,
 StateStore, StateStoreId, StateStoreProviderId, SymmetricHashJoinStateManager}
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+

Review Comment:
   Nit: extra newline ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util.UUID
+
+import org.apache.spark.sql.SparkSession
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinSide
+import 
org.apache.spark.sql.execution.streaming.state.{StateSchemaCompatibilityChecker,
 StateStore, StateStoreId, StateStoreProviderId, SymmetricHashJoinStateManager}
+import org.apache.spark.sql.types.{BooleanType, StructType}
+
+
+object StreamStreamJoinStateHelper {
+  def readSchema(
+      session: SparkSession,
+      stateCheckpointLocation: String,
+      operatorId: Int,
+      side: JoinSide,
+      excludeAuxColumns: Boolean = true): StructType = {
+    val (keySchema, valueSchema) = readKeyValueSchema(session, 
stateCheckpointLocation,
+      operatorId, side, excludeAuxColumns)
+
+    new StructType()
+      .add("key", keySchema)
+      .add("value", valueSchema)
+  }
+
+  def readKeyValueSchema(
+      session: SparkSession,
+      stateCheckpointLocation: String,
+      operatorId: Int,
+      side: JoinSide,
+      excludeAuxColumns: Boolean = true): (StructType, StructType) = {
+
+    // KeyToNumValuesType, KeyWithIndexToValueType
+    val storeNames = 
SymmetricHashJoinStateManager.allStateStoreNames(side).toList
+
+    val partitionId = StateStore.PARTITION_ID_TO_CHECK_SCHEMA
+    val storeIdForKeyToNumValues = new StateStoreId(stateCheckpointLocation, 
operatorId,
+      partitionId, storeNames(0))
+    val providerIdForKeyToNumValues = new 
StateStoreProviderId(storeIdForKeyToNumValues,
+      UUID.randomUUID())
+
+    val storeIdForKeyWithIndexToValue = new 
StateStoreId(stateCheckpointLocation,
+      operatorId, partitionId, storeNames(1))
+    val providerIdForKeyWithIndexToValue = new 
StateStoreProviderId(storeIdForKeyWithIndexToValue,
+      UUID.randomUUID())
+
+    val newHadoopConf = session.sessionState.newHadoopConf()
+
+    val manager = new 
StateSchemaCompatibilityChecker(providerIdForKeyToNumValues, newHadoopConf)
+    val (keySchema, _) = manager.readSchemaFile()
+
+    val manager2 = new 
StateSchemaCompatibilityChecker(providerIdForKeyWithIndexToValue,
+      newHadoopConf)
+    val (_, valueSchema) = manager2.readSchemaFile()
+
+    val maybeMatchedColumn = valueSchema.last
+
+    if (excludeAuxColumns

Review Comment:
   Which auxiliary columns are we referring to here ? Can you give an example 
maybe ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to