Ali Alsuliman has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/3129

Change subject: [FUN][RT] Objects creation in array functions
......................................................................

[FUN][RT] Objects creation in array functions

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- deallocate objects allocated by the caster which is used
in array functions.
- fix array_intersect to pick the smallest list as a starting list

Change-Id: Ib6c8c55ed3e0a35e00c5976a46e9ed6e432a6e9f
---
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
A 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectEval.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
10 files changed, 398 insertions(+), 291 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/29/3129/1

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp
index 3de0126..b5bbbb3 100755
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp
@@ -20,7 +20,7 @@
 use TinySocial;
 
 {
-  "t1": (select array_intersect(t.`referred-topics`, {{"t-mobile", 
"platform"}}, {{"t-mobile"}}) from TweetMessages t order by t.tweetid),
+  "t1": (select t.tweetid, array_intersect(t.`referred-topics`, {{"t-mobile", 
"platform"}}, {{"t-mobile"}}) from TweetMessages t order by t.tweetid),
   "t2": (select array_intersect([1, "John", 2], (select value v.id from d1 v), 
[2,4,1])),
   "t3": (array_intersect([3,5,1], [5,7,3], [3,2,5,1])),
   "t4": (array_intersect([3,5.0,1], [5,7,3], [3,2,5,1])),
@@ -36,5 +36,6 @@
   "t14": (array_intersect(missing, "non_array", [2,5,1])),
   "t15": (array_intersect([], [], [])),
   "t16": (array_intersect([], [3,2], [])),
-  "t17": (select array_intersect(d.followers, ["John Green", "sth"], ["sth", 
"John Green"]) from d1 d)
+  "t17": (select array_intersect(d.followers, ["John Green", "sth"], ["sth", 
"John Green"]) from d1 d),
+  "t18": (array_intersect([1,2], [3,2,1], [1,2,3,4]))
 };
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm
index 2276eb4..368becd 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm
@@ -1 +1 @@
-{ "t1": [ { "$1": {{ "t-mobile" }} }, { "$1": {{  }} }, { "$1": {{  }} }, { 
"$1": {{  }} }, { "$1": {{  }} }, { "$1": {{  }} }, { "$1": {{  }} }, { "$1": 
{{  }} }, { "$1": {{  }} }, { "$1": {{  }} }, { "$1": {{ "t-mobile" }} }, { 
"$1": {{  }} } ], "t2": [ { "$2": [ 2, 1 ] } ], "t3": [ 3, 5 ], "t4": [ 3, 5 ], 
"t5": [ 3, "a" ], "t6": [ 3 ], "t7": [  ], "t8": [  ], "t9": [  ], "t10": [  ], 
"t12": null, "t13": null, "t15": [  ], "t16": [  ], "t17": [ {  }, { "$3": [ 
"John Green" ] } ] }
+{ "t1": [ { "tweetid": "1", "$1": {{ "t-mobile" }} }, { "tweetid": "10", "$1": 
{{  }} }, { "tweetid": "11", "$1": {{  }} }, { "tweetid": "12", "$1": {{  }} }, 
{ "tweetid": "2", "$1": {{  }} }, { "tweetid": "3", "$1": {{  }} }, { 
"tweetid": "4", "$1": {{  }} }, { "tweetid": "5", "$1": {{  }} }, { "tweetid": 
"6", "$1": {{  }} }, { "tweetid": "7", "$1": {{  }} }, { "tweetid": "8", "$1": 
{{ "t-mobile" }} }, { "tweetid": "9", "$1": {{  }} } ], "t2": [ { "$2": [ 2, 1 
] } ], "t3": [ 3, 5 ], "t4": [ 3, 5 ], "t5": [ 3, "a" ], "t6": [ 3 ], "t7": [  
], "t8": [  ], "t9": [  ], "t10": [  ], "t12": null, "t13": null, "t15": [  ], 
"t16": [  ], "t17": [ {  }, { "$3": [ "John Green" ] } ], "t18": [ 1, 2 ] }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java
index 1100a59..a004f89 100755
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java
@@ -136,6 +136,7 @@
             }
             if (valueTag == ATypeTag.MISSING) {
                 PointableHelper.setMissing(result);
+                caster.deallocatePointables();
                 return;
             }
             if (!acceptNullValues && valueTag == ATypeTag.NULL) {
@@ -145,10 +146,12 @@
 
         if (returnNull) {
             PointableHelper.setNull(result);
+            caster.deallocatePointables();
             return;
         }
 
         if (encounteredNonPrimitive) {
+            caster.deallocatePointables();
             throw new RuntimeDataException(ErrorCode.CANNOT_COMPARE_COMPLEX, 
sourceLocation);
         }
         // all arguments are valid
@@ -190,6 +193,8 @@
             result.set(storage);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
+        } finally {
+            caster.deallocatePointables();
         }
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java
index 3258a8d..ff9b38e 100755
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java
@@ -88,6 +88,7 @@
         AbstractCollectionType outList = null;
         ATypeTag listTag;
         for (int i = 0; i < listsEval.length; i++) {
+            // TODO(ali): avoid having to perform evaluate twice
             listsEval[i].evaluate(tuple, listsArgs[i]);
             if (!returnNull) {
                 listArgType = 
listsArgs[i].getByteArray()[listsArgs[i].getStartOffset()];
@@ -109,6 +110,7 @@
 
         if (returnNull) {
             PointableHelper.setNull(result);
+            caster.deallocatePointables();
             return;
         }
 
@@ -140,6 +142,7 @@
             release();
             storageAllocator.reset();
             pointableAllocator.reset();
+            caster.deallocatePointables();
         }
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java
index 06381b5..0633fa4 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java
@@ -194,6 +194,7 @@
             } finally {
                 storageAllocator.reset();
                 listAccessorAllocator.reset();
+                caster.deallocatePointables();
             }
         }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
index 52335a0..fc4d563 100755
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
@@ -18,54 +18,19 @@
  */
 package org.apache.asterix.runtime.evaluators.functions;
 
-import static 
org.apache.asterix.om.types.EnumDeserializer.ATYPETAGDESERIALIZER;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import org.apache.asterix.builders.AbvsBuilderFactory;
-import org.apache.asterix.builders.ArrayListFactory;
-import org.apache.asterix.builders.IAsterixListBuilder;
-import org.apache.asterix.builders.OrderedListBuilder;
-import org.apache.asterix.builders.UnorderedListBuilder;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import 
org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
-import 
org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
-import 
org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
-import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.om.functions.IFunctionTypeInferer;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AbstractCollectionType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.container.IObjectPool;
-import org.apache.asterix.om.util.container.ListObjectPool;
 import 
org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
-import org.apache.asterix.runtime.evaluators.common.ListAccessor;
 import org.apache.asterix.runtime.functions.FunctionTypeInferers;
-import org.apache.asterix.runtime.utils.ArrayFunctionsUtil;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IMutableValueStorage;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 /**
  * <pre>
@@ -101,31 +66,6 @@
         }
     };
 
-    public class ValueListIndex implements IValueReference {
-        private final IPointable value;
-        private int listIndex;
-
-        public ValueListIndex(IPointable value, int listIndex) {
-            this.value = value;
-            this.listIndex = listIndex;
-        }
-
-        @Override
-        public byte[] getByteArray() {
-            return value.getByteArray();
-        }
-
-        @Override
-        public int getStartOffset() {
-            return value.getStartOffset();
-        }
-
-        @Override
-        public int getLength() {
-            return value.getLength();
-        }
-    }
-
     @Override
     public FunctionIdentifier getIdentifier() {
         return BuiltinFunctions.ARRAY_INTERSECT;
@@ -144,235 +84,8 @@
 
             @Override
             public IScalarEvaluator createScalarEvaluator(final 
IHyracksTaskContext ctx) throws HyracksDataException {
-                return new ArrayIntersectEval(args, ctx);
+                return new ArrayIntersectEval(args, ctx, argTypes, sourceLoc);
             }
         };
-    }
-
-    public class ArrayIntersectEval implements IScalarEvaluator {
-        private final ListAccessor listAccessor;
-        private final IPointable[] listsArgs;
-        private final IScalarEvaluator[] listsEval;
-        private final IBinaryHashFunction binaryHashFunction;
-        private final Int2ObjectMap<List<ValueListIndex>> hashes;
-        private final PointableAllocator pointableAllocator;
-        private final IObjectPool<IMutableValueStorage, ATypeTag> 
storageAllocator;
-        private final IObjectPool<List<ValueListIndex>, ATypeTag> 
arrayListAllocator;
-        private final ArrayBackedValueStorage finalResult;
-        private final CastTypeEvaluator caster;
-        private final IBinaryComparator comp;
-        private IAsterixListBuilder orderedListBuilder;
-        private IAsterixListBuilder unorderedListBuilder;
-
-        public ArrayIntersectEval(IScalarEvaluatorFactory[] args, 
IHyracksTaskContext ctx) throws HyracksDataException {
-            orderedListBuilder = null;
-            unorderedListBuilder = null;
-            pointableAllocator = new PointableAllocator();
-            storageAllocator = new ListObjectPool<>(new AbvsBuilderFactory());
-            arrayListAllocator = new ListObjectPool<>(new 
ArrayListFactory<>());
-            hashes = new Int2ObjectOpenHashMap<>();
-            finalResult = new ArrayBackedValueStorage();
-            listAccessor = new ListAccessor();
-            caster = new CastTypeEvaluator();
-            comp = 
AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-            listsArgs = new IPointable[args.length];
-            listsEval = new IScalarEvaluator[args.length];
-            for (int i = 0; i < args.length; i++) {
-                listsArgs[i] = new VoidPointable();
-                listsEval[i] = args[i].createScalarEvaluator(ctx);
-            }
-            binaryHashFunction = 
BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
-                    .createBinaryHashFunction();
-        }
-
-        @Override
-        public void evaluate(IFrameTupleReference tuple, IPointable result) 
throws HyracksDataException {
-            byte listArgType;
-            boolean returnNull = false;
-            AbstractCollectionType outList = null;
-            ATypeTag listTag;
-            int minListIndex = 0;
-            int minSize = -1;
-            int nextSize;
-            IScalarEvaluator listEval;
-            IPointable listArg;
-
-            // evaluate all the lists first to make sure they're all actually 
lists and of the same list type
-            for (int i = 0; i < listsEval.length; i++) {
-                listEval = listsEval[i];
-                listEval.evaluate(tuple, listsArgs[i]);
-                if (!returnNull) {
-                    listArg = listsArgs[i];
-                    listArgType = 
listArg.getByteArray()[listArg.getStartOffset()];
-                    listTag = ATYPETAGDESERIALIZER.deserialize(listArgType);
-                    if (!listTag.isListType()) {
-                        returnNull = true;
-                    } else if (outList != null && outList.getTypeTag() != 
listTag) {
-                        throw new 
RuntimeDataException(ErrorCode.DIFFERENT_LIST_TYPE_ARGS, sourceLoc);
-                    } else {
-                        if (outList == null) {
-                            outList = (AbstractCollectionType) 
DefaultOpenFieldType.getDefaultOpenFieldType(listTag);
-                        }
-
-                        caster.reset(outList, argTypes[i], listsEval[i]);
-                        caster.evaluate(tuple, listsArgs[i]);
-                        nextSize = getNumItems(outList, 
listArg.getByteArray(), listArg.getStartOffset());
-                        if (nextSize < minSize) {
-                            minSize = nextSize;
-                            minListIndex = i;
-                        }
-                    }
-                }
-            }
-
-            if (returnNull) {
-                PointableHelper.setNull(result);
-                return;
-            }
-
-            IAsterixListBuilder listBuilder;
-            if (outList.getTypeTag() == ATypeTag.ARRAY) {
-                if (orderedListBuilder == null) {
-                    orderedListBuilder = new OrderedListBuilder();
-                }
-                listBuilder = orderedListBuilder;
-            } else {
-                if (unorderedListBuilder == null) {
-                    unorderedListBuilder = new UnorderedListBuilder();
-                }
-                listBuilder = unorderedListBuilder;
-            }
-
-            hashes.clear();
-            try {
-                // first, get distinct items of the most restrictive 
(smallest) list, pass listBuilder as null since
-                // we're not adding values yet. Values will be added to 
listBuilder after inspecting all input lists
-                listArg = listsArgs[minListIndex];
-                listAccessor.reset(listArg.getByteArray(), 
listArg.getStartOffset());
-                processList(listAccessor, minListIndex, null, true);
-
-                // now process each list one by one
-                listBuilder.reset(outList);
-                for (int listIndex = 0; listIndex < listsArgs.length; 
listIndex++) {
-                    if (listIndex == minListIndex) {
-                        incrementSmallest(listIndex, hashes.values());
-                    } else {
-                        listArg = listsArgs[listIndex];
-                        listAccessor.reset(listArg.getByteArray(), 
listArg.getStartOffset());
-                        processList(listAccessor, listIndex, listBuilder, 
false);
-                    }
-                }
-
-                finalResult.reset();
-                listBuilder.write(finalResult.getDataOutput(), true);
-                result.set(finalResult);
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
-            } finally {
-                storageAllocator.reset();
-                arrayListAllocator.reset();
-                pointableAllocator.reset();
-            }
-        }
-
-        private int getNumItems(AbstractCollectionType listType, byte[] 
listBytes, int offset) {
-            if (listType.getTypeTag() == ATypeTag.ARRAY) {
-                return 
AOrderedListSerializerDeserializer.getNumberOfItems(listBytes, offset);
-            } else {
-                return 
AUnorderedListSerializerDeserializer.getNumberOfItems(listBytes, offset);
-            }
-        }
-
-        private void processList(ListAccessor listAccessor, int listIndex, 
IAsterixListBuilder listBuilder,
-                boolean initIntersectList) throws IOException {
-            int hash;
-            List<ValueListIndex> sameHashes;
-            boolean itemInStorage;
-            IPointable item = pointableAllocator.allocateEmpty();
-            ArrayBackedValueStorage storage = (ArrayBackedValueStorage) 
storageAllocator.allocate(null);
-            storage.reset();
-            for (int j = 0; j < listAccessor.size(); j++) {
-                itemInStorage = listAccessor.getOrWriteItem(j, item, storage);
-                if 
(ATYPETAGDESERIALIZER.deserialize(item.getByteArray()[item.getStartOffset()]).isDerivedType())
 {
-                    throw new 
RuntimeDataException(ErrorCode.CANNOT_COMPARE_COMPLEX, sourceLoc);
-                }
-                if (notNullAndMissing(item)) {
-                    // look up to see if item exists
-                    hash = binaryHashFunction.hash(item.getByteArray(), 
item.getStartOffset(), item.getLength());
-                    sameHashes = hashes.get(hash);
-                    if (initIntersectList && initIntersectList(item, hash, 
sameHashes)) {
-                        // item is used
-                        item = pointableAllocator.allocateEmpty();
-                        if (itemInStorage) {
-                            storage = (ArrayBackedValueStorage) 
storageAllocator.allocate(null);
-                            storage.reset();
-                        }
-                    } else {
-                        incrementCommonValue(item, sameHashes, listIndex, 
listBuilder);
-                    }
-                }
-            }
-        }
-
-        // collect the items of the most restrictive list, it initializes the 
list index as -1. each successive list
-        // should stamp the value with its list index if the list has the 
item. It starts with list index = 0
-        private boolean initIntersectList(IPointable item, int hash, 
List<ValueListIndex> sameHashes)
-                throws IOException {
-            // add if new item
-            if (sameHashes == null) {
-                List<ValueListIndex> newHashes = 
arrayListAllocator.allocate(null);
-                newHashes.clear();
-                newHashes.add(new ValueListIndex(item, -1));
-                hashes.put(hash, newHashes);
-                return true;
-            } else if (ArrayFunctionsUtil.findItem(item, sameHashes, comp) == 
null) {
-                sameHashes.add(new ValueListIndex(item, -1));
-                return true;
-            }
-            // else ignore for duplicate values in the same list
-            return false;
-        }
-
-        private void incrementCommonValue(IPointable item, 
List<ValueListIndex> sameHashes, int listIndex,
-                IAsterixListBuilder listBuilder) throws IOException {
-            if (sameHashes != null) {
-                // look for the same equal item, add to list builder when all 
lists have seen this item
-                incrementIfExists(sameHashes, item, listIndex, listBuilder);
-            }
-        }
-
-        private boolean notNullAndMissing(IPointable item) {
-            byte tag = item.getByteArray()[item.getStartOffset()];
-            return tag != ATypeTag.SERIALIZED_NULL_TYPE_TAG && tag != 
ATypeTag.SERIALIZED_MISSING_TYPE_TAG;
-        }
-
-        // this method is only for the most restrictive list. it avoids 
comparison since it is the initial list we start
-        // with, so for sure every element in the collection must exist in the 
list
-        private void incrementSmallest(int listIndex, 
Collection<List<ValueListIndex>> commonValues) {
-            for (List<ValueListIndex> items : commonValues) {
-                for (int i = 0; i < items.size(); i++) {
-                    // any difference that is not == 1 means either this 
current list has already stamped and advanced
-                    // the stamp or the item is not common among lists because 
if it's common then each list should've
-                    // incremented the item list index up to the current list 
index
-                    if (listIndex - items.get(i).listIndex == 1) {
-                        items.get(i).listIndex = listIndex;
-                    }
-                }
-            }
-        }
-
-        private void incrementIfExists(List<ValueListIndex> sameHashes, 
IPointable item, int listIndex,
-                IAsterixListBuilder listBuilder) throws HyracksDataException {
-            ValueListIndex sameValue = ArrayFunctionsUtil.findItem(item, 
sameHashes, comp);
-            if (sameValue != null && listIndex - sameValue.listIndex == 1) {
-                // found the item, its stamp is OK (stamp saves the last list 
index that has seen this item)
-                // increment stamp of this item
-                sameValue.listIndex = listIndex;
-                if (listIndex == listsArgs.length - 1) {
-                    // when listIndex is the last list, then it means this 
item was found in all previous lists
-                    listBuilder.addItem(item);
-                }
-            }
-        }
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectEval.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectEval.java
new file mode 100755
index 0000000..e56286f
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectEval.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions;
+
+import static 
org.apache.asterix.om.types.EnumDeserializer.ATYPETAGDESERIALIZER;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.builders.AbvsBuilderFactory;
+import org.apache.asterix.builders.ArrayListFactory;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.builders.UnorderedListBuilder;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import 
org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
+import org.apache.asterix.om.pointables.PointableAllocator;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.container.IObjectFactory;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
+import org.apache.asterix.runtime.evaluators.common.ListAccessor;
+import org.apache.asterix.runtime.utils.ArrayFunctionsUtil;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+public class ArrayIntersectEval implements IScalarEvaluator {
+
+    private final SourceLocation sourceLoc;
+    private final IAType[] argTypes;
+    private final ListAccessor listAccessor;
+    private final IPointable currentItem;
+    private final ArrayBackedValueStorage currentItemStorage;
+    private final IPointable[] listsArgs;
+    private final IScalarEvaluator[] listsEval;
+    private final IBinaryHashFunction binaryHashFunction;
+    private final Int2ObjectMap<List<ValueListIndex>> hashes;
+    private final PointableAllocator pointableAllocator;
+    private final IObjectPool<IMutableValueStorage, ATypeTag> storageAllocator;
+    private final IObjectPool<List<ValueListIndex>, ATypeTag> 
arrayListAllocator;
+    private final IObjectPool<ValueListIndex, ATypeTag> 
valueListIndexAllocator;
+    private final ArrayBackedValueStorage finalResult;
+    private final CastTypeEvaluator caster;
+    private final IBinaryComparator comp;
+    private IAsterixListBuilder orderedListBuilder;
+    private IAsterixListBuilder unorderedListBuilder;
+
+    public ArrayIntersectEval(IScalarEvaluatorFactory[] args, 
IHyracksTaskContext ctx, IAType[] argumentTypes,
+            SourceLocation sourceLocation) throws HyracksDataException {
+        orderedListBuilder = null;
+        unorderedListBuilder = null;
+        argTypes = argumentTypes;
+        sourceLoc = sourceLocation;
+        pointableAllocator = new PointableAllocator();
+        storageAllocator = new ListObjectPool<>(new AbvsBuilderFactory());
+        arrayListAllocator = new ListObjectPool<>(new ArrayListFactory<>());
+        valueListIndexAllocator = new ListObjectPool<>(new 
ValueListIndexFactory());
+        hashes = new Int2ObjectOpenHashMap<>();
+        finalResult = new ArrayBackedValueStorage();
+        listAccessor = new ListAccessor();
+        caster = new CastTypeEvaluator();
+        comp = 
AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        listsArgs = new IPointable[args.length];
+        listsEval = new IScalarEvaluator[args.length];
+        currentItem = new VoidPointable();
+        currentItemStorage = new ArrayBackedValueStorage();
+        for (int i = 0; i < args.length; i++) {
+            listsArgs[i] = new VoidPointable();
+            listsEval[i] = args[i].createScalarEvaluator(ctx);
+        }
+        binaryHashFunction = 
BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
+                .createBinaryHashFunction();
+    }
+
+    @Override
+    public void evaluate(IFrameTupleReference tuple, IPointable result) throws 
HyracksDataException {
+        byte listArgType;
+        boolean returnNull = false;
+        AbstractCollectionType outList = null;
+        ATypeTag listTag;
+        int minListIndex = 0;
+        int minSize = -1;
+        int nextSize;
+        IScalarEvaluator listEval;
+        IPointable listArg;
+
+        // evaluate all the lists first to make sure they're all actually 
lists and of the same list type
+        for (int i = 0; i < listsEval.length; i++) {
+            // TODO(ali): avoid evaluating the list constructor twice, one 
here to identify the list and one in cast
+            listEval = listsEval[i];
+            listEval.evaluate(tuple, listsArgs[i]);
+            if (!returnNull) {
+                listArg = listsArgs[i];
+                listArgType = listArg.getByteArray()[listArg.getStartOffset()];
+                listTag = ATYPETAGDESERIALIZER.deserialize(listArgType);
+                if (!listTag.isListType()) {
+                    returnNull = true;
+                } else if (outList != null && outList.getTypeTag() != listTag) 
{
+                    throw new 
RuntimeDataException(ErrorCode.DIFFERENT_LIST_TYPE_ARGS, sourceLoc);
+                } else {
+                    if (outList == null) {
+                        outList = (AbstractCollectionType) 
DefaultOpenFieldType.getDefaultOpenFieldType(listTag);
+                    }
+
+                    caster.reset(outList, argTypes[i], listsEval[i]);
+                    caster.evaluate(tuple, listsArgs[i]);
+                    nextSize = getNumItems(outList, listArg.getByteArray(), 
listArg.getStartOffset());
+                    if (nextSize < minSize || minSize == -1) {
+                        minSize = nextSize;
+                        minListIndex = i;
+                    }
+                }
+            }
+        }
+
+        if (returnNull) {
+            PointableHelper.setNull(result);
+            caster.deallocatePointables();
+            return;
+        }
+
+        IAsterixListBuilder listBuilder;
+        if (outList.getTypeTag() == ATypeTag.ARRAY) {
+            if (orderedListBuilder == null) {
+                orderedListBuilder = new OrderedListBuilder();
+            }
+            listBuilder = orderedListBuilder;
+        } else {
+            if (unorderedListBuilder == null) {
+                unorderedListBuilder = new UnorderedListBuilder();
+            }
+            listBuilder = unorderedListBuilder;
+        }
+
+        hashes.clear();
+        try {
+            // first, get distinct items of the most restrictive (smallest) 
list.
+            // values will be added to listBuilder after inspecting all input 
lists
+            listArg = listsArgs[minListIndex];
+            listAccessor.reset(listArg.getByteArray(), 
listArg.getStartOffset());
+            buildRestrictiveList(listAccessor);
+
+            // process each list one by one
+            listBuilder.reset(outList);
+            for (int listIndex = 0; listIndex < listsArgs.length; listIndex++) 
{
+                if (listIndex == minListIndex) {
+                    incrementSmallest(listIndex, hashes.values(), listBuilder);
+                } else {
+                    listArg = listsArgs[listIndex];
+                    listAccessor.reset(listArg.getByteArray(), 
listArg.getStartOffset());
+                    processList(listAccessor, listIndex, listBuilder);
+                }
+            }
+
+            finalResult.reset();
+            listBuilder.write(finalResult.getDataOutput(), true);
+            result.set(finalResult);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        } finally {
+            caster.deallocatePointables();
+            valueListIndexAllocator.reset();
+            storageAllocator.reset();
+            arrayListAllocator.reset();
+            pointableAllocator.reset();
+        }
+    }
+
+    private int getNumItems(AbstractCollectionType listType, byte[] listBytes, 
int offset) {
+        if (listType.getTypeTag() == ATypeTag.ARRAY) {
+            return 
AOrderedListSerializerDeserializer.getNumberOfItems(listBytes, offset);
+        } else {
+            return 
AUnorderedListSerializerDeserializer.getNumberOfItems(listBytes, offset);
+        }
+    }
+
+    // puts all the items of the smallest list in "hashes"
+    private void buildRestrictiveList(ListAccessor listAccessor) throws 
IOException {
+        if (listAccessor.size() > 0) {
+            int hash;
+            List<ValueListIndex> sameHashes;
+            boolean itemInStorage;
+            IPointable item = pointableAllocator.allocateEmpty();
+            ArrayBackedValueStorage storage = (ArrayBackedValueStorage) 
storageAllocator.allocate(null);
+            storage.reset();
+            for (int j = 0; j < listAccessor.size(); j++) {
+                itemInStorage = listAccessor.getOrWriteItem(j, item, storage);
+                validateItem(item);
+                if (notNullAndMissing(item)) {
+                    hash = binaryHashFunction.hash(item.getByteArray(), 
item.getStartOffset(), item.getLength());
+                    sameHashes = hashes.get(hash);
+                    if (addToSmallestList(item, hash, sameHashes)) {
+                        // item has been added to intersect list and is being 
used, allocate new pointable
+                        item = pointableAllocator.allocateEmpty();
+                        if (itemInStorage) {
+                            storage = (ArrayBackedValueStorage) 
storageAllocator.allocate(null);
+                            storage.reset();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    // collects the items of the most restrictive list, it initializes the 
list index as -1. each successive list
+    // should stamp the value with its list index if the list has the item. It 
starts with list index = 0
+    private boolean addToSmallestList(IPointable item, int hash, 
List<ValueListIndex> sameHashes) throws IOException {
+        // add if new item
+        if (sameHashes == null) {
+            List<ValueListIndex> newHashes = arrayListAllocator.allocate(null);
+            newHashes.clear();
+            ValueListIndex valueListIndex = 
valueListIndexAllocator.allocate(null);
+            valueListIndex.set(item, -1);
+            newHashes.add(valueListIndex);
+            hashes.put(hash, newHashes);
+            return true;
+        } else if (ArrayFunctionsUtil.findItem(item, sameHashes, comp) == 
null) {
+            ValueListIndex valueListIndex = 
valueListIndexAllocator.allocate(null);
+            valueListIndex.set(item, -1);
+            sameHashes.add(valueListIndex);
+            return true;
+        }
+        // else ignore for duplicate values in the same list
+        return false;
+    }
+
+    // inspects all list arguments except the most restrictive one
+    private void processList(ListAccessor listAccessor, int listIndex, 
IAsterixListBuilder listBuilder)
+            throws IOException {
+        int hash;
+        List<ValueListIndex> sameHashes;
+        for (int j = 0; j < listAccessor.size(); j++) {
+            listAccessor.getOrWriteItem(j, currentItem, currentItemStorage);
+            validateItem(currentItem);
+            if (notNullAndMissing(currentItem)) {
+                // hash the item and look up to see if it is common
+                hash = binaryHashFunction.hash(currentItem.getByteArray(), 
currentItem.getStartOffset(),
+                        currentItem.getLength());
+                sameHashes = hashes.get(hash);
+                incrementIfCommonValue(currentItem, sameHashes, listIndex, 
listBuilder);
+            }
+        }
+    }
+
+    private void incrementIfCommonValue(IPointable item, List<ValueListIndex> 
sameHashes, int listIndex,
+            IAsterixListBuilder listBuilder) throws IOException {
+        if (sameHashes != null) {
+            // look for the same equal item, add to list builder when all 
lists have seen this item
+            incrementIfExists(sameHashes, item, listIndex, listBuilder);
+        }
+    }
+
+    private void incrementIfExists(List<ValueListIndex> sameHashes, IPointable 
item, int listIndex,
+            IAsterixListBuilder listBuilder) throws HyracksDataException {
+        ValueListIndex sameValue = ArrayFunctionsUtil.findItem(item, 
sameHashes, comp);
+        if (sameValue != null && listIndex - sameValue.listIndex == 1) {
+            // found the item, its stamp is OK (stamp saves the index of the 
last list that has seen this item)
+            // increment stamp of this item
+            sameValue.listIndex = listIndex;
+            if (listIndex == listsArgs.length - 1) {
+                // if this list is the last to stamp, then add to the final 
result
+                listBuilder.addItem(item);
+            }
+        }
+    }
+
+    private boolean notNullAndMissing(IPointable item) {
+        byte tag = item.getByteArray()[item.getStartOffset()];
+        return tag != ATypeTag.SERIALIZED_NULL_TYPE_TAG && tag != 
ATypeTag.SERIALIZED_MISSING_TYPE_TAG;
+    }
+
+    // this method is only for the most restrictive list. it avoids comparison 
since it is the initial list we start
+    // with, so for sure every element in the collection must exist in the list
+    private void incrementSmallest(int listIndex, 
Collection<List<ValueListIndex>> commonValues,
+            IAsterixListBuilder listBuilder) throws HyracksDataException {
+        // TODO(ali): avoid iterator object creation. Using lambdas currently 
causes errors due code gen
+        ValueListIndex commonValue;
+        for (List<ValueListIndex> commonItems : commonValues) {
+            for (int i = 0; i < commonItems.size(); i++) {
+                // any difference that is not == 1 means the item is not 
common among lists because if it's common then
+                // each previous list should've incremented the item list 
index up to the current list index
+                commonValue = commonItems.get(i);
+                if (listIndex - commonValue.listIndex == 1) {
+                    commonValue.listIndex = listIndex;
+                    // if this restrictive list is the last to stamp, then add 
to the final result
+                    if (listIndex == listsArgs.length - 1) {
+                        listBuilder.addItem(commonValue);
+                    }
+                }
+            }
+        }
+    }
+
+    // validates that the item is not derived, multisets, objects and arrays 
are not yet supported
+    private void validateItem(IPointable item) throws RuntimeDataException {
+        if 
(ATYPETAGDESERIALIZER.deserialize(item.getByteArray()[item.getStartOffset()]).isDerivedType())
 {
+            throw new RuntimeDataException(ErrorCode.CANNOT_COMPARE_COMPLEX, 
sourceLoc);
+        }
+    }
+
+    protected class ValueListIndex implements IValueReference {
+        private IPointable value;
+        private int listIndex;
+
+        protected ValueListIndex() {
+        }
+
+        protected void set(IPointable value, int listIndex) {
+            this.value = value;
+            this.listIndex = listIndex;
+        }
+
+        @Override
+        public byte[] getByteArray() {
+            return value.getByteArray();
+        }
+
+        @Override
+        public int getStartOffset() {
+            return value.getStartOffset();
+        }
+
+        @Override
+        public int getLength() {
+            return value.getLength();
+        }
+    }
+
+    public class ValueListIndexFactory implements 
IObjectFactory<ValueListIndex, ATypeTag> {
+
+        public ValueListIndexFactory() {
+        }
+
+        @Override
+        public ValueListIndex create(ATypeTag arg) {
+            return new ValueListIndex();
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
index 032ef32..9a8d267 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
@@ -239,6 +239,8 @@
                 result.set(storage);
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
+            } finally {
+                caster.deallocatePointables();
             }
         }
     }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java
index 4bc885c..87db6c5 100755
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java
@@ -213,6 +213,7 @@
                 throw HyracksDataException.create(e);
             } finally {
                 pointableAllocator.reset();
+                caster.deallocatePointables();
             }
         }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
index d1879b2..05541a6 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
@@ -98,4 +98,8 @@
                 return allocator.allocateFieldValue(null);
         }
     }
+
+    public void deallocatePointables() {
+        allocator.reset();
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3129
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib6c8c55ed3e0a35e00c5976a46e9ed6e432a6e9f
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-f69489
Gerrit-Owner: Ali Alsuliman <[email protected]>

Reply via email to