micheal-o commented on code in PR #53287: URL: https://github.com/apache/spark/pull/53287#discussion_r2582809230
########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionWriter.scala: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.io.IOException + +import scala.collection.MapView +import scala.collection.immutable.HashMap + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil +import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types.StructType + +/** + * A writer that can directly write binary data to the streaming state store. + * + * This writer expects input rows with the same schema produced by + * StatePartitionAllColumnFamiliesReader: + * (partition_key, key_bytes, value_bytes, column_family_name) + * + * The writer creates a fresh (empty) state store instance for the target commit version + * instead of loading previous partition data. After writing all rows for the partition, it will + * commit all changes as a snapshot + */ +class StatePartitionAllColumnFamiliesWriter( + storeConf: StateStoreConf, + hadoopConf: Configuration, + partition: StateStoreInputPartition, Review Comment: Don't use `StateStoreInputPartition`. We won't have it since we are not writing with data source atm. Let the params that you need be passed in explicitly ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionWriter.scala: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.io.IOException + +import scala.collection.MapView +import scala.collection.immutable.HashMap + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil +import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types.StructType + +/** + * A writer that can directly write binary data to the streaming state store. + * + * This writer expects input rows with the same schema produced by + * StatePartitionAllColumnFamiliesReader: + * (partition_key, key_bytes, value_bytes, column_family_name) + * + * The writer creates a fresh (empty) state store instance for the target commit version + * instead of loading previous partition data. After writing all rows for the partition, it will + * commit all changes as a snapshot + */ +class StatePartitionAllColumnFamiliesWriter( + storeConf: StateStoreConf, + hadoopConf: Configuration, + partition: StateStoreInputPartition, + columnFamilyToSchemaMap: HashMap[String, (StructType, StructType)], Review Comment: Instead of the tuple, you can use `StateStoreColFamilySchema` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionWriter.scala: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.io.IOException + +import scala.collection.MapView +import scala.collection.immutable.HashMap + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil +import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types.StructType + +/** + * A writer that can directly write binary data to the streaming state store. + * + * This writer expects input rows with the same schema produced by + * StatePartitionAllColumnFamiliesReader: + * (partition_key, key_bytes, value_bytes, column_family_name) + * + * The writer creates a fresh (empty) state store instance for the target commit version + * instead of loading previous partition data. After writing all rows for the partition, it will + * commit all changes as a snapshot + */ +class StatePartitionAllColumnFamiliesWriter( + storeConf: StateStoreConf, + hadoopConf: Configuration, + partition: StateStoreInputPartition, + columnFamilyToSchemaMap: HashMap[String, (StructType, StructType)], + keyStateEncoderSpec: KeyStateEncoderSpec) { + + private val (defaultKeySchema, defaultValueSchema) = { + columnFamilyToSchemaMap.getOrElse( + StateStore.DEFAULT_COL_FAMILY_NAME, + throw new IllegalArgumentException( + s"Column family ${StateStore.DEFAULT_COL_FAMILY_NAME} not found in schema map") + ) + } + + private val columnFamilyToKeySchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._1.length) + private val columnFamilyToValueSchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._2.length) + + private val rowConverter = { + val schema = SchemaUtil.getSourceSchema( + partition.sourceOptions, defaultKeySchema, defaultValueSchema, None, None) + CatalystTypeConverters.createToCatalystConverter(schema) + } + + protected lazy val provider: StateStoreProvider = { + val stateStoreId = StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString, + partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName) + val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId) + + val provider = StateStoreProvider.createAndInit( + stateStoreProviderId, defaultKeySchema, defaultValueSchema, keyStateEncoderSpec, + useColumnFamilies = false, storeConf, hadoopConf, + useMultipleValuesPerKey = false, stateSchemaProvider = None) + provider + } + + private lazy val stateStore: StateStore = { + val stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) { + Some(java.util.UUID.randomUUID.toString) + } else { + None + } + val version = partition.sourceOptions.batchId + 1 + // Create empty store to avoid loading old partition data during repartitioning Review Comment: nit: rephrase to `...old partition data since we are rewriting the store e.g. during repartitioning` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionWriter.scala: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.io.IOException + +import scala.collection.MapView +import scala.collection.immutable.HashMap + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil +import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types.StructType + +/** + * A writer that can directly write binary data to the streaming state store. + * + * This writer expects input rows with the same schema produced by + * StatePartitionAllColumnFamiliesReader: + * (partition_key, key_bytes, value_bytes, column_family_name) + * + * The writer creates a fresh (empty) state store instance for the target commit version + * instead of loading previous partition data. After writing all rows for the partition, it will + * commit all changes as a snapshot + */ +class StatePartitionAllColumnFamiliesWriter( + storeConf: StateStoreConf, + hadoopConf: Configuration, + partition: StateStoreInputPartition, + columnFamilyToSchemaMap: HashMap[String, (StructType, StructType)], + keyStateEncoderSpec: KeyStateEncoderSpec) { + + private val (defaultKeySchema, defaultValueSchema) = { + columnFamilyToSchemaMap.getOrElse( + StateStore.DEFAULT_COL_FAMILY_NAME, + throw new IllegalArgumentException( + s"Column family ${StateStore.DEFAULT_COL_FAMILY_NAME} not found in schema map") + ) + } + + private val columnFamilyToKeySchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._1.length) + private val columnFamilyToValueSchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._2.length) + + private val rowConverter = { + val schema = SchemaUtil.getSourceSchema( + partition.sourceOptions, defaultKeySchema, defaultValueSchema, None, None) + CatalystTypeConverters.createToCatalystConverter(schema) + } + + protected lazy val provider: StateStoreProvider = { + val stateStoreId = StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString, + partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName) + val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId) + + val provider = StateStoreProvider.createAndInit( + stateStoreProviderId, defaultKeySchema, defaultValueSchema, keyStateEncoderSpec, + useColumnFamilies = false, storeConf, hadoopConf, + useMultipleValuesPerKey = false, stateSchemaProvider = None) + provider + } + + private lazy val stateStore: StateStore = { + val stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) { + Some(java.util.UUID.randomUUID.toString) + } else { + None + } + val version = partition.sourceOptions.batchId + 1 + // Create empty store to avoid loading old partition data during repartitioning + // Use loadEmpty=true to create a fresh state store without loading previous versions + // We create the empty store AT version, and the next commit will + // produce version + 1 + provider.getStore( + version, + stateStoreCkptId, + forceSnapshotOnCommit = true, + loadEmpty = true + ) + } + + // The function that writes and commits data to state store. It takes in rows with schema + // - partition_key, StructType + // - key_bytes, BinaryType + // - value_bytes, BinaryType + // - column_family_name, StringType + def put(partition: Iterator[Row]): Unit = { + partition.foreach(row => putRaw(row)) + stateStore.commit() + } + + private def putRaw(rawRecord: Row): Unit = { + val record = rowConverter(rawRecord).asInstanceOf[InternalRow] + // Validate record schema + if (record.numFields != 4) { + throw new IOException( + s"Invalid record schema: expected 4 fields (partition_key, key_bytes, value_bytes, " + + s"column_family_name), got ${record.numFields}") + } + + // Extract raw bytes and column family name from the record + val keyBytes = record.getBinary(1) + val valueBytes = record.getBinary(2) + val colFamilyName = record.getString(3) + + // Reconstruct UnsafeRow objects from the raw bytes + // The bytes are in UnsafeRow memory format from StatePartitionReaderAllColumnFamilies + val keyRow = new UnsafeRow(columnFamilyToKeySchemaLenMap(colFamilyName)) + keyRow.pointTo(keyBytes, keyBytes.length) + + val valueRow = new UnsafeRow(columnFamilyToValueSchemaLenMap(colFamilyName)) + valueRow.pointTo(valueBytes, valueBytes.length) + + // Use StateStore API which handles proper RocksDB encoding (version byte, checksums, etc.) Review Comment: nit: remove this comment. Not necessary ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionWriter.scala: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.io.IOException + +import scala.collection.MapView +import scala.collection.immutable.HashMap + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil +import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types.StructType + +/** + * A writer that can directly write binary data to the streaming state store. + * + * This writer expects input rows with the same schema produced by + * StatePartitionAllColumnFamiliesReader: + * (partition_key, key_bytes, value_bytes, column_family_name) + * + * The writer creates a fresh (empty) state store instance for the target commit version + * instead of loading previous partition data. After writing all rows for the partition, it will + * commit all changes as a snapshot + */ +class StatePartitionAllColumnFamiliesWriter( + storeConf: StateStoreConf, + hadoopConf: Configuration, + partition: StateStoreInputPartition, + columnFamilyToSchemaMap: HashMap[String, (StructType, StructType)], + keyStateEncoderSpec: KeyStateEncoderSpec) { + + private val (defaultKeySchema, defaultValueSchema) = { + columnFamilyToSchemaMap.getOrElse( + StateStore.DEFAULT_COL_FAMILY_NAME, + throw new IllegalArgumentException( + s"Column family ${StateStore.DEFAULT_COL_FAMILY_NAME} not found in schema map") + ) + } + + private val columnFamilyToKeySchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._1.length) + private val columnFamilyToValueSchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._2.length) + + private val rowConverter = { + val schema = SchemaUtil.getSourceSchema( + partition.sourceOptions, defaultKeySchema, defaultValueSchema, None, None) + CatalystTypeConverters.createToCatalystConverter(schema) + } + + protected lazy val provider: StateStoreProvider = { + val stateStoreId = StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString, + partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName) + val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId) Review Comment: you can use a new UUID here for runId, since this isn't a streaming query run ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionWriter.scala: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.io.IOException + +import scala.collection.MapView +import scala.collection.immutable.HashMap + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil +import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types.StructType + +/** + * A writer that can directly write binary data to the streaming state store. + * + * This writer expects input rows with the same schema produced by + * StatePartitionAllColumnFamiliesReader: + * (partition_key, key_bytes, value_bytes, column_family_name) + * + * The writer creates a fresh (empty) state store instance for the target commit version + * instead of loading previous partition data. After writing all rows for the partition, it will + * commit all changes as a snapshot + */ +class StatePartitionAllColumnFamiliesWriter( + storeConf: StateStoreConf, + hadoopConf: Configuration, + partition: StateStoreInputPartition, + columnFamilyToSchemaMap: HashMap[String, (StructType, StructType)], + keyStateEncoderSpec: KeyStateEncoderSpec) { Review Comment: this would already be in the `StateStoreColFamilySchema` above ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ########## @@ -731,6 +731,91 @@ class RocksDB( this } + /** + * Create an empty RocksDB state store at the specified version without loading previous data. + * + * This method is useful when state will be completely rewritten and + * does not need to load previous states + * + * @param targetVersion The version to initialize the empty store at (must be >= 0) + * @param stateStoreCkptId Optional checkpoint ID (required if checkpoint IDs are enabled) + * @param readOnly Whether to open the store in read-only mode + * @return A RocksDB instance with an empty state at the target version + */ + def loadEmpty( Review Comment: We shouldn't use a separate function. Lets implement this within the normal load func to avoid duplicating code. You can implement for only `loadWithoutCheckpointId` for now. We will do for `loadWithCheckpointId` when we add checkpoint v2 support. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionWriter.scala: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.io.IOException + +import scala.collection.MapView +import scala.collection.immutable.HashMap + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil +import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types.StructType + +/** + * A writer that can directly write binary data to the streaming state store. + * + * This writer expects input rows with the same schema produced by + * StatePartitionAllColumnFamiliesReader: + * (partition_key, key_bytes, value_bytes, column_family_name) + * + * The writer creates a fresh (empty) state store instance for the target commit version + * instead of loading previous partition data. After writing all rows for the partition, it will + * commit all changes as a snapshot + */ +class StatePartitionAllColumnFamiliesWriter( + storeConf: StateStoreConf, + hadoopConf: Configuration, + partition: StateStoreInputPartition, + columnFamilyToSchemaMap: HashMap[String, (StructType, StructType)], + keyStateEncoderSpec: KeyStateEncoderSpec) { + + private val (defaultKeySchema, defaultValueSchema) = { + columnFamilyToSchemaMap.getOrElse( + StateStore.DEFAULT_COL_FAMILY_NAME, + throw new IllegalArgumentException( + s"Column family ${StateStore.DEFAULT_COL_FAMILY_NAME} not found in schema map") + ) + } + + private val columnFamilyToKeySchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._1.length) + private val columnFamilyToValueSchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._2.length) + + private val rowConverter = { + val schema = SchemaUtil.getSourceSchema( + partition.sourceOptions, defaultKeySchema, defaultValueSchema, None, None) + CatalystTypeConverters.createToCatalystConverter(schema) + } + + protected lazy val provider: StateStoreProvider = { + val stateStoreId = StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString, + partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName) + val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId) + + val provider = StateStoreProvider.createAndInit( + stateStoreProviderId, defaultKeySchema, defaultValueSchema, keyStateEncoderSpec, + useColumnFamilies = false, storeConf, hadoopConf, + useMultipleValuesPerKey = false, stateSchemaProvider = None) + provider + } + + private lazy val stateStore: StateStore = { + val stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) { + Some(java.util.UUID.randomUUID.toString) Review Comment: This isn't correct. Lets set checkpointId to None for now. We will handle that in subsequent PR to support checkpoint v2 ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionWriter.scala: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.io.IOException + +import scala.collection.MapView +import scala.collection.immutable.HashMap + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil +import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types.StructType + +/** + * A writer that can directly write binary data to the streaming state store. + * + * This writer expects input rows with the same schema produced by + * StatePartitionAllColumnFamiliesReader: + * (partition_key, key_bytes, value_bytes, column_family_name) + * + * The writer creates a fresh (empty) state store instance for the target commit version + * instead of loading previous partition data. After writing all rows for the partition, it will + * commit all changes as a snapshot + */ +class StatePartitionAllColumnFamiliesWriter( + storeConf: StateStoreConf, + hadoopConf: Configuration, + partition: StateStoreInputPartition, + columnFamilyToSchemaMap: HashMap[String, (StructType, StructType)], + keyStateEncoderSpec: KeyStateEncoderSpec) { + + private val (defaultKeySchema, defaultValueSchema) = { + columnFamilyToSchemaMap.getOrElse( + StateStore.DEFAULT_COL_FAMILY_NAME, + throw new IllegalArgumentException( + s"Column family ${StateStore.DEFAULT_COL_FAMILY_NAME} not found in schema map") + ) + } + + private val columnFamilyToKeySchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._1.length) + private val columnFamilyToValueSchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._2.length) + + private val rowConverter = { + val schema = SchemaUtil.getSourceSchema( + partition.sourceOptions, defaultKeySchema, defaultValueSchema, None, None) + CatalystTypeConverters.createToCatalystConverter(schema) + } + + protected lazy val provider: StateStoreProvider = { + val stateStoreId = StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString, + partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName) + val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId) + + val provider = StateStoreProvider.createAndInit( + stateStoreProviderId, defaultKeySchema, defaultValueSchema, keyStateEncoderSpec, + useColumnFamilies = false, storeConf, hadoopConf, + useMultipleValuesPerKey = false, stateSchemaProvider = None) + provider + } + + private lazy val stateStore: StateStore = { + val stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) { + Some(java.util.UUID.randomUUID.toString) + } else { + None + } + val version = partition.sourceOptions.batchId + 1 + // Create empty store to avoid loading old partition data during repartitioning + // Use loadEmpty=true to create a fresh state store without loading previous versions + // We create the empty store AT version, and the next commit will + // produce version + 1 + provider.getStore( + version, + stateStoreCkptId, + forceSnapshotOnCommit = true, Review Comment: Actually, instead, lets turn off changelog checkpointing in the db when `loadEmpty`, so that we would only generate a snapshot and no changelog. Because the new state version marks a new beginning, and we don't want to mistakenly apply the new changelog to older state versions. Lets make sure we have a test for this. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionWriter.scala: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.io.IOException + +import scala.collection.MapView +import scala.collection.immutable.HashMap + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil +import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types.StructType + +/** + * A writer that can directly write binary data to the streaming state store. + * + * This writer expects input rows with the same schema produced by + * StatePartitionAllColumnFamiliesReader: + * (partition_key, key_bytes, value_bytes, column_family_name) + * + * The writer creates a fresh (empty) state store instance for the target commit version + * instead of loading previous partition data. After writing all rows for the partition, it will + * commit all changes as a snapshot + */ +class StatePartitionAllColumnFamiliesWriter( + storeConf: StateStoreConf, + hadoopConf: Configuration, + partition: StateStoreInputPartition, + columnFamilyToSchemaMap: HashMap[String, (StructType, StructType)], + keyStateEncoderSpec: KeyStateEncoderSpec) { + + private val (defaultKeySchema, defaultValueSchema) = { + columnFamilyToSchemaMap.getOrElse( + StateStore.DEFAULT_COL_FAMILY_NAME, + throw new IllegalArgumentException( + s"Column family ${StateStore.DEFAULT_COL_FAMILY_NAME} not found in schema map") + ) + } + + private val columnFamilyToKeySchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._1.length) + private val columnFamilyToValueSchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._2.length) + + private val rowConverter = { + val schema = SchemaUtil.getSourceSchema( + partition.sourceOptions, defaultKeySchema, defaultValueSchema, None, None) + CatalystTypeConverters.createToCatalystConverter(schema) + } + + protected lazy val provider: StateStoreProvider = { + val stateStoreId = StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString, + partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName) + val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId) + + val provider = StateStoreProvider.createAndInit( + stateStoreProviderId, defaultKeySchema, defaultValueSchema, keyStateEncoderSpec, + useColumnFamilies = false, storeConf, hadoopConf, + useMultipleValuesPerKey = false, stateSchemaProvider = None) + provider + } + + private lazy val stateStore: StateStore = { + val stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) { + Some(java.util.UUID.randomUUID.toString) + } else { + None + } + val version = partition.sourceOptions.batchId + 1 + // Create empty store to avoid loading old partition data during repartitioning + // Use loadEmpty=true to create a fresh state store without loading previous versions + // We create the empty store AT version, and the next commit will + // produce version + 1 + provider.getStore( + version, + stateStoreCkptId, + forceSnapshotOnCommit = true, + loadEmpty = true + ) + } + + // The function that writes and commits data to state store. It takes in rows with schema + // - partition_key, StructType + // - key_bytes, BinaryType + // - value_bytes, BinaryType + // - column_family_name, StringType + def put(partition: Iterator[Row]): Unit = { + partition.foreach(row => putRaw(row)) + stateStore.commit() + } + + private def putRaw(rawRecord: Row): Unit = { + val record = rowConverter(rawRecord).asInstanceOf[InternalRow] + // Validate record schema + if (record.numFields != 4) { Review Comment: just make this an assertion ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionWriter.scala: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.io.IOException + +import scala.collection.MapView +import scala.collection.immutable.HashMap + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil +import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types.StructType + +/** + * A writer that can directly write binary data to the streaming state store. + * + * This writer expects input rows with the same schema produced by + * StatePartitionAllColumnFamiliesReader: + * (partition_key, key_bytes, value_bytes, column_family_name) + * + * The writer creates a fresh (empty) state store instance for the target commit version + * instead of loading previous partition data. After writing all rows for the partition, it will + * commit all changes as a snapshot + */ +class StatePartitionAllColumnFamiliesWriter( + storeConf: StateStoreConf, + hadoopConf: Configuration, + partition: StateStoreInputPartition, + columnFamilyToSchemaMap: HashMap[String, (StructType, StructType)], + keyStateEncoderSpec: KeyStateEncoderSpec) { + + private val (defaultKeySchema, defaultValueSchema) = { + columnFamilyToSchemaMap.getOrElse( + StateStore.DEFAULT_COL_FAMILY_NAME, + throw new IllegalArgumentException( + s"Column family ${StateStore.DEFAULT_COL_FAMILY_NAME} not found in schema map") + ) + } + + private val columnFamilyToKeySchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._1.length) + private val columnFamilyToValueSchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._2.length) + + private val rowConverter = { + val schema = SchemaUtil.getSourceSchema( + partition.sourceOptions, defaultKeySchema, defaultValueSchema, None, None) + CatalystTypeConverters.createToCatalystConverter(schema) + } + + protected lazy val provider: StateStoreProvider = { + val stateStoreId = StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString, + partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName) + val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId) + + val provider = StateStoreProvider.createAndInit( + stateStoreProviderId, defaultKeySchema, defaultValueSchema, keyStateEncoderSpec, + useColumnFamilies = false, storeConf, hadoopConf, + useMultipleValuesPerKey = false, stateSchemaProvider = None) + provider + } + + private lazy val stateStore: StateStore = { + val stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) { + Some(java.util.UUID.randomUUID.toString) + } else { + None + } + val version = partition.sourceOptions.batchId + 1 + // Create empty store to avoid loading old partition data during repartitioning + // Use loadEmpty=true to create a fresh state store without loading previous versions + // We create the empty store AT version, and the next commit will + // produce version + 1 + provider.getStore( + version, + stateStoreCkptId, + forceSnapshotOnCommit = true, + loadEmpty = true + ) + } + + // The function that writes and commits data to state store. It takes in rows with schema + // - partition_key, StructType + // - key_bytes, BinaryType + // - value_bytes, BinaryType + // - column_family_name, StringType + def put(partition: Iterator[Row]): Unit = { + partition.foreach(row => putRaw(row)) Review Comment: wrap this in try and abort the store, if there is an error e.g. ``` try { partition.foreach(row => putRaw(row)) stateStore.commit() } finally { if (!store.hasCommitted) { stateStore.abort() } } ``` ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionWriter.scala: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.v2.state + +import java.io.IOException + +import scala.collection.MapView +import scala.collection.immutable.HashMap + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil +import org.apache.spark.sql.execution.streaming.state.{KeyStateEncoderSpec, StateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types.StructType + +/** + * A writer that can directly write binary data to the streaming state store. + * + * This writer expects input rows with the same schema produced by + * StatePartitionAllColumnFamiliesReader: + * (partition_key, key_bytes, value_bytes, column_family_name) + * + * The writer creates a fresh (empty) state store instance for the target commit version + * instead of loading previous partition data. After writing all rows for the partition, it will + * commit all changes as a snapshot + */ +class StatePartitionAllColumnFamiliesWriter( + storeConf: StateStoreConf, + hadoopConf: Configuration, + partition: StateStoreInputPartition, + columnFamilyToSchemaMap: HashMap[String, (StructType, StructType)], + keyStateEncoderSpec: KeyStateEncoderSpec) { + + private val (defaultKeySchema, defaultValueSchema) = { + columnFamilyToSchemaMap.getOrElse( + StateStore.DEFAULT_COL_FAMILY_NAME, + throw new IllegalArgumentException( + s"Column family ${StateStore.DEFAULT_COL_FAMILY_NAME} not found in schema map") + ) + } + + private val columnFamilyToKeySchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._1.length) + private val columnFamilyToValueSchemaLenMap: MapView[String, Int] = + columnFamilyToSchemaMap.view.mapValues(_._2.length) + + private val rowConverter = { + val schema = SchemaUtil.getSourceSchema( + partition.sourceOptions, defaultKeySchema, defaultValueSchema, None, None) + CatalystTypeConverters.createToCatalystConverter(schema) + } + + protected lazy val provider: StateStoreProvider = { + val stateStoreId = StateStoreId(partition.sourceOptions.stateCheckpointLocation.toString, + partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName) + val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId) + + val provider = StateStoreProvider.createAndInit( + stateStoreProviderId, defaultKeySchema, defaultValueSchema, keyStateEncoderSpec, + useColumnFamilies = false, storeConf, hadoopConf, + useMultipleValuesPerKey = false, stateSchemaProvider = None) + provider + } + + private lazy val stateStore: StateStore = { + val stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) { + Some(java.util.UUID.randomUUID.toString) + } else { + None + } + val version = partition.sourceOptions.batchId + 1 + // Create empty store to avoid loading old partition data during repartitioning + // Use loadEmpty=true to create a fresh state store without loading previous versions + // We create the empty store AT version, and the next commit will + // produce version + 1 + provider.getStore( + version, + stateStoreCkptId, + forceSnapshotOnCommit = true, + loadEmpty = true + ) + } + + // The function that writes and commits data to state store. It takes in rows with schema + // - partition_key, StructType + // - key_bytes, BinaryType + // - value_bytes, BinaryType + // - column_family_name, StringType + def put(partition: Iterator[Row]): Unit = { Review Comment: nit: `rows` instead of `partition` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
