HeartSaVioR commented on a change in pull request #32788: URL: https://github.com/apache/spark/pull/32788#discussion_r646231048
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaReader.scala ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.hadoop.fs.FSDataInputStream + +import org.apache.spark.sql.types.StructType + +sealed trait SchemaReader { + final def read(inputStream: FSDataInputStream): (StructType, StructType) = Review comment: Would you mind if I ask the rationalization of having two different methods which are doing the same? What are benefits here compared to just define the interface method and let SchemaV*Reader implements it? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaV2Writer.scala ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.StringReader + +import org.apache.hadoop.fs.FSDataOutputStream + +import org.apache.spark.sql.types.StructType + +class SchemaV2Writer extends SchemaWriter { Review comment: Shall we do the same with read path? And, why not consolidating reader and writer into one? It would be pretty much easier to check whether the read path and write path are implemented properly (compatible) for specific version if these are in the same class. Now it's scattered to the two different classes with two different files. I guess it's not to add SchemaV1Writer into the code path (only leaving for test path), but personally I don't consider it as huge risk. If we really want to avoid, we could simply just throw exception for V1 write path. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaReader.scala ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.hadoop.fs.FSDataInputStream + +import org.apache.spark.sql.types.StructType + +sealed trait SchemaReader { + final def read(inputStream: FSDataInputStream): (StructType, StructType) = + readSchema(inputStream) + protected def readSchema(inputStream: FSDataInputStream): (StructType, StructType) +} + +class SchemaV1Reader extends SchemaReader { + def readSchema(inputStream: FSDataInputStream): (StructType, StructType) = { + val keySchemaStr = inputStream.readUTF() + val valueSchemaStr = inputStream.readUTF() + (StructType.fromString(keySchemaStr), StructType.fromString(valueSchemaStr)) + } +} + +class SchemaV2Reader extends SchemaReader { + def readSchema(inputStream: FSDataInputStream): (StructType, StructType) = { + val buf = new StringBuilder Review comment: It'd be nice if we just write at once (like calling getBytes() and writing length -> byte array), but I'd wonder about the backward compatibility then, so this approach looks OK. (That was the reason I picked writeUTF at that time, but I might be over-thought.) ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala ########## @@ -71,20 +72,16 @@ class StateSchemaCompatibilityChecker( private def schemasCompatible(storedSchema: StructType, schema: StructType): Boolean = DataType.equalsIgnoreNameAndCompatibleNullability(storedSchema, schema) - private def readSchemaFile(): (StructType, StructType) = { + // Visible for testing + private[sql] def readSchemaFile(): (StructType, StructType) = { val inStream = fm.open(schemaFileLocation) try { val versionStr = inStream.readUTF() - // Currently we only support version 1, which we can simplify the version validation and - // the parse logic. val version = MetadataVersionUtil.validateVersion(versionStr, StateSchemaCompatibilityChecker.VERSION) - require(version == 1) - - val keySchemaStr = inStream.readUTF() - val valueSchemaStr = inStream.readUTF() - - (StructType.fromString(keySchemaStr), StructType.fromString(valueSchemaStr)) + require(1 <= version && version <= StateSchemaCompatibilityChecker.VERSION) Review comment: `MetadataVersionUtil.validateVersion` does the thing actually. We should also have the check in reader factory as well so kind of redundant. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaV2Writer.scala ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import java.io.StringReader + +import org.apache.hadoop.fs.FSDataOutputStream + +import org.apache.spark.sql.types.StructType + +class SchemaV2Writer extends SchemaWriter { + + val version = SchemaV2Writer.VERSION + + def writeSchema( + keySchema: StructType, + valueSchema: StructType, + outputStream: FSDataOutputStream): Unit = { + val buf = new Array[Char](SchemaV2Writer.MAX_UTF_CHUNK_SIZE) + + // DataOutputStream.writeUTF can't write a string at once + // if the size exceeds 65535 (2^16 - 1) bytes. + // So a key as well as a value consist of multiple chunks in schema version 2. + val keySchemaJson = keySchema.json + val numKeyChunks = + (keySchemaJson.length -1) / SchemaV2Writer.MAX_UTF_CHUNK_SIZE + 1 Review comment: nit: space between - and 1 ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaWriter.scala ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import org.apache.hadoop.fs.FSDataOutputStream + +import org.apache.spark.sql.types.StructType + +trait SchemaWriter { + val version: Int + + final def write( Review comment: Unlike the read path, this makes sense as we want to be consistent on writing version. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaReaderFactory.scala ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +object SchemaReaderFactory { Review comment: Looks like the pattern we did for such case is defining object as same as trait name and placing it in the same file. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithStateExecHelper.scala The case of FlatMapGroupsWithStateExecHelper is slightly different (the name for the factory and trait are different) but still placed in the same file, and even placing trait under the object. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
