HeartSaVioR commented on a change in pull request #32788:
URL: https://github.com/apache/spark/pull/32788#discussion_r646231048



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaReader.scala
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.hadoop.fs.FSDataInputStream
+
+import org.apache.spark.sql.types.StructType
+
+sealed trait SchemaReader {
+  final def read(inputStream: FSDataInputStream): (StructType, StructType) =

Review comment:
       Would you mind if I ask the rationalization of having two different 
methods which are doing the same? What are benefits here compared to just 
define the interface method and let SchemaV*Reader implements it?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaV2Writer.scala
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.StringReader
+
+import org.apache.hadoop.fs.FSDataOutputStream
+
+import org.apache.spark.sql.types.StructType
+
+class SchemaV2Writer extends SchemaWriter {

Review comment:
       Shall we do the same with read path?
   
   And, why not consolidating reader and writer into one? It would be pretty 
much easier to check whether the read path and write path are implemented 
properly (compatible) for specific version if these are in the same class. Now 
it's scattered to the two different classes with two different files.
   
   I guess it's not to add SchemaV1Writer into the code path (only leaving for 
test path), but personally I don't consider it as huge risk. If we really want 
to avoid, we could simply just throw exception for V1 write path.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaReader.scala
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.hadoop.fs.FSDataInputStream
+
+import org.apache.spark.sql.types.StructType
+
+sealed trait SchemaReader {
+  final def read(inputStream: FSDataInputStream): (StructType, StructType) =
+    readSchema(inputStream)
+  protected def readSchema(inputStream: FSDataInputStream): (StructType, 
StructType)
+}
+
+class SchemaV1Reader extends SchemaReader {
+  def readSchema(inputStream: FSDataInputStream): (StructType, StructType) = {
+    val keySchemaStr = inputStream.readUTF()
+    val valueSchemaStr = inputStream.readUTF()
+    (StructType.fromString(keySchemaStr), 
StructType.fromString(valueSchemaStr))
+  }
+}
+
+class SchemaV2Reader extends SchemaReader {
+  def readSchema(inputStream: FSDataInputStream): (StructType, StructType) = {
+    val buf = new StringBuilder

Review comment:
       It'd be nice if we just write at once (like calling getBytes() and 
writing length -> byte array), but I'd wonder about the backward compatibility 
then, so this approach looks OK.
   (That was the reason I picked writeUTF at that time, but I might be 
over-thought.)

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
##########
@@ -71,20 +72,16 @@ class StateSchemaCompatibilityChecker(
   private def schemasCompatible(storedSchema: StructType, schema: StructType): 
Boolean =
     DataType.equalsIgnoreNameAndCompatibleNullability(storedSchema, schema)
 
-  private def readSchemaFile(): (StructType, StructType) = {
+  // Visible for testing
+  private[sql] def readSchemaFile(): (StructType, StructType) = {
     val inStream = fm.open(schemaFileLocation)
     try {
       val versionStr = inStream.readUTF()
-      // Currently we only support version 1, which we can simplify the 
version validation and
-      // the parse logic.
       val version = MetadataVersionUtil.validateVersion(versionStr,
         StateSchemaCompatibilityChecker.VERSION)
-      require(version == 1)
-
-      val keySchemaStr = inStream.readUTF()
-      val valueSchemaStr = inStream.readUTF()
-
-      (StructType.fromString(keySchemaStr), 
StructType.fromString(valueSchemaStr))
+      require(1 <= version && version <= 
StateSchemaCompatibilityChecker.VERSION)

Review comment:
       `MetadataVersionUtil.validateVersion` does the thing actually. We should 
also have the check in reader factory as well so kind of redundant.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaV2Writer.scala
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.StringReader
+
+import org.apache.hadoop.fs.FSDataOutputStream
+
+import org.apache.spark.sql.types.StructType
+
+class SchemaV2Writer extends SchemaWriter {
+
+  val version = SchemaV2Writer.VERSION
+
+  def writeSchema(
+      keySchema: StructType,
+      valueSchema: StructType,
+      outputStream: FSDataOutputStream): Unit = {
+    val buf = new Array[Char](SchemaV2Writer.MAX_UTF_CHUNK_SIZE)
+
+    // DataOutputStream.writeUTF can't write a string at once
+    // if the size exceeds 65535 (2^16 - 1) bytes.
+    // So a key as well as a value consist of multiple chunks in schema 
version 2.
+    val keySchemaJson = keySchema.json
+    val numKeyChunks =
+      (keySchemaJson.length -1) / SchemaV2Writer.MAX_UTF_CHUNK_SIZE + 1

Review comment:
       nit: space between - and 1

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaWriter.scala
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.hadoop.fs.FSDataOutputStream
+
+import org.apache.spark.sql.types.StructType
+
+trait SchemaWriter {
+  val version: Int
+
+  final def write(

Review comment:
       Unlike the read path, this makes sense as we want to be consistent on 
writing version.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaReaderFactory.scala
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+object SchemaReaderFactory {

Review comment:
       Looks like the pattern we did for such case is defining object as same 
as trait name and placing it in the same file.
   
   
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala
   
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
   
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/FlatMapGroupsWithStateExecHelper.scala
   
   The case of FlatMapGroupsWithStateExecHelper is slightly different (the name 
for the factory and trait are different) but still placed in the same file, and 
even placing trait under the object.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to