anishshri-db commented on code in PR #43425: URL: https://github.com/apache/spark/pull/43425#discussion_r1364319222
########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, UnsafeRow} +import org.apache.spark.sql.connector.read.PartitionReader +import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProviderId} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +class StatePartitionReader( + storeConf: StateStoreConf, + hadoopConf: SerializableConfiguration, + partition: StateStoreInputPartition, + schema: StructType) extends PartitionReader[InternalRow] { + + private val keySchema = SchemaUtil.getSchemaAsDataType(schema, "key").asInstanceOf[StructType] + private val valueSchema = SchemaUtil.getSchemaAsDataType(schema, "value").asInstanceOf[StructType] + + private lazy val store = { + val stateStoreId = StateStoreId(partition.stateCheckpointRootLocation, + partition.operatorId, partition.partition, partition.storeName) + val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId) + + // TODO: This does not handle the case of session window aggregation; we don't have an + // information whether the state store uses prefix scan or not. We will have to add such + // information to determine the right encoder/decoder for the data. + StateStore.getReadOnly(stateStoreProviderId, keySchema, valueSchema, + numColsPrefixKey = 0, version = partition.batchId + 1, storeConf = storeConf, + hadoopConf = hadoopConf.value) + } + + private lazy val iter = { + store.iterator().map(pair => unifyStateRowPair((pair.key, pair.value))) + } + + private var current: InternalRow = _ + + override def next(): Boolean = { + if (iter.hasNext) { + current = iter.next() + true + } else { + current = null + false + } + } + + private val joinedRow = new JoinedRow() + + private def addMetadata(row: InternalRow): InternalRow = { + val metadataRow = new GenericInternalRow( + StateTable.METADATA_COLUMNS.map(_.name()).map { + case "_partition_id" => partition.partition.asInstanceOf[Any] + }.toArray + ) + joinedRow.withLeft(row).withRight(metadataRow) + } + + override def get(): InternalRow = addMetadata(current) + + override def close(): Unit = { + current = null + store.abort() Review Comment: Are we relying on coordinator for this change ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.util.UUID + +import scala.util.Try + +import org.apache.hadoop.fs.{Path, PathFilter} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan, ScanBuilder} +import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues +import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide} +import org.apache.spark.sql.execution.streaming.state.StateStoreConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +class StateScanBuilder( + session: SparkSession, + schema: StructType, + stateCheckpointRootLocation: String, + batchId: Long, + operatorId: Long, + storeName: String, + joinSide: JoinSideValues, + stateStoreConf: StateStoreConf) extends ScanBuilder { + override def build(): Scan = new StateScan(session, schema, stateCheckpointRootLocation, + batchId, operatorId, storeName, joinSide, stateStoreConf) +} + +class StateStoreInputPartition( + val partition: Int, + val queryId: UUID, + val stateCheckpointRootLocation: String, + val batchId: Long, + val operatorId: Long, + val storeName: String, + val joinSide: JoinSideValues) extends InputPartition + +class StateScan( + session: SparkSession, + schema: StructType, + stateCheckpointRootLocation: String, + batchId: Long, + operatorId: Long, + storeName: String, + joinSide: JoinSideValues, + stateStoreConf: StateStoreConf) extends Scan with Batch { + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + private val hadoopConfBroadcast = session.sparkContext.broadcast( + new SerializableConfiguration(session.sessionState.newHadoopConf())) + + override def readSchema(): StructType = schema + + override def planInputPartitions(): Array[InputPartition] = { + val fs = stateCheckpointPartitionsLocation.getFileSystem(hadoopConfBroadcast.value.value) + val partitions = fs.listStatus(stateCheckpointPartitionsLocation, new PathFilter() { + override def accept(path: Path): Boolean = { + fs.isDirectory(path) && Try(path.getName.toInt).isSuccess && path.getName.toInt >= 0 + } + }) + + if (partitions.headOption.isEmpty) { + Array.empty[InputPartition] Review Comment: This essentially means this query is not stateful ? Should we throw an exception in this case ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.util.UUID + +import scala.util.Try + +import org.apache.hadoop.fs.{Path, PathFilter} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan, ScanBuilder} +import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues +import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide} +import org.apache.spark.sql.execution.streaming.state.StateStoreConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +class StateScanBuilder( + session: SparkSession, + schema: StructType, + stateCheckpointRootLocation: String, + batchId: Long, + operatorId: Long, + storeName: String, + joinSide: JoinSideValues, + stateStoreConf: StateStoreConf) extends ScanBuilder { + override def build(): Scan = new StateScan(session, schema, stateCheckpointRootLocation, + batchId, operatorId, storeName, joinSide, stateStoreConf) +} + +class StateStoreInputPartition( + val partition: Int, + val queryId: UUID, + val stateCheckpointRootLocation: String, + val batchId: Long, + val operatorId: Long, + val storeName: String, + val joinSide: JoinSideValues) extends InputPartition + +class StateScan( + session: SparkSession, + schema: StructType, + stateCheckpointRootLocation: String, + batchId: Long, + operatorId: Long, + storeName: String, + joinSide: JoinSideValues, + stateStoreConf: StateStoreConf) extends Scan with Batch { + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + private val hadoopConfBroadcast = session.sparkContext.broadcast( + new SerializableConfiguration(session.sessionState.newHadoopConf())) + + override def readSchema(): StructType = schema + + override def planInputPartitions(): Array[InputPartition] = { + val fs = stateCheckpointPartitionsLocation.getFileSystem(hadoopConfBroadcast.value.value) + val partitions = fs.listStatus(stateCheckpointPartitionsLocation, new PathFilter() { + override def accept(path: Path): Boolean = { + fs.isDirectory(path) && Try(path.getName.toInt).isSuccess && path.getName.toInt >= 0 + } + }) + + if (partitions.headOption.isEmpty) { + Array.empty[InputPartition] + } else { + // just a dummy query id because we are actually not running streaming query + val queryId = UUID.randomUUID() + + val partitionsSorted = partitions.sortBy(fs => fs.getPath.getName.toInt) + val partitionNums = partitionsSorted.map(_.getPath.getName.toInt) + // assuming no same number - they're directories hence no same name + val head = partitionNums.head + val tail = partitionNums(partitionNums.length - 1) + assert(head == 0, "Partition should start with 0") + assert((tail - head + 1) == partitionNums.length, + s"No continuous partitions in state: ${partitionNums.mkString("Array(", ", ", ")")}") + + partitionNums.map { + pn => new StateStoreInputPartition(pn, queryId, stateCheckpointRootLocation, + batchId, operatorId, storeName, joinSide) + }.toArray + } + } + + override def createReaderFactory(): PartitionReaderFactory = joinSide match { + case JoinSideValues.left => + val userFacingSchema = schema + val stateSchema = StreamStreamJoinStateHelper.readSchema(session, + stateCheckpointRootLocation, operatorId.toInt, LeftSide, excludeAuxColumns = false) + new StreamStreamJoinStatePartitionReaderFactory(stateStoreConf, + hadoopConfBroadcast.value, userFacingSchema, stateSchema) + + case JoinSideValues.right => + val userFacingSchema = schema + val stateSchema = StreamStreamJoinStateHelper.readSchema(session, + stateCheckpointRootLocation, operatorId.toInt, RightSide, excludeAuxColumns = false) + new StreamStreamJoinStatePartitionReaderFactory(stateStoreConf, + hadoopConfBroadcast.value, userFacingSchema, stateSchema) + + case JoinSideValues.none => + new StatePartitionReaderFactory(stateStoreConf, hadoopConfBroadcast.value, schema) + } + + override def toBatch: Batch = this Review Comment: What does this do exactly ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReaderFactory.scala: ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} +import org.apache.spark.sql.execution.streaming.state.StateStoreConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +class StatePartitionReaderFactory( Review Comment: Can we move this within the StatePartitionReader.scala file itself ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.util.UUID + +import scala.util.Try + +import org.apache.hadoop.fs.{Path, PathFilter} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan, ScanBuilder} +import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues +import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide} +import org.apache.spark.sql.execution.streaming.state.StateStoreConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +class StateScanBuilder( + session: SparkSession, + schema: StructType, + stateCheckpointRootLocation: String, + batchId: Long, + operatorId: Long, + storeName: String, + joinSide: JoinSideValues, + stateStoreConf: StateStoreConf) extends ScanBuilder { + override def build(): Scan = new StateScan(session, schema, stateCheckpointRootLocation, + batchId, operatorId, storeName, joinSide, stateStoreConf) +} + +class StateStoreInputPartition( + val partition: Int, + val queryId: UUID, + val stateCheckpointRootLocation: String, + val batchId: Long, + val operatorId: Long, + val storeName: String, + val joinSide: JoinSideValues) extends InputPartition + +class StateScan( + session: SparkSession, + schema: StructType, + stateCheckpointRootLocation: String, + batchId: Long, + operatorId: Long, + storeName: String, + joinSide: JoinSideValues, + stateStoreConf: StateStoreConf) extends Scan with Batch { + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + private val hadoopConfBroadcast = session.sparkContext.broadcast( + new SerializableConfiguration(session.sessionState.newHadoopConf())) + + override def readSchema(): StructType = schema + + override def planInputPartitions(): Array[InputPartition] = { + val fs = stateCheckpointPartitionsLocation.getFileSystem(hadoopConfBroadcast.value.value) + val partitions = fs.listStatus(stateCheckpointPartitionsLocation, new PathFilter() { + override def accept(path: Path): Boolean = { + fs.isDirectory(path) && Try(path.getName.toInt).isSuccess && path.getName.toInt >= 0 + } + }) + + if (partitions.headOption.isEmpty) { + Array.empty[InputPartition] + } else { + // just a dummy query id because we are actually not running streaming query + val queryId = UUID.randomUUID() Review Comment: Hmm - why do we need to pass dummy queryId here ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.util.UUID + +import scala.util.Try + +import org.apache.hadoop.fs.{Path, PathFilter} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan, ScanBuilder} +import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues +import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide} +import org.apache.spark.sql.execution.streaming.state.StateStoreConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +class StateScanBuilder( + session: SparkSession, + schema: StructType, + stateCheckpointRootLocation: String, + batchId: Long, + operatorId: Long, + storeName: String, + joinSide: JoinSideValues, + stateStoreConf: StateStoreConf) extends ScanBuilder { + override def build(): Scan = new StateScan(session, schema, stateCheckpointRootLocation, + batchId, operatorId, storeName, joinSide, stateStoreConf) +} + +class StateStoreInputPartition( Review Comment: Can we add some small comments for these classes or maybe just a file level comment ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.util.UUID + +import scala.util.Try + +import org.apache.hadoop.fs.{Path, PathFilter} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan, ScanBuilder} +import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues +import org.apache.spark.sql.execution.datasources.v2.state.StateDataSourceV2.JoinSideValues.JoinSideValues +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide} +import org.apache.spark.sql.execution.streaming.state.StateStoreConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +class StateScanBuilder( + session: SparkSession, + schema: StructType, + stateCheckpointRootLocation: String, + batchId: Long, + operatorId: Long, + storeName: String, + joinSide: JoinSideValues, + stateStoreConf: StateStoreConf) extends ScanBuilder { + override def build(): Scan = new StateScan(session, schema, stateCheckpointRootLocation, + batchId, operatorId, storeName, joinSide, stateStoreConf) +} + +class StateStoreInputPartition( + val partition: Int, + val queryId: UUID, + val stateCheckpointRootLocation: String, + val batchId: Long, + val operatorId: Long, + val storeName: String, + val joinSide: JoinSideValues) extends InputPartition + +class StateScan( + session: SparkSession, + schema: StructType, + stateCheckpointRootLocation: String, + batchId: Long, Review Comment: Can we embed some of these params within a case class ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
