Ali Alsuliman has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/3280
Change subject: [ASTERIXDB-2516][COMP] Avoid writing field names & values when comparing records ...................................................................... [ASTERIXDB-2516][COMP] Avoid writing field names & values when comparing records - user model changes: no - storage format changes: no - interface changes: no Details: ARecordVisitablePointable writes field names, field tags, and field values first before giving access to the record information. This is not ideal for comparison. A different record accessor is needed for comparison. Also, the field names should be sorted which ARecordVisitablePointable does not provide. - avoid this writing when a pointable to the name & value can be obtained (especially when the field value already includes the tag) - use UTF8Pointable cached values (string length, meta length) to compare instead of using the string comparator which would recalculate these values - refactored some common code. Change-Id: I19ac95a91749b2983bf06f763e463521a97a261c --- D asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/CompareHashUtil.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AbstractAGenericBinaryComparator.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/LogicalComplexBinaryComparator.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/hash/AMurmurHash3BinaryHashFunctionFamily.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java A asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/RecordField.java A asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/SortedRecord.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ListObjectPool.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ObjectFactories.java M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java M hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java M hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java 15 files changed, 499 insertions(+), 424 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/80/3280/1 diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/CompareHashUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/CompareHashUtil.java deleted file mode 100644 index a42e5aa..0000000 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/CompareHashUtil.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.dataflow.data.nontagged; - -import static org.apache.asterix.om.types.ATypeTag.SERIALIZED_MISSING_TYPE_TAG; -import static org.apache.asterix.om.types.ATypeTag.VALUE_TYPE_MAPPING; - -import java.util.Comparator; -import java.util.List; -import java.util.PriorityQueue; - -import org.apache.asterix.om.pointables.base.DefaultOpenFieldType; -import org.apache.asterix.om.pointables.base.IVisitablePointable; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.IAType; -import org.apache.asterix.om.types.TypeTagUtil; -import org.apache.hyracks.api.dataflow.value.IBinaryComparator; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public class CompareHashUtil { - - private CompareHashUtil() { - } - - public static Comparator<IVisitablePointable> createFieldNamesComp(IBinaryComparator stringComp) { - return new Comparator<IVisitablePointable>() { - @Override - public int compare(IVisitablePointable name1, IVisitablePointable name2) { - try { - return stringComp.compare(name1.getByteArray(), name1.getStartOffset() + 1, name1.getLength() - 1, - name2.getByteArray(), name2.getStartOffset() + 1, name2.getLength() - 1); - } catch (HyracksDataException e) { - throw new RuntimeException(e); - } - } - }; - } - - public static int addToHeap(List<IVisitablePointable> recordFNames, List<IVisitablePointable> recordFValues, - PriorityQueue<IVisitablePointable> names) { - // do not add fields whose value is missing, they don't exist in reality - int length = recordFNames.size(); - IVisitablePointable fieldValue; - int count = 0; - for (int i = 0; i < length; i++) { - fieldValue = recordFValues.get(i); - if (fieldValue.getByteArray()[fieldValue.getStartOffset()] != SERIALIZED_MISSING_TYPE_TAG) { - names.add(recordFNames.get(i)); - count++; - } - } - return count; - } - - public static int getIndex(List<IVisitablePointable> names, IVisitablePointable instance) { - int size = names.size(); - for (int i = 0; i < size; i++) { - if (instance == names.get(i)) { - return i; - } - } - throw new IllegalStateException(); - } - - public static IAType getType(ARecordType recordType, int fieldIdx, ATypeTag fieldTag) throws HyracksDataException { - IAType[] fieldTypes = recordType.getFieldTypes(); - if (fieldIdx >= fieldTypes.length) { - return fieldTag.isDerivedType() ? DefaultOpenFieldType.getDefaultOpenFieldType(fieldTag) - : TypeTagUtil.getBuiltinTypeByTag(fieldTag); - } - return fieldTypes[fieldIdx]; - } -} diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AbstractAGenericBinaryComparator.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AbstractAGenericBinaryComparator.java index 098c81f..6d7ad76 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AbstractAGenericBinaryComparator.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AbstractAGenericBinaryComparator.java @@ -18,23 +18,20 @@ */ package org.apache.asterix.dataflow.data.nontagged.comparators; -import static org.apache.asterix.om.types.ATypeTag.VALUE_TYPE_MAPPING; +import static org.apache.asterix.om.util.container.ObjectFactories.RECORD_FACTORY; +import static org.apache.asterix.om.util.container.ObjectFactories.STORAGE_FACTORY; +import static org.apache.asterix.om.util.container.ObjectFactories.VOID_FACTORY; import java.io.IOException; -import java.util.Comparator; -import java.util.List; -import java.util.PriorityQueue; import org.apache.asterix.dataflow.data.common.ListAccessorUtil; -import org.apache.asterix.dataflow.data.nontagged.CompareHashUtil; import org.apache.asterix.dataflow.data.nontagged.serde.ADateSerializerDeserializer; import org.apache.asterix.dataflow.data.nontagged.serde.ADateTimeSerializerDeserializer; import org.apache.asterix.dataflow.data.nontagged.serde.ADayTimeDurationSerializerDeserializer; import org.apache.asterix.dataflow.data.nontagged.serde.ATimeSerializerDeserializer; import org.apache.asterix.dataflow.data.nontagged.serde.AYearMonthDurationSerializerDeserializer; -import org.apache.asterix.om.pointables.ARecordVisitablePointable; -import org.apache.asterix.om.pointables.PointableAllocator; -import org.apache.asterix.om.pointables.base.IVisitablePointable; +import org.apache.asterix.om.pointables.nonvisitor.RecordField; +import org.apache.asterix.om.pointables.nonvisitor.SortedRecord; import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; @@ -43,9 +40,7 @@ import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.types.hierachy.ATypeHierarchy; import org.apache.asterix.om.types.hierachy.ATypeHierarchy.Domain; -import org.apache.asterix.om.util.container.IObjectPool; import org.apache.asterix.om.util.container.ListObjectPool; -import org.apache.asterix.om.util.container.ObjectFactories; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; @@ -61,64 +56,43 @@ */ abstract class AbstractAGenericBinaryComparator implements IBinaryComparator { - // BOOLEAN private final IBinaryComparator ascBoolComp = BooleanBinaryComparatorFactory.INSTANCE.createBinaryComparator(); - // STRING private final IBinaryComparator ascStrComp = new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY).createBinaryComparator(); - // BINARY private final IBinaryComparator ascByteArrayComp = new PointableBinaryComparatorFactory(ByteArrayPointable.FACTORY).createBinaryComparator(); - // RECTANGLE private final IBinaryComparator ascRectangleComp = ARectanglePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator(); - // CIRCLE private final IBinaryComparator ascCircleComp = ACirclePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator(); - // DURATION private final IBinaryComparator ascDurationComp = ADurationPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator(); - // INTERVAL private final IBinaryComparator ascIntervalComp = AIntervalAscPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator(); - // LINE private final IBinaryComparator ascLineComp = ALinePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator(); - // POINT private final IBinaryComparator ascPointComp = APointPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator(); - // POINT3D private final IBinaryComparator ascPoint3DComp = APoint3DPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator(); - // POLYGON private final IBinaryComparator ascPolygonComp = APolygonPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator(); - // UUID private final IBinaryComparator ascUUIDComp = AUUIDPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator(); - // RAW private final IBinaryComparator rawComp = RawBinaryComparatorFactory.INSTANCE.createBinaryComparator(); - // these fields can be null + // the type fields can be null protected final IAType leftType; protected final IAType rightType; - private final IObjectPool<IMutableValueStorage, Void> storageAllocator; - private final IObjectPool<IPointable, Void> voidPointableAllocator; - // used for record comparison, sorting field names - private final PointableAllocator recordAllocator; - private final IObjectPool<PriorityQueue<IVisitablePointable>, Void> heapAllocator; - private final Comparator<IVisitablePointable> fieldNamesComparator; + private final ListObjectPool<IMutableValueStorage, Void> storageAllocator = new ListObjectPool<>(STORAGE_FACTORY); + private final ListObjectPool<IPointable, Void> voidPointableAllocator = new ListObjectPool<>(VOID_FACTORY); + private final ListObjectPool<SortedRecord, ARecordType> recordPool = new ListObjectPool<>(RECORD_FACTORY); AbstractAGenericBinaryComparator(IAType leftType, IAType rightType) { // factory should have already made sure to get the actual type this.leftType = leftType; this.rightType = rightType; - this.storageAllocator = new ListObjectPool<>(ObjectFactories.STORAGE_FACTORY); - this.voidPointableAllocator = new ListObjectPool<>(ObjectFactories.VOID_FACTORY); - this.recordAllocator = new PointableAllocator(); - this.fieldNamesComparator = CompareHashUtil.createFieldNamesComp(ascStrComp); - this.heapAllocator = new ListObjectPool<>((type) -> new PriorityQueue<>(fieldNamesComparator)); } - protected int compare(IAType leftType, byte[] b1, int s1, int l1, IAType rightType, byte[] b2, int s2, int l2) + protected final int compare(IAType leftType, byte[] b1, int s1, int l1, IAType rightType, byte[] b2, int s2, int l2) throws HyracksDataException { if (b1[s1] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG) { return b2[s2] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG ? 0 : -1; @@ -132,16 +106,14 @@ } ATypeTag tag1 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b1[s1]); ATypeTag tag2 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b2[s2]); - // if one of tag is null, that means we are dealing with an empty byte array in one side. - // and, we don't need to continue. We just compare raw byte by byte. + // tag being null could mean several things among of which is that the passed args are not tagged if (tag1 == null || tag2 == null) { return rawComp.compare(b1, s1, l1, b2, s2, l2); } if (ATypeHierarchy.isCompatible(tag1, tag2) && ATypeHierarchy.getTypeDomain(tag1) == Domain.NUMERIC) { return ComparatorUtil.compareNumbers(tag1, b1, s1 + 1, tag2, b2, s2 + 1); } - // currently only numbers are compatible. if two tags are not compatible, we compare the tags. - // this is especially useful when we need to generate some order between any two types. + // currently only numbers are compatible. if two tags are not compatible, we compare the tags to generate order if (tag1 != tag2) { return Byte.compare(b1[s1], b2[s2]); } @@ -191,7 +163,6 @@ case OBJECT: return compareRecords(leftType, b1, s1, l1, rightType, b2, s2, l2); default: - // we include typeTag in comparison to compare between two type to enforce some ordering return rawComp.compare(b1, s1, l1, b2, s2, l2); } } @@ -248,53 +219,34 @@ } ARecordType leftRecordType = (ARecordType) TypeComputeUtils.getActualTypeOrOpen(leftType, ATypeTag.OBJECT); ARecordType rightRecordType = (ARecordType) TypeComputeUtils.getActualTypeOrOpen(rightType, ATypeTag.OBJECT); - ARecordVisitablePointable leftRecord = recordAllocator.allocateRecordValue(leftRecordType); - ARecordVisitablePointable rightRecord = recordAllocator.allocateRecordValue(rightRecordType); - PriorityQueue<IVisitablePointable> leftNamesHeap = null, rightNamesHeap = null; + SortedRecord leftRecord = recordPool.allocate(leftRecordType); + SortedRecord rightRecord = recordPool.allocate(rightRecordType); + IPointable leftFieldValue = voidPointableAllocator.allocate(null); + IPointable rightFieldValue = voidPointableAllocator.allocate(null); + // TODO(ali): this is not ideal. should be removed when tagged pointables are introduced + ArrayBackedValueStorage leftStorage = (ArrayBackedValueStorage) storageAllocator.allocate(null); + ArrayBackedValueStorage rightStorage = (ArrayBackedValueStorage) storageAllocator.allocate(null); try { - leftRecord.set(b1, s1, l1); - rightRecord.set(b2, s2, l2); - List<IVisitablePointable> leftFieldsNames = leftRecord.getFieldNames(); - List<IVisitablePointable> rightFieldsNames = rightRecord.getFieldNames(); - List<IVisitablePointable> leftFieldsValues = leftRecord.getFieldValues(); - List<IVisitablePointable> rightFieldsValues = rightRecord.getFieldValues(); - leftNamesHeap = heapAllocator.allocate(null); - rightNamesHeap = heapAllocator.allocate(null); - leftNamesHeap.clear(); - rightNamesHeap.clear(); - int numLeftValuedFields = CompareHashUtil.addToHeap(leftFieldsNames, leftFieldsValues, leftNamesHeap); - int numRightValuedFields = CompareHashUtil.addToHeap(rightFieldsNames, rightFieldsValues, rightNamesHeap); - if (numLeftValuedFields == 0 && numRightValuedFields == 0) { - return 0; - } else if (numLeftValuedFields == 0) { - return -1; - } else if (numRightValuedFields == 0) { - return 1; - } - int result; - int leftFieldIdx, rightFieldIdx; + leftRecord.reset(b1, s1); + rightRecord.reset(b2, s2); IAType leftFieldType, rightFieldType; - IVisitablePointable leftFieldName, leftFieldValue, rightFieldName, rightFieldValue; - ATypeTag fieldTag; - while (!leftNamesHeap.isEmpty() && !rightNamesHeap.isEmpty()) { - leftFieldName = leftNamesHeap.poll(); - rightFieldName = rightNamesHeap.poll(); + RecordField leftField, rightField; + int result; + while (!leftRecord.isEmpty() && !rightRecord.isEmpty()) { + leftField = leftRecord.poll(); + rightField = rightRecord.poll(); // compare the names first - result = ascStrComp.compare(leftFieldName.getByteArray(), leftFieldName.getStartOffset() + 1, - leftFieldName.getLength() - 1, rightFieldName.getByteArray(), - rightFieldName.getStartOffset() + 1, rightFieldName.getLength() - 1); + result = RecordField.FIELD_NAME_COMP.compare(leftField, rightField); if (result != 0) { return result; } // then compare the values if the names are equal - leftFieldIdx = CompareHashUtil.getIndex(leftFieldsNames, leftFieldName); - rightFieldIdx = CompareHashUtil.getIndex(rightFieldsNames, rightFieldName); - leftFieldValue = leftFieldsValues.get(leftFieldIdx); - rightFieldValue = rightFieldsValues.get(rightFieldIdx); - fieldTag = VALUE_TYPE_MAPPING[leftFieldValue.getByteArray()[leftFieldValue.getStartOffset()]]; - leftFieldType = CompareHashUtil.getType(leftRecordType, leftFieldIdx, fieldTag); - fieldTag = VALUE_TYPE_MAPPING[rightFieldValue.getByteArray()[rightFieldValue.getStartOffset()]]; - rightFieldType = CompareHashUtil.getType(rightRecordType, rightFieldIdx, fieldTag); + leftStorage.reset(); + rightStorage.reset(); + leftRecord.getFieldValue(leftField, leftFieldValue, leftStorage); + rightRecord.getFieldValue(rightField, rightFieldValue, rightStorage); + leftFieldType = leftRecord.getFieldType(leftField); + rightFieldType = rightRecord.getFieldType(rightField); result = compare(leftFieldType, leftFieldValue.getByteArray(), leftFieldValue.getStartOffset(), leftFieldValue.getLength(), rightFieldType, rightFieldValue.getByteArray(), rightFieldValue.getStartOffset(), rightFieldValue.getLength()); @@ -302,17 +254,16 @@ return result; } } - - return Integer.compare(numLeftValuedFields, numRightValuedFields); + return Integer.compare(leftRecord.size(), rightRecord.size()); + } catch (IOException e) { + throw HyracksDataException.create(e); } finally { - recordAllocator.freeRecord(rightRecord); - recordAllocator.freeRecord(leftRecord); - if (rightNamesHeap != null) { - heapAllocator.free(rightNamesHeap); - } - if (leftNamesHeap != null) { - heapAllocator.free(leftNamesHeap); - } + recordPool.free(rightRecord); + recordPool.free(leftRecord); + voidPointableAllocator.free(rightFieldValue); + voidPointableAllocator.free(leftFieldValue); + storageAllocator.free(rightStorage); + storageAllocator.free(leftStorage); } } } diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/LogicalComplexBinaryComparator.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/LogicalComplexBinaryComparator.java index b83e248..aa902bb 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/LogicalComplexBinaryComparator.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/LogicalComplexBinaryComparator.java @@ -30,7 +30,6 @@ import org.apache.asterix.dataflow.data.common.ILogicalBinaryComparator; import org.apache.asterix.dataflow.data.common.ListAccessorUtil; -import org.apache.asterix.dataflow.data.nontagged.CompareHashUtil; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; import org.apache.asterix.om.base.IAObject; import org.apache.asterix.om.pointables.ARecordVisitablePointable; @@ -44,6 +43,7 @@ import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.util.container.IObjectPool; import org.apache.asterix.om.util.container.ListObjectPool; +import org.apache.asterix.om.utils.RecordUtil; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IMutableValueStorage; @@ -275,8 +275,8 @@ if (leftFTag == ATypeTag.NULL || rightFTag == ATypeTag.NULL) { tempCompResult = Result.NULL; } else if (leftFTag.isDerivedType() && rightFTag.isDerivedType()) { - leftFieldType = CompareHashUtil.getType(leftRecordType, i, leftFTag); - rightFieldType = CompareHashUtil.getType(rightRecordType, k, rightFTag); + leftFieldType = RecordUtil.getType(leftRecordType, i, leftFTag); + rightFieldType = RecordUtil.getType(rightRecordType, k, rightFTag); tempCompResult = compareComplex(leftFieldType, leftFTag, leftFieldValue, rightFieldType, rightFTag, rightFieldValue); } else { diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/hash/AMurmurHash3BinaryHashFunctionFamily.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/hash/AMurmurHash3BinaryHashFunctionFamily.java index 2ef6e80..020fbd1 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/hash/AMurmurHash3BinaryHashFunctionFamily.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/hash/AMurmurHash3BinaryHashFunctionFamily.java @@ -18,19 +18,16 @@ */ package org.apache.asterix.dataflow.data.nontagged.hash; -import static org.apache.asterix.om.types.ATypeTag.VALUE_TYPE_MAPPING; +import static org.apache.asterix.om.util.container.ObjectFactories.RECORD_FACTORY; +import static org.apache.asterix.om.util.container.ObjectFactories.STORAGE_FACTORY; +import static org.apache.asterix.om.util.container.ObjectFactories.VOID_FACTORY; import java.io.DataOutput; import java.io.IOException; -import java.util.Comparator; -import java.util.List; -import java.util.PriorityQueue; import org.apache.asterix.dataflow.data.common.ListAccessorUtil; -import org.apache.asterix.dataflow.data.nontagged.CompareHashUtil; -import org.apache.asterix.om.pointables.ARecordVisitablePointable; -import org.apache.asterix.om.pointables.PointableAllocator; -import org.apache.asterix.om.pointables.base.IVisitablePointable; +import org.apache.asterix.om.pointables.nonvisitor.RecordField; +import org.apache.asterix.om.pointables.nonvisitor.SortedRecord; import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; @@ -41,16 +38,13 @@ import org.apache.asterix.om.types.hierachy.IntegerToDoubleTypeConvertComputer; import org.apache.asterix.om.util.container.IObjectPool; import org.apache.asterix.om.util.container.ListObjectPool; -import org.apache.asterix.om.util.container.ObjectFactories; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHash; -import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.api.IMutableValueStorage; import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; public class AMurmurHash3BinaryHashFunctionFamily implements IBinaryHashFunctionFamily { @@ -83,24 +77,15 @@ private final ArrayBackedValueStorage valueBuffer = new ArrayBackedValueStorage(); private final DataOutput valueOut = valueBuffer.getDataOutput(); - private final IObjectPool<IPointable, Void> voidPointableAllocator; - private final IObjectPool<IMutableValueStorage, Void> storageAllocator; + private final IObjectPool<IPointable, Void> voidPointableAllocator = new ListObjectPool<>(VOID_FACTORY); + private final IObjectPool<IMutableValueStorage, Void> storageAllocator = new ListObjectPool<>(STORAGE_FACTORY); + private final IObjectPool<SortedRecord, ARecordType> recordPool = new ListObjectPool<>(RECORD_FACTORY); private final IAType type; private final int seed; - // used for record hashing, sorting field names first - private final PointableAllocator recordAllocator; - private final IObjectPool<PriorityQueue<IVisitablePointable>, Void> heapAllocator; - private final Comparator<IVisitablePointable> fieldNamesComparator; private GenericHashFunction(IAType type, int seed) { this.type = type; this.seed = seed; - this.voidPointableAllocator = new ListObjectPool<>(ObjectFactories.VOID_FACTORY); - this.storageAllocator = new ListObjectPool<>(ObjectFactories.STORAGE_FACTORY); - this.recordAllocator = new PointableAllocator(); - this.fieldNamesComparator = CompareHashUtil.createFieldNamesComp( - new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY).createBinaryComparator()); - this.heapAllocator = new ListObjectPool<>((arg) -> new PriorityQueue<>(fieldNamesComparator)); } @Override @@ -178,35 +163,28 @@ return MurmurHash3BinaryHash.hash(bytes, offset, length, seed); } ARecordType recordType = (ARecordType) TypeComputeUtils.getActualTypeOrOpen(type, ATypeTag.OBJECT); - ARecordVisitablePointable record = recordAllocator.allocateRecordValue(recordType); - PriorityQueue<IVisitablePointable> namesHeap = heapAllocator.allocate(null); + SortedRecord record = recordPool.allocate(recordType); + IPointable fieldValue = voidPointableAllocator.allocate(null); + // TODO(ali): this is not ideal. should be removed when tagged pointables are introduced + ArrayBackedValueStorage storage = (ArrayBackedValueStorage) storageAllocator.allocate(null); try { - record.set(bytes, offset, length); - namesHeap.clear(); - List<IVisitablePointable> fieldsNames = record.getFieldNames(); - List<IVisitablePointable> fieldsValues = record.getFieldValues(); - CompareHashUtil.addToHeap(fieldsNames, fieldsValues, namesHeap); - IVisitablePointable fieldName, fieldValue; - IAType fieldType; - ATypeTag fieldTag; + record.reset(bytes, offset); int hash = seed; - int fieldIdx; - while (!namesHeap.isEmpty()) { - fieldName = namesHeap.poll(); - // TODO(ali): currently doing another lookup to find the target field index and get its value & type - fieldIdx = CompareHashUtil.getIndex(fieldsNames, fieldName); - fieldValue = fieldsValues.get(fieldIdx); - fieldTag = VALUE_TYPE_MAPPING[fieldValue.getByteArray()[fieldValue.getStartOffset()]]; - fieldType = CompareHashUtil.getType(recordType, fieldIdx, fieldTag); - hash ^= MurmurHash3BinaryHash.hash(fieldName.getByteArray(), fieldName.getStartOffset(), - fieldName.getLength(), seed) - ^ hash(fieldType, fieldValue.getByteArray(), fieldValue.getStartOffset(), - fieldValue.getLength()); + while (!record.isEmpty()) { + RecordField field = record.poll(); + storage.reset(); + record.getFieldValue(field, fieldValue, storage); + IAType fieldType = record.getFieldType(field); + hash ^= field.getName().hash() ^ hash(fieldType, fieldValue.getByteArray(), + fieldValue.getStartOffset(), fieldValue.getLength()); } return hash; + } catch (IOException e) { + throw HyracksDataException.create(e); } finally { - recordAllocator.freeRecord(record); - heapAllocator.free(namesHeap); + recordPool.free(record); + voidPointableAllocator.free(fieldValue); + storageAllocator.free(storage); } } } diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java index 7089fd6..193ff7d 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/ARecordVisitablePointable.java @@ -34,6 +34,7 @@ import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.util.container.IObjectFactory; import org.apache.asterix.om.utils.NonTaggedFormatUtil; +import org.apache.asterix.om.utils.RecordUtil; import org.apache.asterix.om.utils.ResettableByteArrayOutputStream; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.util.string.UTF8StringWriter; @@ -197,14 +198,12 @@ for (int fieldNumber = 0; fieldNumber < numberOfSchemaFields; fieldNumber++) { if (hasOptionalFields) { byte b1 = b[nullBitMapOffset + fieldNumber / 4]; - int p = 1 << (7 - 2 * (fieldNumber % 4)); - if ((b1 & p) == 0) { + if (RecordUtil.isNull(b1, fieldNumber)) { // set null value (including type tag inside) fieldValues.add(nullReference); continue; } - p = 1 << (7 - 2 * (fieldNumber % 4) - 1); - if ((b1 & p) == 0) { + if (RecordUtil.isMissing(b1, fieldNumber)) { // set missing value (including type tag inside) fieldValues.add(missingReference); continue; diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java index 4ffbf47..f981580 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java @@ -46,32 +46,15 @@ import com.fasterxml.jackson.databind.JsonNode; -/* +/** * This class interprets the binary data representation of a record. + * <pre> + * byte|int |byte |int |int |byte[] |int | | | |int |int |int| | | + * tag |length|expanded?|open off?|# closed fields|null bitmap?|f1 off|..|f1 val|..|# open fields|hash|off|..|name|val + * (24) -------------------------------------------------------^ + * </pre> * - * Record { - * byte tag; - * int length; - * byte isExpanded; - * int openOffset?; - * int numberOfClosedFields; - * byte[ceil (numberOfFields / 4)] nullBitMap; // 1 bit per field, "1" means field is Null for this record - * int[numberOfClosedFields] closedFieldOffset; - * IPointable[numberOfClosedFields] fieldValue; - * int numberOfOpenFields?; - * OpenFieldLookup[numberOfOpenFields] lookup; - * OpenField[numberOfOpenFields] openFields; - * } - * - * OpenFieldLookup { - * int hashCode; - * int Offset; - * } - * - * OpenField { - * StringPointable fieldName; - * IPointable fieldValue; - * } + * "length" is the length of the whole byte array. */ public class ARecordPointable extends AbstractPointable { @@ -107,20 +90,14 @@ } - public static final IObjectFactory<IPointable, ATypeTag> ALLOCATOR = new IObjectFactory<IPointable, ATypeTag>() { - @Override - public IPointable create(ATypeTag type) { - return new ARecordPointable(); - } - }; - - private static final int TAG_SIZE = 1; - private static final int RECORD_LENGTH_SIZE = 4; - private static final int EXPANDED_SIZE = 1; - private static final int OPEN_OFFSET_SIZE = 4; - private static final int CLOSED_COUNT_SIZE = 4; - private static final int FIELD_OFFSET_SIZE = 4; - private static final int OPEN_COUNT_SIZE = 4; + public static final IObjectFactory<IPointable, ATypeTag> ALLOCATOR = type -> new ARecordPointable(); + static final int TAG_SIZE = 1; + static final int RECORD_LENGTH_SIZE = 4; + static final int EXPANDED_SIZE = 1; + static final int OPEN_OFFSET_SIZE = 4; + static final int CLOSED_COUNT_SIZE = 4; + static final int FIELD_OFFSET_SIZE = 4; + static final int OPEN_COUNT_SIZE = 4; private static final int OPEN_FIELD_HASH_SIZE = 4; private static final int OPEN_FIELD_OFFSET_SIZE = 4; private static final int OPEN_FIELD_HEADER = OPEN_FIELD_HASH_SIZE + OPEN_FIELD_OFFSET_SIZE; @@ -137,12 +114,8 @@ return BytePointable.getByte(bytes, getTagOffset()); } - public int getTagOffset() { + private int getTagOffset() { return start; - } - - public int getTagSize() { - return TAG_SIZE; } @Override @@ -150,34 +123,30 @@ return IntegerPointable.getInteger(bytes, getLengthOffset()); } - public int getLengthOffset() { - return getTagOffset() + getTagSize(); + private int getLengthOffset() { + return getTagOffset() + TAG_SIZE; } - public int getLengthSize() { - return RECORD_LENGTH_SIZE; - } - - public boolean isExpanded(ARecordType recordType) { + private boolean isExpanded(ARecordType recordType) { if (isOpen(recordType)) { - return BooleanPointable.getBoolean(bytes, getExpandedOffset(recordType)); + return BooleanPointable.getBoolean(bytes, getExpandedOffset()); } return false; } - public int getExpandedOffset(ARecordType recordType) { - return getLengthOffset() + getLengthSize(); + private int getExpandedOffset() { + return getLengthOffset() + RECORD_LENGTH_SIZE; } - public int getExpandedSize(ARecordType recordType) { + private int getExpandedSize(ARecordType recordType) { return isOpen(recordType) ? EXPANDED_SIZE : 0; } - public int getOpenPartOffset(ARecordType recordType) { - return getExpandedOffset(recordType) + getExpandedSize(recordType); + private int getOpenPartOffset(ARecordType recordType) { + return getExpandedOffset() + getExpandedSize(recordType); } - public int getOpenPartSize(ARecordType recordType) { + private int getOpenPartSize(ARecordType recordType) { return isExpanded(recordType) ? OPEN_OFFSET_SIZE : 0; } @@ -185,32 +154,28 @@ return IntegerPointable.getInteger(bytes, getClosedFieldCountOffset(recordType)); } - public int getClosedFieldCountOffset(ARecordType recordType) { + private int getClosedFieldCountOffset(ARecordType recordType) { return getOpenPartOffset(recordType) + getOpenPartSize(recordType); } - public int getClosedFieldCountSize(ARecordType recordType) { - return CLOSED_COUNT_SIZE; + private int getNullBitmapOffset(ARecordType recordType) { + return getClosedFieldCountOffset(recordType) + CLOSED_COUNT_SIZE; } - public int getNullBitmapOffset(ARecordType recordType) { - return getClosedFieldCountOffset(recordType) + getClosedFieldCountSize(recordType); - } - - public int getNullBitmapSize(ARecordType recordType) { + private int getNullBitmapSize(ARecordType recordType) { return RecordUtil.computeNullBitmapSize(recordType); } public boolean isClosedFieldNull(ARecordType recordType, int fieldId) { if (getNullBitmapSize(recordType) > 0) { - return (bytes[getNullBitmapOffset(recordType) + fieldId / 4] & (1 << (7 - 2 * (fieldId % 4)))) == 0; + return RecordUtil.isNull(bytes[getNullBitmapOffset(recordType) + fieldId / 4], fieldId); } return false; } - public boolean isClosedFieldMissing(ARecordType recordType, int fieldId) { + private boolean isClosedFieldMissing(ARecordType recordType, int fieldId) { if (getNullBitmapSize(recordType) > 0) { - return (bytes[getNullBitmapOffset(recordType) + fieldId / 4] & (1 << (7 - 2 * (fieldId % 4) - 1))) == 0; + return RecordUtil.isMissing(bytes[getNullBitmapOffset(recordType) + fieldId / 4], fieldId); } return false; } @@ -245,7 +210,7 @@ pointable.set(bytes, getClosedFieldOffset(recordType, fieldId), getClosedFieldSize(recordType, fieldId)); } - public String getClosedFieldName(ARecordType recordType, int fieldId) { + private String getClosedFieldName(ARecordType recordType, int fieldId) { return recordType.getFieldNames()[fieldId]; } @@ -288,11 +253,11 @@ return isExpanded(recordType) ? IntegerPointable.getInteger(bytes, getOpenFieldCountOffset(recordType)) : 0; } - public int getOpenFieldCountSize(ARecordType recordType) { + private int getOpenFieldCountSize(ARecordType recordType) { return isExpanded(recordType) ? OPEN_COUNT_SIZE : 0; } - public int getOpenFieldCountOffset(ARecordType recordType) { + private int getOpenFieldCountOffset(ARecordType recordType) { return start + IntegerPointable.getInteger(bytes, getOpenPartOffset(recordType)); } @@ -327,12 +292,12 @@ return fieldName; } - public int getOpenFieldNameSize(ARecordType recordType, int fieldId) { + private int getOpenFieldNameSize(ARecordType recordType, int fieldId) { int utfleng = UTF8StringUtil.getUTFLength(bytes, getOpenFieldNameOffset(recordType, fieldId)); return utfleng + UTF8StringUtil.getNumBytesToStoreLength(utfleng); } - public int getOpenFieldNameOffset(ARecordType recordType, int fieldId) { + private int getOpenFieldNameOffset(ARecordType recordType, int fieldId) { return getOpenFieldOffset(recordType, fieldId); } @@ -344,23 +309,15 @@ return IntegerPointable.getInteger(bytes, getOpenFieldHashOffset(recordType, fieldId)); } - public int getOpenFieldHashOffset(ARecordType recordType, int fieldId) { + private int getOpenFieldHashOffset(ARecordType recordType, int fieldId) { return getOpenFieldCountOffset(recordType) + getOpenFieldCountSize(recordType) + fieldId * OPEN_FIELD_HEADER; } - public int getOpenFieldHashSize(ARecordType recordType, int fieldId) { - return OPEN_FIELD_HASH_SIZE; - } - - public int getOpenFieldOffset(ARecordType recordType, int fieldId) { + private int getOpenFieldOffset(ARecordType recordType, int fieldId) { return start + IntegerPointable.getInteger(bytes, getOpenFieldOffsetOffset(recordType, fieldId)); } - public int getOpenFieldOffsetOffset(ARecordType recordType, int fieldId) { - return getOpenFieldHashOffset(recordType, fieldId) + getOpenFieldHashSize(recordType, fieldId); - } - - public int getOpenFieldOffsetSize(ARecordType recordType, int fieldId) { - return OPEN_FIELD_HASH_SIZE; + private int getOpenFieldOffsetOffset(ARecordType recordType, int fieldId) { + return getOpenFieldHashOffset(recordType, fieldId) + OPEN_FIELD_HASH_SIZE; } } diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/RecordField.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/RecordField.java new file mode 100644 index 0000000..bdd7326 --- /dev/null +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/RecordField.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.om.pointables.nonvisitor; + +import java.util.Comparator; + +import org.apache.asterix.om.types.ATypeTag; +import org.apache.hyracks.data.std.primitive.UTF8StringPointable; + +public final class RecordField { + + public static final Comparator<RecordField> FIELD_NAME_COMP = + (field1, field2) -> UTF8StringPointable.compare(field1.namePointable, field2.namePointable); + private UTF8StringPointable namePointable; + private ATypeTag valueTag; + private int index; + private int valueOffset; + private int valueLength; + + RecordField() { + } + + // for open field + final void set(UTF8StringPointable namePointable, int index, int valueOffset, int valueLength, ATypeTag valueTag) { + this.namePointable = namePointable; + set(index, valueOffset, valueLength, valueTag); + } + + // for closed field where the name is already set + final void set(int index, int valueOffset, int valueLength, ATypeTag valueTag) { + this.index = index; + this.valueOffset = valueOffset; + this.valueTag = valueTag; + this.valueLength = valueLength; + } + + public final UTF8StringPointable getName() { + return namePointable; + } + + final int getIndex() { + return index; + } + + final int getValueOffset() { + return valueOffset; + } + + final int getValueLength() { + return valueLength; + } + + final ATypeTag getValueTag() { + return valueTag; + } +} diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/SortedRecord.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/SortedRecord.java new file mode 100644 index 0000000..e3d5aec --- /dev/null +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/SortedRecord.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.om.pointables.nonvisitor; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.PriorityQueue; + +import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer; +import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.util.container.IObjectFactory; +import org.apache.asterix.om.util.container.IObjectPool; +import org.apache.asterix.om.util.container.ListObjectPool; +import org.apache.asterix.om.util.container.ObjectFactories; +import org.apache.asterix.om.utils.NonTaggedFormatUtil; +import org.apache.asterix.om.utils.RecordUtil; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.UTF8StringPointable; +import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; +import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream; +import org.apache.hyracks.util.string.UTF8StringWriter; + +public final class SortedRecord { + + private static final IObjectFactory<RecordField, Void> FIELD_FACTORY = type -> new RecordField(); + // TODO(ali): copied from PointableHelper for now. + private static final byte[] NULL_BYTES = new byte[] { ATypeTag.SERIALIZED_NULL_TYPE_TAG }; + private static final byte[] MISSING_BYTES = new byte[] { ATypeTag.SERIALIZED_MISSING_TYPE_TAG }; + private final IObjectPool<UTF8StringPointable, Void> utf8Pool = new ListObjectPool<>(ObjectFactories.UTF8_FACTORY); + private final IObjectPool<RecordField, Void> recordFieldPool = new ListObjectPool<>(FIELD_FACTORY); + private final PriorityQueue<RecordField> sortedFields = new PriorityQueue<>(RecordField.FIELD_NAME_COMP); + private final ARecordType recordType; + private final IAType[] fieldTypes; + private final RecordField[] closedFields; + private final int numSchemaFields; + private final boolean hasOptionalFields; + private final int nullBitMapSize; + private byte[] bytes; + + public SortedRecord(ARecordType recordType) { + String[] fieldNames = recordType.getFieldNames(); + this.numSchemaFields = fieldNames.length; + this.recordType = recordType; + this.fieldTypes = recordType.getFieldTypes(); + this.hasOptionalFields = NonTaggedFormatUtil.hasOptionalField(recordType); + this.nullBitMapSize = RecordUtil.computeNullBitmapSize(hasOptionalFields, recordType); + this.closedFields = new RecordField[numSchemaFields]; + UTF8StringWriter utf8Writer = new UTF8StringWriter(); + ByteArrayAccessibleOutputStream byteArrayStream = new ByteArrayAccessibleOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayStream); + try { + // write each closed field into a pointable, create a new field and add to the list of closedFields + for (int i = 0; i < numSchemaFields; i++) { + int nameStart = dataOutputStream.size(); + utf8Writer.writeUTF8(fieldNames[i], dataOutputStream); + int nameEnd = dataOutputStream.size(); + UTF8StringPointable utf8Pointable = UTF8StringPointable.FACTORY.createPointable(); + utf8Pointable.set(byteArrayStream.getByteArray(), nameStart, nameEnd - nameStart); + RecordField field = new RecordField(); + field.set(utf8Pointable, -1, -1, -1, null); + closedFields[i] = field; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * This method should be called before using the record. It recalculates logical indices and offsets of fields, + * closed and open. It populates the utf8 filed names for open. + */ + public final void reset(byte[] data, int start) throws HyracksDataException { + bytes = data; + reset(); + boolean isExpanded = false; + // advance to expanded byte if present + int cursor = start + ARecordPointable.TAG_SIZE + ARecordPointable.RECORD_LENGTH_SIZE; + if (recordType.isOpen()) { + isExpanded = bytes[cursor] == 1; + // advance to either open part offset or number of closed fields + cursor += ARecordPointable.EXPANDED_SIZE; + } + int openPartOffset = 0; + if (isExpanded) { + openPartOffset = start + AInt32SerializerDeserializer.getInt(bytes, cursor); + // advance to number of closed fields + cursor += ARecordPointable.OPEN_OFFSET_SIZE; + } + int fieldOffset; + int length; + int fieldIndex = 0; + ATypeTag tag; + // advance to where fields offsets are (or to null bit map if the schema has optional fields) + cursor += ARecordPointable.CLOSED_COUNT_SIZE; + int nullBitMapOffset = cursor; + int fieldsOffsets = cursor + nullBitMapSize; + // compute the offsets of each closed field value and whether it's missing or null + for (int i = 0; i < numSchemaFields; i++, fieldIndex++) { + fieldOffset = AInt32SerializerDeserializer.getInt(bytes, fieldsOffsets) + start; + tag = TypeComputeUtils.getActualType(fieldTypes[i]).getTypeTag(); + if (hasOptionalFields) { + byte nullBits = bytes[nullBitMapOffset + i / 4]; + if (RecordUtil.isNull(nullBits, i)) { + tag = ATypeTag.NULL; + } else if (RecordUtil.isMissing(nullBits, i)) { + tag = ATypeTag.MISSING; + } + } + length = NonTaggedFormatUtil.getFieldValueLength(bytes, fieldOffset, tag, false); + closedFields[i].set(fieldIndex, fieldOffset, length, tag); + if (tag != ATypeTag.MISSING) { + sortedFields.add(closedFields[i]); + } + fieldsOffsets += ARecordPointable.FIELD_OFFSET_SIZE; + } + // then populate open fields info second, an open field has name + value (tagged) + if (isExpanded) { + int numberOpenFields = AInt32SerializerDeserializer.getInt(bytes, openPartOffset); + fieldOffset = openPartOffset + ARecordPointable.OPEN_COUNT_SIZE + (8 * numberOpenFields); + for (int i = 0; i < numberOpenFields; i++, fieldIndex++) { + // get a pointable to the field name + length = NonTaggedFormatUtil.getFieldValueLength(bytes, fieldOffset, ATypeTag.STRING, false); + UTF8StringPointable openFieldName = utf8Pool.allocate(null); + openFieldName.set(bytes, fieldOffset, length); + // move to the value + fieldOffset += length; + tag = ATypeTag.VALUE_TYPE_MAPPING[bytes[fieldOffset]]; + // +1 to account for the tag included since the field is open + length = NonTaggedFormatUtil.getFieldValueLength(bytes, fieldOffset, tag, true) + 1; + RecordField openField = recordFieldPool.allocate(null); + openField.set(openFieldName, fieldIndex, fieldOffset, length, tag); + sortedFields.add(openField); + // then skip the value to the next field name + fieldOffset += length; + } + } + } + + private void reset() { + sortedFields.clear(); + utf8Pool.reset(); + recordFieldPool.reset(); + } + + public final boolean isEmpty() { + return sortedFields.isEmpty(); + } + + public final RecordField poll() { + return sortedFields.poll(); + } + + public final int size() { + return sortedFields.size(); + } + + public final IAType getFieldType(RecordField field) throws HyracksDataException { + return RecordUtil.getType(recordType, field.getIndex(), field.getValueTag()); + } + + public final void getFieldValue(RecordField field, IPointable pointable, ArrayBackedValueStorage storage) + throws IOException { + int fieldIdx = field.getIndex(); + if (fieldIdx >= numSchemaFields) { + // open field + pointable.set(bytes, field.getValueOffset(), field.getValueLength()); + } else { + // closed field + if (field.getValueTag() == ATypeTag.MISSING) { + pointable.set(MISSING_BYTES, 0, MISSING_BYTES.length); + } else if (field.getValueTag() == ATypeTag.NULL) { + pointable.set(NULL_BYTES, 0, NULL_BYTES.length); + } else { + // TODO(ali): this is not ideal. should not need to copy the tag when tagged pointables are introduced + int start = storage.getLength(); + storage.getDataOutput().writeByte(field.getValueTag().serialize()); + storage.getDataOutput().write(bytes, field.getValueOffset(), field.getValueLength()); + int end = storage.getLength(); + pointable.set(storage.getByteArray(), start, end - start); + } + } + } +} diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java index 941f59f..53ae805 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/ATypeTag.java @@ -122,7 +122,7 @@ VALUE_TYPE_MAPPING = typeList.toArray(new ATypeTag[typeList.size()]); } - ATypeTag(int value) { + private ATypeTag(int value) { this.value = (byte) value; } diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java index e4167ec..528a411 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/TypeTagUtil.java @@ -18,83 +18,59 @@ */ package org.apache.asterix.om.types; +import java.util.EnumMap; + import org.apache.asterix.om.utils.RecordUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; public class TypeTagUtil { + private static final EnumMap<ATypeTag, IAType> TAGS_TO_TYPES = new EnumMap<>(ATypeTag.class); + + static { + TAGS_TO_TYPES.put(ATypeTag.TINYINT, BuiltinType.AINT8); + TAGS_TO_TYPES.put(ATypeTag.SMALLINT, BuiltinType.AINT16); + TAGS_TO_TYPES.put(ATypeTag.INTEGER, BuiltinType.AINT32); + TAGS_TO_TYPES.put(ATypeTag.BIGINT, BuiltinType.AINT64); + TAGS_TO_TYPES.put(ATypeTag.BINARY, BuiltinType.ABINARY); + TAGS_TO_TYPES.put(ATypeTag.BITARRAY, BuiltinType.ABITARRAY); + TAGS_TO_TYPES.put(ATypeTag.FLOAT, BuiltinType.AFLOAT); + TAGS_TO_TYPES.put(ATypeTag.DOUBLE, BuiltinType.ADOUBLE); + TAGS_TO_TYPES.put(ATypeTag.STRING, BuiltinType.ASTRING); + TAGS_TO_TYPES.put(ATypeTag.MISSING, BuiltinType.AMISSING); + TAGS_TO_TYPES.put(ATypeTag.NULL, BuiltinType.ANULL); + TAGS_TO_TYPES.put(ATypeTag.BOOLEAN, BuiltinType.ABOOLEAN); + TAGS_TO_TYPES.put(ATypeTag.DATETIME, BuiltinType.ADATETIME); + TAGS_TO_TYPES.put(ATypeTag.DATE, BuiltinType.ADATE); + TAGS_TO_TYPES.put(ATypeTag.TIME, BuiltinType.ATIME); + TAGS_TO_TYPES.put(ATypeTag.DURATION, BuiltinType.ADURATION); + TAGS_TO_TYPES.put(ATypeTag.POINT, BuiltinType.APOINT); + TAGS_TO_TYPES.put(ATypeTag.POINT3D, BuiltinType.APOINT3D); + TAGS_TO_TYPES.put(ATypeTag.TYPE, BuiltinType.ALL_TYPE); + TAGS_TO_TYPES.put(ATypeTag.ANY, BuiltinType.ANY); + TAGS_TO_TYPES.put(ATypeTag.LINE, BuiltinType.ALINE); + TAGS_TO_TYPES.put(ATypeTag.POLYGON, BuiltinType.APOLYGON); + TAGS_TO_TYPES.put(ATypeTag.CIRCLE, BuiltinType.ACIRCLE); + TAGS_TO_TYPES.put(ATypeTag.RECTANGLE, BuiltinType.ARECTANGLE); + TAGS_TO_TYPES.put(ATypeTag.INTERVAL, BuiltinType.AINTERVAL); + TAGS_TO_TYPES.put(ATypeTag.YEARMONTHDURATION, BuiltinType.AYEARMONTHDURATION); + TAGS_TO_TYPES.put(ATypeTag.DAYTIMEDURATION, BuiltinType.ADAYTIMEDURATION); + TAGS_TO_TYPES.put(ATypeTag.UUID, BuiltinType.AUUID); + TAGS_TO_TYPES.put(ATypeTag.OBJECT, RecordUtil.FULLY_OPEN_RECORD_TYPE); + // TODO: how come the item type in this instance is "null" + TAGS_TO_TYPES.put(ATypeTag.MULTISET, AUnorderedListType.FULLY_OPEN_UNORDEREDLIST_TYPE); + TAGS_TO_TYPES.put(ATypeTag.ARRAY, AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE); + TAGS_TO_TYPES.put(ATypeTag.GEOMETRY, BuiltinType.AGEOMETRY); + } + public static IAType getBuiltinTypeByTag(ATypeTag typeTag) throws HyracksDataException { - switch (typeTag) { - case TINYINT: - return BuiltinType.AINT8; - case SMALLINT: - return BuiltinType.AINT16; - case INTEGER: - return BuiltinType.AINT32; - case BIGINT: - return BuiltinType.AINT64; - case BINARY: - return BuiltinType.ABINARY; - case BITARRAY: - return BuiltinType.ABITARRAY; - case FLOAT: - return BuiltinType.AFLOAT; - case DOUBLE: - return BuiltinType.ADOUBLE; - case STRING: - return BuiltinType.ASTRING; - case MISSING: - return BuiltinType.AMISSING; - case NULL: - return BuiltinType.ANULL; - case BOOLEAN: - return BuiltinType.ABOOLEAN; - case DATETIME: - return BuiltinType.ADATETIME; - case DATE: - return BuiltinType.ADATE; - case TIME: - return BuiltinType.ATIME; - case DURATION: - return BuiltinType.ADURATION; - case POINT: - return BuiltinType.APOINT; - case POINT3D: - return BuiltinType.APOINT3D; - case TYPE: - return BuiltinType.ALL_TYPE; - case ANY: - return BuiltinType.ANY; - case LINE: - return BuiltinType.ALINE; - case POLYGON: - return BuiltinType.APOLYGON; - case CIRCLE: - return BuiltinType.ACIRCLE; - case RECTANGLE: - return BuiltinType.ARECTANGLE; - case INTERVAL: - return BuiltinType.AINTERVAL; - case YEARMONTHDURATION: - return BuiltinType.AYEARMONTHDURATION; - case DAYTIMEDURATION: - return BuiltinType.ADAYTIMEDURATION; - case UUID: - return BuiltinType.AUUID; - case OBJECT: - return RecordUtil.FULLY_OPEN_RECORD_TYPE; - case MULTISET: - // TODO: how come the item type in this instance is "null" - return AUnorderedListType.FULLY_OPEN_UNORDEREDLIST_TYPE; - case ARRAY: - return AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE; - case GEOMETRY: - return BuiltinType.AGEOMETRY; - default: - // TODO(tillw) should be an internal error - throw new HyracksDataException("Typetag " + typeTag + " is not a built-in type"); + IAType type = TAGS_TO_TYPES.get(typeTag); + if (type != null) { + return type; } + // TODO(tillw) should be an internal error + throw new HyracksDataException("Typetag " + typeTag + " is not a built-in type"); } public static boolean isType(ITupleReference tuple, int fieldIdx, byte tag) { diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ListObjectPool.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ListObjectPool.java index bdc1943..78c411a 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ListObjectPool.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ListObjectPool.java @@ -24,10 +24,8 @@ import java.util.List; /** - * Object pool backed by a list. - * The argument for creating E instances could be different. This class also - * considers arguments in object reusing, e.g., it reuses an E instances ONLY - * when the construction argument is "equal". + * Object pool backed by a list. The argument for creating E instances could be different. This class also considers + * arguments in object reusing, e.g., it reuses an E instances ONLY when the construction argument is "equal". */ public class ListObjectPool<E, T> implements IObjectPool<E, T> { @@ -36,12 +34,12 @@ /** * cache of objects */ - private List<E> pool = new ArrayList<E>(); + private List<E> pool = new ArrayList<>(); /** * args that are used to create each element in the pool */ - private List<T> args = new ArrayList<T>(); + private List<T> args = new ArrayList<>(); /** * bits indicating which element is in use @@ -85,7 +83,7 @@ @Override public boolean free(E element) { - for (int i = pool.size() - 1; i >= 0; i--) { + for (int i = usedBits.length(); (i = usedBits.previousSetBit(i - 1)) >= 0;) { if (element == pool.get(i)) { usedBits.clear(i); return true; diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ObjectFactories.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ObjectFactories.java index 763e1d4..2c6e408 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ObjectFactories.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/container/ObjectFactories.java @@ -20,8 +20,11 @@ import java.util.BitSet; +import org.apache.asterix.om.pointables.nonvisitor.SortedRecord; +import org.apache.asterix.om.types.ARecordType; import org.apache.hyracks.data.std.api.IMutableValueStorage; import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; @@ -36,8 +39,10 @@ private ObjectFactories() { } - public static final IObjectFactory<IPointable, Void> VOID_FACTORY = (type) -> new VoidPointable(); + public static final IObjectFactory<IPointable, Void> VOID_FACTORY = type -> new VoidPointable(); public static final IObjectFactory<IMutableValueStorage, Void> STORAGE_FACTORY = - (type) -> new ArrayBackedValueStorage(); - public static final IObjectFactory<BitSet, Void> BIT_SET_FACTORY = (type) -> new BitSet(); + type -> new ArrayBackedValueStorage(); + public static final IObjectFactory<BitSet, Void> BIT_SET_FACTORY = type -> new BitSet(); + public static final IObjectFactory<UTF8StringPointable, Void> UTF8_FACTORY = type -> new UTF8StringPointable(); + public static final IObjectFactory<SortedRecord, ARecordType> RECORD_FACTORY = SortedRecord::new; } diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java index ec1c7cb..cdbe8d3 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/utils/RecordUtil.java @@ -20,9 +20,13 @@ import java.util.List; +import org.apache.asterix.om.pointables.base.DefaultOpenFieldType; import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.types.TypeTagUtil; import org.apache.commons.lang3.StringUtils; +import org.apache.hyracks.api.exceptions.HyracksDataException; public class RecordUtil { /** @@ -35,44 +39,52 @@ } /** - * Create a fully open record type with the passed name - * - * @param name - * @return - */ - public static ARecordType createOpenRecordType(String name) { - return new ARecordType(name, new String[0], new IAType[0], true); - } - - /** - * A util method that takes a field name and return a String representation for error messages - * - * @param field - * @return + * @param field a field name represented by a list + * @return string version of the field */ public static String toFullyQualifiedName(List<String> field) { return StringUtils.join(field, "."); } /** - * A util method that takes String array and return a String representation for error messages - * - * @param field - * @return + * @param names a hierarchy of entity names + * @return string version of the entity name to be qualified */ public static String toFullyQualifiedName(String... names) { return StringUtils.join(names, "."); } /** - * compute the null Bitmap size for the open fields + * Computes the null Bitmap size when the schema has optional fields (nullable/missable) * - * @param recordType - * the record type - * @return the size of the bitmap + * @param recordType the record type + * @return the size of the bitmap in number of bytes */ public static int computeNullBitmapSize(ARecordType recordType) { - return NonTaggedFormatUtil.hasOptionalField(recordType) - ? (int) Math.ceil(recordType.getFieldNames().length / 4.0) : 0; + return computeNullBitmapSize(NonTaggedFormatUtil.hasOptionalField(recordType), recordType); + } + + public static int computeNullBitmapSize(boolean hasOptionalField, ARecordType recordType) { + // each field needs 2 bits for MISSING, NULL, and VALUE. for 4 fields, 4*2=8 bits (1 byte), thus divide by 4. + return hasOptionalField ? (int) Math.ceil(recordType.getFieldTypes().length / 4.0) : 0; + } + + public static boolean isNull(byte nullMissingBits, int fieldIndex) { + int position = 1 << (7 - 2 * (fieldIndex % 4)); + return (nullMissingBits & position) == 0; + } + + public static boolean isMissing(byte nullMissingBits, int fieldIndex) { + int position = 1 << (7 - 2 * (fieldIndex % 4) - 1); + return (nullMissingBits & position) == 0; + } + + public static IAType getType(ARecordType recordType, int fieldIdx, ATypeTag fieldTag) throws HyracksDataException { + IAType[] fieldTypes = recordType.getFieldTypes(); + if (fieldIdx >= fieldTypes.length) { + return fieldTag.isDerivedType() ? DefaultOpenFieldType.getDefaultOpenFieldType(fieldTag) + : TypeTagUtil.getBuiltinTypeByTag(fieldTag); + } + return fieldTypes[fieldIdx]; } } diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java index f683615..4137581 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java @@ -154,6 +154,14 @@ return UTF8StringUtil.compareTo(this.bytes, this.start, bytes, start); } + // TODO(ali): could use the normalized key, too. + // takes advantage of cached utf8 length and meta length + public static int compare(UTF8StringPointable pointable1, UTF8StringPointable pointable2) { + return UTF8StringUtil.compareTo(pointable1.bytes, pointable1.start + pointable1.metaLength, + pointable1.utf8Length, pointable2.bytes, pointable2.start + pointable2.metaLength, + pointable2.utf8Length); + } + @Override public int hash() { if (hashValue == 0) { diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java index 3b3e7b4..9aa2b5d 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java @@ -241,6 +241,12 @@ return compareTo(thisBytes, thisStart, thatBytes, thatStart, false, false); } + // the start and length of each are the ones calculated by UTF8StringPointable. caller should provide proper values + public static int compareTo(byte[] thisBytes, int thisStart, int thisLength, byte[] thatBytes, int thatStart, + int thatLength) { + return compareTo(thisBytes, thisStart, thisLength, thatBytes, thatStart, thatLength, false, false); + } + /** * This function provides the raw bytes-based comparison for UTF8 strings. * Note that the comparison may not deliver the correct ordering for certain languages that include 2 or 3 bytes characters. -- To view, visit https://asterix-gerrit.ics.uci.edu/3280 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I19ac95a91749b2983bf06f763e463521a97a261c Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Ali Alsuliman <[email protected]>
