Hussain Towaileb has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/3370


Change subject: [WIP] Fix autogenerated uuid present in upsert issue
......................................................................

[WIP] Fix autogenerated uuid present in upsert issue

Change-Id: I22100d3ff29864b8bfd54b0decb183e5056fdb4a
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
A 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeEvaluator.java
A 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeIgnoreDuplicatesDescriptor.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
7 files changed, 364 insertions(+), 184 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/70/3370/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
index f1b20d8..a1a788f 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceAutogenerateIDRule.java
@@ -233,7 +233,7 @@
         recordMergeFnArgs.add(new MutableObject<>(rec0));
         recordMergeFnArgs.add(new MutableObject<>(rec1));
         AbstractFunctionCallExpression recordMergeFn = new 
ScalarFunctionCallExpression(
-                FunctionUtil.getFunctionInfo(BuiltinFunctions.RECORD_MERGE), 
recordMergeFnArgs);
+                
FunctionUtil.getFunctionInfo(BuiltinFunctions.RECORD_MERGE_UUID_IGNORE_DUPLICATE),
 recordMergeFnArgs);
         recordMergeFn.setSourceLocation(sourceLoc);
         return recordMergeFn;
     }
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
index df2a868..cf4861e 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java
@@ -254,6 +254,8 @@
     // objects
     public static final FunctionIdentifier RECORD_MERGE =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"object-merge", 2);
+    public static final FunctionIdentifier RECORD_MERGE_UUID_IGNORE_DUPLICATE =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"object-merge", 2);
     public static final FunctionIdentifier RECORD_CONCAT =
             new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"object-concat", FunctionIdentifier.VARARGS);
     public static final FunctionIdentifier RECORD_CONCAT_STRICT =
@@ -2181,6 +2183,7 @@

         // objects
         addFunction(RECORD_MERGE, RecordMergeTypeComputer.INSTANCE, true);
+        addFunction(RECORD_MERGE_UUID_IGNORE_DUPLICATE, 
RecordMergeTypeComputer.INSTANCE_UUID_IGNORE_DUPLICATE, true);
         addFunction(RECORD_CONCAT, OpenARecordTypeComputer.INSTANCE, true);
         addPrivateFunction(RECORD_CONCAT_STRICT, 
OpenARecordTypeComputer.INSTANCE, true);
         addFunction(ADD_FIELDS, RecordAddFieldsTypeComputer.INSTANCE, true);
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java
index 109f7c6..f61d964 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/RecordMergeTypeComputer.java
@@ -32,6 +32,7 @@
 import org.apache.asterix.om.types.AUnionType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeHelper;
+import org.apache.asterix.om.utils.RecordUtil;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -43,9 +44,17 @@

 public class RecordMergeTypeComputer implements IResultTypeComputer {
     public static final RecordMergeTypeComputer INSTANCE = new 
RecordMergeTypeComputer();
+    public static final RecordMergeTypeComputer INSTANCE_UUID_IGNORE_DUPLICATE 
= new RecordMergeTypeComputer(true);

     private RecordMergeTypeComputer() {
+        this(false);
     }
+
+    private RecordMergeTypeComputer(boolean isUuidIgnoreDuplicate) {
+        this.isUuidIgnoreDuplicate = isUuidIgnoreDuplicate;
+    }
+
+    private final boolean isUuidIgnoreDuplicate;

     @Override
     public IAType computeType(ILogicalExpression expression, 
IVariableTypeEnvironment env,
@@ -64,6 +73,12 @@
         ARecordType recType1 = TypeComputeUtils.extractRecordType(t1);
         if (recType1 == null) {
             throw new TypeMismatchException(f.getSourceLocation(), funcId, 1, 
t1.getTypeTag(), ATypeTag.OBJECT);
+        }
+
+        // If the left record is fully open, and this is a primary uuid key 
merge, then we can't tell if we need
+        // to merge or not, so just return the left record and let the 
evaluator handle it
+        if (recType0.deepEqual(RecordUtil.FULLY_OPEN_RECORD_TYPE) && 
isUuidIgnoreDuplicate) {
+            return recType0;
         }

         List<String> resultFieldNames = new ArrayList<>();
@@ -85,15 +100,21 @@

         List<String> additionalFieldNames = new ArrayList<>();
         List<IAType> additionalFieldTypes = new ArrayList<>();
-        String fieldNames[] = recType1.getFieldNames();
-        IAType fieldTypes[] = recType1.getFieldTypes();
+        String[] fieldNames = recType1.getFieldNames();
+        IAType[] fieldTypes = recType1.getFieldTypes();
         for (int i = 0; i < fieldNames.length; ++i) {
             int pos = Collections.binarySearch(resultFieldNames, 
fieldNames[i]);
             if (pos >= 0) {
                 IAType resultFieldType = resultFieldTypes.get(pos);
                 if (resultFieldType.getTypeTag() != 
fieldTypes[i].getTypeTag()) {
-                    throw new 
CompilationException(ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME, 
f.getSourceLocation(),
-                            fieldNames[i]);
+                    // This flag is true for checking the presence of the 
primary key uuid field, if it is present,
+                    // return the left record and don't throw any exceptions
+                    if (isUuidIgnoreDuplicate) {
+                        return t0;
+                    } else {
+                        throw new 
CompilationException(ErrorCode.COMPILATION_DUPLICATE_FIELD_NAME,
+                                f.getSourceLocation(), fieldNames[i]);
+                    }
                 }
                 // Assuming fieldTypes[i].getTypeTag() = 
resultFieldType.getTypeTag()
                 if (fieldTypes[i].getTypeTag() == ATypeTag.OBJECT) {
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
index 45a85b2..bc297bf 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeDescriptor.java
@@ -18,43 +18,20 @@
  */
 package org.apache.asterix.runtime.evaluators.functions.records;

-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.common.annotations.MissingNullInOutFunction;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.om.functions.IFunctionTypeInferer;
-import org.apache.asterix.om.pointables.ARecordVisitablePointable;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
-import org.apache.asterix.om.pointables.base.IVisitablePointable;
 import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.runtime.RuntimeRecordTypeInfo;
 import 
org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
-import org.apache.asterix.runtime.evaluators.comparisons.DeepEqualAssessor;
-import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
 import org.apache.asterix.runtime.functions.FunctionTypeInferers;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.accessors.UTF8StringBinaryComparatorFactory;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;

 /**
  * record merge evaluator is used to combine two records with no matching 
fieldnames
@@ -82,15 +59,13 @@
     };

     private static final long serialVersionUID = 1L;
-    private ARecordType outRecType;
-    private ARecordType inRecType0;
-    private ARecordType inRecType1;
+    private final IAType[] argTypes = new IAType[3];

     @Override
     public void setImmutableStates(Object... states) {
-        outRecType = TypeComputeUtils.extractRecordType((IAType) states[0]);
-        inRecType0 = TypeComputeUtils.extractRecordType((IAType) states[1]);
-        inRecType1 = TypeComputeUtils.extractRecordType((IAType) states[2]);
+        argTypes[0] = TypeComputeUtils.extractRecordType((IAType) states[0]);
+        argTypes[1] = TypeComputeUtils.extractRecordType((IAType) states[1]);
+        argTypes[2] = TypeComputeUtils.extractRecordType((IAType) states[2]);
     }

     @Override
@@ -100,156 +75,7 @@

             @Override
             public IScalarEvaluator createScalarEvaluator(final 
IHyracksTaskContext ctx) throws HyracksDataException {
-                final PointableAllocator pa = new PointableAllocator();
-                final IVisitablePointable vp0 = 
pa.allocateRecordValue(inRecType0);
-                final IVisitablePointable vp1 = 
pa.allocateRecordValue(inRecType1);
-
-                final IPointable argPtr0 = new VoidPointable();
-                final IPointable argPtr1 = new VoidPointable();
-
-                final IScalarEvaluator eval0 = 
args[0].createScalarEvaluator(ctx);
-                final IScalarEvaluator eval1 = 
args[1].createScalarEvaluator(ctx);
-
-                final List<RecordBuilder> rbStack = new ArrayList<>();
-
-                final ArrayBackedValueStorage tabvs = new 
ArrayBackedValueStorage();
-                final IBinaryComparator stringBinaryComparator =
-                        
UTF8StringBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-
-                return new IScalarEvaluator() {
-
-                    private final RuntimeRecordTypeInfo runtimeRecordTypeInfo 
= new RuntimeRecordTypeInfo();
-                    private final DeepEqualAssessor deepEqualAssesor = new 
DeepEqualAssessor();
-                    private ArrayBackedValueStorage resultStorage = new 
ArrayBackedValueStorage();
-                    private DataOutput out = resultStorage.getDataOutput();
-
-                    @Override
-                    public void evaluate(IFrameTupleReference tuple, 
IPointable result) throws HyracksDataException {
-                        resultStorage.reset();
-                        eval0.evaluate(tuple, argPtr0);
-                        eval1.evaluate(tuple, argPtr1);
-
-                        if (PointableHelper.checkAndSetMissingOrNull(result, 
argPtr0, argPtr1)) {
-                            return;
-                        }
-
-                        vp0.set(argPtr0);
-                        vp1.set(argPtr1);
-
-                        ARecordVisitablePointable rp0 = 
(ARecordVisitablePointable) vp0;
-                        ARecordVisitablePointable rp1 = 
(ARecordVisitablePointable) vp1;
-
-                        try {
-                            mergeFields(outRecType, rp0, rp1, true, 0);
-                            rbStack.get(0).write(out, true);
-                        } catch (IOException e) {
-                            throw HyracksDataException.create(e);
-                        }
-                        result.set(resultStorage);
-                    }
-
-                    private void mergeFields(ARecordType combinedType, 
ARecordVisitablePointable leftRecord,
-                            ARecordVisitablePointable rightRecord, boolean 
openFromParent, int nestedLevel)
-                            throws IOException {
-                        if (rbStack.size() < (nestedLevel + 1)) {
-                            rbStack.add(new RecordBuilder());
-                        }
-
-                        rbStack.get(nestedLevel).reset(combinedType);
-                        rbStack.get(nestedLevel).init();
-
-                        //Add all fields from left record
-                        for (int i = 0; i < leftRecord.getFieldNames().size(); 
i++) {
-                            IVisitablePointable leftName = 
leftRecord.getFieldNames().get(i);
-                            IVisitablePointable leftValue = 
leftRecord.getFieldValues().get(i);
-                            IVisitablePointable leftType = 
leftRecord.getFieldTypeTags().get(i);
-                            boolean foundMatch = false;
-                            for (int j = 0; j < 
rightRecord.getFieldNames().size(); j++) {
-                                IVisitablePointable rightName = 
rightRecord.getFieldNames().get(j);
-                                IVisitablePointable rightValue = 
rightRecord.getFieldValues().get(j);
-                                IVisitablePointable rightType = 
rightRecord.getFieldTypeTags().get(j);
-                                // Check if same fieldname
-                                if (PointableHelper.isEqual(leftName, 
rightName, stringBinaryComparator)
-                                        && 
!deepEqualAssesor.isEqual(leftValue, rightValue)) {
-                                    //Field was found on the right and are 
subrecords, merge them
-                                    if 
(PointableHelper.sameType(ATypeTag.OBJECT, rightType)
-                                            && 
PointableHelper.sameType(ATypeTag.OBJECT, leftType)) {
-                                        //We are merging two sub records
-                                        addFieldToSubRecord(combinedType, 
leftName, leftValue, rightValue,
-                                                openFromParent, nestedLevel);
-                                        foundMatch = true;
-                                    } else {
-                                        throw new 
RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME, getIdentifier());
-                                    }
-                                }
-                            }
-                            if (!foundMatch) {
-                                addFieldToSubRecord(combinedType, leftName, 
leftValue, null, openFromParent,
-                                        nestedLevel);
-                            }
-
-                        }
-                        //Repeat for right side (ignoring duplicates this time)
-                        for (int j = 0; j < 
rightRecord.getFieldNames().size(); j++) {
-                            IVisitablePointable rightName = 
rightRecord.getFieldNames().get(j);
-                            IVisitablePointable rightValue = 
rightRecord.getFieldValues().get(j);
-                            boolean foundMatch = false;
-                            for (int i = 0; i < 
leftRecord.getFieldNames().size(); i++) {
-                                IVisitablePointable leftName = 
leftRecord.getFieldNames().get(i);
-                                if (rightName.equals(leftName)) {
-                                    foundMatch = true;
-                                }
-                            }
-                            if (!foundMatch) {
-                                addFieldToSubRecord(combinedType, rightName, 
rightValue, null, openFromParent,
-                                        nestedLevel);
-                            }
-                        }
-                    }
-
-                    /*
-                     * Takes in a record type, field name, and the field 
values (which are record) from two records
-                     * Merges them into one record of combinedType
-                     * And adds that record as a field to the Record in subrb
-                     * the second value can be null, indicated that you just 
add the value of left as a field to subrb
-                     *
-                     */
-                    private void addFieldToSubRecord(ARecordType combinedType, 
IVisitablePointable fieldNamePointable,
-                            IVisitablePointable leftValue, IVisitablePointable 
rightValue, boolean openFromParent,
-                            int nestedLevel) throws IOException {
-
-                        runtimeRecordTypeInfo.reset(combinedType);
-                        int pos = 
runtimeRecordTypeInfo.getFieldIndex(fieldNamePointable.getByteArray(),
-                                fieldNamePointable.getStartOffset() + 1, 
fieldNamePointable.getLength() - 1);
-
-                        //Add the merged field
-                        if (combinedType != null && pos >= 0) {
-                            if (rightValue == null) {
-                                rbStack.get(nestedLevel).addField(pos, 
leftValue);
-                            } else {
-                                mergeFields((ARecordType) 
combinedType.getFieldTypes()[pos],
-                                        (ARecordVisitablePointable) leftValue, 
(ARecordVisitablePointable) rightValue,
-                                        false, nestedLevel + 1);
-
-                                tabvs.reset();
-                                rbStack.get(nestedLevel + 
1).write(tabvs.getDataOutput(), true);
-                                rbStack.get(nestedLevel).addField(pos, tabvs);
-                            }
-                        } else {
-                            if (rightValue == null) {
-                                
rbStack.get(nestedLevel).addField(fieldNamePointable, leftValue);
-                            } else {
-                                
mergeFields(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE,
-                                        (ARecordVisitablePointable) leftValue, 
(ARecordVisitablePointable) rightValue,
-                                        false, nestedLevel + 1);
-                                tabvs.reset();
-                                rbStack.get(nestedLevel + 
1).write(tabvs.getDataOutput(), true);
-                                
rbStack.get(nestedLevel).addField(fieldNamePointable, tabvs);
-                            }
-                        }
-                    }
-
-                };
+                return new RecordMergeEvaluator(ctx, args, argTypes, 
sourceLoc, getIdentifier(), false);
             }
         };
     }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeEvaluator.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeEvaluator.java
new file mode 100644
index 0000000..fffa903
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeEvaluator.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.records;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.common.annotations.MissingNullInOutFunction;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
+import org.apache.asterix.om.pointables.PointableAllocator;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.runtime.RuntimeRecordTypeInfo;
+import org.apache.asterix.runtime.evaluators.comparisons.DeepEqualAssessor;
+import org.apache.asterix.runtime.evaluators.functions.AbstractScalarEval;
+import org.apache.asterix.runtime.evaluators.functions.PointableHelper;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.accessors.UTF8StringBinaryComparatorFactory;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+/**
+ * record merge evaluator is used to combine two records with no matching 
fieldnames
+ * If both records have the same fieldname for a non-record field anywhere in 
the schema, the merge will fail
+ * This function is performed on a recursive level, meaning that nested 
records can be combined
+ * for instance if both records have a nested field called "metadata"
+ * where metadata from A is {"comments":"this rocks"} and metadata from B is 
{"index":7, "priority":5}
+ * Records A and B can be combined yielding a nested record called "metadata"
+ * That will have all three fields
+ */
+
+@MissingNullInOutFunction
+public class RecordMergeEvaluator extends AbstractScalarEval {
+
+    private final boolean isUuidIgnoreDuplicate;
+    private final ARecordType outRecType;
+    private final ARecordType inRecType0;
+    private final ARecordType inRecType1;
+
+    private final PointableAllocator pa = new PointableAllocator();
+    private final IVisitablePointable vp0;
+    private final IVisitablePointable vp1;
+
+    private final IPointable argPtr0 = new VoidPointable();
+    private final IPointable argPtr1 = new VoidPointable();
+
+    private final IScalarEvaluator eval0;
+    private final IScalarEvaluator eval1;
+
+    private final List<RecordBuilder> rbStack = new ArrayList<>();
+
+    private final ArrayBackedValueStorage tabvs = new 
ArrayBackedValueStorage();
+    private final IBinaryComparator stringBinaryComparator =
+            
UTF8StringBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+
+    private final RuntimeRecordTypeInfo runtimeRecordTypeInfo = new 
RuntimeRecordTypeInfo();
+    private final DeepEqualAssessor deepEqualAssesor = new DeepEqualAssessor();
+    private ArrayBackedValueStorage resultStorage = new 
ArrayBackedValueStorage();
+    private DataOutput out = resultStorage.getDataOutput();
+
+    RecordMergeEvaluator(IHyracksTaskContext ctx, IScalarEvaluatorFactory[] 
args, IAType[] argTypes,
+            SourceLocation sourceLocation, FunctionIdentifier identifier, 
boolean isUuidIgnoreDuplicate)
+            throws HyracksDataException {
+        super(sourceLocation, identifier);
+        this.isUuidIgnoreDuplicate = isUuidIgnoreDuplicate;
+
+        eval0 = args[0].createScalarEvaluator(ctx);
+        eval1 = args[1].createScalarEvaluator(ctx);
+
+        outRecType = (ARecordType) argTypes[0];
+        inRecType0 = (ARecordType) argTypes[1];
+        inRecType1 = (ARecordType) argTypes[2];
+
+        vp0 = pa.allocateRecordValue(inRecType0);
+        vp1 = pa.allocateRecordValue(inRecType1);
+    }
+
+    @Override
+    public void evaluate(IFrameTupleReference tuple, IPointable result) throws 
HyracksDataException {
+        resultStorage.reset();
+        eval0.evaluate(tuple, argPtr0);
+        eval1.evaluate(tuple, argPtr1);
+
+        if (PointableHelper.checkAndSetMissingOrNull(result, argPtr0, 
argPtr1)) {
+            return;
+        }
+
+        vp0.set(argPtr0);
+        vp1.set(argPtr1);
+
+        ARecordVisitablePointable rp0 = (ARecordVisitablePointable) vp0;
+        ARecordVisitablePointable rp1 = (ARecordVisitablePointable) vp1;
+
+        try {
+            mergeFields(outRecType, rp0, rp1, true, 0);
+            rbStack.get(0).write(out, true);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        result.set(resultStorage);
+    }
+
+    private void mergeFields(ARecordType combinedType, 
ARecordVisitablePointable leftRecord,
+            ARecordVisitablePointable rightRecord, boolean openFromParent, int 
nestedLevel) throws IOException {
+        if (rbStack.size() < (nestedLevel + 1)) {
+            rbStack.add(new RecordBuilder());
+        }
+
+        rbStack.get(nestedLevel).reset(combinedType);
+        rbStack.get(nestedLevel).init();
+
+        //Add all fields from left record
+        for (int i = 0; i < leftRecord.getFieldNames().size(); i++) {
+            IVisitablePointable leftName = leftRecord.getFieldNames().get(i);
+            IVisitablePointable leftValue = leftRecord.getFieldValues().get(i);
+            IVisitablePointable leftType = 
leftRecord.getFieldTypeTags().get(i);
+            boolean foundMatch = false;
+            for (int j = 0; j < rightRecord.getFieldNames().size(); j++) {
+                IVisitablePointable rightName = 
rightRecord.getFieldNames().get(j);
+                IVisitablePointable rightValue = 
rightRecord.getFieldValues().get(j);
+                IVisitablePointable rightType = 
rightRecord.getFieldTypeTags().get(j);
+                // Check if same fieldname
+                if (PointableHelper.isEqual(leftName, rightName, 
stringBinaryComparator)
+                        && !deepEqualAssesor.isEqual(leftValue, rightValue)) {
+                    //Field was found on the right and are subrecords, merge 
them
+                    if (PointableHelper.sameType(ATypeTag.OBJECT, rightType)
+                            && PointableHelper.sameType(ATypeTag.OBJECT, 
leftType)) {
+                        //We are merging two sub records
+                        addFieldToSubRecord(combinedType, leftName, leftValue, 
rightValue, openFromParent, nestedLevel);
+                        foundMatch = true;
+                    } else {
+                        // This flag is true for checking the presence of the 
primary key uuid field, if it is present,
+                        // return the left record and don't throw any 
exceptions
+                        if (isUuidIgnoreDuplicate) {
+                            resultStorage.set(leftRecord);
+                            return;
+                        } else {
+                            throw new 
RuntimeDataException(ErrorCode.DUPLICATE_FIELD_NAME, getIdentifier());
+                        }
+                    }
+                }
+            }
+            if (!foundMatch) {
+                addFieldToSubRecord(combinedType, leftName, leftValue, null, 
openFromParent, nestedLevel);
+            }
+
+        }
+        //Repeat for right side (ignoring duplicates this time)
+        for (int j = 0; j < rightRecord.getFieldNames().size(); j++) {
+            IVisitablePointable rightName = rightRecord.getFieldNames().get(j);
+            IVisitablePointable rightValue = 
rightRecord.getFieldValues().get(j);
+            boolean foundMatch = false;
+            for (int i = 0; i < leftRecord.getFieldNames().size(); i++) {
+                IVisitablePointable leftName = 
leftRecord.getFieldNames().get(i);
+                if (rightName.equals(leftName)) {
+                    foundMatch = true;
+                }
+            }
+            if (!foundMatch) {
+                addFieldToSubRecord(combinedType, rightName, rightValue, null, 
openFromParent, nestedLevel);
+            }
+        }
+    }
+
+    /*
+     * Takes in a record type, field name, and the field values (which are 
record) from two records
+     * Merges them into one record of combinedType
+     * And adds that record as a field to the Record in subrb
+     * the second value can be null, indicated that you just add the value of 
left as a field to subrb
+     *
+     */
+    private void addFieldToSubRecord(ARecordType combinedType, 
IVisitablePointable fieldNamePointable,
+            IVisitablePointable leftValue, IVisitablePointable rightValue, 
boolean openFromParent, int nestedLevel)
+            throws IOException {
+
+        runtimeRecordTypeInfo.reset(combinedType);
+        int pos = 
runtimeRecordTypeInfo.getFieldIndex(fieldNamePointable.getByteArray(),
+                fieldNamePointable.getStartOffset() + 1, 
fieldNamePointable.getLength() - 1);
+
+        //Add the merged field
+        if (combinedType != null && pos >= 0) {
+            if (rightValue == null) {
+                rbStack.get(nestedLevel).addField(pos, leftValue);
+            } else {
+                mergeFields((ARecordType) combinedType.getFieldTypes()[pos], 
(ARecordVisitablePointable) leftValue,
+                        (ARecordVisitablePointable) rightValue, false, 
nestedLevel + 1);
+
+                tabvs.reset();
+                rbStack.get(nestedLevel + 1).write(tabvs.getDataOutput(), 
true);
+                rbStack.get(nestedLevel).addField(pos, tabvs);
+            }
+        } else {
+            if (rightValue == null) {
+                rbStack.get(nestedLevel).addField(fieldNamePointable, 
leftValue);
+            } else {
+                mergeFields(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE, 
(ARecordVisitablePointable) leftValue,
+                        (ARecordVisitablePointable) rightValue, false, 
nestedLevel + 1);
+                tabvs.reset();
+                rbStack.get(nestedLevel + 1).write(tabvs.getDataOutput(), 
true);
+                rbStack.get(nestedLevel).addField(fieldNamePointable, tabvs);
+            }
+        }
+    }
+
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.RECORD_MERGE;
+    }
+}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeIgnoreDuplicatesDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeIgnoreDuplicatesDescriptor.java
new file mode 100644
index 0000000..e8d73d1
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/records/RecordMergeIgnoreDuplicatesDescriptor.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions.records;
+
+import org.apache.asterix.common.annotations.MissingNullInOutFunction;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptor;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
+import org.apache.asterix.om.typecomputer.impl.TypeComputeUtils;
+import org.apache.asterix.om.types.IAType;
+import 
org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * record merge evaluator is used to combine two records with no matching 
fieldnames
+ * If both records have the same fieldname for a non-record field anywhere in 
the schema, the merge will fail
+ * This function is performed on a recursive level, meaning that nested 
records can be combined
+ * for instance if both records have a nested field called "metadata"
+ * where metadata from A is {"comments":"this rocks"} and metadata from B is 
{"index":7, "priority":5}
+ * Records A and B can be combined yielding a nested record called "metadata"
+ * That will have all three fields
+ */
+
+@MissingNullInOutFunction
+public class RecordMergeIgnoreDuplicatesDescriptor extends 
AbstractScalarFunctionDynamicDescriptor {
+
+    public static final IFunctionDescriptorFactory FACTORY = new 
IFunctionDescriptorFactory() {
+        @Override
+        public IFunctionDescriptor createFunctionDescriptor() {
+            return new RecordMergeIgnoreDuplicatesDescriptor();
+        }
+
+        @Override
+        public IFunctionTypeInferer createFunctionTypeInferer() {
+            return new FunctionTypeInferers.RecordMergeTypeInferer();
+        }
+    };
+
+    private static final long serialVersionUID = 1L;
+    private final IAType[] argTypes = new IAType[3];
+
+    @Override
+    public void setImmutableStates(Object... states) {
+        argTypes[0] = TypeComputeUtils.extractRecordType((IAType) states[0]);
+        argTypes[1] = TypeComputeUtils.extractRecordType((IAType) states[1]);
+        argTypes[2] = TypeComputeUtils.extractRecordType((IAType) states[2]);
+    }
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(final 
IScalarEvaluatorFactory[] args) {
+        return new IScalarEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(final 
IHyracksTaskContext ctx) throws HyracksDataException {
+                return new RecordMergeEvaluator(ctx, args, argTypes, 
sourceLoc, getIdentifier(), true);
+            }
+        };
+    }
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.RECORD_MERGE_UUID_IGNORE_DUPLICATE;
+    }
+}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
index 6269582..6038450 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/functions/FunctionCollection.java
@@ -452,6 +452,7 @@
 import 
org.apache.asterix.runtime.evaluators.functions.records.RecordConcatStrictDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.records.RecordLengthDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.records.RecordMergeDescriptor;
+import 
org.apache.asterix.runtime.evaluators.functions.records.RecordMergeIgnoreDuplicatesDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.records.RecordNamesDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.records.RecordPairsDescriptor;
 import 
org.apache.asterix.runtime.evaluators.functions.records.RecordPutDescriptor;
@@ -1007,6 +1008,7 @@
         fc.add(GetRecordFieldValueDescriptor.FACTORY);
         fc.add(DeepEqualityDescriptor.FACTORY);
         fc.add(RecordMergeDescriptor.FACTORY);
+        fc.add(RecordMergeIgnoreDuplicatesDescriptor.FACTORY);
         fc.add(RecordAddFieldsDescriptor.FACTORY);
         fc.add(RecordRemoveFieldsDescriptor.FACTORY);
         fc.add(RecordLengthDescriptor.FACTORY);

--
To view, visit https://asterix-gerrit.ics.uci.edu/3370
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I22100d3ff29864b8bfd54b0decb183e5056fdb4a
Gerrit-Change-Number: 3370
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>

Reply via email to