gene-db commented on code in PR #48851: URL: https://github.com/apache/spark/pull/48851#discussion_r1857230795
########## common/variant/src/main/java/org/apache/spark/types/variant/ShreddingUtils.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.math.BigDecimal; +import java.util.ArrayList; + +import static org.apache.spark.types.variant.VariantUtil.*; + +public class ShreddingUtils { + // Interface to read from a shredded result. It essentially has the same interface and semantics + // as Spark's `SpecializedGetters`, but we need a new interface to avoid the dependency. + public interface ShreddedRow { + boolean isNullAt(int ordinal); + boolean getBoolean(int ordinal); + byte getByte(int ordinal); + short getShort(int ordinal); + int getInt(int ordinal); + long getLong(int ordinal); + float getFloat(int ordinal); + double getDouble(int ordinal); + BigDecimal getDecimal(int ordinal, int precision, int scale); + String getString(int ordinal); + byte[] getBinary(int ordinal); + ShreddedRow getStruct(int ordinal, int numFields); + ShreddedRow getArray(int ordinal); + int numElements(); + } + + public static Variant rebuild(ShreddedRow row, VariantSchema schema) { + if (schema.topLevelMetadataIdx < 0 || row.isNullAt(schema.topLevelMetadataIdx)) { + throw malformedVariant(); + } + byte[] metadata = row.getBinary(schema.topLevelMetadataIdx); + if (schema.variantIdx >= 0 && schema.typedIdx < 0) { + // The variant is unshredded. We are not required to do anything special, but we can have an + // optimization to avoid `rebuild`. + if (row.isNullAt(schema.variantIdx)) { + throw malformedVariant(); + } + return new Variant(row.getBinary(schema.variantIdx), metadata); + } + VariantBuilder builder = new VariantBuilder(false); + rebuild(row, metadata, schema, builder); + return builder.result(); + } + + // Rebuild a variant value from the shredded data according to the reconstruction algorithm in + // https://github.com/apache/parquet-format/blob/master/VariantShredding.md. + // Append the result to `builder`. + private static void rebuild(ShreddedRow row, byte[] metadata, VariantSchema schema, + VariantBuilder builder) { + int typedIdx = schema.typedIdx; + int variantIdx = schema.variantIdx; + if (typedIdx >= 0 && !row.isNullAt(typedIdx)) { + if (schema.scalarSchema != null) { + VariantSchema.ScalarType scalar = schema.scalarSchema; + if (scalar instanceof VariantSchema.StringType) { + builder.appendString(row.getString(typedIdx)); + } else if (scalar instanceof VariantSchema.IntegralType) { + VariantSchema.IntegralType it = (VariantSchema.IntegralType) scalar; + long value = 0; + switch (it.size) { + case BYTE: + value = row.getByte(typedIdx); + break; + case SHORT: + value = row.getShort(typedIdx); + break; + case INT: + value = row.getInt(typedIdx); + break; + case LONG: + value = row.getLong(typedIdx); + break; + } + builder.appendLong(value); + } else if (scalar instanceof VariantSchema.FloatType) { + builder.appendFloat(row.getFloat(typedIdx)); + } else if (scalar instanceof VariantSchema.DoubleType) { + builder.appendDouble(row.getDouble(typedIdx)); + } else if (scalar instanceof VariantSchema.BooleanType) { + builder.appendBoolean(row.getBoolean(typedIdx)); + } else if (scalar instanceof VariantSchema.BinaryType) { + builder.appendBinary(row.getBinary(typedIdx)); + } else if (scalar instanceof VariantSchema.DecimalType) { + VariantSchema.DecimalType dt = (VariantSchema.DecimalType) scalar; + builder.appendDecimal(row.getDecimal(typedIdx, dt.precision, dt.scale)); + } else if (scalar instanceof VariantSchema.DateType) { + builder.appendDate(row.getInt(typedIdx)); + } else if (scalar instanceof VariantSchema.TimestampType) { + builder.appendTimestamp(row.getLong(typedIdx)); + } else { Review Comment: Can this be ``` } else if (scalar instanceof VariantSchema.TimestampNTZType) { builder.appendTimestampNtz(row.getLong(typedIdx)); } else { // error handling } ``` Should the error handling ultimately return a malformed variant exception? RIght now, it just crashes? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkShreddingUtils.scala: ########## @@ -19,12 +19,32 @@ package org.apache.spark.sql.execution.datasources.parquet import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.GenericArrayData +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types._ import org.apache.spark.types.variant._ import org.apache.spark.unsafe.types._ +case class SparkShreddedRow(row: SpecializedGetters) extends ShreddingUtils.ShreddedRow { + override def isNullAt(ordinal: Int): Boolean = row.isNullAt(ordinal) + override def getBoolean(ordinal: Int): Boolean = row.getBoolean(ordinal) + override def getByte(ordinal: Int): Byte = row.getByte(ordinal) + override def getShort(ordinal: Int): Short = row.getShort(ordinal) + override def getInt(ordinal: Int): Int = row.getInt(ordinal) + override def getLong(ordinal: Int): Long = row.getLong(ordinal) + override def getFloat(ordinal: Int): Float = row.getFloat(ordinal) + override def getDouble(ordinal: Int): Double = row.getDouble(ordinal) + override def getDecimal(ordinal: Int, precision: Int, scale: Int): java.math.BigDecimal = + row.getDecimal(ordinal, precision, scale).toJavaBigDecimal + override def getString(ordinal: Int): String = row.getUTF8String(ordinal).toString + override def getBinary(ordinal: Int): Array[Byte] = row.getBinary(ordinal) + override def getStruct(ordinal: Int, numFields: Int): SparkShreddedRow = + SparkShreddedRow(row.getStruct(ordinal, numFields)) + override def getArray(ordinal: Int): SparkShreddedRow = + SparkShreddedRow(row.getArray(ordinal)) + override def numElements(): Int = row.asInstanceOf[ArrayData].numElements() Review Comment: How is `row` guaranteed to be `ArrayData`? ########## common/variant/src/main/java/org/apache/spark/types/variant/ShreddingUtils.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.math.BigDecimal; +import java.util.ArrayList; + +import static org.apache.spark.types.variant.VariantUtil.*; + +public class ShreddingUtils { + // Interface to read from a shredded result. It essentially has the same interface and semantics + // as Spark's `SpecializedGetters`, but we need a new interface to avoid the dependency. + public interface ShreddedRow { + boolean isNullAt(int ordinal); + boolean getBoolean(int ordinal); + byte getByte(int ordinal); + short getShort(int ordinal); + int getInt(int ordinal); + long getLong(int ordinal); + float getFloat(int ordinal); + double getDouble(int ordinal); + BigDecimal getDecimal(int ordinal, int precision, int scale); + String getString(int ordinal); + byte[] getBinary(int ordinal); + ShreddedRow getStruct(int ordinal, int numFields); + ShreddedRow getArray(int ordinal); + int numElements(); + } + + public static Variant rebuild(ShreddedRow row, VariantSchema schema) { + if (schema.topLevelMetadataIdx < 0 || row.isNullAt(schema.topLevelMetadataIdx)) { + throw malformedVariant(); + } + byte[] metadata = row.getBinary(schema.topLevelMetadataIdx); + if (schema.variantIdx >= 0 && schema.typedIdx < 0) { + // The variant is unshredded. We are not required to do anything special, but we can have an + // optimization to avoid `rebuild`. + if (row.isNullAt(schema.variantIdx)) { + throw malformedVariant(); + } + return new Variant(row.getBinary(schema.variantIdx), metadata); + } + VariantBuilder builder = new VariantBuilder(false); + rebuild(row, metadata, schema, builder); + return builder.result(); + } + + // Rebuild a variant value from the shredded data according to the reconstruction algorithm in + // https://github.com/apache/parquet-format/blob/master/VariantShredding.md. + // Append the result to `builder`. + private static void rebuild(ShreddedRow row, byte[] metadata, VariantSchema schema, + VariantBuilder builder) { + int typedIdx = schema.typedIdx; + int variantIdx = schema.variantIdx; + if (typedIdx >= 0 && !row.isNullAt(typedIdx)) { + if (schema.scalarSchema != null) { + VariantSchema.ScalarType scalar = schema.scalarSchema; + if (scalar instanceof VariantSchema.StringType) { + builder.appendString(row.getString(typedIdx)); + } else if (scalar instanceof VariantSchema.IntegralType) { + VariantSchema.IntegralType it = (VariantSchema.IntegralType) scalar; + long value = 0; + switch (it.size) { + case BYTE: + value = row.getByte(typedIdx); + break; + case SHORT: + value = row.getShort(typedIdx); + break; + case INT: + value = row.getInt(typedIdx); + break; + case LONG: + value = row.getLong(typedIdx); + break; + } + builder.appendLong(value); + } else if (scalar instanceof VariantSchema.FloatType) { + builder.appendFloat(row.getFloat(typedIdx)); + } else if (scalar instanceof VariantSchema.DoubleType) { + builder.appendDouble(row.getDouble(typedIdx)); + } else if (scalar instanceof VariantSchema.BooleanType) { + builder.appendBoolean(row.getBoolean(typedIdx)); + } else if (scalar instanceof VariantSchema.BinaryType) { + builder.appendBinary(row.getBinary(typedIdx)); + } else if (scalar instanceof VariantSchema.DecimalType) { + VariantSchema.DecimalType dt = (VariantSchema.DecimalType) scalar; + builder.appendDecimal(row.getDecimal(typedIdx, dt.precision, dt.scale)); + } else if (scalar instanceof VariantSchema.DateType) { + builder.appendDate(row.getInt(typedIdx)); + } else if (scalar instanceof VariantSchema.TimestampType) { + builder.appendTimestamp(row.getLong(typedIdx)); + } else { + assert scalar instanceof VariantSchema.TimestampNTZType; + builder.appendTimestampNtz(row.getLong(typedIdx)); + } + } else if (schema.arraySchema != null) { + VariantSchema elementSchema = schema.arraySchema; + ShreddedRow array = row.getArray(typedIdx); + int start = builder.getWritePos(); + ArrayList<Integer> offsets = new ArrayList<>(array.numElements()); + for (int i = 0; i < array.numElements(); i++) { + offsets.add(builder.getWritePos() - start); + rebuild(array.getStruct(i, elementSchema.numFields), metadata, elementSchema, builder); + } + builder.finishWritingArray(start, offsets); + } else { + ShreddedRow object = row.getStruct(typedIdx, schema.objectSchema.length); + ArrayList<VariantBuilder.FieldEntry> fields = new ArrayList<>(); + int start = builder.getWritePos(); + for (int fieldIdx = 0; fieldIdx < schema.objectSchema.length; ++fieldIdx) { + String fieldName = schema.objectSchema[fieldIdx].fieldName; + VariantSchema fieldSchema = schema.objectSchema[fieldIdx].schema; + ShreddedRow fieldValue = object.getStruct(fieldIdx, fieldSchema.numFields); + // If the field doesn't have non-null `typed_value` or `value`, it is missing. + if ((fieldSchema.typedIdx >= 0 && !fieldValue.isNullAt(fieldSchema.typedIdx)) || + (fieldSchema.variantIdx >= 0 && !fieldValue.isNullAt(fieldSchema.variantIdx))) { + int id = builder.addKey(fieldName); + fields.add(new VariantBuilder.FieldEntry(fieldName, id, builder.getWritePos() - start)); + rebuild(fieldValue, metadata, fieldSchema, builder); + } + } + if (variantIdx >= 0 && !row.isNullAt(variantIdx)) { + // Add the leftover fields in the variant binary. + Variant v = new Variant(row.getBinary(variantIdx), metadata); + if (v.getType() != VariantUtil.Type.OBJECT) throw malformedVariant(); + for (int i = 0; i < v.objectSize(); ++i) { + Variant.ObjectField field = v.getFieldAtIndex(i); + int id = builder.addKey(field.key); + fields.add(new VariantBuilder.FieldEntry(field.key, id, builder.getWritePos() - start)); Review Comment: Don't we have to check to see if there are no duplicate fields in the variant blob? The spec says the shredded field must overwrite the encoded blob field. ########## common/variant/src/main/java/org/apache/spark/types/variant/ShreddingUtils.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.math.BigDecimal; +import java.util.ArrayList; + +import static org.apache.spark.types.variant.VariantUtil.*; + +public class ShreddingUtils { + // Interface to read from a shredded result. It essentially has the same interface and semantics + // as Spark's `SpecializedGetters`, but we need a new interface to avoid the dependency. + public interface ShreddedRow { + boolean isNullAt(int ordinal); + boolean getBoolean(int ordinal); + byte getByte(int ordinal); + short getShort(int ordinal); + int getInt(int ordinal); + long getLong(int ordinal); + float getFloat(int ordinal); + double getDouble(int ordinal); + BigDecimal getDecimal(int ordinal, int precision, int scale); + String getString(int ordinal); + byte[] getBinary(int ordinal); + ShreddedRow getStruct(int ordinal, int numFields); + ShreddedRow getArray(int ordinal); + int numElements(); + } + + public static Variant rebuild(ShreddedRow row, VariantSchema schema) { + if (schema.topLevelMetadataIdx < 0 || row.isNullAt(schema.topLevelMetadataIdx)) { + throw malformedVariant(); + } + byte[] metadata = row.getBinary(schema.topLevelMetadataIdx); + if (schema.variantIdx >= 0 && schema.typedIdx < 0) { + // The variant is unshredded. We are not required to do anything special, but we can have an + // optimization to avoid `rebuild`. + if (row.isNullAt(schema.variantIdx)) { + throw malformedVariant(); + } + return new Variant(row.getBinary(schema.variantIdx), metadata); + } + VariantBuilder builder = new VariantBuilder(false); + rebuild(row, metadata, schema, builder); + return builder.result(); + } + + // Rebuild a variant value from the shredded data according to the reconstruction algorithm in + // https://github.com/apache/parquet-format/blob/master/VariantShredding.md. + // Append the result to `builder`. + private static void rebuild(ShreddedRow row, byte[] metadata, VariantSchema schema, + VariantBuilder builder) { + int typedIdx = schema.typedIdx; + int variantIdx = schema.variantIdx; + if (typedIdx >= 0 && !row.isNullAt(typedIdx)) { + if (schema.scalarSchema != null) { + VariantSchema.ScalarType scalar = schema.scalarSchema; + if (scalar instanceof VariantSchema.StringType) { + builder.appendString(row.getString(typedIdx)); + } else if (scalar instanceof VariantSchema.IntegralType) { + VariantSchema.IntegralType it = (VariantSchema.IntegralType) scalar; + long value = 0; + switch (it.size) { + case BYTE: + value = row.getByte(typedIdx); + break; + case SHORT: + value = row.getShort(typedIdx); + break; + case INT: + value = row.getInt(typedIdx); + break; + case LONG: + value = row.getLong(typedIdx); + break; + } + builder.appendLong(value); + } else if (scalar instanceof VariantSchema.FloatType) { + builder.appendFloat(row.getFloat(typedIdx)); + } else if (scalar instanceof VariantSchema.DoubleType) { + builder.appendDouble(row.getDouble(typedIdx)); + } else if (scalar instanceof VariantSchema.BooleanType) { + builder.appendBoolean(row.getBoolean(typedIdx)); + } else if (scalar instanceof VariantSchema.BinaryType) { + builder.appendBinary(row.getBinary(typedIdx)); + } else if (scalar instanceof VariantSchema.DecimalType) { + VariantSchema.DecimalType dt = (VariantSchema.DecimalType) scalar; + builder.appendDecimal(row.getDecimal(typedIdx, dt.precision, dt.scale)); + } else if (scalar instanceof VariantSchema.DateType) { + builder.appendDate(row.getInt(typedIdx)); + } else if (scalar instanceof VariantSchema.TimestampType) { + builder.appendTimestamp(row.getLong(typedIdx)); + } else { + assert scalar instanceof VariantSchema.TimestampNTZType; + builder.appendTimestampNtz(row.getLong(typedIdx)); + } + } else if (schema.arraySchema != null) { + VariantSchema elementSchema = schema.arraySchema; + ShreddedRow array = row.getArray(typedIdx); + int start = builder.getWritePos(); + ArrayList<Integer> offsets = new ArrayList<>(array.numElements()); + for (int i = 0; i < array.numElements(); i++) { + offsets.add(builder.getWritePos() - start); + rebuild(array.getStruct(i, elementSchema.numFields), metadata, elementSchema, builder); + } + builder.finishWritingArray(start, offsets); + } else { Review Comment: Is this the object case? Should we explicitly check for ``` } else if (schema.objectSchema != null) { ... } else { // error handling } ``` And the error handling should be malformed variant, right? ########## common/variant/src/main/java/org/apache/spark/types/variant/ShreddingUtils.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.math.BigDecimal; +import java.util.ArrayList; + +import static org.apache.spark.types.variant.VariantUtil.*; + +public class ShreddingUtils { + // Interface to read from a shredded result. It essentially has the same interface and semantics + // as Spark's `SpecializedGetters`, but we need a new interface to avoid the dependency. + public interface ShreddedRow { + boolean isNullAt(int ordinal); + boolean getBoolean(int ordinal); + byte getByte(int ordinal); + short getShort(int ordinal); + int getInt(int ordinal); + long getLong(int ordinal); + float getFloat(int ordinal); + double getDouble(int ordinal); + BigDecimal getDecimal(int ordinal, int precision, int scale); + String getString(int ordinal); + byte[] getBinary(int ordinal); + ShreddedRow getStruct(int ordinal, int numFields); + ShreddedRow getArray(int ordinal); + int numElements(); + } + + public static Variant rebuild(ShreddedRow row, VariantSchema schema) { + if (schema.topLevelMetadataIdx < 0 || row.isNullAt(schema.topLevelMetadataIdx)) { + throw malformedVariant(); + } + byte[] metadata = row.getBinary(schema.topLevelMetadataIdx); + if (schema.variantIdx >= 0 && schema.typedIdx < 0) { + // The variant is unshredded. We are not required to do anything special, but we can have an + // optimization to avoid `rebuild`. + if (row.isNullAt(schema.variantIdx)) { + throw malformedVariant(); + } + return new Variant(row.getBinary(schema.variantIdx), metadata); + } + VariantBuilder builder = new VariantBuilder(false); + rebuild(row, metadata, schema, builder); + return builder.result(); + } + + // Rebuild a variant value from the shredded data according to the reconstruction algorithm in + // https://github.com/apache/parquet-format/blob/master/VariantShredding.md. + // Append the result to `builder`. + private static void rebuild(ShreddedRow row, byte[] metadata, VariantSchema schema, + VariantBuilder builder) { + int typedIdx = schema.typedIdx; + int variantIdx = schema.variantIdx; + if (typedIdx >= 0 && !row.isNullAt(typedIdx)) { + if (schema.scalarSchema != null) { + VariantSchema.ScalarType scalar = schema.scalarSchema; + if (scalar instanceof VariantSchema.StringType) { + builder.appendString(row.getString(typedIdx)); + } else if (scalar instanceof VariantSchema.IntegralType) { + VariantSchema.IntegralType it = (VariantSchema.IntegralType) scalar; + long value = 0; + switch (it.size) { + case BYTE: + value = row.getByte(typedIdx); + break; + case SHORT: + value = row.getShort(typedIdx); + break; + case INT: + value = row.getInt(typedIdx); + break; + case LONG: + value = row.getLong(typedIdx); + break; + } + builder.appendLong(value); + } else if (scalar instanceof VariantSchema.FloatType) { + builder.appendFloat(row.getFloat(typedIdx)); + } else if (scalar instanceof VariantSchema.DoubleType) { + builder.appendDouble(row.getDouble(typedIdx)); + } else if (scalar instanceof VariantSchema.BooleanType) { + builder.appendBoolean(row.getBoolean(typedIdx)); + } else if (scalar instanceof VariantSchema.BinaryType) { + builder.appendBinary(row.getBinary(typedIdx)); + } else if (scalar instanceof VariantSchema.DecimalType) { + VariantSchema.DecimalType dt = (VariantSchema.DecimalType) scalar; + builder.appendDecimal(row.getDecimal(typedIdx, dt.precision, dt.scale)); + } else if (scalar instanceof VariantSchema.DateType) { + builder.appendDate(row.getInt(typedIdx)); + } else if (scalar instanceof VariantSchema.TimestampType) { + builder.appendTimestamp(row.getLong(typedIdx)); + } else { + assert scalar instanceof VariantSchema.TimestampNTZType; + builder.appendTimestampNtz(row.getLong(typedIdx)); + } + } else if (schema.arraySchema != null) { + VariantSchema elementSchema = schema.arraySchema; + ShreddedRow array = row.getArray(typedIdx); + int start = builder.getWritePos(); + ArrayList<Integer> offsets = new ArrayList<>(array.numElements()); + for (int i = 0; i < array.numElements(); i++) { + offsets.add(builder.getWritePos() - start); + rebuild(array.getStruct(i, elementSchema.numFields), metadata, elementSchema, builder); + } + builder.finishWritingArray(start, offsets); + } else { + ShreddedRow object = row.getStruct(typedIdx, schema.objectSchema.length); + ArrayList<VariantBuilder.FieldEntry> fields = new ArrayList<>(); + int start = builder.getWritePos(); + for (int fieldIdx = 0; fieldIdx < schema.objectSchema.length; ++fieldIdx) { + String fieldName = schema.objectSchema[fieldIdx].fieldName; + VariantSchema fieldSchema = schema.objectSchema[fieldIdx].schema; + ShreddedRow fieldValue = object.getStruct(fieldIdx, fieldSchema.numFields); + // If the field doesn't have non-null `typed_value` or `value`, it is missing. + if ((fieldSchema.typedIdx >= 0 && !fieldValue.isNullAt(fieldSchema.typedIdx)) || + (fieldSchema.variantIdx >= 0 && !fieldValue.isNullAt(fieldSchema.variantIdx))) { + int id = builder.addKey(fieldName); + fields.add(new VariantBuilder.FieldEntry(fieldName, id, builder.getWritePos() - start)); + rebuild(fieldValue, metadata, fieldSchema, builder); + } + } + if (variantIdx >= 0 && !row.isNullAt(variantIdx)) { + // Add the leftover fields in the variant binary. + Variant v = new Variant(row.getBinary(variantIdx), metadata); + if (v.getType() != VariantUtil.Type.OBJECT) throw malformedVariant(); + for (int i = 0; i < v.objectSize(); ++i) { + Variant.ObjectField field = v.getFieldAtIndex(i); + int id = builder.addKey(field.key); + fields.add(new VariantBuilder.FieldEntry(field.key, id, builder.getWritePos() - start)); + builder.appendVariant(field.value); + } + } + builder.finishWritingObject(start, fields); + } + } else if (variantIdx >= 0 && !row.isNullAt(variantIdx)) { + builder.appendVariant(new Variant(row.getBinary(variantIdx), metadata)); Review Comment: We should add a comment here. This is when there is no `typed_value`, and only the `value` parquet column? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
