HeartSaVioR commented on code in PR #47104: URL: https://github.com/apache/spark/pull/47104#discussion_r1668189431
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec} + +trait ColumnFamilySchemaUtils { + def getValueStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchema + + + def getListStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchema + Review Comment: nit: ditto ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2144,6 +2144,13 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(TimeUnit.MINUTES.toMillis(1)) // 1 minute + val STREAMING_TRANSFORM_WITH_STATE_OP_STATE_SCHEMA_VERSION = + buildConf("spark.sql.streaming.transformWithState.stateSchemaVersion") + .doc("The version of the state schema used by the transformWithState operator") + .version("4.0.0") + .intConf + .createWithDefault(3) Review Comment: Every operator has its own state schema version - version 2 of streaming aggregation and version 2 of stream-stream join are totally different one. That said, any reason to start this with 3? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec} + +trait ColumnFamilySchemaUtils { + def getValueStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchema + Review Comment: nit: let's not put unnecessary empty lines (multiple empty lines). one line is enough. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -131,17 +101,49 @@ class StateSchemaCompatibilityChecker( createSchemaFile(newKeySchema, newValueSchema) } else { // validate if the new schema is compatible with the existing schema - check(existingSchema.get, (newKeySchema, newValueSchema), ignoreValueSchema) + StateSchemaCompatibilityChecker. + check(existingSchema.get, (newKeySchema, newValueSchema), ignoreValueSchema) } } private def schemaFile(storeCpLocation: Path): Path = new Path(new Path(storeCpLocation, "_metadata"), "schema") } -object StateSchemaCompatibilityChecker { +object StateSchemaCompatibilityChecker extends Logging { val VERSION = 2 + Review Comment: nit: unnecessary empty line ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala: ########## @@ -246,7 +246,11 @@ case class StreamingSymmetricHashJoinExec( watermarkUsedForStateCleanup && watermarkHasChanged } - override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): Unit = { + override def validateAndMaybeEvolveStateSchema( + hadoopConf: Configuration, + batchId: Long, + stateSchemaVersion: Int + ): Array[String] = { Review Comment: nit: move this to one line up ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec} + +trait ColumnFamilySchemaUtils { + def getValueStateSchema[T]( Review Comment: nit: could be one liner? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec} + +trait ColumnFamilySchemaUtils { + def getValueStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchema + + + def getListStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchema + + + def getMapStateSchema[K, V]( + stateName: String, + userKeyEnc: Encoder[K], + hasTtl: Boolean): ColumnFamilySchema +} + +object ColumnFamilySchemaUtilsV1 extends ColumnFamilySchemaUtils { + + def getValueStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchemaV1 = { + new ColumnFamilySchemaV1( + stateName, + KEY_ROW_SCHEMA, + if (hasTtl) { + VALUE_ROW_SCHEMA_WITH_TTL + } else { + VALUE_ROW_SCHEMA + }, + NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA)) + } + + def getListStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchemaV1 = { + new ColumnFamilySchemaV1( + stateName, + KEY_ROW_SCHEMA, + if (hasTtl) { + VALUE_ROW_SCHEMA_WITH_TTL + } else { + VALUE_ROW_SCHEMA + }, + NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA)) + } + + def getMapStateSchema[K, V]( + stateName: String, + userKeyEnc: Encoder[K], + hasTtl: Boolean): ColumnFamilySchemaV1 = { + new ColumnFamilySchemaV1( Review Comment: nit: doesn't need `new` as it's a case class ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec} + +trait ColumnFamilySchemaUtils { + def getValueStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchema + + + def getListStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchema + + + def getMapStateSchema[K, V]( + stateName: String, + userKeyEnc: Encoder[K], + hasTtl: Boolean): ColumnFamilySchema +} + +object ColumnFamilySchemaUtilsV1 extends ColumnFamilySchemaUtils { + + def getValueStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchemaV1 = { + new ColumnFamilySchemaV1( Review Comment: nit: doesn't need `new` as it's a case class ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.{InputStream, OutputStream} +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.UUID + +import scala.io.{Source => IOSource} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion + +/** + * The StateSchemaV3File is used to write the schema of multiple column families. + * Right now, this is primarily used for the TransformWithState operator, which supports + * multiple column families to keep the data for multiple state variables. + * @param hadoopConf Hadoop configuration that is used to read / write metadata files. + * @param path Path to the directory that will be used for writing metadata. + */ +class StateSchemaV3File( + hadoopConf: Configuration, + path: String) { + + val VERSION = 3 + + val metadataPath = new Path(path) + + protected val fileManager: CheckpointFileManager = + CheckpointFileManager.create(metadataPath, hadoopConf) + + if (!fileManager.exists(metadataPath)) { + fileManager.mkdirs(metadataPath) + } + + def deserialize(in: InputStream): List[ColumnFamilySchema] = { + val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() + + if (!lines.hasNext) { + throw new IllegalStateException("Incomplete log file in the offset commit log") + } + + val version = lines.next().trim + validateVersion(version, VERSION) + + lines.map(ColumnFamilySchemaV1.fromJson).toList + } + + def serialize(schemas: List[ColumnFamilySchema], out: OutputStream): Unit = { + out.write(s"v${VERSION}".getBytes(UTF_8)) + out.write('\n') + out.write(schemas.map(_.json).mkString("\n").getBytes(UTF_8)) + } + + def addWithUUID(batchId: Long, metadata: List[ColumnFamilySchema]): Path = { + val batchMetadataPath = batchIdToPath(batchId) + val schemaFilePath = new Path(batchMetadataPath, UUID.randomUUID().toString) + fileManager.mkdirs(batchMetadataPath) + write(schemaFilePath, out => serialize(metadata, out)) + schemaFilePath + } + + def getWithPath(schemaFilePath: Path): List[ColumnFamilySchema] = { + deserialize(fileManager.open(schemaFilePath)) + } + + protected def write( + batchMetadataFile: Path, + fn: OutputStream => Unit): Unit = { + // Only write metadata when the batch has not yet been written Review Comment: We don't expect this to happen, given the addition of UUID. Even for the multiple retrials of the same batch, this will always create new file per trial. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.{InputStream, OutputStream} +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.UUID + +import scala.io.{Source => IOSource} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion + +/** + * The StateSchemaV3File is used to write the schema of multiple column families. + * Right now, this is primarily used for the TransformWithState operator, which supports + * multiple column families to keep the data for multiple state variables. + * @param hadoopConf Hadoop configuration that is used to read / write metadata files. + * @param path Path to the directory that will be used for writing metadata. + */ +class StateSchemaV3File( + hadoopConf: Configuration, + path: String) { + + val VERSION = 3 + + val metadataPath = new Path(path) + + protected val fileManager: CheckpointFileManager = + CheckpointFileManager.create(metadataPath, hadoopConf) + + if (!fileManager.exists(metadataPath)) { + fileManager.mkdirs(metadataPath) + } + + def deserialize(in: InputStream): List[ColumnFamilySchema] = { + val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() + + if (!lines.hasNext) { + throw new IllegalStateException("Incomplete log file in the offset commit log") + } + + val version = lines.next().trim + validateVersion(version, VERSION) + + lines.map(ColumnFamilySchemaV1.fromJson).toList + } + + def serialize(schemas: List[ColumnFamilySchema], out: OutputStream): Unit = { Review Comment: same here ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.{InputStream, OutputStream} +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.UUID + +import scala.io.{Source => IOSource} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion + +/** + * The StateSchemaV3File is used to write the schema of multiple column families. + * Right now, this is primarily used for the TransformWithState operator, which supports + * multiple column families to keep the data for multiple state variables. + * @param hadoopConf Hadoop configuration that is used to read / write metadata files. + * @param path Path to the directory that will be used for writing metadata. + */ +class StateSchemaV3File( + hadoopConf: Configuration, + path: String) { + + val VERSION = 3 + + val metadataPath = new Path(path) + + protected val fileManager: CheckpointFileManager = + CheckpointFileManager.create(metadataPath, hadoopConf) + + if (!fileManager.exists(metadataPath)) { + fileManager.mkdirs(metadataPath) + } + + def deserialize(in: InputStream): List[ColumnFamilySchema] = { Review Comment: descope (private or package private for testing) if this isn't an entry point for outside caller ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ########## @@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl( } } } + +/** + * This DriverStatefulProcessorHandleImpl is used within TransformWithExec + * on the driver side to collect the columnFamilySchemas before any processing is + * actually done. We need this class because we can only collect the schemas after + * the StatefulProcessor is initialized. + */ +class DriverStatefulProcessorHandleImpl(timeMode: TimeMode) + extends StatefulProcessorHandleImplBase(timeMode) { + + private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1 + + // Because this is only happening on the driver side, there is only + // one task modifying and accessing this map at a time + private[sql] val columnFamilySchemas: mutable.Map[String, ColumnFamilySchema] = + new mutable.HashMap[String, ColumnFamilySchema]() + + def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = columnFamilySchemas.toMap + + /** + * Function to add the ValueState schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @tparam T - type of state variable + * @return - instance of ValueState of type T that can be used to store state persistently + */ + override def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T] = { + verifyStateVarOperations("get_value_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getValueStateSchema(stateName, false) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ValueState[T]] + } + + /** + * Function to add the ValueStateWithTTL schema to the list of column family schemas. Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ########## @@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl( } } } + +/** + * This DriverStatefulProcessorHandleImpl is used within TransformWithExec + * on the driver side to collect the columnFamilySchemas before any processing is + * actually done. We need this class because we can only collect the schemas after + * the StatefulProcessor is initialized. + */ +class DriverStatefulProcessorHandleImpl(timeMode: TimeMode) + extends StatefulProcessorHandleImplBase(timeMode) { + + private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1 + + // Because this is only happening on the driver side, there is only + // one task modifying and accessing this map at a time + private[sql] val columnFamilySchemas: mutable.Map[String, ColumnFamilySchema] = + new mutable.HashMap[String, ColumnFamilySchema]() + + def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = columnFamilySchemas.toMap + + /** Review Comment: nit: Is it the same with the method doc in base interface/class? If then you can just skip. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ########## @@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl( } } } + +/** + * This DriverStatefulProcessorHandleImpl is used within TransformWithExec + * on the driver side to collect the columnFamilySchemas before any processing is + * actually done. We need this class because we can only collect the schemas after + * the StatefulProcessor is initialized. + */ +class DriverStatefulProcessorHandleImpl(timeMode: TimeMode) + extends StatefulProcessorHandleImplBase(timeMode) { + + private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1 + + // Because this is only happening on the driver side, there is only + // one task modifying and accessing this map at a time + private[sql] val columnFamilySchemas: mutable.Map[String, ColumnFamilySchema] = + new mutable.HashMap[String, ColumnFamilySchema]() + + def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = columnFamilySchemas.toMap + + /** + * Function to add the ValueState schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @tparam T - type of state variable + * @return - instance of ValueState of type T that can be used to store state persistently + */ + override def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T] = { + verifyStateVarOperations("get_value_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getValueStateSchema(stateName, false) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ValueState[T]] + } + + /** + * Function to add the ValueStateWithTTL schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @param ttlConfig - the ttl configuration (time to live duration etc.) + * @tparam T - type of state variable + * @return - instance of ValueState of type T that can be used to store state persistently + */ + override def getValueState[T]( + stateName: String, + valEncoder: Encoder[T], + ttlConfig: TTLConfig): ValueState[T] = { + verifyStateVarOperations("get_value_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getValueStateSchema(stateName, true) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ValueState[T]] + } + + /** + * Function to add the ListState schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @tparam T - type of state variable + * @return - instance of ListState of type T that can be used to store state persistently + */ + override def getListState[T](stateName: String, valEncoder: Encoder[T]): ListState[T] = { + verifyStateVarOperations("get_list_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getListStateSchema(stateName, false) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ListState[T]] + } + + /** + * Function to add the ListStateWithTTL schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @param ttlConfig - the ttl configuration (time to live duration etc.) + * @tparam T - type of state variable + * @return - instance of ListState of type T that can be used to store state persistently + */ + override def getListState[T]( + stateName: String, + valEncoder: Encoder[T], + ttlConfig: TTLConfig): ListState[T] = { + verifyStateVarOperations("get_list_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getListStateSchema(stateName, true) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ListState[T]] + } + + /** + * Function to add the MapState schema to the list of column family schemas. Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -92,6 +93,35 @@ case class TransformWithStateExec( } } + /** + * We initialize this processor handle in the driver to run the init function + * and fetch the schemas of the state variables initialized in this processor. + * @return a new instance of the driver processor handle + */ + private def getDriverProcessorHandle: DriverStatefulProcessorHandleImpl = { Review Comment: nit: let's use `()` explicitly if it's not just a property of class but doing some more thing ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -92,6 +93,35 @@ case class TransformWithStateExec( } } + /** + * We initialize this processor handle in the driver to run the init function + * and fetch the schemas of the state variables initialized in this processor. + * @return a new instance of the driver processor handle + */ + private def getDriverProcessorHandle: DriverStatefulProcessorHandleImpl = { + val driverProcessorHandle = new DriverStatefulProcessorHandleImpl(timeMode) + driverProcessorHandle.setHandleState(StatefulProcessorHandleState.PRE_INIT) + statefulProcessor.setHandle(driverProcessorHandle) + statefulProcessor.init(outputMode, timeMode) + driverProcessorHandle + } + + /** + * Fetching the columnFamilySchemas from the StatefulProcessorHandle + * after init is called. + */ + private def getColFamilySchemas(): Map[String, ColumnFamilySchema] = { + val driverProcessorHandle = getDriverProcessorHandle + val columnFamilySchemas = driverProcessorHandle.getColumnFamilySchemas + closeProcessorHandle() Review Comment: nit: Let's inline if it's only used here. It's just two lines. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -340,11 +370,47 @@ case class TransformWithStateExec( ) } - override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): Unit = { - // TODO: transformWithState is special because we don't have the schema of the state directly - // within the passed args. We need to gather this after running the init function - // within the stateful processor on the driver. This also requires a schema format change - // when recording this information persistently. + override def validateAndMaybeEvolveStateSchema( + hadoopConf: Configuration, + batchId: Long, + stateSchemaVersion: Int): + Array[String] = { + assert(stateSchemaVersion >= 3) + val newColumnFamilySchemas = getColFamilySchemas() + val schemaFile = new StateSchemaV3File( + hadoopConf, stateSchemaFilePath(StateStoreId.DEFAULT_STORE_NAME).toString) + // TODO: Read the schema path from the OperatorStateMetadata file Review Comment: Same, better to have explicit JIRA ticket for every TODO. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ########## @@ -187,23 +189,29 @@ class IncrementalExecution( } } - object WriteStatefulOperatorMetadataRule extends SparkPlanPartialRule { - override val rule: PartialFunction[SparkPlan, SparkPlan] = { - case stateStoreWriter: StateStoreWriter if isFirstBatch => - val metadata = stateStoreWriter.operatorStateMetadata() - val metadataWriter = new OperatorStateMetadataWriter(new Path( - checkpointLocation, stateStoreWriter.getStateInfo.operatorId.toString), hadoopConf) - metadataWriter.write(metadata) - stateStoreWriter - } - } - // Planning rule used to record the state schema for the first run and validate state schema // changes across query runs. - object StateSchemaValidationRule extends SparkPlanPartialRule { + object StateSchemaAndOperatorMetadataRule extends SparkPlanPartialRule { override val rule: PartialFunction[SparkPlan, SparkPlan] = { + // In the case of TransformWithStateExec, we want to collect this StateSchema + // filepath, and write this path out in the OperatorStateMetadata file case statefulOp: StatefulOperator if isFirstBatch => - statefulOp.validateAndMaybeEvolveStateSchema(hadoopConf) + val stateSchemaVersion = statefulOp match { + case _: TransformWithStateExec => sparkSession.sessionState.conf. + getConf(SQLConf.STREAMING_TRANSFORM_WITH_STATE_OP_STATE_SCHEMA_VERSION) + case _ => STATE_SCHEMA_DEFAULT_VERSION + } + val stateSchemaPaths = statefulOp. + validateAndMaybeEvolveStateSchema(hadoopConf, currentBatchId, stateSchemaVersion) + // write out the state schema paths to the metadata file + statefulOp match { + case stateStoreWriter: StateStoreWriter => + val metadata = stateStoreWriter.operatorStateMetadata() + // TODO: Populate metadata with stateSchemaPaths if metadata version is v2 Review Comment: Also what is the plan for non-transformWithState? Will they be stick to metadata version v1, or will we track the schema file for them as well? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -340,11 +370,47 @@ case class TransformWithStateExec( ) } - override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): Unit = { - // TODO: transformWithState is special because we don't have the schema of the state directly - // within the passed args. We need to gather this after running the init function - // within the stateful processor on the driver. This also requires a schema format change - // when recording this information persistently. + override def validateAndMaybeEvolveStateSchema( + hadoopConf: Configuration, + batchId: Long, + stateSchemaVersion: Int): + Array[String] = { Review Comment: nit: move one line up ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec} + +trait ColumnFamilySchemaUtils { + def getValueStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchema + + + def getListStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchema + + + def getMapStateSchema[K, V]( + stateName: String, + userKeyEnc: Encoder[K], + hasTtl: Boolean): ColumnFamilySchema +} + +object ColumnFamilySchemaUtilsV1 extends ColumnFamilySchemaUtils { + + def getValueStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchemaV1 = { + new ColumnFamilySchemaV1( + stateName, + KEY_ROW_SCHEMA, + if (hasTtl) { + VALUE_ROW_SCHEMA_WITH_TTL + } else { + VALUE_ROW_SCHEMA + }, + NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA)) + } + + def getListStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchemaV1 = { + new ColumnFamilySchemaV1( Review Comment: nit: doesn't need `new` as it's a case class ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala: ########## @@ -28,6 +33,57 @@ import org.apache.spark.util.Utils /** * Helper classes for reading/writing state schema. */ +sealed trait ColumnFamilySchema extends Serializable { + def jsonValue: JValue + + def json: String + + def columnFamilyName: String +} + +case class ColumnFamilySchemaV1( + columnFamilyName: String, + keySchema: StructType, + valueSchema: StructType, + keyStateEncoderSpec: KeyStateEncoderSpec, + userKeyEncoder: Option[StructType] = None) extends ColumnFamilySchema { + def jsonValue: JValue = { + ("columnFamilyName" -> JString(columnFamilyName)) ~ + ("keySchema" -> JString(keySchema.json)) ~ + ("valueSchema" -> JString(valueSchema.json)) ~ + ("keyStateEncoderSpec" -> keyStateEncoderSpec.jsonValue) ~ + ("userKeyEncoder" -> userKeyEncoder.map(s => JString(s.json)).getOrElse(JNothing)) + } + + def json: String = { + compact(render(jsonValue)) + } +} + +object ColumnFamilySchemaV1 { + + /** + * Create a ColumnFamilySchemaV1 object from the Json string + * This function is to read the StateSchemaV3 file + */ + def fromJson(json: String): ColumnFamilySchema = { + implicit val formats: DefaultFormats.type = DefaultFormats + val colFamilyMap = JsonMethods.parse(json).extract[Map[String, Any]] + assert(colFamilyMap.isInstanceOf[Map[_, _]], + s"Expected Map but got ${colFamilyMap.getClass}") + val keySchema = StructType.fromString(colFamilyMap("keySchema").asInstanceOf[String]) + val valueSchema = StructType.fromString(colFamilyMap("valueSchema").asInstanceOf[String]) + new ColumnFamilySchemaV1( Review Comment: nit: doesn't need `new` as it's a case class ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -278,16 +282,45 @@ case class StateStoreCustomTimingMetric(name: String, desc: String) extends Stat SQLMetrics.createTimingMetric(sparkContext, desc) } -sealed trait KeyStateEncoderSpec +sealed trait KeyStateEncoderSpec { + def jsonValue: JValue + def json: String = compact(render(jsonValue)) +} -case class NoPrefixKeyStateEncoderSpec(keySchema: StructType) extends KeyStateEncoderSpec +object KeyStateEncoderSpec { + def fromJson(keySchema: StructType, m: Map[String, Any]): KeyStateEncoderSpec = { + // match on type + m("keyStateEncoderType").asInstanceOf[String] match { Review Comment: IMO, except the first routing, instantiation of the spec from json must happen in its own implementation. (companion object, for example) This way we can couple with the serialization and deserialization together. But I get that it will put more boilerplate code, I'll leave this as an area of preference. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.{InputStream, OutputStream} +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.UUID + +import scala.io.{Source => IOSource} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion + +/** + * The StateSchemaV3File is used to write the schema of multiple column families. + * Right now, this is primarily used for the TransformWithState operator, which supports + * multiple column families to keep the data for multiple state variables. + * @param hadoopConf Hadoop configuration that is used to read / write metadata files. + * @param path Path to the directory that will be used for writing metadata. + */ +class StateSchemaV3File( Review Comment: What is the version policy for State schema file and ColumnFamilySchema? Do we expect bumping the version for state schema file when we need to bump the version for ColumnFamilySchema? Or will we consider this separately and provide separate versioning for ColumnFamilySchema? If we expect former, please explicitly call this out. If we expect latter, please put the version information for ColumnFamilySchema into the file, so that we can read based on its own versioning. Also, this is certainly diverged with previous versions. Is there any chance for us to generalize/normalize? E.g. read v1/v2 from file and instantiate v3, and vice versa (store scoped v3 - only default cf - to file as v1/v2). We would eventually want to see this to be combined rather than scattered. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec} + +trait ColumnFamilySchemaUtils { + def getValueStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchema + + + def getListStateSchema[T]( Review Comment: nit: could be one liner? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ########## @@ -187,23 +189,29 @@ class IncrementalExecution( } } - object WriteStatefulOperatorMetadataRule extends SparkPlanPartialRule { - override val rule: PartialFunction[SparkPlan, SparkPlan] = { - case stateStoreWriter: StateStoreWriter if isFirstBatch => - val metadata = stateStoreWriter.operatorStateMetadata() - val metadataWriter = new OperatorStateMetadataWriter(new Path( - checkpointLocation, stateStoreWriter.getStateInfo.operatorId.toString), hadoopConf) - metadataWriter.write(metadata) - stateStoreWriter - } - } - // Planning rule used to record the state schema for the first run and validate state schema // changes across query runs. - object StateSchemaValidationRule extends SparkPlanPartialRule { + object StateSchemaAndOperatorMetadataRule extends SparkPlanPartialRule { override val rule: PartialFunction[SparkPlan, SparkPlan] = { + // In the case of TransformWithStateExec, we want to collect this StateSchema + // filepath, and write this path out in the OperatorStateMetadata file case statefulOp: StatefulOperator if isFirstBatch => - statefulOp.validateAndMaybeEvolveStateSchema(hadoopConf) + val stateSchemaVersion = statefulOp match { + case _: TransformWithStateExec => sparkSession.sessionState.conf. + getConf(SQLConf.STREAMING_TRANSFORM_WITH_STATE_OP_STATE_SCHEMA_VERSION) + case _ => STATE_SCHEMA_DEFAULT_VERSION + } + val stateSchemaPaths = statefulOp. + validateAndMaybeEvolveStateSchema(hadoopConf, currentBatchId, stateSchemaVersion) + // write out the state schema paths to the metadata file + statefulOp match { + case stateStoreWriter: StateStoreWriter => + val metadata = stateStoreWriter.operatorStateMetadata() + // TODO: Populate metadata with stateSchemaPaths if metadata version is v2 Review Comment: Let's be sure to file a JIRA ticket and put the ticket number here. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.{InputStream, OutputStream} +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.UUID + +import scala.io.{Source => IOSource} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion + +/** + * The StateSchemaV3File is used to write the schema of multiple column families. + * Right now, this is primarily used for the TransformWithState operator, which supports + * multiple column families to keep the data for multiple state variables. + * @param hadoopConf Hadoop configuration that is used to read / write metadata files. + * @param path Path to the directory that will be used for writing metadata. + */ +class StateSchemaV3File( + hadoopConf: Configuration, + path: String) { + + val VERSION = 3 + + val metadataPath = new Path(path) + + protected val fileManager: CheckpointFileManager = + CheckpointFileManager.create(metadataPath, hadoopConf) + + if (!fileManager.exists(metadataPath)) { + fileManager.mkdirs(metadataPath) + } + + def deserialize(in: InputStream): List[ColumnFamilySchema] = { + val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() + + if (!lines.hasNext) { + throw new IllegalStateException("Incomplete log file in the offset commit log") + } + + val version = lines.next().trim + validateVersion(version, VERSION) + + lines.map(ColumnFamilySchemaV1.fromJson).toList + } + + def serialize(schemas: List[ColumnFamilySchema], out: OutputStream): Unit = { + out.write(s"v${VERSION}".getBytes(UTF_8)) + out.write('\n') + out.write(schemas.map(_.json).mkString("\n").getBytes(UTF_8)) + } + + def addWithUUID(batchId: Long, metadata: List[ColumnFamilySchema]): Path = { + val batchMetadataPath = batchIdToPath(batchId) + val schemaFilePath = new Path(batchMetadataPath, UUID.randomUUID().toString) + fileManager.mkdirs(batchMetadataPath) + write(schemaFilePath, out => serialize(metadata, out)) + schemaFilePath + } + + def getWithPath(schemaFilePath: Path): List[ColumnFamilySchema] = { + deserialize(fileManager.open(schemaFilePath)) + } + + protected def write( + batchMetadataFile: Path, + fn: OutputStream => Unit): Unit = { + // Only write metadata when the batch has not yet been written Review Comment: We don't expect this to happen, given the addition of UUID. Even for the multiple retrials of the same batch, this will always create new file per trial. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -201,5 +203,6 @@ object StateSchemaCompatibilityChecker { if (storeConf.stateSchemaCheckEnabled && result.isDefined) { throw result.get } + Array(checker.schemaFileLocation.toString) Review Comment: I guess this won't be used for v3 as the schema file location is meant to be UUID postfixed. Do I understand correctly? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.{InputStream, OutputStream} +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.UUID + +import scala.io.{Source => IOSource} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion + +/** + * The StateSchemaV3File is used to write the schema of multiple column families. + * Right now, this is primarily used for the TransformWithState operator, which supports + * multiple column families to keep the data for multiple state variables. + * @param hadoopConf Hadoop configuration that is used to read / write metadata files. + * @param path Path to the directory that will be used for writing metadata. + */ +class StateSchemaV3File( + hadoopConf: Configuration, + path: String) { + + val VERSION = 3 + + val metadataPath = new Path(path) + + protected val fileManager: CheckpointFileManager = + CheckpointFileManager.create(metadataPath, hadoopConf) + + if (!fileManager.exists(metadataPath)) { + fileManager.mkdirs(metadataPath) + } + + def deserialize(in: InputStream): List[ColumnFamilySchema] = { + val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() + + if (!lines.hasNext) { + throw new IllegalStateException("Incomplete log file in the offset commit log") + } + + val version = lines.next().trim + validateVersion(version, VERSION) + + lines.map(ColumnFamilySchemaV1.fromJson).toList + } + + def serialize(schemas: List[ColumnFamilySchema], out: OutputStream): Unit = { + out.write(s"v${VERSION}".getBytes(UTF_8)) + out.write('\n') + out.write(schemas.map(_.json).mkString("\n").getBytes(UTF_8)) + } + + def addWithUUID(batchId: Long, metadata: List[ColumnFamilySchema]): Path = { + val batchMetadataPath = batchIdToPath(batchId) Review Comment: Can we clarify why we need additional directory per each write? Have we considered about simply using files in the same directory? The pointers for valid schema files are available to checkpoint anyway, so I'm yet to understand what's the benefit of doing this - this adds more cost on deletion (if the storage requires us to delete the directory separately). ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaV3File.scala: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.{InputStream, OutputStream} +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.UUID + +import scala.io.{Source => IOSource} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.MetadataVersionUtil.validateVersion + +/** + * The StateSchemaV3File is used to write the schema of multiple column families. + * Right now, this is primarily used for the TransformWithState operator, which supports + * multiple column families to keep the data for multiple state variables. + * @param hadoopConf Hadoop configuration that is used to read / write metadata files. + * @param path Path to the directory that will be used for writing metadata. + */ +class StateSchemaV3File( + hadoopConf: Configuration, + path: String) { + + val VERSION = 3 Review Comment: Best practice for constants is to place them in the companion object. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ########## @@ -85,6 +85,8 @@ class IncrementalExecution( .map(SQLConf.SHUFFLE_PARTITIONS.valueConverter) .getOrElse(sparkSession.sessionState.conf.numShufflePartitions) + private val STATE_SCHEMA_DEFAULT_VERSION: Int = 2 Review Comment: Let's put the code comment for what this means and what operators are applied vs not applied. (For this case, can simply say `except transformWithState`) ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ########## @@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl( } } } + +/** + * This DriverStatefulProcessorHandleImpl is used within TransformWithExec + * on the driver side to collect the columnFamilySchemas before any processing is + * actually done. We need this class because we can only collect the schemas after + * the StatefulProcessor is initialized. + */ +class DriverStatefulProcessorHandleImpl(timeMode: TimeMode) + extends StatefulProcessorHandleImplBase(timeMode) { + + private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1 + + // Because this is only happening on the driver side, there is only + // one task modifying and accessing this map at a time + private[sql] val columnFamilySchemas: mutable.Map[String, ColumnFamilySchema] = + new mutable.HashMap[String, ColumnFamilySchema]() + + def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = columnFamilySchemas.toMap + + /** + * Function to add the ValueState schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @tparam T - type of state variable + * @return - instance of ValueState of type T that can be used to store state persistently + */ + override def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T] = { + verifyStateVarOperations("get_value_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getValueStateSchema(stateName, false) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ValueState[T]] + } + + /** + * Function to add the ValueStateWithTTL schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @param ttlConfig - the ttl configuration (time to live duration etc.) + * @tparam T - type of state variable + * @return - instance of ValueState of type T that can be used to store state persistently + */ + override def getValueState[T]( + stateName: String, + valEncoder: Encoder[T], + ttlConfig: TTLConfig): ValueState[T] = { + verifyStateVarOperations("get_value_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getValueStateSchema(stateName, true) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ValueState[T]] + } + + /** + * Function to add the ListState schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @tparam T - type of state variable + * @return - instance of ListState of type T that can be used to store state persistently + */ + override def getListState[T](stateName: String, valEncoder: Encoder[T]): ListState[T] = { + verifyStateVarOperations("get_list_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getListStateSchema(stateName, false) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ListState[T]] + } + + /** + * Function to add the ListStateWithTTL schema to the list of column family schemas. Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala: ########## @@ -28,6 +33,57 @@ import org.apache.spark.util.Utils /** * Helper classes for reading/writing state schema. */ +sealed trait ColumnFamilySchema extends Serializable { + def jsonValue: JValue + + def json: String + + def columnFamilyName: String +} + +case class ColumnFamilySchemaV1( + columnFamilyName: String, + keySchema: StructType, + valueSchema: StructType, + keyStateEncoderSpec: KeyStateEncoderSpec, + userKeyEncoder: Option[StructType] = None) extends ColumnFamilySchema { Review Comment: I feel like userKeyEncoder is too coupled with transformWithState, but I couldn't imagine we would have more complicated case of stateful operator implementation, so sounds like OK to me. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA, KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL} +import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec} + +trait ColumnFamilySchemaUtils { + def getValueStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchema + + + def getListStateSchema[T]( + stateName: String, + hasTtl: Boolean): ColumnFamilySchema + + + def getMapStateSchema[K, V]( + stateName: String, + userKeyEnc: Encoder[K], + hasTtl: Boolean): ColumnFamilySchema +} + +object ColumnFamilySchemaUtilsV1 extends ColumnFamilySchemaUtils { + + def getValueStateSchema[T]( Review Comment: Will we have a corresponding metadata information for the operator? Here the schema information does not cover how state data source reader can read the state back. I assume we will have another code change for supporting state data source reader, but just wanted to double check. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ########## @@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl( } } } + +/** + * This DriverStatefulProcessorHandleImpl is used within TransformWithExec + * on the driver side to collect the columnFamilySchemas before any processing is + * actually done. We need this class because we can only collect the schemas after + * the StatefulProcessor is initialized. + */ +class DriverStatefulProcessorHandleImpl(timeMode: TimeMode) + extends StatefulProcessorHandleImplBase(timeMode) { + + private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1 + + // Because this is only happening on the driver side, there is only + // one task modifying and accessing this map at a time + private[sql] val columnFamilySchemas: mutable.Map[String, ColumnFamilySchema] = + new mutable.HashMap[String, ColumnFamilySchema]() + + def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = columnFamilySchemas.toMap + + /** + * Function to add the ValueState schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @tparam T - type of state variable + * @return - instance of ValueState of type T that can be used to store state persistently + */ + override def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T] = { + verifyStateVarOperations("get_value_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getValueStateSchema(stateName, false) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ValueState[T]] + } + + /** + * Function to add the ValueStateWithTTL schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @param ttlConfig - the ttl configuration (time to live duration etc.) + * @tparam T - type of state variable + * @return - instance of ValueState of type T that can be used to store state persistently + */ + override def getValueState[T]( + stateName: String, + valEncoder: Encoder[T], + ttlConfig: TTLConfig): ValueState[T] = { + verifyStateVarOperations("get_value_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getValueStateSchema(stateName, true) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ValueState[T]] + } + + /** + * Function to add the ListState schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @tparam T - type of state variable + * @return - instance of ListState of type T that can be used to store state persistently + */ + override def getListState[T](stateName: String, valEncoder: Encoder[T]): ListState[T] = { + verifyStateVarOperations("get_list_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getListStateSchema(stateName, false) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ListState[T]] + } + + /** + * Function to add the ListStateWithTTL schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @param ttlConfig - the ttl configuration (time to live duration etc.) + * @tparam T - type of state variable + * @return - instance of ListState of type T that can be used to store state persistently + */ + override def getListState[T]( + stateName: String, + valEncoder: Encoder[T], + ttlConfig: TTLConfig): ListState[T] = { + verifyStateVarOperations("get_list_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getListStateSchema(stateName, true) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ListState[T]] + } + + /** + * Function to add the MapState schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * @param stateName - name of the state variable + * @param userKeyEnc - spark sql encoder for the map key + * @param valEncoder - spark sql encoder for the map value + * @tparam K - type of key for map state variable + * @tparam V - type of value for map state variable + * @return - instance of MapState of type [K,V] that can be used to store state persistently + */ + override def getMapState[K, V]( + stateName: String, + userKeyEnc: Encoder[K], + valEncoder: Encoder[V]): MapState[K, V] = { + verifyStateVarOperations("get_map_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getMapStateSchema(stateName, userKeyEnc, false) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[MapState[K, V]] + } + + /** + * Function to add the MapStateWithTTL schema to the list of column family schemas. Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ########## @@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl( } } } + +/** + * This DriverStatefulProcessorHandleImpl is used within TransformWithExec + * on the driver side to collect the columnFamilySchemas before any processing is + * actually done. We need this class because we can only collect the schemas after + * the StatefulProcessor is initialized. + */ +class DriverStatefulProcessorHandleImpl(timeMode: TimeMode) + extends StatefulProcessorHandleImplBase(timeMode) { + + private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1 + + // Because this is only happening on the driver side, there is only + // one task modifying and accessing this map at a time + private[sql] val columnFamilySchemas: mutable.Map[String, ColumnFamilySchema] = + new mutable.HashMap[String, ColumnFamilySchema]() + + def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = columnFamilySchemas.toMap + + /** + * Function to add the ValueState schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @tparam T - type of state variable + * @return - instance of ValueState of type T that can be used to store state persistently + */ + override def getValueState[T](stateName: String, valEncoder: Encoder[T]): ValueState[T] = { + verifyStateVarOperations("get_value_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getValueStateSchema(stateName, false) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ValueState[T]] + } + + /** + * Function to add the ValueStateWithTTL schema to the list of column family schemas. + * The user must ensure to call this function only within the `init()` method of the + * StatefulProcessor. + * + * @param stateName - name of the state variable + * @param valEncoder - SQL encoder for state variable + * @param ttlConfig - the ttl configuration (time to live duration etc.) + * @tparam T - type of state variable + * @return - instance of ValueState of type T that can be used to store state persistently + */ + override def getValueState[T]( + stateName: String, + valEncoder: Encoder[T], + ttlConfig: TTLConfig): ValueState[T] = { + verifyStateVarOperations("get_value_state", PRE_INIT) + val colFamilySchema = columnFamilySchemaUtils. + getValueStateSchema(stateName, true) + columnFamilySchemas.put(stateName, colFamilySchema) + null.asInstanceOf[ValueState[T]] + } + + /** + * Function to add the ListState schema to the list of column family schemas. Review Comment: ditto ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -340,11 +370,47 @@ case class TransformWithStateExec( ) } - override def validateAndMaybeEvolveStateSchema(hadoopConf: Configuration): Unit = { - // TODO: transformWithState is special because we don't have the schema of the state directly - // within the passed args. We need to gather this after running the init function - // within the stateful processor on the driver. This also requires a schema format change - // when recording this information persistently. + override def validateAndMaybeEvolveStateSchema( + hadoopConf: Configuration, + batchId: Long, + stateSchemaVersion: Int): + Array[String] = { + assert(stateSchemaVersion >= 3) + val newColumnFamilySchemas = getColFamilySchemas() + val schemaFile = new StateSchemaV3File( + hadoopConf, stateSchemaFilePath(StateStoreId.DEFAULT_STORE_NAME).toString) + // TODO: Read the schema path from the OperatorStateMetadata file + // and validate it with the new schema + + // Write the new schema to the schema file + val schemaPath = schemaFile.addWithUUID(batchId, newColumnFamilySchemas.values.toList) + Array(schemaPath.toString) + } + + private def validateSchemas( + oldSchemas: List[ColumnFamilySchema], + newSchemas: Map[String, ColumnFamilySchema]): Unit = { + oldSchemas.foreach { case oldSchema: ColumnFamilySchemaV1 => + newSchemas.get(oldSchema.columnFamilyName).foreach { + case newSchema: ColumnFamilySchemaV1 => + StateSchemaCompatibilityChecker.check( + (oldSchema.keySchema, oldSchema.valueSchema), + (newSchema.keySchema, newSchema.valueSchema), + ignoreValueSchema = false + ) + } + } + } + + private def stateSchemaFilePath(storeName: String): Path = { Review Comment: nit: `Dir` path to have less confusion. ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ########## Review Comment: Can we have round-trip serde tests for all specs <-> json? Here we have only tests leveraging single spec and I don't think it's sufficient. You can put them to the separate test suite to avoid adding further complexity of test into here. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ########## @@ -313,3 +290,176 @@ class StatefulProcessorHandleImpl( } } } + +/** + * This DriverStatefulProcessorHandleImpl is used within TransformWithExec + * on the driver side to collect the columnFamilySchemas before any processing is + * actually done. We need this class because we can only collect the schemas after + * the StatefulProcessor is initialized. + */ +class DriverStatefulProcessorHandleImpl(timeMode: TimeMode) + extends StatefulProcessorHandleImplBase(timeMode) { + + private[sql] val columnFamilySchemaUtils = ColumnFamilySchemaUtilsV1 + + // Because this is only happening on the driver side, there is only + // one task modifying and accessing this map at a time + private[sql] val columnFamilySchemas: mutable.Map[String, ColumnFamilySchema] = + new mutable.HashMap[String, ColumnFamilySchema]() + + def getColumnFamilySchemas: Map[String, ColumnFamilySchema] = columnFamilySchemas.toMap + + /** Review Comment: Btw, I'm not sure users will try to do something except assigning the returning value to the field, but if they are ever doing something, this will lead to NPE. Probably need to document this in the method doc of base class? cc. @anishshri-db ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -92,6 +93,35 @@ case class TransformWithStateExec( } } + /** + * We initialize this processor handle in the driver to run the init function + * and fetch the schemas of the state variables initialized in this processor. + * @return a new instance of the driver processor handle + */ + private def getDriverProcessorHandle: DriverStatefulProcessorHandleImpl = { + val driverProcessorHandle = new DriverStatefulProcessorHandleImpl(timeMode) + driverProcessorHandle.setHandleState(StatefulProcessorHandleState.PRE_INIT) + statefulProcessor.setHandle(driverProcessorHandle) + statefulProcessor.init(outputMode, timeMode) + driverProcessorHandle + } + + /** + * Fetching the columnFamilySchemas from the StatefulProcessorHandle + * after init is called. + */ + private def getColFamilySchemas(): Map[String, ColumnFamilySchema] = { + val driverProcessorHandle = getDriverProcessorHandle + val columnFamilySchemas = driverProcessorHandle.getColumnFamilySchemas + closeProcessorHandle() + columnFamilySchemas + } + + private def closeProcessorHandle(): Unit = { + statefulProcessor.close() Review Comment: It's not only closing the temporal processor handle, right? Will the statefulProcessor instance be ever reused (in driver, and executor with serialized), or is it clearly isolated? Probably better to make clear with this so that future reader won't be confused. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
