Hussain Towaileb has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/3370
Change subject: [WIP] Fix autogenerated uuid present in upsert issue
......................................................................
[WIP] Fix autogenerated uuid present in upsert issue
Change-Id: I22100d3ff29864b8bfd54b0decb183e5056fdb4a
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
M
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
M
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
A
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeEvaluator.java
A
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeIgnoreDuplicatesDescriptor.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
7 files changed, 364 insertions(+), 184 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/70/3370/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
index f1b20d8..a1a788f 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
@@ -233,7 +233,7 @@
recordMergeFnArgs.add(new MutableObject<>(rec0));
recordMergeFnArgs.add(new MutableObject<>(rec1));
AbstractFunctionCallExpression recordMergeFn = new
ScalarFunctionCallExpression(
- FunctionUtil.getFunctionInfo(BuiltinFunctions.RECORD_MERGE),
recordMergeFnArgs);
+
FunctionUtil.getFunctionInfo(BuiltinFunctions.RECORD_MERGE_UUID_IGNORE_DUPLICATE),
recordMergeFnArgs);
recordMergeFn.setSourceLocation(sourceLoc);
return recordMergeFn;
}
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index df2a868..cf4861e 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -254,6 +254,8 @@
// objects
public static final FunctionIdentifier RECORD_MERGE =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"object-merge", 2);
+ public static final FunctionIdentifier RECORD_MERGE_UUID_IGNORE_DUPLICATE =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"object-merge", 2);
public static final FunctionIdentifier RECORD_CONCAT =
new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"object-concat", FunctionIdentifier.VARARGS);
public static final FunctionIdentifier RECORD_CONCAT_STRICT =
@@ -2181,6 +2183,7 @@
// objects
addFunction(RECORD_MERGE, RecordMergeTypeComputer.INSTANCE, true);
+ addFunction(RECORD_MERGE_UUID_IGNORE_DUPLICATE,
RecordMergeTypeComputer.INSTANCE_UUID_IGNORE_DUPLICATE, true);
addFunction(RECORD_CONCAT, OpenARecordTypeComputer.INSTANCE, true);
addPrivateFunction(RECORD_CONCAT_STRICT,
OpenARecordTypeComputer.INSTANCE, true);
addFunction(ADD_FIELDS, RecordAddFieldsTypeComputer.INSTANCE, true);
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java
index 109f7c6..f61d964 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java
@@ -32,6 +32,7 @@
import org.apache.asterix.om.types.AUnionType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.TypeHelper;
+import org.apache.asterix.om.utils.RecordUtil;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -43,9 +44,17 @@
public class RecordMergeTypeComputer implements IResultTypeComputer {
public static final RecordMergeTypeComputer INSTANCE = new
RecordMergeTypeComputer();
+ public static final RecordMergeTypeComputer INSTANCE_UUID_IGNORE_DUPLICATE
= new RecordMergeTypeComputer(true);
private RecordMergeTypeComputer() {
+ this(false);
}
+
+ private RecordMergeTypeComputer(boolean isUuidIgnoreDuplicate) {
+ this.isUuidIgnoreDuplicate = isUuidIgnoreDuplicate;
+ }
+
+ private final boolean isUuidIgnoreDuplicate;
@Override
public IAType computeType(ILogicalExpression expression,
IVariableTypeEnvironment env,
@@ -64,6 +73,12 @@
ARecordType recType1 = TypeComputeUtils.extractRecordType(t1);
if (recType1 == null) {
throw new TypeMismatchException(f.getSourceLocation(), funcId, 1,
t1.getTypeTag(), ATypeTag.OBJECT);
+ }
+
+ // If the left record is fully open, and this is a primary uuid key
merge, then we can't tell if we need
+ // to merge or not, so just return the left record and let the
evaluator handle it
+ if (recType0.deepEqual(RecordUtil.FULLY_OPEN_RECORD_TYPE) &&
isUuidIgnoreDuplicate) {
+ return recType0;
}
List<String> resultFieldNames = new ArrayList<>();
@@ -85,15 +100,21 @@
List<String> additionalFieldNames = new ArrayList<>();
List<IAType> additionalFieldTypes = new ArrayList<>();
- String fieldNames[] = recType1.getFieldNames();
- IAType fieldTypes[] = recType1.getFieldTypes();
+ String[] fieldNames = recType1.getFieldNames();
+ IAType[] fieldTypes = recType1.getFieldTypes();
for (int i = 0; i < fieldNames.length; ++i) {
int pos = Collections.binarySearch(resultFieldNames,
fieldNames[i]);
if (pos >= 0) {
IAType resultFieldType = resultFieldTypes.get(pos);
if (resultFieldType.getTypeTag() !=
fieldTypes[i].getTypeTag()) {
- throw new
CompilationException(ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME,
f.getSourceLocation(),
- fieldNames[i]);
+ // This flag is true for checking the presence of the
primary key uuid field, if it is present,
+ // return the left record and don't throw any exceptions
+ if (isUuidIgnoreDuplicate) {
+ return t0;
+ } else {
+ throw new
CompilationException(ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME,
+ f.getSourceLocation(), fieldNames[i]);
+ }
}
// Assuming fieldTypes[i].getTypeTag() =
resultFieldType.getTypeTag()
if (fieldTypes[i].getTypeTag() == ATypeTag.OBJECT) {
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
index 45a85b2..bc297bf 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
@@ -18,43 +18,20 @@
*/
package org.apache.asterix.runtime.evaluators.functions.records;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.common.annotations.MissingNullInOutFunction;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.om.functions.IFunctionTypeInferer;
-import org.apache.asterix.om.pointables.ARecordVisitablePointable;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.runtime.RuntimeRecordTypeInfo;
import
org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
-import org.apache.asterix.runtime.evaluators.comparisons.DeepEqualAssessor;
-import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
import org.apache.asterix.runtime.functions.FunctionTypeInferers;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.accessors.UTF8StringBinaryComparatorFactory;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
/**
* record merge evaluator is used to combine two records with no matching
fieldnames
@@ -82,15 +59,13 @@
};
private static final long serialVersionUID = 1L;
- private ARecordType outRecType;
- private ARecordType inRecType0;
- private ARecordType inRecType1;
+ private final IAType[] argTypes = new IAType[3];
@Override
public void setImmutableStates(Object... states) {
- outRecType = TypeComputeUtils.extractRecordType((IAType) states[0]);
- inRecType0 = TypeComputeUtils.extractRecordType((IAType) states[1]);
- inRecType1 = TypeComputeUtils.extractRecordType((IAType) states[2]);
+ argTypes[0] = TypeComputeUtils.extractRecordType((IAType) states[0]);
+ argTypes[1] = TypeComputeUtils.extractRecordType((IAType) states[1]);
+ argTypes[2] = TypeComputeUtils.extractRecordType((IAType) states[2]);
}
@Override
@@ -100,156 +75,7 @@
@Override
public IScalarEvaluator createScalarEvaluator(final
IHyracksTaskContext ctx) throws HyracksDataException {
- final PointableAllocator pa = new PointableAllocator();
- final IVisitablePointable vp0 =
pa.allocateRecordValue(inRecType0);
- final IVisitablePointable vp1 =
pa.allocateRecordValue(inRecType1);
-
- final IPointable argPtr0 = new VoidPointable();
- final IPointable argPtr1 = new VoidPointable();
-
- final IScalarEvaluator eval0 =
args[0].createScalarEvaluator(ctx);
- final IScalarEvaluator eval1 =
args[1].createScalarEvaluator(ctx);
-
- final List<RecordBuilder> rbStack = new ArrayList<>();
-
- final ArrayBackedValueStorage tabvs = new
ArrayBackedValueStorage();
- final IBinaryComparator stringBinaryComparator =
-
UTF8StringBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-
- return new IScalarEvaluator() {
-
- private final RuntimeRecordTypeInfo runtimeRecordTypeInfo
= new RuntimeRecordTypeInfo();
- private final DeepEqualAssessor deepEqualAssesor = new
DeepEqualAssessor();
- private ArrayBackedValueStorage resultStorage = new
ArrayBackedValueStorage();
- private DataOutput out = resultStorage.getDataOutput();
-
- @Override
- public void evaluate(IFrameTupleReference tuple,
IPointable result) throws HyracksDataException {
- resultStorage.reset();
- eval0.evaluate(tuple, argPtr0);
- eval1.evaluate(tuple, argPtr1);
-
- if (PointableHelper.checkAndSetMissingOrNull(result,
argPtr0, argPtr1)) {
- return;
- }
-
- vp0.set(argPtr0);
- vp1.set(argPtr1);
-
- ARecordVisitablePointable rp0 =
(ARecordVisitablePointable) vp0;
- ARecordVisitablePointable rp1 =
(ARecordVisitablePointable) vp1;
-
- try {
- mergeFields(outRecType, rp0, rp1, true, 0);
- rbStack.get(0).write(out, true);
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- result.set(resultStorage);
- }
-
- private void mergeFields(ARecordType combinedType,
ARecordVisitablePointable leftRecord,
- ARecordVisitablePointable rightRecord, boolean
openFromParent, int nestedLevel)
- throws IOException {
- if (rbStack.size() < (nestedLevel + 1)) {
- rbStack.add(new RecordBuilder());
- }
-
- rbStack.get(nestedLevel).reset(combinedType);
- rbStack.get(nestedLevel).init();
-
- //Add all fields from left record
- for (int i = 0; i < leftRecord.getFieldNames().size();
i++) {
- IVisitablePointable leftName =
leftRecord.getFieldNames().get(i);
- IVisitablePointable leftValue =
leftRecord.getFieldValues().get(i);
- IVisitablePointable leftType =
leftRecord.getFieldTypeTags().get(i);
- boolean foundMatch = false;
- for (int j = 0; j <
rightRecord.getFieldNames().size(); j++) {
- IVisitablePointable rightName =
rightRecord.getFieldNames().get(j);
- IVisitablePointable rightValue =
rightRecord.getFieldValues().get(j);
- IVisitablePointable rightType =
rightRecord.getFieldTypeTags().get(j);
- // Check if same fieldname
- if (PointableHelper.isEqual(leftName,
rightName, stringBinaryComparator)
- &&
!deepEqualAssesor.isEqual(leftValue, rightValue)) {
- //Field was found on the right and are
subrecords, merge them
- if
(PointableHelper.sameType(ATypeTag.OBJECT, rightType)
- &&
PointableHelper.sameType(ATypeTag.OBJECT, leftType)) {
- //We are merging two sub records
- addFieldToSubRecord(combinedType,
leftName, leftValue, rightValue,
- openFromParent, nestedLevel);
- foundMatch = true;
- } else {
- throw new
RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME, getIdentifier());
- }
- }
- }
- if (!foundMatch) {
- addFieldToSubRecord(combinedType, leftName,
leftValue, null, openFromParent,
- nestedLevel);
- }
-
- }
- //Repeat for right side (ignoring duplicates this time)
- for (int j = 0; j <
rightRecord.getFieldNames().size(); j++) {
- IVisitablePointable rightName =
rightRecord.getFieldNames().get(j);
- IVisitablePointable rightValue =
rightRecord.getFieldValues().get(j);
- boolean foundMatch = false;
- for (int i = 0; i <
leftRecord.getFieldNames().size(); i++) {
- IVisitablePointable leftName =
leftRecord.getFieldNames().get(i);
- if (rightName.equals(leftName)) {
- foundMatch = true;
- }
- }
- if (!foundMatch) {
- addFieldToSubRecord(combinedType, rightName,
rightValue, null, openFromParent,
- nestedLevel);
- }
- }
- }
-
- /*
- * Takes in a record type, field name, and the field
values (which are record) from two records
- * Merges them into one record of combinedType
- * And adds that record as a field to the Record in subrb
- * the second value can be null, indicated that you just
add the value of left as a field to subrb
- *
- */
- private void addFieldToSubRecord(ARecordType combinedType,
IVisitablePointable fieldNamePointable,
- IVisitablePointable leftValue, IVisitablePointable
rightValue, boolean openFromParent,
- int nestedLevel) throws IOException {
-
- runtimeRecordTypeInfo.reset(combinedType);
- int pos =
runtimeRecordTypeInfo.getFieldIndex(fieldNamePointable.getByteArray(),
- fieldNamePointable.getStartOffset() + 1,
fieldNamePointable.getLength() - 1);
-
- //Add the merged field
- if (combinedType != null && pos >= 0) {
- if (rightValue == null) {
- rbStack.get(nestedLevel).addField(pos,
leftValue);
- } else {
- mergeFields((ARecordType)
combinedType.getFieldTypes()[pos],
- (ARecordVisitablePointable) leftValue,
(ARecordVisitablePointable) rightValue,
- false, nestedLevel + 1);
-
- tabvs.reset();
- rbStack.get(nestedLevel +
1).write(tabvs.getDataOutput(), true);
- rbStack.get(nestedLevel).addField(pos, tabvs);
- }
- } else {
- if (rightValue == null) {
-
rbStack.get(nestedLevel).addField(fieldNamePointable, leftValue);
- } else {
-
mergeFields(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE,
- (ARecordVisitablePointable) leftValue,
(ARecordVisitablePointable) rightValue,
- false, nestedLevel + 1);
- tabvs.reset();
- rbStack.get(nestedLevel +
1).write(tabvs.getDataOutput(), true);
-
rbStack.get(nestedLevel).addField(fieldNamePointable, tabvs);
- }
- }
- }
-
- };
+ return new RecordMergeEvaluator(ctx, args, argTypes,
sourceLoc, getIdentifier(), false);
}
};
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeEvaluator.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeEvaluator.java
new file mode 100644
index 0000000..fffa903
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeEvaluator.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.records;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.common.annotations.MissingNullInOutFunction;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
+import org.apache.asterix.om.pointables.PointableAllocator;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.runtime.RuntimeRecordTypeInfo;
+import org.apache.asterix.runtime.evaluators.comparisons.DeepEqualAssessor;
+import org.apache.asterix.runtime.evaluators.functions.AbstractScalarEval;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryComparatorFactory;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * record merge evaluator is used to combine two records with no matching
fieldnames
+ * If both records have the same fieldname for a non-record field anywhere in
the schema, the merge will fail
+ * This function is performed on a recursive level, meaning that nested
records can be combined
+ * for instance if both records have a nested field called "metadata"
+ * where metadata from A is {"comments":"this rocks"} and metadata from B is
{"index":7, "priority":5}
+ * Records A and B can be combined yielding a nested record called "metadata"
+ * That will have all three fields
+ */
+
+@MissingNullInOutFunction
+public class RecordMergeEvaluator extends AbstractScalarEval {
+
+ private final boolean isUuidIgnoreDuplicate;
+ private final ARecordType outRecType;
+ private final ARecordType inRecType0;
+ private final ARecordType inRecType1;
+
+ private final PointableAllocator pa = new PointableAllocator();
+ private final IVisitablePointable vp0;
+ private final IVisitablePointable vp1;
+
+ private final IPointable argPtr0 = new VoidPointable();
+ private final IPointable argPtr1 = new VoidPointable();
+
+ private final IScalarEvaluator eval0;
+ private final IScalarEvaluator eval1;
+
+ private final List<RecordBuilder> rbStack = new ArrayList<>();
+
+ private final ArrayBackedValueStorage tabvs = new
ArrayBackedValueStorage();
+ private final IBinaryComparator stringBinaryComparator =
+
UTF8StringBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+
+ private final RuntimeRecordTypeInfo runtimeRecordTypeInfo = new
RuntimeRecordTypeInfo();
+ private final DeepEqualAssessor deepEqualAssesor = new DeepEqualAssessor();
+ private ArrayBackedValueStorage resultStorage = new
ArrayBackedValueStorage();
+ private DataOutput out = resultStorage.getDataOutput();
+
+ RecordMergeEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory[]
args, IAType[] argTypes,
+ SourceLocation sourceLocation, FunctionIdentifier identifier,
boolean isUuidIgnoreDuplicate)
+ throws HyracksDataException {
+ super(sourceLocation, identifier);
+ this.isUuidIgnoreDuplicate = isUuidIgnoreDuplicate;
+
+ eval0 = args[0].createScalarEvaluator(ctx);
+ eval1 = args[1].createScalarEvaluator(ctx);
+
+ outRecType = (ARecordType) argTypes[0];
+ inRecType0 = (ARecordType) argTypes[1];
+ inRecType1 = (ARecordType) argTypes[2];
+
+ vp0 = pa.allocateRecordValue(inRecType0);
+ vp1 = pa.allocateRecordValue(inRecType1);
+ }
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws
HyracksDataException {
+ resultStorage.reset();
+ eval0.evaluate(tuple, argPtr0);
+ eval1.evaluate(tuple, argPtr1);
+
+ if (PointableHelper.checkAndSetMissingOrNull(result, argPtr0,
argPtr1)) {
+ return;
+ }
+
+ vp0.set(argPtr0);
+ vp1.set(argPtr1);
+
+ ARecordVisitablePointable rp0 = (ARecordVisitablePointable) vp0;
+ ARecordVisitablePointable rp1 = (ARecordVisitablePointable) vp1;
+
+ try {
+ mergeFields(outRecType, rp0, rp1, true, 0);
+ rbStack.get(0).write(out, true);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ result.set(resultStorage);
+ }
+
+ private void mergeFields(ARecordType combinedType,
ARecordVisitablePointable leftRecord,
+ ARecordVisitablePointable rightRecord, boolean openFromParent, int
nestedLevel) throws IOException {
+ if (rbStack.size() < (nestedLevel + 1)) {
+ rbStack.add(new RecordBuilder());
+ }
+
+ rbStack.get(nestedLevel).reset(combinedType);
+ rbStack.get(nestedLevel).init();
+
+ //Add all fields from left record
+ for (int i = 0; i < leftRecord.getFieldNames().size(); i++) {
+ IVisitablePointable leftName = leftRecord.getFieldNames().get(i);
+ IVisitablePointable leftValue = leftRecord.getFieldValues().get(i);
+ IVisitablePointable leftType =
leftRecord.getFieldTypeTags().get(i);
+ boolean foundMatch = false;
+ for (int j = 0; j < rightRecord.getFieldNames().size(); j++) {
+ IVisitablePointable rightName =
rightRecord.getFieldNames().get(j);
+ IVisitablePointable rightValue =
rightRecord.getFieldValues().get(j);
+ IVisitablePointable rightType =
rightRecord.getFieldTypeTags().get(j);
+ // Check if same fieldname
+ if (PointableHelper.isEqual(leftName, rightName,
stringBinaryComparator)
+ && !deepEqualAssesor.isEqual(leftValue, rightValue)) {
+ //Field was found on the right and are subrecords, merge
them
+ if (PointableHelper.sameType(ATypeTag.OBJECT, rightType)
+ && PointableHelper.sameType(ATypeTag.OBJECT,
leftType)) {
+ //We are merging two sub records
+ addFieldToSubRecord(combinedType, leftName, leftValue,
rightValue, openFromParent, nestedLevel);
+ foundMatch = true;
+ } else {
+ // This flag is true for checking the presence of the
primary key uuid field, if it is present,
+ // return the left record and don't throw any
exceptions
+ if (isUuidIgnoreDuplicate) {
+ resultStorage.set(leftRecord);
+ return;
+ } else {
+ throw new
RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME, getIdentifier());
+ }
+ }
+ }
+ }
+ if (!foundMatch) {
+ addFieldToSubRecord(combinedType, leftName, leftValue, null,
openFromParent, nestedLevel);
+ }
+
+ }
+ //Repeat for right side (ignoring duplicates this time)
+ for (int j = 0; j < rightRecord.getFieldNames().size(); j++) {
+ IVisitablePointable rightName = rightRecord.getFieldNames().get(j);
+ IVisitablePointable rightValue =
rightRecord.getFieldValues().get(j);
+ boolean foundMatch = false;
+ for (int i = 0; i < leftRecord.getFieldNames().size(); i++) {
+ IVisitablePointable leftName =
leftRecord.getFieldNames().get(i);
+ if (rightName.equals(leftName)) {
+ foundMatch = true;
+ }
+ }
+ if (!foundMatch) {
+ addFieldToSubRecord(combinedType, rightName, rightValue, null,
openFromParent, nestedLevel);
+ }
+ }
+ }
+
+ /*
+ * Takes in a record type, field name, and the field values (which are
record) from two records
+ * Merges them into one record of combinedType
+ * And adds that record as a field to the Record in subrb
+ * the second value can be null, indicated that you just add the value of
left as a field to subrb
+ *
+ */
+ private void addFieldToSubRecord(ARecordType combinedType,
IVisitablePointable fieldNamePointable,
+ IVisitablePointable leftValue, IVisitablePointable rightValue,
boolean openFromParent, int nestedLevel)
+ throws IOException {
+
+ runtimeRecordTypeInfo.reset(combinedType);
+ int pos =
runtimeRecordTypeInfo.getFieldIndex(fieldNamePointable.getByteArray(),
+ fieldNamePointable.getStartOffset() + 1,
fieldNamePointable.getLength() - 1);
+
+ //Add the merged field
+ if (combinedType != null && pos >= 0) {
+ if (rightValue == null) {
+ rbStack.get(nestedLevel).addField(pos, leftValue);
+ } else {
+ mergeFields((ARecordType) combinedType.getFieldTypes()[pos],
(ARecordVisitablePointable) leftValue,
+ (ARecordVisitablePointable) rightValue, false,
nestedLevel + 1);
+
+ tabvs.reset();
+ rbStack.get(nestedLevel + 1).write(tabvs.getDataOutput(),
true);
+ rbStack.get(nestedLevel).addField(pos, tabvs);
+ }
+ } else {
+ if (rightValue == null) {
+ rbStack.get(nestedLevel).addField(fieldNamePointable,
leftValue);
+ } else {
+ mergeFields(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE,
(ARecordVisitablePointable) leftValue,
+ (ARecordVisitablePointable) rightValue, false,
nestedLevel + 1);
+ tabvs.reset();
+ rbStack.get(nestedLevel + 1).write(tabvs.getDataOutput(),
true);
+ rbStack.get(nestedLevel).addField(fieldNamePointable, tabvs);
+ }
+ }
+ }
+
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.RECORD_MERGE;
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeIgnoreDuplicatesDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeIgnoreDuplicatesDescriptor.java
new file mode 100644
index 0000000..e8d73d1
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeIgnoreDuplicatesDescriptor.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.records;
+
+import org.apache.asterix.common.annotations.MissingNullInOutFunction;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
+import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
+import org.apache.asterix.om.types.IAType;
+import
org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * record merge evaluator is used to combine two records with no matching
fieldnames
+ * If both records have the same fieldname for a non-record field anywhere in
the schema, the merge will fail
+ * This function is performed on a recursive level, meaning that nested
records can be combined
+ * for instance if both records have a nested field called "metadata"
+ * where metadata from A is {"comments":"this rocks"} and metadata from B is
{"index":7, "priority":5}
+ * Records A and B can be combined yielding a nested record called "metadata"
+ * That will have all three fields
+ */
+
+@MissingNullInOutFunction
+public class RecordMergeIgnoreDuplicatesDescriptor extends
AbstractScalarFunctionDynamicDescriptor {
+
+ public static final IFunctionDescriptorFactory FACTORY = new
IFunctionDescriptorFactory() {
+ @Override
+ public IFunctionDescriptor createFunctionDescriptor() {
+ return new RecordMergeIgnoreDuplicatesDescriptor();
+ }
+
+ @Override
+ public IFunctionTypeInferer createFunctionTypeInferer() {
+ return new FunctionTypeInferers.RecordMergeTypeInferer();
+ }
+ };
+
+ private static final long serialVersionUID = 1L;
+ private final IAType[] argTypes = new IAType[3];
+
+ @Override
+ public void setImmutableStates(Object... states) {
+ argTypes[0] = TypeComputeUtils.extractRecordType((IAType) states[0]);
+ argTypes[1] = TypeComputeUtils.extractRecordType((IAType) states[1]);
+ argTypes[2] = TypeComputeUtils.extractRecordType((IAType) states[2]);
+ }
+
+ @Override
+ public IScalarEvaluatorFactory createEvaluatorFactory(final
IScalarEvaluatorFactory[] args) {
+ return new IScalarEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public IScalarEvaluator createScalarEvaluator(final
IHyracksTaskContext ctx) throws HyracksDataException {
+ return new RecordMergeEvaluator(ctx, args, argTypes,
sourceLoc, getIdentifier(), true);
+ }
+ };
+ }
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.RECORD_MERGE_UUID_IGNORE_DUPLICATE;
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 6269582..6038450 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -452,6 +452,7 @@
import
org.apache.asterix.runtime.evaluators.functions.records.RecordConcatStrictDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.records.RecordLengthDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.records.RecordMergeDescriptor;
+import
org.apache.asterix.runtime.evaluators.functions.records.RecordMergeIgnoreDuplicatesDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.records.RecordNamesDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.records.RecordPairsDescriptor;
import
org.apache.asterix.runtime.evaluators.functions.records.RecordPutDescriptor;
@@ -1007,6 +1008,7 @@
fc.add(GetRecordFieldValueDescriptor.FACTORY);
fc.add(DeepEqualityDescriptor.FACTORY);
fc.add(RecordMergeDescriptor.FACTORY);
+ fc.add(RecordMergeIgnoreDuplicatesDescriptor.FACTORY);
fc.add(RecordAddFieldsDescriptor.FACTORY);
fc.add(RecordRemoveFieldsDescriptor.FACTORY);
fc.add(RecordLengthDescriptor.FACTORY);
--
To view, visit https://asterix-gerrit.ics.uci.edu/3370
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I22100d3ff29864b8bfd54b0decb183e5056fdb4a
Gerrit-Change-Number: 3370
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>