This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new e7c1700  [SPARK-35602][SS] Update state schema to be able to accept 
long length JSON
e7c1700 is described below

commit e7c1700368b2cba4df50d35144a4f13879956ca8
Author: Kousuke Saruta <saru...@oss.nttdata.com>
AuthorDate: Wed Jun 9 10:09:57 2021 +0900

    [SPARK-35602][SS] Update state schema to be able to accept long length JSON
    
    ### What changes were proposed in this pull request?
    
    This PR fixes an issue that both key and value of state schema cannot 
accept long length (>65535 bytes) JSON.
    To solve the problem explained below, JSON represented schema is divided 
into chunks whose maximum length is 65535 bytes, and each chunk is written by 
`DataOutputStream.writeUTF`.
    
    As the solution changes the format of the schema, the version is also 
changes from `1` to `2` but old version schema is still acceptable to ensures 
backward compatibility.
    
    ### Why are the changes needed?
    
    In the current implementation, writing state schema fails if the length of 
schema exceeds 65535 bytes and `UTFDataFormatException` is thrown.
    It's due to the limitation of `DataOutputStream.writeUTF`.
    `writeUTF` writes a length field first and it's 2 bytes width, meaning the 
maximum content length is limited to `2^16-1`=`65535` bytes.
    
https://docs.oracle.com/javase/8/docs/api/java/io/DataOutputStream.html#writeUTF-java.lang.String-
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New tests.
    
    Closes #32788 from sarutak/fix-UTFDataFormatException.
    
    Authored-by: Kousuke Saruta <saru...@oss.nttdata.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 93a9dc479c098ef0989d64f38c2157f20ec4f32d)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../execution/streaming/state/SchemaHelper.scala   | 147 +++++++++++++++++++++
 .../state/StateSchemaCompatibilityChecker.scala    |  34 ++---
 .../StateSchemaCompatibilityCheckerSuite.scala     |  35 +++++
 3 files changed, 200 insertions(+), 16 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala
new file mode 100644
index 0000000..2eef3d9
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.StringReader
+
+import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream}
+
+import org.apache.spark.sql.execution.streaming.MetadataVersionUtil
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+
+/**
+ * Helper classes for reading/writing state schema.
+ */
+object SchemaHelper {
+
+  sealed trait SchemaReader {
+    def read(inputStream: FSDataInputStream): (StructType, StructType)
+  }
+
+  object SchemaReader {
+    def createSchemaReader(versionStr: String): SchemaReader = {
+      val version = MetadataVersionUtil.validateVersion(versionStr,
+        StateSchemaCompatibilityChecker.VERSION)
+      version match {
+        case 1 => new SchemaV1Reader
+        case 2 => new SchemaV2Reader
+      }
+    }
+  }
+
+  class SchemaV1Reader extends SchemaReader {
+    def read(inputStream: FSDataInputStream): (StructType, StructType) = {
+      val keySchemaStr = inputStream.readUTF()
+      val valueSchemaStr = inputStream.readUTF()
+      (StructType.fromString(keySchemaStr), 
StructType.fromString(valueSchemaStr))
+    }
+  }
+
+  class SchemaV2Reader extends SchemaReader {
+    def read(inputStream: FSDataInputStream): (StructType, StructType) = {
+      val buf = new StringBuilder
+      val numKeyChunks = inputStream.readInt()
+      (0 until numKeyChunks).foreach(_ => buf.append(inputStream.readUTF()))
+      val keySchemaStr = buf.toString()
+
+      buf.clear()
+      val numValueChunks = inputStream.readInt()
+      (0 until numValueChunks).foreach(_ => buf.append(inputStream.readUTF()))
+      val valueSchemaStr = buf.toString()
+      (StructType.fromString(keySchemaStr), 
StructType.fromString(valueSchemaStr))
+    }
+  }
+
+  trait SchemaWriter {
+    val version: Int
+
+    final def write(
+        keySchema: StructType,
+        valueSchema: StructType,
+        outputStream: FSDataOutputStream): Unit = {
+      writeVersion(outputStream)
+      writeSchema(keySchema, valueSchema, outputStream)
+    }
+
+    private def writeVersion(outputStream: FSDataOutputStream): Unit = {
+      outputStream.writeUTF(s"v${version}")
+    }
+
+    protected def writeSchema(
+        keySchema: StructType,
+        valueSchema: StructType,
+        outputStream: FSDataOutputStream): Unit
+  }
+
+  object SchemaWriter {
+    def createSchemaWriter(version: Int): SchemaWriter = {
+      version match {
+        case 1 if Utils.isTesting => new SchemaV1Writer
+        case 2 => new SchemaV2Writer
+      }
+    }
+  }
+
+  class SchemaV1Writer extends SchemaWriter {
+    val version: Int = 1
+
+    def writeSchema(
+        keySchema: StructType,
+        valueSchema: StructType,
+        outputStream: FSDataOutputStream): Unit = {
+      outputStream.writeUTF(keySchema.json)
+      outputStream.writeUTF(valueSchema.json)
+    }
+  }
+
+  class SchemaV2Writer extends SchemaWriter {
+    val version: Int = 2
+
+    // 2^16 - 1 bytes
+    final val MAX_UTF_CHUNK_SIZE = 65535
+
+    def writeSchema(
+        keySchema: StructType,
+        valueSchema: StructType,
+        outputStream: FSDataOutputStream): Unit = {
+      val buf = new Array[Char](MAX_UTF_CHUNK_SIZE)
+
+      // DataOutputStream.writeUTF can't write a string at once
+      // if the size exceeds 65535 (2^16 - 1) bytes.
+      // So a key as well as a value consist of multiple chunks in schema 
version 2.
+      val keySchemaJson = keySchema.json
+      val numKeyChunks = (keySchemaJson.length - 1) / MAX_UTF_CHUNK_SIZE + 1
+      val keyStringReader = new StringReader(keySchemaJson)
+      outputStream.writeInt(numKeyChunks)
+      (0 until numKeyChunks).foreach { _ =>
+        val numRead = keyStringReader.read(buf, 0, MAX_UTF_CHUNK_SIZE)
+        outputStream.writeUTF(new String(buf, 0, numRead))
+      }
+
+      val valueSchemaJson = valueSchema.json
+      val numValueChunks = (valueSchemaJson.length - 1) / MAX_UTF_CHUNK_SIZE + 
1
+      val valueStringReader = new StringReader(valueSchemaJson)
+      outputStream.writeInt(numValueChunks)
+      (0 until numValueChunks).foreach { _ =>
+        val numRead = valueStringReader.read(buf, 0, MAX_UTF_CHUNK_SIZE)
+        outputStream.writeUTF(new String(buf, 0, numRead))
+      }
+    }
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index 4ac12c0..20625e1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -21,7 +21,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, 
MetadataVersionUtil}
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader, 
SchemaWriter}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, StructType}
 
@@ -34,6 +35,8 @@ class StateSchemaCompatibilityChecker(
   private val storeCpLocation = providerId.storeId.storeCheckpointLocation()
   private val fm = CheckpointFileManager.create(storeCpLocation, hadoopConf)
   private val schemaFileLocation = schemaFile(storeCpLocation)
+  private val schemaWriter =
+    SchemaWriter.createSchemaWriter(StateSchemaCompatibilityChecker.VERSION)
 
   fm.mkdirs(schemaFileLocation.getParent)
 
@@ -71,20 +74,13 @@ class StateSchemaCompatibilityChecker(
   private def schemasCompatible(storedSchema: StructType, schema: StructType): 
Boolean =
     DataType.equalsIgnoreNameAndCompatibleNullability(storedSchema, schema)
 
-  private def readSchemaFile(): (StructType, StructType) = {
+  // Visible for testing
+  private[sql] def readSchemaFile(): (StructType, StructType) = {
     val inStream = fm.open(schemaFileLocation)
     try {
       val versionStr = inStream.readUTF()
-      // Currently we only support version 1, which we can simplify the 
version validation and
-      // the parse logic.
-      val version = MetadataVersionUtil.validateVersion(versionStr,
-        StateSchemaCompatibilityChecker.VERSION)
-      require(version == 1)
-
-      val keySchemaStr = inStream.readUTF()
-      val valueSchemaStr = inStream.readUTF()
-
-      (StructType.fromString(keySchemaStr), 
StructType.fromString(valueSchemaStr))
+      val schemaReader = SchemaReader.createSchemaReader(versionStr)
+      schemaReader.read(inStream)
     } catch {
       case e: Throwable =>
         logError(s"Fail to read schema file from $schemaFileLocation", e)
@@ -95,11 +91,17 @@ class StateSchemaCompatibilityChecker(
   }
 
   private def createSchemaFile(keySchema: StructType, valueSchema: 
StructType): Unit = {
+    createSchemaFile(keySchema, valueSchema, schemaWriter)
+  }
+
+  // Visible for testing
+  private[sql] def createSchemaFile(
+      keySchema: StructType,
+      valueSchema: StructType,
+      schemaWriter: SchemaWriter): Unit = {
     val outStream = fm.createAtomic(schemaFileLocation, overwriteIfPossible = 
false)
     try {
-      outStream.writeUTF(s"v${StateSchemaCompatibilityChecker.VERSION}")
-      outStream.writeUTF(keySchema.json)
-      outStream.writeUTF(valueSchema.json)
+      schemaWriter.write(keySchema, valueSchema, outStream)
       outStream.close()
     } catch {
       case e: Throwable =>
@@ -114,5 +116,5 @@ class StateSchemaCompatibilityChecker(
 }
 
 object StateSchemaCompatibilityChecker {
-  val VERSION = 1
+  val VERSION = 2
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
index 4eb7603b..a9cc90c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
@@ -47,6 +47,22 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
     .add(StructField("value2", StringType, nullable = true))
     .add(StructField("value3", structSchema, nullable = true))
 
+  private val longKeySchema = new StructType()
+    .add(StructField("key" + "1" * 64 * 1024, IntegerType, nullable = true))
+    .add(StructField("key" + "2" * 64 * 1024, StringType, nullable = true))
+    .add(StructField("key" + "3" * 64 * 1024, structSchema, nullable = true))
+
+  private val longValueSchema = new StructType()
+    .add(StructField("value" + "1" * 64 * 1024, IntegerType, nullable = true))
+    .add(StructField("value" + "2" * 64 * 1024, StringType, nullable = true))
+    .add(StructField("value" + "3" * 64 * 1024, structSchema, nullable = true))
+
+  private val keySchema65535Bytes = new StructType()
+    .add(StructField("k" * (65535 - 87), IntegerType, nullable = true))
+
+  private val valueSchema65535Bytes = new StructType()
+    .add(StructField("v" * (65535 - 87), IntegerType, nullable = true))
+
   test("adding field to key should fail") {
     val fieldAddedKeySchema = keySchema.add(StructField("newKey", IntegerType))
     verifyException(keySchema, valueSchema, fieldAddedKeySchema, valueSchema)
@@ -161,6 +177,25 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
     verifySuccess(keySchema, valueSchema, keySchema, 
fieldNameChangedValueSchema)
   }
 
+  test("SPARK-35602: checking for long length schema") {
+    verifySuccess(longKeySchema, longValueSchema, longKeySchema, 
longValueSchema)
+    verifySuccess(
+      keySchema65535Bytes, valueSchema65535Bytes, keySchema65535Bytes, 
valueSchema65535Bytes)
+  }
+
+  test("SPARK-35602: checking for compatibility with schema version 1") {
+    val dir = newDir()
+    val queryId = UUID.randomUUID()
+    val providerId = StateStoreProviderId(
+      StateStoreId(dir, opId, partitionId), queryId)
+    val checker = new StateSchemaCompatibilityChecker(providerId, hadoopConf)
+    checker.createSchemaFile(keySchema, valueSchema,
+      SchemaHelper.SchemaWriter.createSchemaWriter(1))
+    val (resultKeySchema, resultValueSchema) = checker.readSchemaFile()
+
+    assert((resultKeySchema, resultValueSchema) === (keySchema, valueSchema))
+  }
+
   private def applyNewSchemaToNestedFieldInKey(newNestedSchema: StructType): 
StructType = {
     applyNewSchemaToNestedField(keySchema, newNestedSchema, "key3")
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to