chenhao-db commented on code in PR #48779: URL: https://github.com/apache/spark/pull/48779#discussion_r1831601108
########## common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java: ########## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.List; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Class to implement shredding a Variant value. + */ +public class VariantShreddingWriter { + + // Interface to build up a shredded result. Callers should implement a ShreddedResultBuilder + // to create an empty result. The castShredded method will call one or more of the add* methods + // to populate it. + public interface ShreddedResult { + // Create an array. The elements are the result of shredding each element. + void addArray(VariantSchema schema, List<ShreddedResult> array); + // Create an object. The values are the result of shredding each field, order by the index in + // objectSchema. Missing fields are populated with an empty result. + void addObject(VariantSchema schema, ShreddedResult[] values); + void addVariantValue(VariantSchema schema, byte[] result); + // Add a scalar to typed_value. The type of Object depends on the scalarSchema in the shredding + // schema. + void addScalar(VariantSchema schema, Object result); + void addMetadata(VariantSchema schema, byte[] result); + } + + public interface ShreddedResultBuilder { + ShreddedResult createEmpty(VariantSchema schema); + + // If true, we will shred decimals to a different scale or to integers, as long as they are + // numerically equivalent. Similarly, integers will be allowed to shred to decimals. + boolean allowNumericScaleChanges(); + } + + /** + * Converts an input variant into shredded components. Returns the shredded result, as well + * as the original Variant with shredded fields removed. + * `dataType` must be a valid shredding schema, as described in common/variant/shredding.md. + */ + public static ShreddedResult castShredded( + Variant v, + VariantSchema schema, + ShreddedResultBuilder builder) { + VariantUtil.Type variantType = v.getType(); + ShreddedResult result = builder.createEmpty(schema); + + if (schema.topLevelMetadataIdx >= 0) { + result.addMetadata(schema, v.getMetadata()); + } + + if (schema.arraySchema != null && variantType == VariantUtil.Type.ARRAY) { + // The array element is always a struct containing untyped and typed fields. + VariantSchema elementSchema = schema.arraySchema; + int size = v.arraySize(); + ArrayList<ShreddedResult> array = new ArrayList<>(size); + for (int i = 0; i < size; ++i) { + ShreddedResult shreddedArray = castShredded(v.getElementAtIndex(i), elementSchema, builder); + array.add(shreddedArray); + } + result.addArray(schema, array); + } else if (schema.objectSchema != null && variantType == VariantUtil.Type.OBJECT) { + Map<String, VariantSchema.ObjectField> objectSchema = schema.objectSchema; + int maxSize = Math.min(v.objectSize(), objectSchema.size()); + ShreddedResult[] shreddedValues = new ShreddedResult[objectSchema.size()]; + + // Create a variantBuilder for any mismatched fields. + VariantBuilder variantBuilder = new VariantBuilder(false); + ArrayList<VariantBuilder.FieldEntry> fieldEntries = new ArrayList<>(); + // Keep track of which schema fields we actually found in the Variant value. + Set<String> presentKeys = new HashSet<String>(); Review Comment: I think it is more efficient if you use the nullness of `shreddedValues[i]` to determine whether a field presents. You may need an ordered list of fields in `VariantSchema` to use this approach. ########## common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java: ########## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.List; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Class to implement shredding a Variant value. + */ +public class VariantShreddingWriter { + + // Interface to build up a shredded result. Callers should implement a ShreddedResultBuilder + // to create an empty result. The castShredded method will call one or more of the add* methods + // to populate it. + public interface ShreddedResult { + // Create an array. The elements are the result of shredding each element. + void addArray(VariantSchema schema, List<ShreddedResult> array); Review Comment: I think using array here may be more efficient. ########## common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java: ########## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.List; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Class to implement shredding a Variant value. + */ +public class VariantShreddingWriter { + + // Interface to build up a shredded result. Callers should implement a ShreddedResultBuilder + // to create an empty result. The castShredded method will call one or more of the add* methods + // to populate it. + public interface ShreddedResult { + // Create an array. The elements are the result of shredding each element. + void addArray(VariantSchema schema, List<ShreddedResult> array); + // Create an object. The values are the result of shredding each field, order by the index in + // objectSchema. Missing fields are populated with an empty result. + void addObject(VariantSchema schema, ShreddedResult[] values); + void addVariantValue(VariantSchema schema, byte[] result); + // Add a scalar to typed_value. The type of Object depends on the scalarSchema in the shredding + // schema. + void addScalar(VariantSchema schema, Object result); + void addMetadata(VariantSchema schema, byte[] result); + } + + public interface ShreddedResultBuilder { + ShreddedResult createEmpty(VariantSchema schema); + + // If true, we will shred decimals to a different scale or to integers, as long as they are + // numerically equivalent. Similarly, integers will be allowed to shred to decimals. + boolean allowNumericScaleChanges(); + } + + /** + * Converts an input variant into shredded components. Returns the shredded result, as well + * as the original Variant with shredded fields removed. + * `dataType` must be a valid shredding schema, as described in common/variant/shredding.md. + */ + public static ShreddedResult castShredded( + Variant v, + VariantSchema schema, + ShreddedResultBuilder builder) { + VariantUtil.Type variantType = v.getType(); + ShreddedResult result = builder.createEmpty(schema); + + if (schema.topLevelMetadataIdx >= 0) { + result.addMetadata(schema, v.getMetadata()); + } + + if (schema.arraySchema != null && variantType == VariantUtil.Type.ARRAY) { + // The array element is always a struct containing untyped and typed fields. + VariantSchema elementSchema = schema.arraySchema; + int size = v.arraySize(); + ArrayList<ShreddedResult> array = new ArrayList<>(size); + for (int i = 0; i < size; ++i) { + ShreddedResult shreddedArray = castShredded(v.getElementAtIndex(i), elementSchema, builder); + array.add(shreddedArray); + } + result.addArray(schema, array); + } else if (schema.objectSchema != null && variantType == VariantUtil.Type.OBJECT) { + Map<String, VariantSchema.ObjectField> objectSchema = schema.objectSchema; + int maxSize = Math.min(v.objectSize(), objectSchema.size()); + ShreddedResult[] shreddedValues = new ShreddedResult[objectSchema.size()]; + + // Create a variantBuilder for any mismatched fields. + VariantBuilder variantBuilder = new VariantBuilder(false); + ArrayList<VariantBuilder.FieldEntry> fieldEntries = new ArrayList<>(); + // Keep track of which schema fields we actually found in the Variant value. + Set<String> presentKeys = new HashSet<String>(); + int start = variantBuilder.getWritePos(); + for (int i = 0; i < v.objectSize(); ++i) { + Variant.ObjectField field = v.getFieldAtIndex(i); + VariantSchema.ObjectField keySchema = objectSchema.get(field.key); + if (keySchema != null) { + // The field exists in the shredding schema. Recursively shred, and write the result. + ShreddedResult shreddedField = castShredded(field.value, keySchema.schema, builder); + shreddedValues[keySchema.idx] = shreddedField; + presentKeys.add(field.key); + } else { + // The field is not shredded. Put it in the untyped_value column. + int id = v.getDictionaryIdAtIndex(i); + fieldEntries.add(new VariantBuilder.FieldEntry(field.key, id, variantBuilder.getWritePos() - start)); + variantBuilder.appendVariant(field.value); + } + } + if (presentKeys.size() != objectSchema.size()) { + // Set missing fields to non-null with all fields set to null. + // Iterate over objectSchema key-value pairs and add them to the result. + for (Map.Entry<String, VariantSchema.ObjectField> entry : objectSchema.entrySet()) { + String key = entry.getKey(); + if (!presentKeys.contains(key)) { + ShreddedResult emptyChild = builder.createEmpty(entry.getValue().schema); + shreddedValues[entry.getValue().idx] = emptyChild; + } + } + } + result.addObject(schema, shreddedValues); + if (variantBuilder.getWritePos() != start) { + // We added something to the untyped value. + variantBuilder.finishWritingObject(start, fieldEntries); + result.addVariantValue(schema, variantBuilder.valueWithoutMetadata()); + } + } else if (schema.scalarSchema != null) { + VariantSchema.ScalarType scalarType = schema.scalarSchema; + Object typedValue = tryTypedShred(v, variantType, scalarType, builder); + if (typedValue != null) { + // Store the typed value. + result.addScalar(schema, typedValue); + } else { + VariantBuilder variantBuilder = new VariantBuilder(false); + variantBuilder.appendVariant(v); + result.addVariantValue(schema, v.getValue()); + } + } else { + // Store in untyped. + result.addVariantValue(schema, v.getValue()); + } + return result; + } + + /** + * Tries to cast a Variant into a typed value. If the cast fails, returns null. + * + * @param v + * @param variantType The Variant Type of v + * @param targetType The target type + * @return The scalar value, or null if the cast is not valid. + */ + private static Object tryTypedShred( + Variant v, + VariantUtil.Type variantType, + VariantSchema.ScalarType targetType, + ShreddedResultBuilder builder) { + switch (variantType) { + case LONG: + if (targetType instanceof VariantSchema.IntegralType) { Review Comment: I think we can use the new Java grammar: ``` if (targetType instanceof VariantSchema.IntegralType integralType) { ``` ########## common/variant/src/main/java/org/apache/spark/types/variant/VariantShreddingWriter.java: ########## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.types.variant; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.List; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Class to implement shredding a Variant value. + */ +public class VariantShreddingWriter { + + // Interface to build up a shredded result. Callers should implement a ShreddedResultBuilder + // to create an empty result. The castShredded method will call one or more of the add* methods + // to populate it. + public interface ShreddedResult { + // Create an array. The elements are the result of shredding each element. + void addArray(VariantSchema schema, List<ShreddedResult> array); + // Create an object. The values are the result of shredding each field, order by the index in + // objectSchema. Missing fields are populated with an empty result. + void addObject(VariantSchema schema, ShreddedResult[] values); + void addVariantValue(VariantSchema schema, byte[] result); + // Add a scalar to typed_value. The type of Object depends on the scalarSchema in the shredding + // schema. + void addScalar(VariantSchema schema, Object result); + void addMetadata(VariantSchema schema, byte[] result); + } + + public interface ShreddedResultBuilder { + ShreddedResult createEmpty(VariantSchema schema); + + // If true, we will shred decimals to a different scale or to integers, as long as they are + // numerically equivalent. Similarly, integers will be allowed to shred to decimals. + boolean allowNumericScaleChanges(); + } + + /** + * Converts an input variant into shredded components. Returns the shredded result, as well + * as the original Variant with shredded fields removed. + * `dataType` must be a valid shredding schema, as described in common/variant/shredding.md. + */ + public static ShreddedResult castShredded( + Variant v, + VariantSchema schema, + ShreddedResultBuilder builder) { + VariantUtil.Type variantType = v.getType(); + ShreddedResult result = builder.createEmpty(schema); + + if (schema.topLevelMetadataIdx >= 0) { + result.addMetadata(schema, v.getMetadata()); + } + + if (schema.arraySchema != null && variantType == VariantUtil.Type.ARRAY) { + // The array element is always a struct containing untyped and typed fields. + VariantSchema elementSchema = schema.arraySchema; + int size = v.arraySize(); + ArrayList<ShreddedResult> array = new ArrayList<>(size); + for (int i = 0; i < size; ++i) { + ShreddedResult shreddedArray = castShredded(v.getElementAtIndex(i), elementSchema, builder); + array.add(shreddedArray); + } + result.addArray(schema, array); + } else if (schema.objectSchema != null && variantType == VariantUtil.Type.OBJECT) { + Map<String, VariantSchema.ObjectField> objectSchema = schema.objectSchema; + int maxSize = Math.min(v.objectSize(), objectSchema.size()); Review Comment: Unused. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
