chenhao-db commented on code in PR #48851: URL: https://github.com/apache/spark/pull/48851#discussion_r1864423631
########## common/variant/src/main/java/org/apache/spark/types/variant/ShreddingUtils.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.math.BigDecimal; +import java.util.ArrayList; + +import static org.apache.spark.types.variant.VariantUtil.*; + +public class ShreddingUtils { + // Interface to read from a shredded result. It essentially has the same interface and semantics + // as Spark's `SpecializedGetters`, but we need a new interface to avoid the dependency. + public interface ShreddedRow { + boolean isNullAt(int ordinal); + boolean getBoolean(int ordinal); + byte getByte(int ordinal); + short getShort(int ordinal); + int getInt(int ordinal); + long getLong(int ordinal); + float getFloat(int ordinal); + double getDouble(int ordinal); + BigDecimal getDecimal(int ordinal, int precision, int scale); + String getString(int ordinal); + byte[] getBinary(int ordinal); + ShreddedRow getStruct(int ordinal, int numFields); + ShreddedRow getArray(int ordinal); + int numElements(); + } + + public static Variant rebuild(ShreddedRow row, VariantSchema schema) { + if (schema.topLevelMetadataIdx < 0 || row.isNullAt(schema.topLevelMetadataIdx)) { + throw malformedVariant(); + } + byte[] metadata = row.getBinary(schema.topLevelMetadataIdx); + if (schema.variantIdx >= 0 && schema.typedIdx < 0) { + // The variant is unshredded. We are not required to do anything special, but we can have an + // optimization to avoid `rebuild`. + if (row.isNullAt(schema.variantIdx)) { + throw malformedVariant(); + } + return new Variant(row.getBinary(schema.variantIdx), metadata); + } + VariantBuilder builder = new VariantBuilder(false); + rebuild(row, metadata, schema, builder); + return builder.result(); + } + + // Rebuild a variant value from the shredded data according to the reconstruction algorithm in + // https://github.com/apache/parquet-format/blob/master/VariantShredding.md. + // Append the result to `builder`. + private static void rebuild(ShreddedRow row, byte[] metadata, VariantSchema schema, + VariantBuilder builder) { + int typedIdx = schema.typedIdx; + int variantIdx = schema.variantIdx; + if (typedIdx >= 0 && !row.isNullAt(typedIdx)) { + if (schema.scalarSchema != null) { + VariantSchema.ScalarType scalar = schema.scalarSchema; + if (scalar instanceof VariantSchema.StringType) { + builder.appendString(row.getString(typedIdx)); + } else if (scalar instanceof VariantSchema.IntegralType) { + VariantSchema.IntegralType it = (VariantSchema.IntegralType) scalar; + long value = 0; + switch (it.size) { + case BYTE: + value = row.getByte(typedIdx); + break; + case SHORT: + value = row.getShort(typedIdx); + break; + case INT: + value = row.getInt(typedIdx); + break; + case LONG: + value = row.getLong(typedIdx); + break; + } + builder.appendLong(value); + } else if (scalar instanceof VariantSchema.FloatType) { + builder.appendFloat(row.getFloat(typedIdx)); + } else if (scalar instanceof VariantSchema.DoubleType) { + builder.appendDouble(row.getDouble(typedIdx)); + } else if (scalar instanceof VariantSchema.BooleanType) { + builder.appendBoolean(row.getBoolean(typedIdx)); + } else if (scalar instanceof VariantSchema.BinaryType) { + builder.appendBinary(row.getBinary(typedIdx)); + } else if (scalar instanceof VariantSchema.DecimalType) { + VariantSchema.DecimalType dt = (VariantSchema.DecimalType) scalar; + builder.appendDecimal(row.getDecimal(typedIdx, dt.precision, dt.scale)); + } else if (scalar instanceof VariantSchema.DateType) { + builder.appendDate(row.getInt(typedIdx)); + } else if (scalar instanceof VariantSchema.TimestampType) { + builder.appendTimestamp(row.getLong(typedIdx)); + } else { + assert scalar instanceof VariantSchema.TimestampNTZType; + builder.appendTimestampNtz(row.getLong(typedIdx)); + } + } else if (schema.arraySchema != null) { + VariantSchema elementSchema = schema.arraySchema; + ShreddedRow array = row.getArray(typedIdx); + int start = builder.getWritePos(); + ArrayList<Integer> offsets = new ArrayList<>(array.numElements()); + for (int i = 0; i < array.numElements(); i++) { + offsets.add(builder.getWritePos() - start); + rebuild(array.getStruct(i, elementSchema.numFields), metadata, elementSchema, builder); + } + builder.finishWritingArray(start, offsets); + } else { Review Comment: Similar to the previous one. If we really ever reaches the "error handling" part, it means other parts of the implementation is wrong. A malformed variant can never cause that. ########## common/variant/src/main/java/org/apache/spark/types/variant/ShreddingUtils.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.math.BigDecimal; +import java.util.ArrayList; + +import static org.apache.spark.types.variant.VariantUtil.*; + +public class ShreddingUtils { + // Interface to read from a shredded result. It essentially has the same interface and semantics + // as Spark's `SpecializedGetters`, but we need a new interface to avoid the dependency. + public interface ShreddedRow { + boolean isNullAt(int ordinal); + boolean getBoolean(int ordinal); + byte getByte(int ordinal); + short getShort(int ordinal); + int getInt(int ordinal); + long getLong(int ordinal); + float getFloat(int ordinal); + double getDouble(int ordinal); + BigDecimal getDecimal(int ordinal, int precision, int scale); + String getString(int ordinal); + byte[] getBinary(int ordinal); + ShreddedRow getStruct(int ordinal, int numFields); + ShreddedRow getArray(int ordinal); + int numElements(); + } + + public static Variant rebuild(ShreddedRow row, VariantSchema schema) { + if (schema.topLevelMetadataIdx < 0 || row.isNullAt(schema.topLevelMetadataIdx)) { + throw malformedVariant(); + } + byte[] metadata = row.getBinary(schema.topLevelMetadataIdx); + if (schema.variantIdx >= 0 && schema.typedIdx < 0) { + // The variant is unshredded. We are not required to do anything special, but we can have an + // optimization to avoid `rebuild`. + if (row.isNullAt(schema.variantIdx)) { + throw malformedVariant(); + } + return new Variant(row.getBinary(schema.variantIdx), metadata); + } + VariantBuilder builder = new VariantBuilder(false); + rebuild(row, metadata, schema, builder); + return builder.result(); + } + + // Rebuild a variant value from the shredded data according to the reconstruction algorithm in + // https://github.com/apache/parquet-format/blob/master/VariantShredding.md. + // Append the result to `builder`. + private static void rebuild(ShreddedRow row, byte[] metadata, VariantSchema schema, + VariantBuilder builder) { + int typedIdx = schema.typedIdx; + int variantIdx = schema.variantIdx; + if (typedIdx >= 0 && !row.isNullAt(typedIdx)) { + if (schema.scalarSchema != null) { + VariantSchema.ScalarType scalar = schema.scalarSchema; + if (scalar instanceof VariantSchema.StringType) { + builder.appendString(row.getString(typedIdx)); + } else if (scalar instanceof VariantSchema.IntegralType) { + VariantSchema.IntegralType it = (VariantSchema.IntegralType) scalar; + long value = 0; + switch (it.size) { + case BYTE: + value = row.getByte(typedIdx); + break; + case SHORT: + value = row.getShort(typedIdx); + break; + case INT: + value = row.getInt(typedIdx); + break; + case LONG: + value = row.getLong(typedIdx); + break; + } + builder.appendLong(value); + } else if (scalar instanceof VariantSchema.FloatType) { + builder.appendFloat(row.getFloat(typedIdx)); + } else if (scalar instanceof VariantSchema.DoubleType) { + builder.appendDouble(row.getDouble(typedIdx)); + } else if (scalar instanceof VariantSchema.BooleanType) { + builder.appendBoolean(row.getBoolean(typedIdx)); + } else if (scalar instanceof VariantSchema.BinaryType) { + builder.appendBinary(row.getBinary(typedIdx)); + } else if (scalar instanceof VariantSchema.DecimalType) { + VariantSchema.DecimalType dt = (VariantSchema.DecimalType) scalar; + builder.appendDecimal(row.getDecimal(typedIdx, dt.precision, dt.scale)); + } else if (scalar instanceof VariantSchema.DateType) { + builder.appendDate(row.getInt(typedIdx)); + } else if (scalar instanceof VariantSchema.TimestampType) { + builder.appendTimestamp(row.getLong(typedIdx)); + } else { + assert scalar instanceof VariantSchema.TimestampNTZType; + builder.appendTimestampNtz(row.getLong(typedIdx)); + } + } else if (schema.arraySchema != null) { + VariantSchema elementSchema = schema.arraySchema; + ShreddedRow array = row.getArray(typedIdx); + int start = builder.getWritePos(); + ArrayList<Integer> offsets = new ArrayList<>(array.numElements()); + for (int i = 0; i < array.numElements(); i++) { + offsets.add(builder.getWritePos() - start); + rebuild(array.getStruct(i, elementSchema.numFields), metadata, elementSchema, builder); + } + builder.finishWritingArray(start, offsets); + } else { + ShreddedRow object = row.getStruct(typedIdx, schema.objectSchema.length); + ArrayList<VariantBuilder.FieldEntry> fields = new ArrayList<>(); + int start = builder.getWritePos(); + for (int fieldIdx = 0; fieldIdx < schema.objectSchema.length; ++fieldIdx) { + String fieldName = schema.objectSchema[fieldIdx].fieldName; + VariantSchema fieldSchema = schema.objectSchema[fieldIdx].schema; + ShreddedRow fieldValue = object.getStruct(fieldIdx, fieldSchema.numFields); + // If the field doesn't have non-null `typed_value` or `value`, it is missing. + if ((fieldSchema.typedIdx >= 0 && !fieldValue.isNullAt(fieldSchema.typedIdx)) || + (fieldSchema.variantIdx >= 0 && !fieldValue.isNullAt(fieldSchema.variantIdx))) { + int id = builder.addKey(fieldName); + fields.add(new VariantBuilder.FieldEntry(fieldName, id, builder.getWritePos() - start)); + rebuild(fieldValue, metadata, fieldSchema, builder); + } + } + if (variantIdx >= 0 && !row.isNullAt(variantIdx)) { + // Add the leftover fields in the variant binary. + Variant v = new Variant(row.getBinary(variantIdx), metadata); + if (v.getType() != VariantUtil.Type.OBJECT) throw malformedVariant(); + for (int i = 0; i < v.objectSize(); ++i) { + Variant.ObjectField field = v.getFieldAtIndex(i); + int id = builder.addKey(field.key); + fields.add(new VariantBuilder.FieldEntry(field.key, id, builder.getWritePos() - start)); + builder.appendVariant(field.value); + } + } + builder.finishWritingObject(start, fields); + } + } else if (variantIdx >= 0 && !row.isNullAt(variantIdx)) { + builder.appendVariant(new Variant(row.getBinary(variantIdx), metadata)); Review Comment: This is when `typed_value` doesn't exist or is null. Though, I thought the code is pretty self-descriptive. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala: ########## @@ -19,12 +19,32 @@ package org.apache.spark.sql.execution.datasources.parquet import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.GenericArrayData +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types._ import org.apache.spark.types.variant._ import org.apache.spark.unsafe.types._ +case class SparkShreddedRow(row: SpecializedGetters) extends ShreddingUtils.ShreddedRow { + override def isNullAt(ordinal: Int): Boolean = row.isNullAt(ordinal) + override def getBoolean(ordinal: Int): Boolean = row.getBoolean(ordinal) + override def getByte(ordinal: Int): Byte = row.getByte(ordinal) + override def getShort(ordinal: Int): Short = row.getShort(ordinal) + override def getInt(ordinal: Int): Int = row.getInt(ordinal) + override def getLong(ordinal: Int): Long = row.getLong(ordinal) + override def getFloat(ordinal: Int): Float = row.getFloat(ordinal) + override def getDouble(ordinal: Int): Double = row.getDouble(ordinal) + override def getDecimal(ordinal: Int, precision: Int, scale: Int): java.math.BigDecimal = + row.getDecimal(ordinal, precision, scale).toJavaBigDecimal + override def getString(ordinal: Int): String = row.getUTF8String(ordinal).toString + override def getBinary(ordinal: Int): Array[Byte] = row.getBinary(ordinal) + override def getStruct(ordinal: Int, numFields: Int): SparkShreddedRow = + SparkShreddedRow(row.getStruct(ordinal, numFields)) + override def getArray(ordinal: Int): SparkShreddedRow = + SparkShreddedRow(row.getArray(ordinal)) + override def numElements(): Int = row.asInstanceOf[ArrayData].numElements() Review Comment: The caller should have checked the row represents an array before calling it. It is not limited to `numElements`, but all access functions have similar assumptions. This should never cause an error even if the input is somehow corrupted. ########## common/variant/src/main/java/org/apache/spark/types/variant/ShreddingUtils.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.math.BigDecimal; +import java.util.ArrayList; + +import static org.apache.spark.types.variant.VariantUtil.*; + +public class ShreddingUtils { + // Interface to read from a shredded result. It essentially has the same interface and semantics + // as Spark's `SpecializedGetters`, but we need a new interface to avoid the dependency. + public interface ShreddedRow { + boolean isNullAt(int ordinal); + boolean getBoolean(int ordinal); + byte getByte(int ordinal); + short getShort(int ordinal); + int getInt(int ordinal); + long getLong(int ordinal); + float getFloat(int ordinal); + double getDouble(int ordinal); + BigDecimal getDecimal(int ordinal, int precision, int scale); + String getString(int ordinal); + byte[] getBinary(int ordinal); + ShreddedRow getStruct(int ordinal, int numFields); + ShreddedRow getArray(int ordinal); + int numElements(); + } + + public static Variant rebuild(ShreddedRow row, VariantSchema schema) { + if (schema.topLevelMetadataIdx < 0 || row.isNullAt(schema.topLevelMetadataIdx)) { + throw malformedVariant(); + } + byte[] metadata = row.getBinary(schema.topLevelMetadataIdx); + if (schema.variantIdx >= 0 && schema.typedIdx < 0) { + // The variant is unshredded. We are not required to do anything special, but we can have an + // optimization to avoid `rebuild`. + if (row.isNullAt(schema.variantIdx)) { + throw malformedVariant(); + } + return new Variant(row.getBinary(schema.variantIdx), metadata); + } + VariantBuilder builder = new VariantBuilder(false); + rebuild(row, metadata, schema, builder); + return builder.result(); + } + + // Rebuild a variant value from the shredded data according to the reconstruction algorithm in + // https://github.com/apache/parquet-format/blob/master/VariantShredding.md. + // Append the result to `builder`. + private static void rebuild(ShreddedRow row, byte[] metadata, VariantSchema schema, + VariantBuilder builder) { + int typedIdx = schema.typedIdx; + int variantIdx = schema.variantIdx; + if (typedIdx >= 0 && !row.isNullAt(typedIdx)) { + if (schema.scalarSchema != null) { + VariantSchema.ScalarType scalar = schema.scalarSchema; + if (scalar instanceof VariantSchema.StringType) { + builder.appendString(row.getString(typedIdx)); + } else if (scalar instanceof VariantSchema.IntegralType) { + VariantSchema.IntegralType it = (VariantSchema.IntegralType) scalar; + long value = 0; + switch (it.size) { + case BYTE: + value = row.getByte(typedIdx); + break; + case SHORT: + value = row.getShort(typedIdx); + break; + case INT: + value = row.getInt(typedIdx); + break; + case LONG: + value = row.getLong(typedIdx); + break; + } + builder.appendLong(value); + } else if (scalar instanceof VariantSchema.FloatType) { + builder.appendFloat(row.getFloat(typedIdx)); + } else if (scalar instanceof VariantSchema.DoubleType) { + builder.appendDouble(row.getDouble(typedIdx)); + } else if (scalar instanceof VariantSchema.BooleanType) { + builder.appendBoolean(row.getBoolean(typedIdx)); + } else if (scalar instanceof VariantSchema.BinaryType) { + builder.appendBinary(row.getBinary(typedIdx)); + } else if (scalar instanceof VariantSchema.DecimalType) { + VariantSchema.DecimalType dt = (VariantSchema.DecimalType) scalar; + builder.appendDecimal(row.getDecimal(typedIdx, dt.precision, dt.scale)); + } else if (scalar instanceof VariantSchema.DateType) { + builder.appendDate(row.getInt(typedIdx)); + } else if (scalar instanceof VariantSchema.TimestampType) { + builder.appendTimestamp(row.getLong(typedIdx)); + } else { + assert scalar instanceof VariantSchema.TimestampNTZType; + builder.appendTimestampNtz(row.getLong(typedIdx)); + } + } else if (schema.arraySchema != null) { + VariantSchema elementSchema = schema.arraySchema; + ShreddedRow array = row.getArray(typedIdx); + int start = builder.getWritePos(); + ArrayList<Integer> offsets = new ArrayList<>(array.numElements()); + for (int i = 0; i < array.numElements(); i++) { + offsets.add(builder.getWritePos() - start); + rebuild(array.getStruct(i, elementSchema.numFields), metadata, elementSchema, builder); + } + builder.finishWritingArray(start, offsets); + } else { + ShreddedRow object = row.getStruct(typedIdx, schema.objectSchema.length); + ArrayList<VariantBuilder.FieldEntry> fields = new ArrayList<>(); + int start = builder.getWritePos(); + for (int fieldIdx = 0; fieldIdx < schema.objectSchema.length; ++fieldIdx) { + String fieldName = schema.objectSchema[fieldIdx].fieldName; + VariantSchema fieldSchema = schema.objectSchema[fieldIdx].schema; + ShreddedRow fieldValue = object.getStruct(fieldIdx, fieldSchema.numFields); + // If the field doesn't have non-null `typed_value` or `value`, it is missing. + if ((fieldSchema.typedIdx >= 0 && !fieldValue.isNullAt(fieldSchema.typedIdx)) || + (fieldSchema.variantIdx >= 0 && !fieldValue.isNullAt(fieldSchema.variantIdx))) { + int id = builder.addKey(fieldName); + fields.add(new VariantBuilder.FieldEntry(fieldName, id, builder.getWritePos() - start)); + rebuild(fieldValue, metadata, fieldSchema, builder); + } + } + if (variantIdx >= 0 && !row.isNullAt(variantIdx)) { + // Add the leftover fields in the variant binary. + Variant v = new Variant(row.getBinary(variantIdx), metadata); + if (v.getType() != VariantUtil.Type.OBJECT) throw malformedVariant(); + for (int i = 0; i < v.objectSize(); ++i) { + Variant.ObjectField field = v.getFieldAtIndex(i); + int id = builder.addKey(field.key); + fields.add(new VariantBuilder.FieldEntry(field.key, id, builder.getWritePos() - start)); Review Comment: Added. ########## common/variant/src/main/java/org/apache/spark/types/variant/ShreddingUtils.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.math.BigDecimal; +import java.util.ArrayList; + +import static org.apache.spark.types.variant.VariantUtil.*; + +public class ShreddingUtils { + // Interface to read from a shredded result. It essentially has the same interface and semantics + // as Spark's `SpecializedGetters`, but we need a new interface to avoid the dependency. + public interface ShreddedRow { + boolean isNullAt(int ordinal); + boolean getBoolean(int ordinal); + byte getByte(int ordinal); + short getShort(int ordinal); + int getInt(int ordinal); + long getLong(int ordinal); + float getFloat(int ordinal); + double getDouble(int ordinal); + BigDecimal getDecimal(int ordinal, int precision, int scale); + String getString(int ordinal); + byte[] getBinary(int ordinal); + ShreddedRow getStruct(int ordinal, int numFields); + ShreddedRow getArray(int ordinal); + int numElements(); + } + + public static Variant rebuild(ShreddedRow row, VariantSchema schema) { + if (schema.topLevelMetadataIdx < 0 || row.isNullAt(schema.topLevelMetadataIdx)) { + throw malformedVariant(); + } + byte[] metadata = row.getBinary(schema.topLevelMetadataIdx); + if (schema.variantIdx >= 0 && schema.typedIdx < 0) { + // The variant is unshredded. We are not required to do anything special, but we can have an + // optimization to avoid `rebuild`. + if (row.isNullAt(schema.variantIdx)) { + throw malformedVariant(); + } + return new Variant(row.getBinary(schema.variantIdx), metadata); + } + VariantBuilder builder = new VariantBuilder(false); + rebuild(row, metadata, schema, builder); + return builder.result(); + } + + // Rebuild a variant value from the shredded data according to the reconstruction algorithm in + // https://github.com/apache/parquet-format/blob/master/VariantShredding.md. + // Append the result to `builder`. + private static void rebuild(ShreddedRow row, byte[] metadata, VariantSchema schema, + VariantBuilder builder) { + int typedIdx = schema.typedIdx; + int variantIdx = schema.variantIdx; + if (typedIdx >= 0 && !row.isNullAt(typedIdx)) { + if (schema.scalarSchema != null) { + VariantSchema.ScalarType scalar = schema.scalarSchema; + if (scalar instanceof VariantSchema.StringType) { + builder.appendString(row.getString(typedIdx)); + } else if (scalar instanceof VariantSchema.IntegralType) { + VariantSchema.IntegralType it = (VariantSchema.IntegralType) scalar; + long value = 0; + switch (it.size) { + case BYTE: + value = row.getByte(typedIdx); + break; + case SHORT: + value = row.getShort(typedIdx); + break; + case INT: + value = row.getInt(typedIdx); + break; + case LONG: + value = row.getLong(typedIdx); + break; + } + builder.appendLong(value); + } else if (scalar instanceof VariantSchema.FloatType) { + builder.appendFloat(row.getFloat(typedIdx)); + } else if (scalar instanceof VariantSchema.DoubleType) { + builder.appendDouble(row.getDouble(typedIdx)); + } else if (scalar instanceof VariantSchema.BooleanType) { + builder.appendBoolean(row.getBoolean(typedIdx)); + } else if (scalar instanceof VariantSchema.BinaryType) { + builder.appendBinary(row.getBinary(typedIdx)); + } else if (scalar instanceof VariantSchema.DecimalType) { + VariantSchema.DecimalType dt = (VariantSchema.DecimalType) scalar; + builder.appendDecimal(row.getDecimal(typedIdx, dt.precision, dt.scale)); + } else if (scalar instanceof VariantSchema.DateType) { + builder.appendDate(row.getInt(typedIdx)); + } else if (scalar instanceof VariantSchema.TimestampType) { + builder.appendTimestamp(row.getLong(typedIdx)); + } else { Review Comment: It doesn't need to be. `scalar` must be one of them, which is guaranteed before the execution and doesn't depend on the input data. ########## sql/core/src/test/scala/org/apache/spark/sql/VariantShreddingSuite.scala: ########## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File +import java.sql.{Date, Timestamp} +import java.time.LocalDateTime + +import org.apache.spark.sql.execution.datasources.parquet.ParquetTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ +import org.apache.spark.types.variant._ + +class VariantShreddingSuite extends QueryTest with SharedSparkSession with ParquetTest { + // Make a variant value binary by parsing a JSON string. + def value(s: String): Array[Byte] = VariantBuilder.parseJson(s, false).getValue + + // Make a variant metadata binary that includes a set of keys. + def metadata(keys: Seq[String]): Array[Byte] = { + val builder = new VariantBuilder(false) + keys.foreach(builder.addKey) + builder.result().getMetadata + } + + // Make a variant value binary by parsing a JSON string on a pre-defined metadata key set. + def shreddedValue(s: String, metadataKeys: Seq[String]): Array[Byte] = { + val builder = new VariantBuilder(false) + metadataKeys.foreach(builder.addKey) + builder.appendVariant(VariantBuilder.parseJson(s, false)) + builder.result().getValue + } + + def checkRead(path: File, expected: Seq[String]): Unit = { + withAllParquetReaders { + val df = spark.read.schema("v variant").parquet(path.getAbsolutePath) + .selectExpr("to_json(v)") + checkAnswer(df, expected.map(Row(_))) + } + } + + test("scalar types rebuild") { + val scalarTypes = Array( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, + TimestampType, TimestampNTZType, DateType, + StringType, BinaryType, + DecimalType(9, 3), DecimalType(18, 6), DecimalType(22, 9)) + val obj = StructType(scalarTypes.zipWithIndex.map { case (t, i) => + StructField(i.toString, StructType(Array(StructField("typed_value", t)))) + }) + val v = StructType(Array( + StructField("typed_value", obj), + StructField("value", BinaryType), + StructField("metadata", BinaryType) + )) + + val values = Seq[Any]( + true, 1.toByte, 2.toShort, 3, 4L, 5.5F, 6.6, + new Timestamp(7), LocalDateTime.of(1, 1, 1, 0, 0, 8, 0), new Date(9), + "str10", Array[Byte](11), + Decimal("12.12"), Decimal("13.13"), Decimal("14.14")) + val row = Row(Row.fromSeq(values.map(Row(_))), null, + metadata(scalarTypes.indices.map(_.toString))) + + withTempPath { path => + spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row(row))), + StructType(Array(StructField("v", v)))) + .write.parquet(path.getAbsolutePath) + for (tz <- Seq("Etc/UTC", "America/Los_Angeles")) { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> tz) { + val timestamp = if (tz == "Etc/UTC") { + "1970-01-01 00:00:00.007+00:00" + } else { + "1969-12-31 16:00:00.007-08:00" + } + checkRead(path, Seq( + """{"0":true,"1":1,"10":"str10","11":"Cw==","12":12.12,"13":13.13,"14":14.14,""" + + s""""2":2,"3":3,"4":4,"5":5.5,"6":6.6,"7":"$timestamp",""" + + """"8":"0001-01-01 00:00:08","9":"1969-12-31"}""")) + } + } + } + } + + test("object rebuild") { + val schema = StructType.fromDDL("v struct<metadata binary, value binary, " + + "typed_value struct<b struct<typed_value int, value binary>, " + + "d struct<typed_value int, value binary>>>") + withTempPath { path => + spark.createDataFrame(spark.sparkContext.parallelize(Seq( + Row(metadata(Seq("b", "d")), null, Row(Row(1, null), Row(null, null))), + Row(metadata(Seq("b", "d")), null, Row(Row(1, null), Row(null, value("null")))), + Row(metadata(Seq("a", "b", "c", "d")), + shreddedValue("""{"a": 1, "c": 3}""", Seq("a", "b", "c", "d")), + Row(Row(2, null), Row(null, value("4")))), Review Comment: There is no difference. Putting 4 in variant value is also valid input data, though the writer may not write such data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
