anishshri-db commented on code in PR #43425:
URL: https://github.com/apache/spark/pull/43425#discussion_r1364319222


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
JoinedRow, UnsafeRow}
+import org.apache.spark.sql.connector.read.PartitionReader
+import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreConf, StateStoreId, StateStoreProviderId}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+class StatePartitionReader(
+    storeConf: StateStoreConf,
+    hadoopConf: SerializableConfiguration,
+    partition: StateStoreInputPartition,
+    schema: StructType) extends PartitionReader[InternalRow] {
+
+  private val keySchema = SchemaUtil.getSchemaAsDataType(schema, 
"key").asInstanceOf[StructType]
+  private val valueSchema = SchemaUtil.getSchemaAsDataType(schema, 
"value").asInstanceOf[StructType]
+
+  private lazy val store = {
+    val stateStoreId = StateStoreId(partition.stateCheckpointRootLocation,
+      partition.operatorId, partition.partition, partition.storeName)
+    val stateStoreProviderId = StateStoreProviderId(stateStoreId, 
partition.queryId)
+
+    // TODO: This does not handle the case of session window aggregation; we 
don't have an
+    //  information whether the state store uses prefix scan or not. We will 
have to add such
+    //  information to determine the right encoder/decoder for the data.
+    StateStore.getReadOnly(stateStoreProviderId, keySchema, valueSchema,
+      numColsPrefixKey = 0, version = partition.batchId + 1, storeConf = 
storeConf,
+      hadoopConf = hadoopConf.value)
+  }
+
+  private lazy val iter = {
+    store.iterator().map(pair => unifyStateRowPair((pair.key, pair.value)))
+  }
+
+  private var current: InternalRow = _
+
+  override def next(): Boolean = {
+    if (iter.hasNext) {
+      current = iter.next()
+      true
+    } else {
+      current = null
+      false
+    }
+  }
+
+  private val joinedRow = new JoinedRow()
+
+  private def addMetadata(row: InternalRow): InternalRow = {
+    val metadataRow = new GenericInternalRow(
+      StateTable.METADATA_COLUMNS.map(_.name()).map {
+        case "_partition_id" => partition.partition.asInstanceOf[Any]
+      }.toArray
+    )
+    joinedRow.withLeft(row).withRight(metadataRow)
+  }
+
+  override def get(): InternalRow = addMetadata(current)
+
+  override def close(): Unit = {
+    current = null
+    store.abort()

Review Comment:
   Are we relying on coordinator for this change ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util.UUID
+
+import scala.util.Try
+
+import org.apache.hadoop.fs.{Path, PathFilter}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReaderFactory, Scan, ScanBuilder}
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide,
 RightSide}
+import org.apache.spark.sql.execution.streaming.state.StateStoreConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+class StateScanBuilder(
+    session: SparkSession,
+    schema: StructType,
+    stateCheckpointRootLocation: String,
+    batchId: Long,
+    operatorId: Long,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateStoreConf: StateStoreConf) extends ScanBuilder {
+  override def build(): Scan = new StateScan(session, schema, 
stateCheckpointRootLocation,
+    batchId, operatorId, storeName, joinSide, stateStoreConf)
+}
+
+class StateStoreInputPartition(
+    val partition: Int,
+    val queryId: UUID,
+    val stateCheckpointRootLocation: String,
+    val batchId: Long,
+    val operatorId: Long,
+    val storeName: String,
+    val joinSide: JoinSideValues) extends InputPartition
+
+class StateScan(
+    session: SparkSession,
+    schema: StructType,
+    stateCheckpointRootLocation: String,
+    batchId: Long,
+    operatorId: Long,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateStoreConf: StateStoreConf) extends Scan with Batch {
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val hadoopConfBroadcast = session.sparkContext.broadcast(
+    new SerializableConfiguration(session.sessionState.newHadoopConf()))
+
+  override def readSchema(): StructType = schema
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    val fs = 
stateCheckpointPartitionsLocation.getFileSystem(hadoopConfBroadcast.value.value)
+    val partitions = fs.listStatus(stateCheckpointPartitionsLocation, new 
PathFilter() {
+      override def accept(path: Path): Boolean = {
+        fs.isDirectory(path) && Try(path.getName.toInt).isSuccess && 
path.getName.toInt >= 0
+      }
+    })
+
+    if (partitions.headOption.isEmpty) {
+      Array.empty[InputPartition]

Review Comment:
   This essentially means this query is not stateful ? Should we throw an 
exception in this case ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util.UUID
+
+import scala.util.Try
+
+import org.apache.hadoop.fs.{Path, PathFilter}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReaderFactory, Scan, ScanBuilder}
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide,
 RightSide}
+import org.apache.spark.sql.execution.streaming.state.StateStoreConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+class StateScanBuilder(
+    session: SparkSession,
+    schema: StructType,
+    stateCheckpointRootLocation: String,
+    batchId: Long,
+    operatorId: Long,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateStoreConf: StateStoreConf) extends ScanBuilder {
+  override def build(): Scan = new StateScan(session, schema, 
stateCheckpointRootLocation,
+    batchId, operatorId, storeName, joinSide, stateStoreConf)
+}
+
+class StateStoreInputPartition(
+    val partition: Int,
+    val queryId: UUID,
+    val stateCheckpointRootLocation: String,
+    val batchId: Long,
+    val operatorId: Long,
+    val storeName: String,
+    val joinSide: JoinSideValues) extends InputPartition
+
+class StateScan(
+    session: SparkSession,
+    schema: StructType,
+    stateCheckpointRootLocation: String,
+    batchId: Long,
+    operatorId: Long,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateStoreConf: StateStoreConf) extends Scan with Batch {
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val hadoopConfBroadcast = session.sparkContext.broadcast(
+    new SerializableConfiguration(session.sessionState.newHadoopConf()))
+
+  override def readSchema(): StructType = schema
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    val fs = 
stateCheckpointPartitionsLocation.getFileSystem(hadoopConfBroadcast.value.value)
+    val partitions = fs.listStatus(stateCheckpointPartitionsLocation, new 
PathFilter() {
+      override def accept(path: Path): Boolean = {
+        fs.isDirectory(path) && Try(path.getName.toInt).isSuccess && 
path.getName.toInt >= 0
+      }
+    })
+
+    if (partitions.headOption.isEmpty) {
+      Array.empty[InputPartition]
+    } else {
+      // just a dummy query id because we are actually not running streaming 
query
+      val queryId = UUID.randomUUID()
+
+      val partitionsSorted = partitions.sortBy(fs => fs.getPath.getName.toInt)
+      val partitionNums = partitionsSorted.map(_.getPath.getName.toInt)
+      // assuming no same number - they're directories hence no same name
+      val head = partitionNums.head
+      val tail = partitionNums(partitionNums.length - 1)
+      assert(head == 0, "Partition should start with 0")
+      assert((tail - head + 1) == partitionNums.length,
+        s"No continuous partitions in state: 
${partitionNums.mkString("Array(", ", ", ")")}")
+
+      partitionNums.map {
+        pn => new StateStoreInputPartition(pn, queryId, 
stateCheckpointRootLocation,
+          batchId, operatorId, storeName, joinSide)
+      }.toArray
+    }
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = joinSide match {
+    case JoinSideValues.left =>
+      val userFacingSchema = schema
+      val stateSchema = StreamStreamJoinStateHelper.readSchema(session,
+        stateCheckpointRootLocation, operatorId.toInt, LeftSide, 
excludeAuxColumns = false)
+      new StreamStreamJoinStatePartitionReaderFactory(stateStoreConf,
+        hadoopConfBroadcast.value, userFacingSchema, stateSchema)
+
+    case JoinSideValues.right =>
+      val userFacingSchema = schema
+      val stateSchema = StreamStreamJoinStateHelper.readSchema(session,
+        stateCheckpointRootLocation, operatorId.toInt, RightSide, 
excludeAuxColumns = false)
+      new StreamStreamJoinStatePartitionReaderFactory(stateStoreConf,
+        hadoopConfBroadcast.value, userFacingSchema, stateSchema)
+
+    case JoinSideValues.none =>
+      new StatePartitionReaderFactory(stateStoreConf, 
hadoopConfBroadcast.value, schema)
+  }
+
+  override def toBatch: Batch = this

Review Comment:
   What does this do exactly ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReaderFactory.scala:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
+import org.apache.spark.sql.execution.streaming.state.StateStoreConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+class StatePartitionReaderFactory(

Review Comment:
   Can we move this within the StatePartitionReader.scala file itself ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util.UUID
+
+import scala.util.Try
+
+import org.apache.hadoop.fs.{Path, PathFilter}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReaderFactory, Scan, ScanBuilder}
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide,
 RightSide}
+import org.apache.spark.sql.execution.streaming.state.StateStoreConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+class StateScanBuilder(
+    session: SparkSession,
+    schema: StructType,
+    stateCheckpointRootLocation: String,
+    batchId: Long,
+    operatorId: Long,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateStoreConf: StateStoreConf) extends ScanBuilder {
+  override def build(): Scan = new StateScan(session, schema, 
stateCheckpointRootLocation,
+    batchId, operatorId, storeName, joinSide, stateStoreConf)
+}
+
+class StateStoreInputPartition(
+    val partition: Int,
+    val queryId: UUID,
+    val stateCheckpointRootLocation: String,
+    val batchId: Long,
+    val operatorId: Long,
+    val storeName: String,
+    val joinSide: JoinSideValues) extends InputPartition
+
+class StateScan(
+    session: SparkSession,
+    schema: StructType,
+    stateCheckpointRootLocation: String,
+    batchId: Long,
+    operatorId: Long,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateStoreConf: StateStoreConf) extends Scan with Batch {
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val hadoopConfBroadcast = session.sparkContext.broadcast(
+    new SerializableConfiguration(session.sessionState.newHadoopConf()))
+
+  override def readSchema(): StructType = schema
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    val fs = 
stateCheckpointPartitionsLocation.getFileSystem(hadoopConfBroadcast.value.value)
+    val partitions = fs.listStatus(stateCheckpointPartitionsLocation, new 
PathFilter() {
+      override def accept(path: Path): Boolean = {
+        fs.isDirectory(path) && Try(path.getName.toInt).isSuccess && 
path.getName.toInt >= 0
+      }
+    })
+
+    if (partitions.headOption.isEmpty) {
+      Array.empty[InputPartition]
+    } else {
+      // just a dummy query id because we are actually not running streaming 
query
+      val queryId = UUID.randomUUID()

Review Comment:
   Hmm - why do we need to pass dummy queryId here ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util.UUID
+
+import scala.util.Try
+
+import org.apache.hadoop.fs.{Path, PathFilter}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReaderFactory, Scan, ScanBuilder}
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide,
 RightSide}
+import org.apache.spark.sql.execution.streaming.state.StateStoreConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+class StateScanBuilder(
+    session: SparkSession,
+    schema: StructType,
+    stateCheckpointRootLocation: String,
+    batchId: Long,
+    operatorId: Long,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateStoreConf: StateStoreConf) extends ScanBuilder {
+  override def build(): Scan = new StateScan(session, schema, 
stateCheckpointRootLocation,
+    batchId, operatorId, storeName, joinSide, stateStoreConf)
+}
+
+class StateStoreInputPartition(

Review Comment:
   Can we add some small comments for these classes or maybe just a file level 
comment ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.v2.state
+
+import java.util.UUID
+
+import scala.util.Try
+
+import org.apache.hadoop.fs.{Path, PathFilter}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReaderFactory, Scan, ScanBuilder}
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues
+import 
org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide,
 RightSide}
+import org.apache.spark.sql.execution.streaming.state.StateStoreConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+class StateScanBuilder(
+    session: SparkSession,
+    schema: StructType,
+    stateCheckpointRootLocation: String,
+    batchId: Long,
+    operatorId: Long,
+    storeName: String,
+    joinSide: JoinSideValues,
+    stateStoreConf: StateStoreConf) extends ScanBuilder {
+  override def build(): Scan = new StateScan(session, schema, 
stateCheckpointRootLocation,
+    batchId, operatorId, storeName, joinSide, stateStoreConf)
+}
+
+class StateStoreInputPartition(
+    val partition: Int,
+    val queryId: UUID,
+    val stateCheckpointRootLocation: String,
+    val batchId: Long,
+    val operatorId: Long,
+    val storeName: String,
+    val joinSide: JoinSideValues) extends InputPartition
+
+class StateScan(
+    session: SparkSession,
+    schema: StructType,
+    stateCheckpointRootLocation: String,
+    batchId: Long,

Review Comment:
   Can we embed some of these params within a case class ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to