Ali Alsuliman has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/3129
Change subject: [FUN][RT] Objects creation in array functions
......................................................................
[FUN][RT] Objects creation in array functions
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- deallocate objects allocated by the caster which is used
in array functions.
- fix array_intersect to pick the smallest list as a starting list
Change-Id: Ib6c8c55ed3e0a35e00c5976a46e9ed6e432a6e9f
---
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
A
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectEval.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
10 files changed, 398 insertions(+), 291 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/29/3129/1
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp
index 3de0126..b5bbbb3 100755
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp
@@ -20,7 +20,7 @@
use TinySocial;
{
- "t1": (select array_intersect(t.`referred-topics`, {{"t-mobile",
"platform"}}, {{"t-mobile"}}) from TweetMessages t order by t.tweetid),
+ "t1": (select t.tweetid, array_intersect(t.`referred-topics`, {{"t-mobile",
"platform"}}, {{"t-mobile"}}) from TweetMessages t order by t.tweetid),
"t2": (select array_intersect([1, "John", 2], (select value v.id from d1 v),
[2,4,1])),
"t3": (array_intersect([3,5,1], [5,7,3], [3,2,5,1])),
"t4": (array_intersect([3,5.0,1], [5,7,3], [3,2,5,1])),
@@ -36,5 +36,6 @@
"t14": (array_intersect(missing, "non_array", [2,5,1])),
"t15": (array_intersect([], [], [])),
"t16": (array_intersect([], [3,2], [])),
- "t17": (select array_intersect(d.followers, ["John Green", "sth"], ["sth",
"John Green"]) from d1 d)
+ "t17": (select array_intersect(d.followers, ["John Green", "sth"], ["sth",
"John Green"]) from d1 d),
+ "t18": (array_intersect([1,2], [3,2,1], [1,2,3,4]))
};
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm
index 2276eb4..368becd 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm
@@ -1 +1 @@
-{ "t1": [ { "$1": {{ "t-mobile" }} }, { "$1": {{ }} }, { "$1": {{ }} }, {
"$1": {{ }} }, { "$1": {{ }} }, { "$1": {{ }} }, { "$1": {{ }} }, { "$1":
{{ }} }, { "$1": {{ }} }, { "$1": {{ }} }, { "$1": {{ "t-mobile" }} }, {
"$1": {{ }} } ], "t2": [ { "$2": [ 2, 1 ] } ], "t3": [ 3, 5 ], "t4": [ 3, 5 ],
"t5": [ 3, "a" ], "t6": [ 3 ], "t7": [ ], "t8": [ ], "t9": [ ], "t10": [ ],
"t12": null, "t13": null, "t15": [ ], "t16": [ ], "t17": [ { }, { "$3": [
"John Green" ] } ] }
+{ "t1": [ { "tweetid": "1", "$1": {{ "t-mobile" }} }, { "tweetid": "10", "$1":
{{ }} }, { "tweetid": "11", "$1": {{ }} }, { "tweetid": "12", "$1": {{ }} },
{ "tweetid": "2", "$1": {{ }} }, { "tweetid": "3", "$1": {{ }} }, {
"tweetid": "4", "$1": {{ }} }, { "tweetid": "5", "$1": {{ }} }, { "tweetid":
"6", "$1": {{ }} }, { "tweetid": "7", "$1": {{ }} }, { "tweetid": "8", "$1":
{{ "t-mobile" }} }, { "tweetid": "9", "$1": {{ }} } ], "t2": [ { "$2": [ 2, 1
] } ], "t3": [ 3, 5 ], "t4": [ 3, 5 ], "t5": [ 3, "a" ], "t6": [ 3 ], "t7": [
], "t8": [ ], "t9": [ ], "t10": [ ], "t12": null, "t13": null, "t15": [ ],
"t16": [ ], "t17": [ { }, { "$3": [ "John Green" ] } ], "t18": [ 1, 2 ] }
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java
index 1100a59..a004f89 100755
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java
@@ -136,6 +136,7 @@
}
if (valueTag == ATypeTag.MISSING) {
PointableHelper.setMissing(result);
+ caster.deallocatePointables();
return;
}
if (!acceptNullValues && valueTag == ATypeTag.NULL) {
@@ -145,10 +146,12 @@
if (returnNull) {
PointableHelper.setNull(result);
+ caster.deallocatePointables();
return;
}
if (encounteredNonPrimitive) {
+ caster.deallocatePointables();
throw new RuntimeDataException(ErrorCode.CANNOT_COMPARE_COMPLEX,
sourceLocation);
}
// all arguments are valid
@@ -190,6 +193,8 @@
result.set(storage);
} catch (IOException e) {
throw HyracksDataException.create(e);
+ } finally {
+ caster.deallocatePointables();
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java
index 3258a8d..ff9b38e 100755
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java
@@ -88,6 +88,7 @@
AbstractCollectionType outList = null;
ATypeTag listTag;
for (int i = 0; i < listsEval.length; i++) {
+ // TODO(ali): avoid having to perform evaluate twice
listsEval[i].evaluate(tuple, listsArgs[i]);
if (!returnNull) {
listArgType =
listsArgs[i].getByteArray()[listsArgs[i].getStartOffset()];
@@ -109,6 +110,7 @@
if (returnNull) {
PointableHelper.setNull(result);
+ caster.deallocatePointables();
return;
}
@@ -140,6 +142,7 @@
release();
storageAllocator.reset();
pointableAllocator.reset();
+ caster.deallocatePointables();
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java
index 06381b5..0633fa4 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java
@@ -194,6 +194,7 @@
} finally {
storageAllocator.reset();
listAccessorAllocator.reset();
+ caster.deallocatePointables();
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
index 52335a0..fc4d563 100755
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
@@ -18,54 +18,19 @@
*/
package org.apache.asterix.runtime.evaluators.functions;
-import static
org.apache.asterix.om.types.EnumDeserializer.ATYPETAGDESERIALIZER;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import org.apache.asterix.builders.AbvsBuilderFactory;
-import org.apache.asterix.builders.ArrayListFactory;
-import org.apache.asterix.builders.IAsterixListBuilder;
-import org.apache.asterix.builders.OrderedListBuilder;
-import org.apache.asterix.builders.UnorderedListBuilder;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import
org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
-import
org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
-import
org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
-import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.om.functions.IFunctionTypeInferer;
-import org.apache.asterix.om.pointables.PointableAllocator;
-import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AbstractCollectionType;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.util.container.IObjectPool;
-import org.apache.asterix.om.util.container.ListObjectPool;
import
org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
-import org.apache.asterix.runtime.evaluators.common.ListAccessor;
import org.apache.asterix.runtime.functions.FunctionTypeInferers;
-import org.apache.asterix.runtime.utils.ArrayFunctionsUtil;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IMutableValueStorage;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
/**
* <pre>
@@ -101,31 +66,6 @@
}
};
- public class ValueListIndex implements IValueReference {
- private final IPointable value;
- private int listIndex;
-
- public ValueListIndex(IPointable value, int listIndex) {
- this.value = value;
- this.listIndex = listIndex;
- }
-
- @Override
- public byte[] getByteArray() {
- return value.getByteArray();
- }
-
- @Override
- public int getStartOffset() {
- return value.getStartOffset();
- }
-
- @Override
- public int getLength() {
- return value.getLength();
- }
- }
-
@Override
public FunctionIdentifier getIdentifier() {
return BuiltinFunctions.ARRAY_INTERSECT;
@@ -144,235 +84,8 @@
@Override
public IScalarEvaluator createScalarEvaluator(final
IHyracksTaskContext ctx) throws HyracksDataException {
- return new ArrayIntersectEval(args, ctx);
+ return new ArrayIntersectEval(args, ctx, argTypes, sourceLoc);
}
};
- }
-
- public class ArrayIntersectEval implements IScalarEvaluator {
- private final ListAccessor listAccessor;
- private final IPointable[] listsArgs;
- private final IScalarEvaluator[] listsEval;
- private final IBinaryHashFunction binaryHashFunction;
- private final Int2ObjectMap<List<ValueListIndex>> hashes;
- private final PointableAllocator pointableAllocator;
- private final IObjectPool<IMutableValueStorage, ATypeTag>
storageAllocator;
- private final IObjectPool<List<ValueListIndex>, ATypeTag>
arrayListAllocator;
- private final ArrayBackedValueStorage finalResult;
- private final CastTypeEvaluator caster;
- private final IBinaryComparator comp;
- private IAsterixListBuilder orderedListBuilder;
- private IAsterixListBuilder unorderedListBuilder;
-
- public ArrayIntersectEval(IScalarEvaluatorFactory[] args,
IHyracksTaskContext ctx) throws HyracksDataException {
- orderedListBuilder = null;
- unorderedListBuilder = null;
- pointableAllocator = new PointableAllocator();
- storageAllocator = new ListObjectPool<>(new AbvsBuilderFactory());
- arrayListAllocator = new ListObjectPool<>(new
ArrayListFactory<>());
- hashes = new Int2ObjectOpenHashMap<>();
- finalResult = new ArrayBackedValueStorage();
- listAccessor = new ListAccessor();
- caster = new CastTypeEvaluator();
- comp =
AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
- listsArgs = new IPointable[args.length];
- listsEval = new IScalarEvaluator[args.length];
- for (int i = 0; i < args.length; i++) {
- listsArgs[i] = new VoidPointable();
- listsEval[i] = args[i].createScalarEvaluator(ctx);
- }
- binaryHashFunction =
BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
- .createBinaryHashFunction();
- }
-
- @Override
- public void evaluate(IFrameTupleReference tuple, IPointable result)
throws HyracksDataException {
- byte listArgType;
- boolean returnNull = false;
- AbstractCollectionType outList = null;
- ATypeTag listTag;
- int minListIndex = 0;
- int minSize = -1;
- int nextSize;
- IScalarEvaluator listEval;
- IPointable listArg;
-
- // evaluate all the lists first to make sure they're all actually
lists and of the same list type
- for (int i = 0; i < listsEval.length; i++) {
- listEval = listsEval[i];
- listEval.evaluate(tuple, listsArgs[i]);
- if (!returnNull) {
- listArg = listsArgs[i];
- listArgType =
listArg.getByteArray()[listArg.getStartOffset()];
- listTag = ATYPETAGDESERIALIZER.deserialize(listArgType);
- if (!listTag.isListType()) {
- returnNull = true;
- } else if (outList != null && outList.getTypeTag() !=
listTag) {
- throw new
RuntimeDataException(ErrorCode.DIFFERENT_LIST_TYPE_ARGS, sourceLoc);
- } else {
- if (outList == null) {
- outList = (AbstractCollectionType)
DefaultOpenFieldType.getDefaultOpenFieldType(listTag);
- }
-
- caster.reset(outList, argTypes[i], listsEval[i]);
- caster.evaluate(tuple, listsArgs[i]);
- nextSize = getNumItems(outList,
listArg.getByteArray(), listArg.getStartOffset());
- if (nextSize < minSize) {
- minSize = nextSize;
- minListIndex = i;
- }
- }
- }
- }
-
- if (returnNull) {
- PointableHelper.setNull(result);
- return;
- }
-
- IAsterixListBuilder listBuilder;
- if (outList.getTypeTag() == ATypeTag.ARRAY) {
- if (orderedListBuilder == null) {
- orderedListBuilder = new OrderedListBuilder();
- }
- listBuilder = orderedListBuilder;
- } else {
- if (unorderedListBuilder == null) {
- unorderedListBuilder = new UnorderedListBuilder();
- }
- listBuilder = unorderedListBuilder;
- }
-
- hashes.clear();
- try {
- // first, get distinct items of the most restrictive
(smallest) list, pass listBuilder as null since
- // we're not adding values yet. Values will be added to
listBuilder after inspecting all input lists
- listArg = listsArgs[minListIndex];
- listAccessor.reset(listArg.getByteArray(),
listArg.getStartOffset());
- processList(listAccessor, minListIndex, null, true);
-
- // now process each list one by one
- listBuilder.reset(outList);
- for (int listIndex = 0; listIndex < listsArgs.length;
listIndex++) {
- if (listIndex == minListIndex) {
- incrementSmallest(listIndex, hashes.values());
- } else {
- listArg = listsArgs[listIndex];
- listAccessor.reset(listArg.getByteArray(),
listArg.getStartOffset());
- processList(listAccessor, listIndex, listBuilder,
false);
- }
- }
-
- finalResult.reset();
- listBuilder.write(finalResult.getDataOutput(), true);
- result.set(finalResult);
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- } finally {
- storageAllocator.reset();
- arrayListAllocator.reset();
- pointableAllocator.reset();
- }
- }
-
- private int getNumItems(AbstractCollectionType listType, byte[]
listBytes, int offset) {
- if (listType.getTypeTag() == ATypeTag.ARRAY) {
- return
AOrderedListSerializerDeserializer.getNumberOfItems(listBytes, offset);
- } else {
- return
AUnorderedListSerializerDeserializer.getNumberOfItems(listBytes, offset);
- }
- }
-
- private void processList(ListAccessor listAccessor, int listIndex,
IAsterixListBuilder listBuilder,
- boolean initIntersectList) throws IOException {
- int hash;
- List<ValueListIndex> sameHashes;
- boolean itemInStorage;
- IPointable item = pointableAllocator.allocateEmpty();
- ArrayBackedValueStorage storage = (ArrayBackedValueStorage)
storageAllocator.allocate(null);
- storage.reset();
- for (int j = 0; j < listAccessor.size(); j++) {
- itemInStorage = listAccessor.getOrWriteItem(j, item, storage);
- if
(ATYPETAGDESERIALIZER.deserialize(item.getByteArray()[item.getStartOffset()]).isDerivedType())
{
- throw new
RuntimeDataException(ErrorCode.CANNOT_COMPARE_COMPLEX, sourceLoc);
- }
- if (notNullAndMissing(item)) {
- // look up to see if item exists
- hash = binaryHashFunction.hash(item.getByteArray(),
item.getStartOffset(), item.getLength());
- sameHashes = hashes.get(hash);
- if (initIntersectList && initIntersectList(item, hash,
sameHashes)) {
- // item is used
- item = pointableAllocator.allocateEmpty();
- if (itemInStorage) {
- storage = (ArrayBackedValueStorage)
storageAllocator.allocate(null);
- storage.reset();
- }
- } else {
- incrementCommonValue(item, sameHashes, listIndex,
listBuilder);
- }
- }
- }
- }
-
- // collect the items of the most restrictive list, it initializes the
list index as -1. each successive list
- // should stamp the value with its list index if the list has the
item. It starts with list index = 0
- private boolean initIntersectList(IPointable item, int hash,
List<ValueListIndex> sameHashes)
- throws IOException {
- // add if new item
- if (sameHashes == null) {
- List<ValueListIndex> newHashes =
arrayListAllocator.allocate(null);
- newHashes.clear();
- newHashes.add(new ValueListIndex(item, -1));
- hashes.put(hash, newHashes);
- return true;
- } else if (ArrayFunctionsUtil.findItem(item, sameHashes, comp) ==
null) {
- sameHashes.add(new ValueListIndex(item, -1));
- return true;
- }
- // else ignore for duplicate values in the same list
- return false;
- }
-
- private void incrementCommonValue(IPointable item,
List<ValueListIndex> sameHashes, int listIndex,
- IAsterixListBuilder listBuilder) throws IOException {
- if (sameHashes != null) {
- // look for the same equal item, add to list builder when all
lists have seen this item
- incrementIfExists(sameHashes, item, listIndex, listBuilder);
- }
- }
-
- private boolean notNullAndMissing(IPointable item) {
- byte tag = item.getByteArray()[item.getStartOffset()];
- return tag != ATypeTag.SERIALIZED_NULL_TYPE_TAG && tag !=
ATypeTag.SERIALIZED_MISSING_TYPE_TAG;
- }
-
- // this method is only for the most restrictive list. it avoids
comparison since it is the initial list we start
- // with, so for sure every element in the collection must exist in the
list
- private void incrementSmallest(int listIndex,
Collection<List<ValueListIndex>> commonValues) {
- for (List<ValueListIndex> items : commonValues) {
- for (int i = 0; i < items.size(); i++) {
- // any difference that is not == 1 means either this
current list has already stamped and advanced
- // the stamp or the item is not common among lists because
if it's common then each list should've
- // incremented the item list index up to the current list
index
- if (listIndex - items.get(i).listIndex == 1) {
- items.get(i).listIndex = listIndex;
- }
- }
- }
- }
-
- private void incrementIfExists(List<ValueListIndex> sameHashes,
IPointable item, int listIndex,
- IAsterixListBuilder listBuilder) throws HyracksDataException {
- ValueListIndex sameValue = ArrayFunctionsUtil.findItem(item,
sameHashes, comp);
- if (sameValue != null && listIndex - sameValue.listIndex == 1) {
- // found the item, its stamp is OK (stamp saves the last list
index that has seen this item)
- // increment stamp of this item
- sameValue.listIndex = listIndex;
- if (listIndex == listsArgs.length - 1) {
- // when listIndex is the last list, then it means this
item was found in all previous lists
- listBuilder.addItem(item);
- }
- }
- }
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectEval.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectEval.java
new file mode 100755
index 0000000..e56286f
--- /dev/null
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectEval.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.evaluators.functions;
+
+import static
org.apache.asterix.om.types.EnumDeserializer.ATYPETAGDESERIALIZER;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.asterix.builders.AbvsBuilderFactory;
+import org.apache.asterix.builders.ArrayListFactory;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.builders.UnorderedListBuilder;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import
org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
+import
org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
+import
org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
+import org.apache.asterix.om.pointables.PointableAllocator;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.container.IObjectFactory;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
+import org.apache.asterix.runtime.evaluators.common.ListAccessor;
+import org.apache.asterix.runtime.utils.ArrayFunctionsUtil;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+public class ArrayIntersectEval implements IScalarEvaluator {
+
+ private final SourceLocation sourceLoc;
+ private final IAType[] argTypes;
+ private final ListAccessor listAccessor;
+ private final IPointable currentItem;
+ private final ArrayBackedValueStorage currentItemStorage;
+ private final IPointable[] listsArgs;
+ private final IScalarEvaluator[] listsEval;
+ private final IBinaryHashFunction binaryHashFunction;
+ private final Int2ObjectMap<List<ValueListIndex>> hashes;
+ private final PointableAllocator pointableAllocator;
+ private final IObjectPool<IMutableValueStorage, ATypeTag> storageAllocator;
+ private final IObjectPool<List<ValueListIndex>, ATypeTag>
arrayListAllocator;
+ private final IObjectPool<ValueListIndex, ATypeTag>
valueListIndexAllocator;
+ private final ArrayBackedValueStorage finalResult;
+ private final CastTypeEvaluator caster;
+ private final IBinaryComparator comp;
+ private IAsterixListBuilder orderedListBuilder;
+ private IAsterixListBuilder unorderedListBuilder;
+
+ public ArrayIntersectEval(IScalarEvaluatorFactory[] args,
IHyracksTaskContext ctx, IAType[] argumentTypes,
+ SourceLocation sourceLocation) throws HyracksDataException {
+ orderedListBuilder = null;
+ unorderedListBuilder = null;
+ argTypes = argumentTypes;
+ sourceLoc = sourceLocation;
+ pointableAllocator = new PointableAllocator();
+ storageAllocator = new ListObjectPool<>(new AbvsBuilderFactory());
+ arrayListAllocator = new ListObjectPool<>(new ArrayListFactory<>());
+ valueListIndexAllocator = new ListObjectPool<>(new
ValueListIndexFactory());
+ hashes = new Int2ObjectOpenHashMap<>();
+ finalResult = new ArrayBackedValueStorage();
+ listAccessor = new ListAccessor();
+ caster = new CastTypeEvaluator();
+ comp =
AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+ listsArgs = new IPointable[args.length];
+ listsEval = new IScalarEvaluator[args.length];
+ currentItem = new VoidPointable();
+ currentItemStorage = new ArrayBackedValueStorage();
+ for (int i = 0; i < args.length; i++) {
+ listsArgs[i] = new VoidPointable();
+ listsEval[i] = args[i].createScalarEvaluator(ctx);
+ }
+ binaryHashFunction =
BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
+ .createBinaryHashFunction();
+ }
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws
HyracksDataException {
+ byte listArgType;
+ boolean returnNull = false;
+ AbstractCollectionType outList = null;
+ ATypeTag listTag;
+ int minListIndex = 0;
+ int minSize = -1;
+ int nextSize;
+ IScalarEvaluator listEval;
+ IPointable listArg;
+
+ // evaluate all the lists first to make sure they're all actually
lists and of the same list type
+ for (int i = 0; i < listsEval.length; i++) {
+ // TODO(ali): avoid evaluating the list constructor twice, one
here to identify the list and one in cast
+ listEval = listsEval[i];
+ listEval.evaluate(tuple, listsArgs[i]);
+ if (!returnNull) {
+ listArg = listsArgs[i];
+ listArgType = listArg.getByteArray()[listArg.getStartOffset()];
+ listTag = ATYPETAGDESERIALIZER.deserialize(listArgType);
+ if (!listTag.isListType()) {
+ returnNull = true;
+ } else if (outList != null && outList.getTypeTag() != listTag)
{
+ throw new
RuntimeDataException(ErrorCode.DIFFERENT_LIST_TYPE_ARGS, sourceLoc);
+ } else {
+ if (outList == null) {
+ outList = (AbstractCollectionType)
DefaultOpenFieldType.getDefaultOpenFieldType(listTag);
+ }
+
+ caster.reset(outList, argTypes[i], listsEval[i]);
+ caster.evaluate(tuple, listsArgs[i]);
+ nextSize = getNumItems(outList, listArg.getByteArray(),
listArg.getStartOffset());
+ if (nextSize < minSize || minSize == -1) {
+ minSize = nextSize;
+ minListIndex = i;
+ }
+ }
+ }
+ }
+
+ if (returnNull) {
+ PointableHelper.setNull(result);
+ caster.deallocatePointables();
+ return;
+ }
+
+ IAsterixListBuilder listBuilder;
+ if (outList.getTypeTag() == ATypeTag.ARRAY) {
+ if (orderedListBuilder == null) {
+ orderedListBuilder = new OrderedListBuilder();
+ }
+ listBuilder = orderedListBuilder;
+ } else {
+ if (unorderedListBuilder == null) {
+ unorderedListBuilder = new UnorderedListBuilder();
+ }
+ listBuilder = unorderedListBuilder;
+ }
+
+ hashes.clear();
+ try {
+ // first, get distinct items of the most restrictive (smallest)
list.
+ // values will be added to listBuilder after inspecting all input
lists
+ listArg = listsArgs[minListIndex];
+ listAccessor.reset(listArg.getByteArray(),
listArg.getStartOffset());
+ buildRestrictiveList(listAccessor);
+
+ // process each list one by one
+ listBuilder.reset(outList);
+ for (int listIndex = 0; listIndex < listsArgs.length; listIndex++)
{
+ if (listIndex == minListIndex) {
+ incrementSmallest(listIndex, hashes.values(), listBuilder);
+ } else {
+ listArg = listsArgs[listIndex];
+ listAccessor.reset(listArg.getByteArray(),
listArg.getStartOffset());
+ processList(listAccessor, listIndex, listBuilder);
+ }
+ }
+
+ finalResult.reset();
+ listBuilder.write(finalResult.getDataOutput(), true);
+ result.set(finalResult);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ } finally {
+ caster.deallocatePointables();
+ valueListIndexAllocator.reset();
+ storageAllocator.reset();
+ arrayListAllocator.reset();
+ pointableAllocator.reset();
+ }
+ }
+
+ private int getNumItems(AbstractCollectionType listType, byte[] listBytes,
int offset) {
+ if (listType.getTypeTag() == ATypeTag.ARRAY) {
+ return
AOrderedListSerializerDeserializer.getNumberOfItems(listBytes, offset);
+ } else {
+ return
AUnorderedListSerializerDeserializer.getNumberOfItems(listBytes, offset);
+ }
+ }
+
+ // puts all the items of the smallest list in "hashes"
+ private void buildRestrictiveList(ListAccessor listAccessor) throws
IOException {
+ if (listAccessor.size() > 0) {
+ int hash;
+ List<ValueListIndex> sameHashes;
+ boolean itemInStorage;
+ IPointable item = pointableAllocator.allocateEmpty();
+ ArrayBackedValueStorage storage = (ArrayBackedValueStorage)
storageAllocator.allocate(null);
+ storage.reset();
+ for (int j = 0; j < listAccessor.size(); j++) {
+ itemInStorage = listAccessor.getOrWriteItem(j, item, storage);
+ validateItem(item);
+ if (notNullAndMissing(item)) {
+ hash = binaryHashFunction.hash(item.getByteArray(),
item.getStartOffset(), item.getLength());
+ sameHashes = hashes.get(hash);
+ if (addToSmallestList(item, hash, sameHashes)) {
+ // item has been added to intersect list and is being
used, allocate new pointable
+ item = pointableAllocator.allocateEmpty();
+ if (itemInStorage) {
+ storage = (ArrayBackedValueStorage)
storageAllocator.allocate(null);
+ storage.reset();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // collects the items of the most restrictive list, it initializes the
list index as -1. each successive list
+ // should stamp the value with its list index if the list has the item. It
starts with list index = 0
+ private boolean addToSmallestList(IPointable item, int hash,
List<ValueListIndex> sameHashes) throws IOException {
+ // add if new item
+ if (sameHashes == null) {
+ List<ValueListIndex> newHashes = arrayListAllocator.allocate(null);
+ newHashes.clear();
+ ValueListIndex valueListIndex =
valueListIndexAllocator.allocate(null);
+ valueListIndex.set(item, -1);
+ newHashes.add(valueListIndex);
+ hashes.put(hash, newHashes);
+ return true;
+ } else if (ArrayFunctionsUtil.findItem(item, sameHashes, comp) ==
null) {
+ ValueListIndex valueListIndex =
valueListIndexAllocator.allocate(null);
+ valueListIndex.set(item, -1);
+ sameHashes.add(valueListIndex);
+ return true;
+ }
+ // else ignore for duplicate values in the same list
+ return false;
+ }
+
+ // inspects all list arguments except the most restrictive one
+ private void processList(ListAccessor listAccessor, int listIndex,
IAsterixListBuilder listBuilder)
+ throws IOException {
+ int hash;
+ List<ValueListIndex> sameHashes;
+ for (int j = 0; j < listAccessor.size(); j++) {
+ listAccessor.getOrWriteItem(j, currentItem, currentItemStorage);
+ validateItem(currentItem);
+ if (notNullAndMissing(currentItem)) {
+ // hash the item and look up to see if it is common
+ hash = binaryHashFunction.hash(currentItem.getByteArray(),
currentItem.getStartOffset(),
+ currentItem.getLength());
+ sameHashes = hashes.get(hash);
+ incrementIfCommonValue(currentItem, sameHashes, listIndex,
listBuilder);
+ }
+ }
+ }
+
+ private void incrementIfCommonValue(IPointable item, List<ValueListIndex>
sameHashes, int listIndex,
+ IAsterixListBuilder listBuilder) throws IOException {
+ if (sameHashes != null) {
+ // look for the same equal item, add to list builder when all
lists have seen this item
+ incrementIfExists(sameHashes, item, listIndex, listBuilder);
+ }
+ }
+
+ private void incrementIfExists(List<ValueListIndex> sameHashes, IPointable
item, int listIndex,
+ IAsterixListBuilder listBuilder) throws HyracksDataException {
+ ValueListIndex sameValue = ArrayFunctionsUtil.findItem(item,
sameHashes, comp);
+ if (sameValue != null && listIndex - sameValue.listIndex == 1) {
+ // found the item, its stamp is OK (stamp saves the index of the
last list that has seen this item)
+ // increment stamp of this item
+ sameValue.listIndex = listIndex;
+ if (listIndex == listsArgs.length - 1) {
+ // if this list is the last to stamp, then add to the final
result
+ listBuilder.addItem(item);
+ }
+ }
+ }
+
+ private boolean notNullAndMissing(IPointable item) {
+ byte tag = item.getByteArray()[item.getStartOffset()];
+ return tag != ATypeTag.SERIALIZED_NULL_TYPE_TAG && tag !=
ATypeTag.SERIALIZED_MISSING_TYPE_TAG;
+ }
+
+ // this method is only for the most restrictive list. it avoids comparison
since it is the initial list we start
+ // with, so for sure every element in the collection must exist in the list
+ private void incrementSmallest(int listIndex,
Collection<List<ValueListIndex>> commonValues,
+ IAsterixListBuilder listBuilder) throws HyracksDataException {
+ // TODO(ali): avoid iterator object creation. Using lambdas currently
causes errors due code gen
+ ValueListIndex commonValue;
+ for (List<ValueListIndex> commonItems : commonValues) {
+ for (int i = 0; i < commonItems.size(); i++) {
+ // any difference that is not == 1 means the item is not
common among lists because if it's common then
+ // each previous list should've incremented the item list
index up to the current list index
+ commonValue = commonItems.get(i);
+ if (listIndex - commonValue.listIndex == 1) {
+ commonValue.listIndex = listIndex;
+ // if this restrictive list is the last to stamp, then add
to the final result
+ if (listIndex == listsArgs.length - 1) {
+ listBuilder.addItem(commonValue);
+ }
+ }
+ }
+ }
+ }
+
+ // validates that the item is not derived, multisets, objects and arrays
are not yet supported
+ private void validateItem(IPointable item) throws RuntimeDataException {
+ if
(ATYPETAGDESERIALIZER.deserialize(item.getByteArray()[item.getStartOffset()]).isDerivedType())
{
+ throw new RuntimeDataException(ErrorCode.CANNOT_COMPARE_COMPLEX,
sourceLoc);
+ }
+ }
+
+ protected class ValueListIndex implements IValueReference {
+ private IPointable value;
+ private int listIndex;
+
+ protected ValueListIndex() {
+ }
+
+ protected void set(IPointable value, int listIndex) {
+ this.value = value;
+ this.listIndex = listIndex;
+ }
+
+ @Override
+ public byte[] getByteArray() {
+ return value.getByteArray();
+ }
+
+ @Override
+ public int getStartOffset() {
+ return value.getStartOffset();
+ }
+
+ @Override
+ public int getLength() {
+ return value.getLength();
+ }
+ }
+
+ public class ValueListIndexFactory implements
IObjectFactory<ValueListIndex, ATypeTag> {
+
+ public ValueListIndexFactory() {
+ }
+
+ @Override
+ public ValueListIndex create(ATypeTag arg) {
+ return new ValueListIndex();
+ }
+ }
+}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
index 032ef32..9a8d267 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
@@ -239,6 +239,8 @@
result.set(storage);
} catch (IOException e) {
throw HyracksDataException.create(e);
+ } finally {
+ caster.deallocatePointables();
}
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java
index 4bc885c..87db6c5 100755
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java
@@ -213,6 +213,7 @@
throw HyracksDataException.create(e);
} finally {
pointableAllocator.reset();
+ caster.deallocatePointables();
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
index d1879b2..05541a6 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
@@ -98,4 +98,8 @@
return allocator.allocateFieldValue(null);
}
}
+
+ public void deallocatePointables() {
+ allocator.reset();
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/3129
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib6c8c55ed3e0a35e00c5976a46e9ed6e432a6e9f
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-f69489
Gerrit-Owner: Ali Alsuliman <[email protected]>