Heri Ramampiaro has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1203
Change subject: This change adds fixes for possible race conditions or sporadic test failures caused by the remove-fields function. ...................................................................... This change adds fixes for possible race conditions or sporadic test failures caused by the remove-fields function. Change-Id: I39324fcfb10a93b599816920007d48c753da11c4 --- M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsDescriptor.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsEvalFactory.java 4 files changed, 60 insertions(+), 53 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/03/1203/1 diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java index 24e66f2..c3eb8bb 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/PointableHelper.java @@ -20,18 +20,13 @@ import java.io.DataOutput; import java.io.IOException; - import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.om.pointables.base.IVisitablePointable; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.EnumDeserializer; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.api.dataflow.value.IBinaryComparator; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.api.IMutableValueStorage; import org.apache.hyracks.data.std.api.IValueReference; -import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.util.string.UTF8StringWriter; /** @@ -42,22 +37,10 @@ */ public class PointableHelper { - private static final IBinaryComparator STRING_BINARY_COMPARATOR = PointableBinaryComparatorFactory.of( - UTF8StringPointable.FACTORY).createBinaryComparator(); private final UTF8StringWriter utf8Writer; public PointableHelper() { utf8Writer = new UTF8StringWriter(); - } - - public static int compareStringBinValues(IValueReference a, IValueReference b) throws HyracksDataException { - // start+1 and len-1 due to type tag ignore (only interested in String value) - return STRING_BINARY_COMPARATOR.compare(a.getByteArray(), a.getStartOffset() + 1, a.getLength() - 1, - b.getByteArray(), b.getStartOffset() + 1, b.getLength() - 1); - } - - public static boolean isEqual(IValueReference a, IValueReference b) throws HyracksDataException { - return (compareStringBinValues(a, b) == 0); } public static boolean byteArrayEqual(IValueReference valueRef1, IValueReference valueRef2) { diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java index 2f54c9e..f2af253 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java @@ -45,7 +45,12 @@ import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; @@ -82,40 +87,40 @@ public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) throws AlgebricksException { return new IScalarEvaluatorFactory() { - - private static final long serialVersionUID = 1L; - @Override public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException { - final PointableAllocator pa = new PointableAllocator(); - final IVisitablePointable vp0 = pa.allocateRecordValue(inRecType0); - final IVisitablePointable vp1 = pa.allocateRecordValue(inRecType1); - - final IPointable argPtr0 = new VoidPointable(); - final IPointable argPtr1 = new VoidPointable(); - - final IScalarEvaluator eval0 = args[0].createScalarEvaluator(ctx); - final IScalarEvaluator eval1 = args[1].createScalarEvaluator(ctx); - - final List<RecordBuilder> rbStack = new ArrayList<>(); - - final ArrayBackedValueStorage tabvs = new ArrayBackedValueStorage(); - return new IScalarEvaluator() { + private final PointableAllocator pa = new PointableAllocator(); + private final IVisitablePointable vp0 = pa.allocateRecordValue(inRecType0); + private final IVisitablePointable vp1 = pa.allocateRecordValue(inRecType1); + + private final IPointable inputArg0 = new VoidPointable(); + private final IPointable inputArg1 = new VoidPointable(); + + private final IScalarEvaluator eval0 = args[0].createScalarEvaluator(ctx); + private final IScalarEvaluator eval1 = args[1].createScalarEvaluator(ctx); + + private final List<RecordBuilder> rbStack = new ArrayList<>(); + + private final ArrayBackedValueStorage tabvs = new ArrayBackedValueStorage(); + private final RuntimeRecordTypeInfo runtimeRecordTypeInfo = new RuntimeRecordTypeInfo(); private final DeepEqualAssessor deepEqualAssesor = new DeepEqualAssessor(); private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage(); private DataOutput out = resultStorage.getDataOutput(); + private final IBinaryComparator stringBinComparator = PointableBinaryComparatorFactory.of( + UTF8StringPointable.FACTORY).createBinaryComparator(); + @Override public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException { resultStorage.reset(); - eval0.evaluate(tuple, argPtr0); - eval1.evaluate(tuple, argPtr1); + eval0.evaluate(tuple, inputArg0); + eval1.evaluate(tuple, inputArg1); - vp0.set(argPtr0); - vp1.set(argPtr1); + vp0.set(inputArg0); + vp1.set(inputArg1); ARecordVisitablePointable rp0 = (ARecordVisitablePointable) vp0; ARecordVisitablePointable rp1 = (ARecordVisitablePointable) vp1; @@ -150,7 +155,7 @@ IVisitablePointable rightValue = rightRecord.getFieldValues().get(j); IVisitablePointable rightType = rightRecord.getFieldTypeTags().get(j); // Check if same fieldname - if (PointableHelper.isEqual(leftName, rightName) + if (compareStringBinValues(leftName, rightName) && !deepEqualAssesor.isEqual(leftValue, rightValue)) { //Field was found on the right and are subrecords, merge them if (PointableHelper.sameType(ATypeTag.RECORD, rightType) @@ -230,6 +235,14 @@ } } + private boolean compareStringBinValues(IValueReference a, IValueReference b) + throws HyracksDataException { + // start+1 and len-1 due to type tag ignore (only interested in String value) + int diff = stringBinComparator.compare(a.getByteArray(), a.getStartOffset() + 1, + a.getLength() - 1, b.getByteArray(), b.getStartOffset() + 1, b.getLength() - 1); + return (diff == 0); + } + }; } }; diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsDescriptor.java index b7e719d..56edc5c 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsDescriptor.java @@ -30,7 +30,6 @@ import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; public class RecordRemoveFieldsDescriptor extends AbstractScalarFunctionDynamicDescriptor { - private static final long serialVersionUID = 1L; public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { @Override public IFunctionDescriptor createFunctionDescriptor() { diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsEvalFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsEvalFactory.java index 68865c3..6c5803e 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsEvalFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordRemoveFieldsEvalFactory.java @@ -44,14 +44,17 @@ import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.api.IValueReference; +import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; -class RecordRemoveFieldsEvalFactory implements IScalarEvaluatorFactory { - private static final long serialVersionUID = 1L; +public class RecordRemoveFieldsEvalFactory implements IScalarEvaluatorFactory { private IScalarEvaluatorFactory inputRecordEvalFactory; private IScalarEvaluatorFactory removeFieldPathsFactory; private ARecordType requiredRecType; @@ -71,16 +74,15 @@ @Override public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException { - - final PointableAllocator pa = new PointableAllocator(); - final IVisitablePointable vp0 = pa.allocateRecordValue(inputRecType); - final IVisitablePointable vp1 = pa.allocateListValue(inputListType); - final IPointable inputArg0 = new VoidPointable(); - final IPointable inputArg1 = new VoidPointable(); - final IScalarEvaluator eval0 = inputRecordEvalFactory.createScalarEvaluator(ctx); - final IScalarEvaluator eval1 = removeFieldPathsFactory.createScalarEvaluator(ctx); - return new IScalarEvaluator() { + private final PointableAllocator pa = new PointableAllocator(); + private final IVisitablePointable vp0 = pa.allocateRecordValue(inputRecType); + private final IVisitablePointable vp1 = pa.allocateListValue(inputListType); + private final IPointable inputArg0 = new VoidPointable(); + private final IPointable inputArg1 = new VoidPointable(); + private final IScalarEvaluator eval0 = inputRecordEvalFactory.createScalarEvaluator(ctx); + private final IScalarEvaluator eval1 = removeFieldPathsFactory.createScalarEvaluator(ctx); + private final RuntimeRecordTypeInfo runtimeRecordTypeInfo = new RuntimeRecordTypeInfo(); private final List<RecordBuilder> rbStack = new ArrayList<>(); @@ -89,6 +91,9 @@ private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage(); private DataOutput out = resultStorage.getDataOutput(); + + private final IBinaryComparator stringBinComparator = PointableBinaryComparatorFactory.of( + UTF8StringPointable.FACTORY).createBinaryComparator(); @Override public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException { @@ -202,7 +207,7 @@ boolean match = true; Iterator<IVisitablePointable> fpi = recordPath.iterator(); for (int j = inputPathItems.size() - 1; j >= 0; j--) { - match &= PointableHelper.isEqual(inputPathItems.get(j), fpi.next()); + match &= compareStringBinValues(inputPathItems.get(j), fpi.next()); if (!match) { break; } @@ -212,13 +217,20 @@ } } } else { - if (PointableHelper.isEqual(recordPath.getFirst(), item)) { + if (compareStringBinValues(recordPath.getFirst(), item)) { return false; } } } return true; } + + private boolean compareStringBinValues(IValueReference a, IValueReference b) throws HyracksDataException { + // start+1 and len-1 due to type tag ignore (only interested in String value) + int diff = stringBinComparator.compare(a.getByteArray(), a.getStartOffset() + 1, a.getLength() - 1, + b.getByteArray(), b.getStartOffset() + 1, b.getLength() - 1); + return (diff == 0); + } }; } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1203 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I39324fcfb10a93b599816920007d48c753da11c4 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Heri Ramampiaro <heri...@gmail.com>