gene-db commented on code in PR #48779:
URL: https://github.com/apache/spark/pull/48779#discussion_r1838456489


##########
common/variant/src/main/java/org/apache/spark/types/variant/Variant.java:
##########
@@ -193,6 +193,18 @@ public ObjectField getFieldAtIndex(int index) {
     });
   }
 
+  // Get the dictionary ID for the object field at the `index` slot. Throws 
malformedVariatn if

Review Comment:
   ```suggestion
     // Get the dictionary ID for the object field at the `index` slot. Throws 
malformedVariant if
   ```



##########
common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java:
##########
@@ -413,6 +420,17 @@ private void appendVariantImpl(byte[] value, byte[] 
metadata, int pos) {
     }
   }
 
+  // Append the variant value without rewriting or creating any metadata. This 
is used when
+  // building an object during shredding, where there is a fixed pre-existing 
metadata that
+  // all shredded values will refer to.
+  public void shallowAppendVariant(Variant v) {
+    int size = valueSize(v.value, v.pos);

Review Comment:
   This looks basically like the code in 
https://github.com/apache/spark/pull/48779/files#diff-397d59a94179cafec72d53368f75ffdb825315cb839a87a789db4521650659c0R414
   
   I wonder if we could pull out that logic, and have both locations call the 
common function?



##########
common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java:
##########
@@ -107,6 +107,13 @@ public Variant result() {
     return new Variant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata);
   }
 
+  // Return the variant value only, without metadata.

Review Comment:
   Is this ok to run before `result()`, and/or after `result()`?



##########
common/variant/src/main/java/org/apache/spark/types/variant/VariantSchema.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.types.variant;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Defines a valid shredding schema, as described in
+ * https://github.com/apache/parquet-format/blob/master/VariantShredding.md.
+ * A shredding schema contains a value and optional typed_value field.
+ * If a typed_value is an array or struct, it recursively contain its own 
shredding schema for
+ * elements and fields, respectively.
+ * The schema also contains a metadata field at the top level, but not in 
recursively shredded
+ * fields.
+ */
+public class VariantSchema {
+
+  // Represents one field of an object in the shredding schema.
+  public static final class ObjectField {
+    public final String fieldName;
+    public final VariantSchema schema;
+
+    public ObjectField(String fieldName, VariantSchema schema) {
+      this.fieldName = fieldName;
+      this.schema = schema;
+    }
+
+    @Override
+    public String toString() {
+      return "ObjectField{" +
+          "fieldName=" + fieldName +
+          ", schema=" + schema +
+          '}';
+    }
+  }
+
+  public abstract static class ScalarType {
+  }
+
+  public static final class StringType extends ScalarType {
+  }
+
+  public enum IntegralSize {
+    BYTE, SHORT, INT, LONG
+  }
+
+  public static final class IntegralType extends ScalarType {
+    public final IntegralSize size;
+
+    public IntegralType(IntegralSize size) {
+      this.size = size;
+    }
+  }
+
+  public static final class FloatType extends ScalarType {
+  }
+
+  public static final class DoubleType extends ScalarType {
+  }
+
+  public static final class BooleanType extends ScalarType {
+  }
+
+  public static final class BinaryType extends ScalarType {
+  }
+
+  public static final class DecimalType extends ScalarType {
+    public final int precision;
+    public final int scale;
+
+    public DecimalType(int precision, int scale) {
+      this.precision = precision;
+      this.scale = scale;
+    }
+  }
+
+  public static final class DateType extends ScalarType {
+  }
+
+  public static final class TimestampType extends ScalarType {
+  }
+
+  public static final class TimestampNTZType extends ScalarType {
+  }
+
+  public final int typedIdx;
+  public final int variantIdx;
+  public final int topLevelMetadataIdx;
+  public final int numFields;

Review Comment:
   Can you add a comment on what this represents? If a schema has `value` and 
`typed_value` and `metadata`, is `numFields` = 3? For an object `typed_value`, 
will it be number of object fields?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types._
+import org.apache.spark.types.variant._
+import org.apache.spark.unsafe.types._
+
+case object SparkShreddingUtils {
+  def buildVariantSchema(schema: DataType): VariantSchema = {
+    schema match {
+      case s: StructType => buildVariantSchema(s, topLevel = true)
+      case _ => throw QueryCompilationErrors.invalidVariantSchema(schema)
+    }
+  }
+
+  /**
+   * Given an expected schema of a Variant value, returns a suitable schema 
for shredding, by
+   * inserting appropriate intermediate value/typed_value fields at each level.
+   * For example, to represent the JSON {"a": 1, "b": "hello"},
+   * the schema struct<a: int, b: string> could be passed into this function, 
and it would return
+   * the shredding scheme:
+   * struct<
+   *  metadata: binary,
+   *  value: binary,
+   *  typed_value: struct<
+   *   a: struct<typed_value: int, value: binary>,
+   *   b: struct<typed_value: string, value: binary>>>
+   *
+   */
+  def variantShreddingSchema(dataType: DataType, isTopLevel: Boolean = true): 
StructType = {
+    val fields = dataType match {
+      case ArrayType(elementType, containsNull) =>
+        val arrayShreddingSchema =
+          ArrayType(variantShreddingSchema(elementType, false), containsNull)
+        Seq(
+          StructField("value", BinaryType, nullable = true),
+          StructField("typed_value", arrayShreddingSchema, nullable = true)
+        )
+      case StructType(fields) =>
+        val objectShreddingSchema = StructType(fields.map(f =>
+            f.copy(dataType = variantShreddingSchema(f.dataType, false))))
+        Seq(
+          StructField("value", BinaryType, nullable = true),
+          StructField("typed_value", objectShreddingSchema, nullable = true)
+        )
+      case VariantType =>
+        // For Variant, we don't need a typed column
+        Seq(
+          StructField("value", BinaryType, nullable = true)
+        )
+      case _: NumericType | BooleanType | _: StringType | BinaryType | _: 
DatetimeType =>
+        Seq(
+          StructField("value", BinaryType, nullable = true),
+          StructField("typed_value", dataType, nullable = true)
+        )
+      case _ =>
+        // No other types have a corresponding shreddings schema.
+        throw QueryCompilationErrors.invalidVariantSchema(dataType)
+    }
+
+    if (isTopLevel) {
+      StructType(StructField("metadata", BinaryType, nullable = true) +: 
fields)
+    } else {
+      StructType(fields)
+    }
+  }
+
+  /*
+   * Given a Spark schema that represents a valid shredding schema (e.g. 
constructed by
+   * SparkShreddingUtils.variantShreddingSchema), return the corresponding 
VariantSchema.
+   */
+  private def buildVariantSchema(schema: StructType, topLevel: Boolean): 
VariantSchema = {
+    var typedIdx = -1
+    var variantIdx = -1
+    var topLevelMetadataIdx = -1
+    var scalarSchema: VariantSchema.ScalarType = null
+    var objectSchema: Array[VariantSchema.ObjectField] = null
+    var arraySchema: VariantSchema = null
+
+    schema.fields.zipWithIndex.foreach { case (f, i) =>
+      f.name match {
+        case "typed_value" =>
+          if (typedIdx != -1) {
+            throw QueryCompilationErrors.invalidVariantSchema(schema)
+          }
+          typedIdx = i
+          f.dataType match {
+            case StructType(fields) =>
+              objectSchema =
+                  new Array[VariantSchema.ObjectField](fields.length)
+              fields.zipWithIndex.foreach { case (field, fieldIdx) =>
+                field.dataType match {
+                  case s: StructType =>
+                    val fieldSchema = buildVariantSchema(s, topLevel = false)
+                    objectSchema(fieldIdx) = new 
VariantSchema.ObjectField(field.name, fieldSchema)
+                  case _ => throw 
QueryCompilationErrors.invalidVariantSchema(schema)
+                }
+              }
+            case ArrayType(elementType, _) =>
+              elementType match {
+                case s: StructType => arraySchema = buildVariantSchema(s, 
topLevel = false)
+                case _ => throw 
QueryCompilationErrors.invalidVariantSchema(schema)
+              }
+            case t => scalarSchema = (t match {
+              case BooleanType => new VariantSchema.BooleanType
+              case ByteType => new 
VariantSchema.IntegralType(VariantSchema.IntegralSize.BYTE)
+              case ShortType => new 
VariantSchema.IntegralType(VariantSchema.IntegralSize.SHORT)
+              case IntegerType => new 
VariantSchema.IntegralType(VariantSchema.IntegralSize.INT)
+              case LongType => new 
VariantSchema.IntegralType(VariantSchema.IntegralSize.LONG)
+              case FloatType => new VariantSchema.FloatType
+              case DoubleType => new VariantSchema.DoubleType
+              case StringType => new VariantSchema.StringType
+              case BinaryType => new VariantSchema.BinaryType
+              case DateType => new VariantSchema.DateType
+              case TimestampType => new VariantSchema.TimestampType
+              case TimestampNTZType => new VariantSchema.TimestampNTZType
+              case d: DecimalType => new 
VariantSchema.DecimalType(d.precision, d.scale)
+              case _ => throw 
QueryCompilationErrors.invalidVariantSchema(schema)
+            })
+          }
+        case "value" =>
+          if (variantIdx != -1 || f.dataType != BinaryType) {
+            throw QueryCompilationErrors.invalidVariantSchema(schema)
+          }
+          variantIdx = i
+        case "metadata" =>
+          if (topLevelMetadataIdx != -1 || f.dataType != BinaryType) {
+            throw QueryCompilationErrors.invalidVariantSchema(schema)
+          }
+          topLevelMetadataIdx = i
+        case _ => throw QueryCompilationErrors.invalidVariantSchema(schema)
+      }
+    }
+
+    if (topLevel != (topLevelMetadataIdx >= 0)) {
+      throw QueryCompilationErrors.invalidVariantSchema(schema)
+    }
+    new VariantSchema(typedIdx, variantIdx, topLevelMetadataIdx, 
schema.fields.length,
+      scalarSchema, objectSchema, arraySchema)
+  }
+
+  class SparkShreddedResult(schema: VariantSchema) extends 
VariantShreddingWriter.ShreddedResult {
+    // Result is stored as an InternalRow.
+    val row = new GenericInternalRow(schema.numFields)
+
+    override def addArray(schema: VariantSchema,

Review Comment:
   Will this `schema` be different from the `class SparkShreddedResult(schema: 
VariantSchema)` schema? Must this function `schema` be one of the sub-schemas 
of the class schema? I'm not sure how the class schema and the parameter 
schemas are different/same or how they are used.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types._
+import org.apache.spark.types.variant._
+import org.apache.spark.unsafe.types._
+
+case object SparkShreddingUtils {
+  def buildVariantSchema(schema: DataType): VariantSchema = {
+    schema match {
+      case s: StructType => buildVariantSchema(s, topLevel = true)
+      case _ => throw QueryCompilationErrors.invalidVariantSchema(schema)
+    }
+  }
+
+  /**
+   * Given an expected schema of a Variant value, returns a suitable schema 
for shredding, by
+   * inserting appropriate intermediate value/typed_value fields at each level.
+   * For example, to represent the JSON {"a": 1, "b": "hello"},
+   * the schema struct<a: int, b: string> could be passed into this function, 
and it would return
+   * the shredding scheme:
+   * struct<
+   *  metadata: binary,
+   *  value: binary,
+   *  typed_value: struct<
+   *   a: struct<typed_value: int, value: binary>,
+   *   b: struct<typed_value: string, value: binary>>>
+   *
+   */
+  def variantShreddingSchema(dataType: DataType, isTopLevel: Boolean = true): 
StructType = {
+    val fields = dataType match {
+      case ArrayType(elementType, containsNull) =>
+        val arrayShreddingSchema =
+          ArrayType(variantShreddingSchema(elementType, false), containsNull)
+        Seq(
+          StructField("value", BinaryType, nullable = true),
+          StructField("typed_value", arrayShreddingSchema, nullable = true)
+        )
+      case StructType(fields) =>
+        val objectShreddingSchema = StructType(fields.map(f =>
+            f.copy(dataType = variantShreddingSchema(f.dataType, false))))
+        Seq(
+          StructField("value", BinaryType, nullable = true),
+          StructField("typed_value", objectShreddingSchema, nullable = true)
+        )
+      case VariantType =>
+        // For Variant, we don't need a typed column
+        Seq(
+          StructField("value", BinaryType, nullable = true)
+        )
+      case _: NumericType | BooleanType | _: StringType | BinaryType | _: 
DatetimeType =>
+        Seq(
+          StructField("value", BinaryType, nullable = true),
+          StructField("typed_value", dataType, nullable = true)
+        )
+      case _ =>
+        // No other types have a corresponding shreddings schema.
+        throw QueryCompilationErrors.invalidVariantSchema(dataType)
+    }
+
+    if (isTopLevel) {
+      StructType(StructField("metadata", BinaryType, nullable = true) +: 
fields)

Review Comment:
   Is the top-level metadata nullable? I thought the metadata always had some 
bytes.



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -3319,6 +3319,12 @@
     ],
     "sqlState" : "22023"
   },
+  "INVALID_VARIANT_SCHEMA" : {

Review Comment:
   NIT: Should we name this `INVALID_VARIANT_SHREDDING_SCHEMA`? Or, is a 
shredding schema the only type of Variant schema that can exist?



##########
common/variant/src/main/java/org/apache/spark/types/variant/VariantSchema.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.types.variant;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Defines a valid shredding schema, as described in
+ * https://github.com/apache/parquet-format/blob/master/VariantShredding.md.
+ * A shredding schema contains a value and optional typed_value field.
+ * If a typed_value is an array or struct, it recursively contain its own 
shredding schema for
+ * elements and fields, respectively.
+ * The schema also contains a metadata field at the top level, but not in 
recursively shredded
+ * fields.
+ */
+public class VariantSchema {
+
+  // Represents one field of an object in the shredding schema.
+  public static final class ObjectField {
+    public final String fieldName;
+    public final VariantSchema schema;
+
+    public ObjectField(String fieldName, VariantSchema schema) {
+      this.fieldName = fieldName;
+      this.schema = schema;
+    }
+
+    @Override
+    public String toString() {
+      return "ObjectField{" +
+          "fieldName=" + fieldName +
+          ", schema=" + schema +
+          '}';
+    }
+  }
+
+  public abstract static class ScalarType {
+  }
+
+  public static final class StringType extends ScalarType {
+  }
+
+  public enum IntegralSize {
+    BYTE, SHORT, INT, LONG
+  }
+
+  public static final class IntegralType extends ScalarType {
+    public final IntegralSize size;
+
+    public IntegralType(IntegralSize size) {
+      this.size = size;
+    }
+  }
+
+  public static final class FloatType extends ScalarType {
+  }
+
+  public static final class DoubleType extends ScalarType {
+  }
+
+  public static final class BooleanType extends ScalarType {
+  }
+
+  public static final class BinaryType extends ScalarType {
+  }
+
+  public static final class DecimalType extends ScalarType {
+    public final int precision;
+    public final int scale;
+
+    public DecimalType(int precision, int scale) {
+      this.precision = precision;
+      this.scale = scale;
+    }
+  }
+
+  public static final class DateType extends ScalarType {
+  }
+
+  public static final class TimestampType extends ScalarType {
+  }
+
+  public static final class TimestampNTZType extends ScalarType {
+  }
+
+  public final int typedIdx;
+  public final int variantIdx;
+  public final int topLevelMetadataIdx;

Review Comment:
   Will this only be valid at the very top level `VariantSchema` instance?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types._
+import org.apache.spark.types.variant._
+import org.apache.spark.unsafe.types._
+
+case object SparkShreddingUtils {
+  def buildVariantSchema(schema: DataType): VariantSchema = {
+    schema match {
+      case s: StructType => buildVariantSchema(s, topLevel = true)
+      case _ => throw QueryCompilationErrors.invalidVariantSchema(schema)
+    }
+  }
+
+  /**
+   * Given an expected schema of a Variant value, returns a suitable schema 
for shredding, by
+   * inserting appropriate intermediate value/typed_value fields at each level.
+   * For example, to represent the JSON {"a": 1, "b": "hello"},
+   * the schema struct<a: int, b: string> could be passed into this function, 
and it would return
+   * the shredding scheme:

Review Comment:
   Should this be?
   ```suggestion
      * the shredding schema:
   ```



##########
common/variant/src/main/java/org/apache/spark/types/variant/VariantSchema.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.types.variant;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Defines a valid shredding schema, as described in
+ * https://github.com/apache/parquet-format/blob/master/VariantShredding.md.
+ * A shredding schema contains a value and optional typed_value field.
+ * If a typed_value is an array or struct, it recursively contain its own 
shredding schema for
+ * elements and fields, respectively.
+ * The schema also contains a metadata field at the top level, but not in 
recursively shredded
+ * fields.
+ */
+public class VariantSchema {
+
+  // Represents one field of an object in the shredding schema.
+  public static final class ObjectField {
+    public final String fieldName;
+    public final VariantSchema schema;
+
+    public ObjectField(String fieldName, VariantSchema schema) {
+      this.fieldName = fieldName;
+      this.schema = schema;
+    }
+
+    @Override
+    public String toString() {
+      return "ObjectField{" +
+          "fieldName=" + fieldName +
+          ", schema=" + schema +
+          '}';
+    }
+  }
+
+  public abstract static class ScalarType {
+  }
+
+  public static final class StringType extends ScalarType {
+  }
+
+  public enum IntegralSize {
+    BYTE, SHORT, INT, LONG
+  }
+
+  public static final class IntegralType extends ScalarType {
+    public final IntegralSize size;
+
+    public IntegralType(IntegralSize size) {
+      this.size = size;
+    }
+  }
+
+  public static final class FloatType extends ScalarType {
+  }
+
+  public static final class DoubleType extends ScalarType {
+  }
+
+  public static final class BooleanType extends ScalarType {
+  }
+
+  public static final class BinaryType extends ScalarType {
+  }
+
+  public static final class DecimalType extends ScalarType {
+    public final int precision;
+    public final int scale;
+
+    public DecimalType(int precision, int scale) {
+      this.precision = precision;
+      this.scale = scale;
+    }
+  }
+
+  public static final class DateType extends ScalarType {
+  }
+
+  public static final class TimestampType extends ScalarType {
+  }
+
+  public static final class TimestampNTZType extends ScalarType {
+  }
+
+  public final int typedIdx;
+  public final int variantIdx;
+  public final int topLevelMetadataIdx;
+  public final int numFields;
+
+  public final ScalarType scalarSchema;
+  public final ObjectField[] objectSchema;
+  // Map for fast lookup of object fields by name.
+  public final Map<String, Integer> objectSchemaMap;

Review Comment:
   Does the value `Integer` index into `objectSchema`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types._
+import org.apache.spark.types.variant._
+import org.apache.spark.unsafe.types._
+
+case object SparkShreddingUtils {
+  def buildVariantSchema(schema: DataType): VariantSchema = {
+    schema match {
+      case s: StructType => buildVariantSchema(s, topLevel = true)
+      case _ => throw QueryCompilationErrors.invalidVariantSchema(schema)
+    }
+  }
+
+  /**
+   * Given an expected schema of a Variant value, returns a suitable schema 
for shredding, by
+   * inserting appropriate intermediate value/typed_value fields at each level.
+   * For example, to represent the JSON {"a": 1, "b": "hello"},
+   * the schema struct<a: int, b: string> could be passed into this function, 
and it would return
+   * the shredding scheme:
+   * struct<
+   *  metadata: binary,
+   *  value: binary,
+   *  typed_value: struct<
+   *   a: struct<typed_value: int, value: binary>,
+   *   b: struct<typed_value: string, value: binary>>>
+   *
+   */
+  def variantShreddingSchema(dataType: DataType, isTopLevel: Boolean = true): 
StructType = {
+    val fields = dataType match {
+      case ArrayType(elementType, containsNull) =>
+        val arrayShreddingSchema =
+          ArrayType(variantShreddingSchema(elementType, false), containsNull)
+        Seq(
+          StructField("value", BinaryType, nullable = true),

Review Comment:
   NIT: We should make these field names (`value`, `typed_value`, `metadata`) 
as constants in `SparkShreddingUtils`.



##########
common/variant/src/main/java/org/apache/spark/types/variant/VariantSchema.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.types.variant;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Defines a valid shredding schema, as described in
+ * https://github.com/apache/parquet-format/blob/master/VariantShredding.md.
+ * A shredding schema contains a value and optional typed_value field.
+ * If a typed_value is an array or struct, it recursively contain its own 
shredding schema for
+ * elements and fields, respectively.
+ * The schema also contains a metadata field at the top level, but not in 
recursively shredded
+ * fields.
+ */
+public class VariantSchema {
+
+  // Represents one field of an object in the shredding schema.
+  public static final class ObjectField {
+    public final String fieldName;
+    public final VariantSchema schema;
+
+    public ObjectField(String fieldName, VariantSchema schema) {
+      this.fieldName = fieldName;
+      this.schema = schema;
+    }
+
+    @Override
+    public String toString() {
+      return "ObjectField{" +
+          "fieldName=" + fieldName +
+          ", schema=" + schema +
+          '}';
+    }
+  }
+
+  public abstract static class ScalarType {
+  }
+
+  public static final class StringType extends ScalarType {
+  }
+
+  public enum IntegralSize {
+    BYTE, SHORT, INT, LONG
+  }
+
+  public static final class IntegralType extends ScalarType {
+    public final IntegralSize size;
+
+    public IntegralType(IntegralSize size) {
+      this.size = size;
+    }
+  }
+
+  public static final class FloatType extends ScalarType {
+  }
+
+  public static final class DoubleType extends ScalarType {
+  }
+
+  public static final class BooleanType extends ScalarType {
+  }
+
+  public static final class BinaryType extends ScalarType {
+  }
+
+  public static final class DecimalType extends ScalarType {
+    public final int precision;
+    public final int scale;
+
+    public DecimalType(int precision, int scale) {
+      this.precision = precision;
+      this.scale = scale;
+    }
+  }
+
+  public static final class DateType extends ScalarType {
+  }
+
+  public static final class TimestampType extends ScalarType {
+  }
+
+  public static final class TimestampNTZType extends ScalarType {
+  }
+
+  public final int typedIdx;

Review Comment:
   Can you comment on what these indexes represent? What do they into? What are 
valid values for these?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types._
+import org.apache.spark.types.variant._
+import org.apache.spark.unsafe.types._
+
+case object SparkShreddingUtils {
+  def buildVariantSchema(schema: DataType): VariantSchema = {
+    schema match {
+      case s: StructType => buildVariantSchema(s, topLevel = true)
+      case _ => throw QueryCompilationErrors.invalidVariantSchema(schema)
+    }
+  }
+
+  /**
+   * Given an expected schema of a Variant value, returns a suitable schema 
for shredding, by
+   * inserting appropriate intermediate value/typed_value fields at each level.
+   * For example, to represent the JSON {"a": 1, "b": "hello"},
+   * the schema struct<a: int, b: string> could be passed into this function, 
and it would return
+   * the shredding scheme:
+   * struct<
+   *  metadata: binary,
+   *  value: binary,
+   *  typed_value: struct<
+   *   a: struct<typed_value: int, value: binary>,
+   *   b: struct<typed_value: string, value: binary>>>
+   *
+   */
+  def variantShreddingSchema(dataType: DataType, isTopLevel: Boolean = true): 
StructType = {
+    val fields = dataType match {
+      case ArrayType(elementType, containsNull) =>
+        val arrayShreddingSchema =
+          ArrayType(variantShreddingSchema(elementType, false), containsNull)
+        Seq(
+          StructField("value", BinaryType, nullable = true),
+          StructField("typed_value", arrayShreddingSchema, nullable = true)
+        )
+      case StructType(fields) =>
+        val objectShreddingSchema = StructType(fields.map(f =>
+            f.copy(dataType = variantShreddingSchema(f.dataType, false))))
+        Seq(
+          StructField("value", BinaryType, nullable = true),
+          StructField("typed_value", objectShreddingSchema, nullable = true)
+        )
+      case VariantType =>
+        // For Variant, we don't need a typed column
+        Seq(
+          StructField("value", BinaryType, nullable = true)
+        )
+      case _: NumericType | BooleanType | _: StringType | BinaryType | _: 
DatetimeType =>
+        Seq(
+          StructField("value", BinaryType, nullable = true),
+          StructField("typed_value", dataType, nullable = true)
+        )
+      case _ =>
+        // No other types have a corresponding shreddings schema.
+        throw QueryCompilationErrors.invalidVariantSchema(dataType)
+    }
+
+    if (isTopLevel) {
+      StructType(StructField("metadata", BinaryType, nullable = true) +: 
fields)
+    } else {
+      StructType(fields)
+    }
+  }
+
+  /*
+   * Given a Spark schema that represents a valid shredding schema (e.g. 
constructed by
+   * SparkShreddingUtils.variantShreddingSchema), return the corresponding 
VariantSchema.
+   */
+  private def buildVariantSchema(schema: StructType, topLevel: Boolean): 
VariantSchema = {
+    var typedIdx = -1
+    var variantIdx = -1
+    var topLevelMetadataIdx = -1
+    var scalarSchema: VariantSchema.ScalarType = null
+    var objectSchema: Array[VariantSchema.ObjectField] = null
+    var arraySchema: VariantSchema = null
+
+    schema.fields.zipWithIndex.foreach { case (f, i) =>
+      f.name match {
+        case "typed_value" =>
+          if (typedIdx != -1) {
+            throw QueryCompilationErrors.invalidVariantSchema(schema)
+          }
+          typedIdx = i
+          f.dataType match {
+            case StructType(fields) =>
+              objectSchema =
+                  new Array[VariantSchema.ObjectField](fields.length)
+              fields.zipWithIndex.foreach { case (field, fieldIdx) =>
+                field.dataType match {
+                  case s: StructType =>
+                    val fieldSchema = buildVariantSchema(s, topLevel = false)
+                    objectSchema(fieldIdx) = new 
VariantSchema.ObjectField(field.name, fieldSchema)
+                  case _ => throw 
QueryCompilationErrors.invalidVariantSchema(schema)
+                }
+              }
+            case ArrayType(elementType, _) =>
+              elementType match {
+                case s: StructType => arraySchema = buildVariantSchema(s, 
topLevel = false)
+                case _ => throw 
QueryCompilationErrors.invalidVariantSchema(schema)
+              }
+            case t => scalarSchema = (t match {
+              case BooleanType => new VariantSchema.BooleanType
+              case ByteType => new 
VariantSchema.IntegralType(VariantSchema.IntegralSize.BYTE)
+              case ShortType => new 
VariantSchema.IntegralType(VariantSchema.IntegralSize.SHORT)
+              case IntegerType => new 
VariantSchema.IntegralType(VariantSchema.IntegralSize.INT)
+              case LongType => new 
VariantSchema.IntegralType(VariantSchema.IntegralSize.LONG)
+              case FloatType => new VariantSchema.FloatType
+              case DoubleType => new VariantSchema.DoubleType
+              case StringType => new VariantSchema.StringType
+              case BinaryType => new VariantSchema.BinaryType
+              case DateType => new VariantSchema.DateType
+              case TimestampType => new VariantSchema.TimestampType
+              case TimestampNTZType => new VariantSchema.TimestampNTZType
+              case d: DecimalType => new 
VariantSchema.DecimalType(d.precision, d.scale)
+              case _ => throw 
QueryCompilationErrors.invalidVariantSchema(schema)
+            })
+          }
+        case "value" =>
+          if (variantIdx != -1 || f.dataType != BinaryType) {
+            throw QueryCompilationErrors.invalidVariantSchema(schema)
+          }
+          variantIdx = i
+        case "metadata" =>
+          if (topLevelMetadataIdx != -1 || f.dataType != BinaryType) {
+            throw QueryCompilationErrors.invalidVariantSchema(schema)
+          }
+          topLevelMetadataIdx = i
+        case _ => throw QueryCompilationErrors.invalidVariantSchema(schema)
+      }
+    }
+
+    if (topLevel != (topLevelMetadataIdx >= 0)) {

Review Comment:
   We are translating the `StructType` to the `VariantSchema`, so we don't 
actually need `topLevel`. Are we using `topLevel` only for validation of 
`variantShreddingSchema`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.types._
+import org.apache.spark.types.variant._
+import org.apache.spark.unsafe.types._
+
+case object SparkShreddingUtils {
+  def buildVariantSchema(schema: DataType): VariantSchema = {
+    schema match {
+      case s: StructType => buildVariantSchema(s, topLevel = true)
+      case _ => throw QueryCompilationErrors.invalidVariantSchema(schema)
+    }
+  }
+
+  /**
+   * Given an expected schema of a Variant value, returns a suitable schema 
for shredding, by

Review Comment:
   What is the difference between a shredding schema that is a `StructType` and 
a shredding schema that is a `VariantSchema`? It is confusing because I see 
both in this file.



##########
common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.types.variant;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+
+/**
+ * Class to implement shredding a Variant value.
+ */
+public class VariantShreddingWriter {
+
+  // Interface to build up a shredded result. Callers should implement a 
ShreddedResultBuilder
+  // to create an empty result. The castShredded method will call one or more 
of the add* methods
+  // to populate it.
+  public interface ShreddedResult {
+    // Create an array. The elements are the result of shredding each element.
+    void addArray(VariantSchema schema, ShreddedResult[] array);
+    // Create an object. The values are the result of shredding each field, 
order by the index in
+    // objectSchema. Missing fields are populated with an empty result.
+    void addObject(VariantSchema schema, ShreddedResult[] values);
+    void addVariantValue(VariantSchema schema, byte[] result);
+    // Add a scalar to typed_value. The type of Object depends on the 
scalarSchema in the shredding
+    // schema.
+    void addScalar(VariantSchema schema, Object result);
+    void addMetadata(VariantSchema schema, byte[] result);
+  }
+
+  public interface ShreddedResultBuilder {
+    ShreddedResult createEmpty(VariantSchema schema);
+
+    // If true, we will shred decimals to a different scale or to integers, as 
long as they are
+    // numerically equivalent. Similarly, integers will be allowed to shred to 
decimals.
+    boolean allowNumericScaleChanges();
+  }
+
+  /**
+   * Converts an input variant into shredded components. Returns the shredded 
result, as well
+   * as the original Variant with shredded fields removed.
+   * `dataType` must be a valid shredding schema, as described in 
common/variant/shredding.md.

Review Comment:
   Should we point to the parquet spec doc?



##########
common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.types.variant;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+
+/**
+ * Class to implement shredding a Variant value.
+ */
+public class VariantShreddingWriter {
+
+  // Interface to build up a shredded result. Callers should implement a 
ShreddedResultBuilder
+  // to create an empty result. The castShredded method will call one or more 
of the add* methods
+  // to populate it.
+  public interface ShreddedResult {
+    // Create an array. The elements are the result of shredding each element.
+    void addArray(VariantSchema schema, ShreddedResult[] array);
+    // Create an object. The values are the result of shredding each field, 
order by the index in
+    // objectSchema. Missing fields are populated with an empty result.
+    void addObject(VariantSchema schema, ShreddedResult[] values);
+    void addVariantValue(VariantSchema schema, byte[] result);
+    // Add a scalar to typed_value. The type of Object depends on the 
scalarSchema in the shredding
+    // schema.
+    void addScalar(VariantSchema schema, Object result);
+    void addMetadata(VariantSchema schema, byte[] result);
+  }
+
+  public interface ShreddedResultBuilder {
+    ShreddedResult createEmpty(VariantSchema schema);
+
+    // If true, we will shred decimals to a different scale or to integers, as 
long as they are
+    // numerically equivalent. Similarly, integers will be allowed to shred to 
decimals.
+    boolean allowNumericScaleChanges();
+  }
+
+  /**
+   * Converts an input variant into shredded components. Returns the shredded 
result, as well
+   * as the original Variant with shredded fields removed.
+   * `dataType` must be a valid shredding schema, as described in 
common/variant/shredding.md.
+   */
+  public static ShreddedResult castShredded(
+          Variant v,
+          VariantSchema schema,
+          ShreddedResultBuilder builder) {
+    VariantUtil.Type variantType = v.getType();
+    ShreddedResult result = builder.createEmpty(schema);
+
+    if (schema.topLevelMetadataIdx >= 0) {
+      result.addMetadata(schema, v.getMetadata());
+    }
+
+    if (schema.arraySchema != null && variantType == VariantUtil.Type.ARRAY) {
+      // The array element is always a struct containing untyped and typed 
fields.
+      VariantSchema elementSchema = schema.arraySchema;
+      int size = v.arraySize();
+      ShreddedResult[] array = new ShreddedResult[size];
+      for (int i = 0; i < size; ++i) {
+        ShreddedResult shreddedArray = castShredded(v.getElementAtIndex(i), 
elementSchema, builder);
+        array[i] = shreddedArray;
+      }
+      result.addArray(schema, array);
+    } else if (schema.objectSchema != null && variantType == 
VariantUtil.Type.OBJECT) {
+      VariantSchema.ObjectField[] objectSchema = schema.objectSchema;
+      ShreddedResult[] shreddedValues = new 
ShreddedResult[objectSchema.length];
+
+      // Create a variantBuilder for any mismatched fields.

Review Comment:
   What exactly does "mismatched fields" mean? Could it mean type mismatches?
   
   Or, is this just for any variant fields which are not found in the shredding 
schema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to