Ali Alsuliman has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/3391


Change subject: [ASTERIXDB-2564][RT] Too many objects created in min() and max()
......................................................................

[ASTERIXDB-2564][RT] Too many objects created in min() and max()

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
During min() and max() aggregation, the functions keep track of
the aggregation type in order to handle heterogeneous  lists.
It promotes the aggregation type if needed (e.g. encountered double).
Don't switch to new aggregation type and create a new comparator
when the new input value type is the same as the previously
aggregated values. That is because canPromote(agg_type, new_val_type)
will always return true for same types.

Change-Id: I0bb9f0715985ae555de00bbf3173c80371d8968b
---
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
1 file changed, 40 insertions(+), 54 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/91/3391/1

diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
index 86ae924..616bb5a 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
@@ -30,7 +30,6 @@
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
@@ -39,18 +38,16 @@
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;

 public abstract class AbstractMinMaxAggregateFunction extends 
AbstractAggregateFunction {
-    private IPointable inputVal = new VoidPointable();
-    private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
-    private ArrayBackedValueStorage tempValForCasting = new 
ArrayBackedValueStorage();
-
-    protected ArrayBackedValueStorage resultStorage = new 
ArrayBackedValueStorage();
-    private IScalarEvaluator eval;
+    protected final ArrayBackedValueStorage resultStorage = new 
ArrayBackedValueStorage();
+    private final IPointable inputVal = new VoidPointable();
+    private final ArrayBackedValueStorage outputVal = new 
ArrayBackedValueStorage();
+    private final ArrayBackedValueStorage tempValForCasting = new 
ArrayBackedValueStorage();
+    private final IScalarEvaluator eval;
+    private final boolean isMin;
     protected ATypeTag aggType;
     private IBinaryComparator cmp;
-    private ITypeConvertComputer tpc;
-    private final boolean isMin;

-    public AbstractMinMaxAggregateFunction(IScalarEvaluatorFactory[] args, 
IHyracksTaskContext context, boolean isMin,
+    AbstractMinMaxAggregateFunction(IScalarEvaluatorFactory[] args, 
IHyracksTaskContext context, boolean isMin,
             SourceLocation sourceLoc) throws HyracksDataException {
         super(sourceLoc);
         eval = args[0].createScalarEvaluator(context);
@@ -82,9 +79,8 @@
             // First value encountered. Set type, comparator, and initial 
value.
             aggType = typeTag;
             // Set comparator.
-            IBinaryComparatorFactory cmpFactory =
-                    
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, 
isMin);
-            cmp = cmpFactory.createBinaryComparator();
+            cmp = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, 
isMin)
+                    .createBinaryComparator();
             // Initialize min value.
             outputVal.assign(inputVal);
         } else if (typeTag != ATypeTag.SYSTEM_NULL && 
!ATypeHierarchy.isCompatible(typeTag, aggType)) {
@@ -94,56 +90,27 @@
                 throw new IncompatibleTypeException(sourceLoc, "min/max", 
aggType.serialize(), typeTag.serialize());
             }
         } else {
-
             // If a system_null is encountered locally, it would be an error; 
otherwise if it is seen
             // by a global aggregator, it is simple ignored.
             if (typeTag == ATypeTag.SYSTEM_NULL) {
                 processSystemNull();
                 return;
             }
-
+            if (aggType == typeTag) {
+                compareAndUpdate(cmp, inputVal, outputVal);
+                return;
+            }
             if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-                tpc = ATypeHierarchy.getTypePromoteComputer(aggType, typeTag);
-                aggType = typeTag;
-                cmp = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, 
isMin)
+                // switch to new comp & aggregation type (i.e. current min/max 
is int and new input is double)
+                cmp = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(typeTag, 
isMin)
                         .createBinaryComparator();
-                if (tpc != null) {
-                    tempValForCasting.reset();
-                    try {
-                        tpc.convertType(outputVal.getByteArray(), 
outputVal.getStartOffset() + 1,
-                                outputVal.getLength() - 1, 
tempValForCasting.getDataOutput());
-                    } catch (IOException e) {
-                        throw HyracksDataException.create(e);
-                    }
-                    outputVal.assign(tempValForCasting);
-                }
-                if (cmp.compare(inputVal.getByteArray(), 
inputVal.getStartOffset(), inputVal.getLength(),
-                        outputVal.getByteArray(), outputVal.getStartOffset(), 
outputVal.getLength()) < 0) {
-                    outputVal.assign(inputVal);
-                }
-
+                castValue(ATypeHierarchy.getTypePromoteComputer(aggType, 
typeTag), outputVal, tempValForCasting);
+                outputVal.assign(tempValForCasting);
+                compareAndUpdate(cmp, inputVal, outputVal);
+                aggType = typeTag;
             } else {
-                tpc = ATypeHierarchy.getTypePromoteComputer(typeTag, aggType);
-                if (tpc != null) {
-                    tempValForCasting.reset();
-                    try {
-                        tpc.convertType(inputVal.getByteArray(), 
inputVal.getStartOffset() + 1,
-                                inputVal.getLength() - 1, 
tempValForCasting.getDataOutput());
-                    } catch (IOException e) {
-                        throw HyracksDataException.create(e);
-                    }
-                    if (cmp.compare(tempValForCasting.getByteArray(), 
tempValForCasting.getStartOffset(),
-                            tempValForCasting.getLength(), 
outputVal.getByteArray(), outputVal.getStartOffset(),
-                            outputVal.getLength()) < 0) {
-                        outputVal.assign(tempValForCasting);
-                    }
-                } else {
-                    if (cmp.compare(inputVal.getByteArray(), 
inputVal.getStartOffset(), inputVal.getLength(),
-                            outputVal.getByteArray(), 
outputVal.getStartOffset(), outputVal.getLength()) < 0) {
-                        outputVal.assign(inputVal);
-                    }
-                }
-
+                castValue(ATypeHierarchy.getTypePromoteComputer(typeTag, 
aggType), inputVal, tempValForCasting);
+                compareAndUpdate(cmp, tempValForCasting, outputVal);
             }
         }
     }
@@ -187,4 +154,23 @@
     protected abstract void processSystemNull() throws HyracksDataException;

     protected abstract void finishSystemNull() throws IOException;
+
+    private static void compareAndUpdate(IBinaryComparator comp, IPointable 
newVal, ArrayBackedValueStorage oldVal)
+            throws HyracksDataException {
+        if (comp.compare(newVal.getByteArray(), newVal.getStartOffset(), 
newVal.getLength(), oldVal.getByteArray(),
+                oldVal.getStartOffset(), oldVal.getLength()) < 0) {
+            oldVal.assign(newVal);
+        }
+    }
+
+    private static void castValue(ITypeConvertComputer typeConverter, 
IPointable inputValue,
+            ArrayBackedValueStorage tempValForCasting) throws 
HyracksDataException {
+        tempValForCasting.reset();
+        try {
+            typeConverter.convertType(inputValue.getByteArray(), 
inputValue.getStartOffset() + 1,
+                    inputValue.getLength() - 1, 
tempValForCasting.getDataOutput());
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
 }

--
To view, visit https://asterix-gerrit.ics.uci.edu/3391
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0bb9f0715985ae555de00bbf3173c80371d8968b
Gerrit-Change-Number: 3391
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <ali.al.solai...@gmail.com>

Reply via email to