cashmand commented on code in PR #48779: URL: https://github.com/apache/spark/pull/48779#discussion_r1838613234
########## common/variant/src/main/java/org/apache/spark/types/variant/VariantSchema.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.util.HashMap; +import java.util.Map; + +/** + * Defines a valid shredding schema, as described in + * https://github.com/apache/parquet-format/blob/master/VariantShredding.md. + * A shredding schema contains a value and optional typed_value field. + * If a typed_value is an array or struct, it recursively contain its own shredding schema for + * elements and fields, respectively. + * The schema also contains a metadata field at the top level, but not in recursively shredded + * fields. + */ +public class VariantSchema { + + // Represents one field of an object in the shredding schema. + public static final class ObjectField { + public final String fieldName; + public final VariantSchema schema; + + public ObjectField(String fieldName, VariantSchema schema) { + this.fieldName = fieldName; + this.schema = schema; + } + + @Override + public String toString() { + return "ObjectField{" + + "fieldName=" + fieldName + + ", schema=" + schema + + '}'; + } + } + + public abstract static class ScalarType { + } + + public static final class StringType extends ScalarType { + } + + public enum IntegralSize { + BYTE, SHORT, INT, LONG + } + + public static final class IntegralType extends ScalarType { + public final IntegralSize size; + + public IntegralType(IntegralSize size) { + this.size = size; + } + } + + public static final class FloatType extends ScalarType { + } + + public static final class DoubleType extends ScalarType { + } + + public static final class BooleanType extends ScalarType { + } + + public static final class BinaryType extends ScalarType { + } + + public static final class DecimalType extends ScalarType { + public final int precision; + public final int scale; + + public DecimalType(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + } + + public static final class DateType extends ScalarType { + } + + public static final class TimestampType extends ScalarType { + } + + public static final class TimestampNTZType extends ScalarType { + } + + public final int typedIdx; Review Comment: Okay, I'll add a comment. I don't really use them in the `common/variant` code. For spark, they represent the position of the corresponding value within the `InternalRow`. ########## common/variant/src/main/java/org/apache/spark/types/variant/VariantSchema.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.util.HashMap; +import java.util.Map; + +/** + * Defines a valid shredding schema, as described in + * https://github.com/apache/parquet-format/blob/master/VariantShredding.md. + * A shredding schema contains a value and optional typed_value field. + * If a typed_value is an array or struct, it recursively contain its own shredding schema for + * elements and fields, respectively. + * The schema also contains a metadata field at the top level, but not in recursively shredded + * fields. + */ +public class VariantSchema { + + // Represents one field of an object in the shredding schema. + public static final class ObjectField { + public final String fieldName; + public final VariantSchema schema; + + public ObjectField(String fieldName, VariantSchema schema) { + this.fieldName = fieldName; + this.schema = schema; + } + + @Override + public String toString() { + return "ObjectField{" + + "fieldName=" + fieldName + + ", schema=" + schema + + '}'; + } + } + + public abstract static class ScalarType { + } + + public static final class StringType extends ScalarType { + } + + public enum IntegralSize { + BYTE, SHORT, INT, LONG + } + + public static final class IntegralType extends ScalarType { + public final IntegralSize size; + + public IntegralType(IntegralSize size) { + this.size = size; + } + } + + public static final class FloatType extends ScalarType { + } + + public static final class DoubleType extends ScalarType { + } + + public static final class BooleanType extends ScalarType { + } + + public static final class BinaryType extends ScalarType { + } + + public static final class DecimalType extends ScalarType { + public final int precision; + public final int scale; + + public DecimalType(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + } + + public static final class DateType extends ScalarType { + } + + public static final class TimestampType extends ScalarType { + } + + public static final class TimestampNTZType extends ScalarType { + } + + public final int typedIdx; + public final int variantIdx; + public final int topLevelMetadataIdx; Review Comment: Yes, I'll add a comment. ########## common/utils/src/main/resources/error/error-conditions.json: ########## @@ -3319,6 +3319,12 @@ ], "sqlState" : "22023" }, + "INVALID_VARIANT_SCHEMA" : { Review Comment: I can't really think of a different invalid schema, but I guess it doesn't hurt to rename it to be a bit clearer. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.GenericArrayData +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types._ +import org.apache.spark.types.variant._ +import org.apache.spark.unsafe.types._ + +case object SparkShreddingUtils { + def buildVariantSchema(schema: DataType): VariantSchema = { + schema match { + case s: StructType => buildVariantSchema(s, topLevel = true) + case _ => throw QueryCompilationErrors.invalidVariantSchema(schema) + } + } + + /** + * Given an expected schema of a Variant value, returns a suitable schema for shredding, by + * inserting appropriate intermediate value/typed_value fields at each level. + * For example, to represent the JSON {"a": 1, "b": "hello"}, + * the schema struct<a: int, b: string> could be passed into this function, and it would return + * the shredding scheme: + * struct< + * metadata: binary, + * value: binary, + * typed_value: struct< + * a: struct<typed_value: int, value: binary>, + * b: struct<typed_value: string, value: binary>>> + * + */ + def variantShreddingSchema(dataType: DataType, isTopLevel: Boolean = true): StructType = { + val fields = dataType match { + case ArrayType(elementType, containsNull) => + val arrayShreddingSchema = + ArrayType(variantShreddingSchema(elementType, false), containsNull) + Seq( + StructField("value", BinaryType, nullable = true), + StructField("typed_value", arrayShreddingSchema, nullable = true) + ) + case StructType(fields) => + val objectShreddingSchema = StructType(fields.map(f => + f.copy(dataType = variantShreddingSchema(f.dataType, false)))) + Seq( + StructField("value", BinaryType, nullable = true), + StructField("typed_value", objectShreddingSchema, nullable = true) + ) + case VariantType => + // For Variant, we don't need a typed column + Seq( + StructField("value", BinaryType, nullable = true) + ) + case _: NumericType | BooleanType | _: StringType | BinaryType | _: DatetimeType => + Seq( + StructField("value", BinaryType, nullable = true), + StructField("typed_value", dataType, nullable = true) + ) + case _ => + // No other types have a corresponding shreddings schema. + throw QueryCompilationErrors.invalidVariantSchema(dataType) + } + + if (isTopLevel) { + StructType(StructField("metadata", BinaryType, nullable = true) +: fields) + } else { + StructType(fields) + } + } + + /* + * Given a Spark schema that represents a valid shredding schema (e.g. constructed by + * SparkShreddingUtils.variantShreddingSchema), return the corresponding VariantSchema. + */ + private def buildVariantSchema(schema: StructType, topLevel: Boolean): VariantSchema = { + var typedIdx = -1 + var variantIdx = -1 + var topLevelMetadataIdx = -1 + var scalarSchema: VariantSchema.ScalarType = null + var objectSchema: Array[VariantSchema.ObjectField] = null + var arraySchema: VariantSchema = null + + schema.fields.zipWithIndex.foreach { case (f, i) => + f.name match { + case "typed_value" => + if (typedIdx != -1) { + throw QueryCompilationErrors.invalidVariantSchema(schema) + } + typedIdx = i + f.dataType match { + case StructType(fields) => + objectSchema = + new Array[VariantSchema.ObjectField](fields.length) + fields.zipWithIndex.foreach { case (field, fieldIdx) => + field.dataType match { + case s: StructType => + val fieldSchema = buildVariantSchema(s, topLevel = false) + objectSchema(fieldIdx) = new VariantSchema.ObjectField(field.name, fieldSchema) + case _ => throw QueryCompilationErrors.invalidVariantSchema(schema) + } + } + case ArrayType(elementType, _) => + elementType match { + case s: StructType => arraySchema = buildVariantSchema(s, topLevel = false) + case _ => throw QueryCompilationErrors.invalidVariantSchema(schema) + } + case t => scalarSchema = (t match { + case BooleanType => new VariantSchema.BooleanType + case ByteType => new VariantSchema.IntegralType(VariantSchema.IntegralSize.BYTE) + case ShortType => new VariantSchema.IntegralType(VariantSchema.IntegralSize.SHORT) + case IntegerType => new VariantSchema.IntegralType(VariantSchema.IntegralSize.INT) + case LongType => new VariantSchema.IntegralType(VariantSchema.IntegralSize.LONG) + case FloatType => new VariantSchema.FloatType + case DoubleType => new VariantSchema.DoubleType + case StringType => new VariantSchema.StringType + case BinaryType => new VariantSchema.BinaryType + case DateType => new VariantSchema.DateType + case TimestampType => new VariantSchema.TimestampType + case TimestampNTZType => new VariantSchema.TimestampNTZType + case d: DecimalType => new VariantSchema.DecimalType(d.precision, d.scale) + case _ => throw QueryCompilationErrors.invalidVariantSchema(schema) + }) + } + case "value" => + if (variantIdx != -1 || f.dataType != BinaryType) { + throw QueryCompilationErrors.invalidVariantSchema(schema) + } + variantIdx = i + case "metadata" => + if (topLevelMetadataIdx != -1 || f.dataType != BinaryType) { + throw QueryCompilationErrors.invalidVariantSchema(schema) + } + topLevelMetadataIdx = i + case _ => throw QueryCompilationErrors.invalidVariantSchema(schema) + } + } + + if (topLevel != (topLevelMetadataIdx >= 0)) { Review Comment: Yes, it's just for validation. In the long term, I wasn't sure if we'd necessarily use `variantShreddingSchema`, or if data sources would directly provide the full shredding schema, so I think it's good to validate here. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.GenericArrayData +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types._ +import org.apache.spark.types.variant._ +import org.apache.spark.unsafe.types._ + +case object SparkShreddingUtils { + def buildVariantSchema(schema: DataType): VariantSchema = { + schema match { + case s: StructType => buildVariantSchema(s, topLevel = true) + case _ => throw QueryCompilationErrors.invalidVariantSchema(schema) + } + } + + /** + * Given an expected schema of a Variant value, returns a suitable schema for shredding, by + * inserting appropriate intermediate value/typed_value fields at each level. + * For example, to represent the JSON {"a": 1, "b": "hello"}, + * the schema struct<a: int, b: string> could be passed into this function, and it would return + * the shredding scheme: + * struct< + * metadata: binary, + * value: binary, + * typed_value: struct< + * a: struct<typed_value: int, value: binary>, + * b: struct<typed_value: string, value: binary>>> + * + */ + def variantShreddingSchema(dataType: DataType, isTopLevel: Boolean = true): StructType = { + val fields = dataType match { + case ArrayType(elementType, containsNull) => + val arrayShreddingSchema = + ArrayType(variantShreddingSchema(elementType, false), containsNull) + Seq( + StructField("value", BinaryType, nullable = true), + StructField("typed_value", arrayShreddingSchema, nullable = true) + ) + case StructType(fields) => + val objectShreddingSchema = StructType(fields.map(f => + f.copy(dataType = variantShreddingSchema(f.dataType, false)))) + Seq( + StructField("value", BinaryType, nullable = true), + StructField("typed_value", objectShreddingSchema, nullable = true) + ) + case VariantType => + // For Variant, we don't need a typed column + Seq( + StructField("value", BinaryType, nullable = true) + ) + case _: NumericType | BooleanType | _: StringType | BinaryType | _: DatetimeType => + Seq( + StructField("value", BinaryType, nullable = true), + StructField("typed_value", dataType, nullable = true) + ) + case _ => + // No other types have a corresponding shreddings schema. + throw QueryCompilationErrors.invalidVariantSchema(dataType) + } + + if (isTopLevel) { + StructType(StructField("metadata", BinaryType, nullable = true) +: fields) + } else { + StructType(fields) + } + } + + /* + * Given a Spark schema that represents a valid shredding schema (e.g. constructed by + * SparkShreddingUtils.variantShreddingSchema), return the corresponding VariantSchema. + */ + private def buildVariantSchema(schema: StructType, topLevel: Boolean): VariantSchema = { + var typedIdx = -1 + var variantIdx = -1 + var topLevelMetadataIdx = -1 + var scalarSchema: VariantSchema.ScalarType = null + var objectSchema: Array[VariantSchema.ObjectField] = null + var arraySchema: VariantSchema = null + + schema.fields.zipWithIndex.foreach { case (f, i) => + f.name match { + case "typed_value" => + if (typedIdx != -1) { + throw QueryCompilationErrors.invalidVariantSchema(schema) + } + typedIdx = i + f.dataType match { + case StructType(fields) => + objectSchema = + new Array[VariantSchema.ObjectField](fields.length) + fields.zipWithIndex.foreach { case (field, fieldIdx) => + field.dataType match { + case s: StructType => + val fieldSchema = buildVariantSchema(s, topLevel = false) + objectSchema(fieldIdx) = new VariantSchema.ObjectField(field.name, fieldSchema) + case _ => throw QueryCompilationErrors.invalidVariantSchema(schema) + } + } + case ArrayType(elementType, _) => + elementType match { + case s: StructType => arraySchema = buildVariantSchema(s, topLevel = false) + case _ => throw QueryCompilationErrors.invalidVariantSchema(schema) + } + case t => scalarSchema = (t match { + case BooleanType => new VariantSchema.BooleanType + case ByteType => new VariantSchema.IntegralType(VariantSchema.IntegralSize.BYTE) + case ShortType => new VariantSchema.IntegralType(VariantSchema.IntegralSize.SHORT) + case IntegerType => new VariantSchema.IntegralType(VariantSchema.IntegralSize.INT) + case LongType => new VariantSchema.IntegralType(VariantSchema.IntegralSize.LONG) + case FloatType => new VariantSchema.FloatType + case DoubleType => new VariantSchema.DoubleType + case StringType => new VariantSchema.StringType + case BinaryType => new VariantSchema.BinaryType + case DateType => new VariantSchema.DateType + case TimestampType => new VariantSchema.TimestampType + case TimestampNTZType => new VariantSchema.TimestampNTZType + case d: DecimalType => new VariantSchema.DecimalType(d.precision, d.scale) + case _ => throw QueryCompilationErrors.invalidVariantSchema(schema) + }) + } + case "value" => + if (variantIdx != -1 || f.dataType != BinaryType) { + throw QueryCompilationErrors.invalidVariantSchema(schema) + } + variantIdx = i + case "metadata" => + if (topLevelMetadataIdx != -1 || f.dataType != BinaryType) { + throw QueryCompilationErrors.invalidVariantSchema(schema) + } + topLevelMetadataIdx = i + case _ => throw QueryCompilationErrors.invalidVariantSchema(schema) + } + } + + if (topLevel != (topLevelMetadataIdx >= 0)) { + throw QueryCompilationErrors.invalidVariantSchema(schema) + } + new VariantSchema(typedIdx, variantIdx, topLevelMetadataIdx, schema.fields.length, + scalarSchema, objectSchema, arraySchema) + } + + class SparkShreddedResult(schema: VariantSchema) extends VariantShreddingWriter.ShreddedResult { + // Result is stored as an InternalRow. + val row = new GenericInternalRow(schema.numFields) + + override def addArray(schema: VariantSchema, Review Comment: Oh, that's a good point. I don't think it's necessary to pass the schema into the methods. I'll try to remove it. ########## common/variant/src/main/java/org/apache/spark/types/variant/VariantBuilder.java: ########## @@ -107,6 +107,13 @@ public Variant result() { return new Variant(Arrays.copyOfRange(writeBuffer, 0, writePos), metadata); } + // Return the variant value only, without metadata. Review Comment: Yes, either is fine, although the expectation is that it's run instead of `result()`. I'll add a comment. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.GenericArrayData +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types._ +import org.apache.spark.types.variant._ +import org.apache.spark.unsafe.types._ + +case object SparkShreddingUtils { + def buildVariantSchema(schema: DataType): VariantSchema = { + schema match { + case s: StructType => buildVariantSchema(s, topLevel = true) + case _ => throw QueryCompilationErrors.invalidVariantSchema(schema) + } + } + + /** + * Given an expected schema of a Variant value, returns a suitable schema for shredding, by Review Comment: `VariantSchema` is a concept in the `common/variant` code, and represents a shredding schema (possibly one that is nested as an array element or object field in a larger shredding schema). `StructType` is a Spark type. It happens to be used as the Spark implementation to represent both `VariantSchema` and a shredded object in such a schema. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.GenericArrayData +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types._ +import org.apache.spark.types.variant._ +import org.apache.spark.unsafe.types._ + +case object SparkShreddingUtils { + def buildVariantSchema(schema: DataType): VariantSchema = { + schema match { + case s: StructType => buildVariantSchema(s, topLevel = true) + case _ => throw QueryCompilationErrors.invalidVariantSchema(schema) + } + } + + /** + * Given an expected schema of a Variant value, returns a suitable schema for shredding, by + * inserting appropriate intermediate value/typed_value fields at each level. + * For example, to represent the JSON {"a": 1, "b": "hello"}, + * the schema struct<a: int, b: string> could be passed into this function, and it would return + * the shredding scheme: + * struct< + * metadata: binary, + * value: binary, + * typed_value: struct< + * a: struct<typed_value: int, value: binary>, + * b: struct<typed_value: string, value: binary>>> + * + */ + def variantShreddingSchema(dataType: DataType, isTopLevel: Boolean = true): StructType = { + val fields = dataType match { + case ArrayType(elementType, containsNull) => + val arrayShreddingSchema = + ArrayType(variantShreddingSchema(elementType, false), containsNull) + Seq( + StructField("value", BinaryType, nullable = true), Review Comment: Okay, I'll put it in VariantaSchema, since it's part of the spec. ########## common/variant/src/main/java/org/apache/spark/types/variant/VariantSchema.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.util.HashMap; +import java.util.Map; + +/** + * Defines a valid shredding schema, as described in + * https://github.com/apache/parquet-format/blob/master/VariantShredding.md. + * A shredding schema contains a value and optional typed_value field. + * If a typed_value is an array or struct, it recursively contain its own shredding schema for + * elements and fields, respectively. + * The schema also contains a metadata field at the top level, but not in recursively shredded + * fields. + */ +public class VariantSchema { + + // Represents one field of an object in the shredding schema. + public static final class ObjectField { + public final String fieldName; + public final VariantSchema schema; + + public ObjectField(String fieldName, VariantSchema schema) { + this.fieldName = fieldName; + this.schema = schema; + } + + @Override + public String toString() { + return "ObjectField{" + + "fieldName=" + fieldName + + ", schema=" + schema + + '}'; + } + } + + public abstract static class ScalarType { + } + + public static final class StringType extends ScalarType { + } + + public enum IntegralSize { + BYTE, SHORT, INT, LONG + } + + public static final class IntegralType extends ScalarType { + public final IntegralSize size; + + public IntegralType(IntegralSize size) { + this.size = size; + } + } + + public static final class FloatType extends ScalarType { + } + + public static final class DoubleType extends ScalarType { + } + + public static final class BooleanType extends ScalarType { + } + + public static final class BinaryType extends ScalarType { + } + + public static final class DecimalType extends ScalarType { + public final int precision; + public final int scale; + + public DecimalType(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + } + + public static final class DateType extends ScalarType { + } + + public static final class TimestampType extends ScalarType { + } + + public static final class TimestampNTZType extends ScalarType { + } + + public final int typedIdx; + public final int variantIdx; + public final int topLevelMetadataIdx; + public final int numFields; + + public final ScalarType scalarSchema; + public final ObjectField[] objectSchema; + // Map for fast lookup of object fields by name. + public final Map<String, Integer> objectSchemaMap; Review Comment: Yes. ########## common/variant/src/main/java/org/apache/spark/types/variant/VariantSchema.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.util.HashMap; +import java.util.Map; + +/** + * Defines a valid shredding schema, as described in + * https://github.com/apache/parquet-format/blob/master/VariantShredding.md. + * A shredding schema contains a value and optional typed_value field. + * If a typed_value is an array or struct, it recursively contain its own shredding schema for + * elements and fields, respectively. + * The schema also contains a metadata field at the top level, but not in recursively shredded + * fields. + */ +public class VariantSchema { + + // Represents one field of an object in the shredding schema. + public static final class ObjectField { + public final String fieldName; + public final VariantSchema schema; + + public ObjectField(String fieldName, VariantSchema schema) { + this.fieldName = fieldName; + this.schema = schema; + } + + @Override + public String toString() { + return "ObjectField{" + + "fieldName=" + fieldName + + ", schema=" + schema + + '}'; + } + } + + public abstract static class ScalarType { + } + + public static final class StringType extends ScalarType { + } + + public enum IntegralSize { + BYTE, SHORT, INT, LONG + } + + public static final class IntegralType extends ScalarType { + public final IntegralSize size; + + public IntegralType(IntegralSize size) { + this.size = size; + } + } + + public static final class FloatType extends ScalarType { + } + + public static final class DoubleType extends ScalarType { + } + + public static final class BooleanType extends ScalarType { + } + + public static final class BinaryType extends ScalarType { + } + + public static final class DecimalType extends ScalarType { + public final int precision; + public final int scale; + + public DecimalType(int precision, int scale) { + this.precision = precision; + this.scale = scale; + } + } + + public static final class DateType extends ScalarType { + } + + public static final class TimestampType extends ScalarType { + } + + public static final class TimestampNTZType extends ScalarType { + } + + public final int typedIdx; + public final int variantIdx; + public final int topLevelMetadataIdx; + public final int numFields; Review Comment: It will always be the first thing - i.e. 1, 2 or 3 depending on how many of `value`, `typed_value` and `metadata` are in the schema. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.GenericArrayData +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types._ +import org.apache.spark.types.variant._ +import org.apache.spark.unsafe.types._ + +case object SparkShreddingUtils { + def buildVariantSchema(schema: DataType): VariantSchema = { + schema match { + case s: StructType => buildVariantSchema(s, topLevel = true) + case _ => throw QueryCompilationErrors.invalidVariantSchema(schema) + } + } + + /** + * Given an expected schema of a Variant value, returns a suitable schema for shredding, by + * inserting appropriate intermediate value/typed_value fields at each level. + * For example, to represent the JSON {"a": 1, "b": "hello"}, + * the schema struct<a: int, b: string> could be passed into this function, and it would return + * the shredding scheme: + * struct< + * metadata: binary, + * value: binary, + * typed_value: struct< + * a: struct<typed_value: int, value: binary>, + * b: struct<typed_value: string, value: binary>>> + * + */ + def variantShreddingSchema(dataType: DataType, isTopLevel: Boolean = true): StructType = { + val fields = dataType match { + case ArrayType(elementType, containsNull) => + val arrayShreddingSchema = + ArrayType(variantShreddingSchema(elementType, false), containsNull) + Seq( + StructField("value", BinaryType, nullable = true), + StructField("typed_value", arrayShreddingSchema, nullable = true) + ) + case StructType(fields) => + val objectShreddingSchema = StructType(fields.map(f => + f.copy(dataType = variantShreddingSchema(f.dataType, false)))) + Seq( + StructField("value", BinaryType, nullable = true), + StructField("typed_value", objectShreddingSchema, nullable = true) + ) + case VariantType => + // For Variant, we don't need a typed column + Seq( + StructField("value", BinaryType, nullable = true) + ) + case _: NumericType | BooleanType | _: StringType | BinaryType | _: DatetimeType => + Seq( + StructField("value", BinaryType, nullable = true), + StructField("typed_value", dataType, nullable = true) + ) + case _ => + // No other types have a corresponding shreddings schema. + throw QueryCompilationErrors.invalidVariantSchema(dataType) + } + + if (isTopLevel) { + StructType(StructField("metadata", BinaryType, nullable = true) +: fields) Review Comment: You're right, I'll change it. ########## common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.ArrayList; + +/** + * Class to implement shredding a Variant value. + */ +public class VariantShreddingWriter { + + // Interface to build up a shredded result. Callers should implement a ShreddedResultBuilder + // to create an empty result. The castShredded method will call one or more of the add* methods + // to populate it. + public interface ShreddedResult { + // Create an array. The elements are the result of shredding each element. + void addArray(VariantSchema schema, ShreddedResult[] array); + // Create an object. The values are the result of shredding each field, order by the index in + // objectSchema. Missing fields are populated with an empty result. + void addObject(VariantSchema schema, ShreddedResult[] values); + void addVariantValue(VariantSchema schema, byte[] result); + // Add a scalar to typed_value. The type of Object depends on the scalarSchema in the shredding + // schema. + void addScalar(VariantSchema schema, Object result); + void addMetadata(VariantSchema schema, byte[] result); + } + + public interface ShreddedResultBuilder { + ShreddedResult createEmpty(VariantSchema schema); + + // If true, we will shred decimals to a different scale or to integers, as long as they are + // numerically equivalent. Similarly, integers will be allowed to shred to decimals. + boolean allowNumericScaleChanges(); + } + + /** + * Converts an input variant into shredded components. Returns the shredded result, as well + * as the original Variant with shredded fields removed. + * `dataType` must be a valid shredding schema, as described in common/variant/shredding.md. + */ + public static ShreddedResult castShredded( + Variant v, + VariantSchema schema, + ShreddedResultBuilder builder) { + VariantUtil.Type variantType = v.getType(); + ShreddedResult result = builder.createEmpty(schema); + + if (schema.topLevelMetadataIdx >= 0) { + result.addMetadata(schema, v.getMetadata()); + } + + if (schema.arraySchema != null && variantType == VariantUtil.Type.ARRAY) { + // The array element is always a struct containing untyped and typed fields. + VariantSchema elementSchema = schema.arraySchema; + int size = v.arraySize(); + ShreddedResult[] array = new ShreddedResult[size]; + for (int i = 0; i < size; ++i) { + ShreddedResult shreddedArray = castShredded(v.getElementAtIndex(i), elementSchema, builder); + array[i] = shreddedArray; + } + result.addArray(schema, array); + } else if (schema.objectSchema != null && variantType == VariantUtil.Type.OBJECT) { + VariantSchema.ObjectField[] objectSchema = schema.objectSchema; + ShreddedResult[] shreddedValues = new ShreddedResult[objectSchema.length]; + + // Create a variantBuilder for any mismatched fields. Review Comment: The latter. I'll reword it to clarify. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
