Yingyi Bu has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1940

Change subject: [RT]Buget the memory usage for pre-clustered group-by.
......................................................................

[RT]Buget the memory usage for pre-clustered group-by.

Change-Id: I670269b0b8f446d06d8dd73202194574aa524e85
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/query-issue810-3/query-issue810-3.3.query.sqlpp
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql/query-issue810/query-issue810.3.query.sqlpp
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch/query-issue810-3/query-issue810-3.3.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/listify/listify.1.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
24 files changed, 184 insertions(+), 31 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/40/1940/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 5aaf87b..fa066f9 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -182,7 +182,8 @@
                                         
columnList.add(varRef.getVariableReference());
                                     }
                                 }
-                                op.setPhysicalOperator(new 
PreclusteredGroupByPOperator(columnList, gby.isGroupAll()));
+                                op.setPhysicalOperator(new 
PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
+                                        
context.getPhysicalOptimizationConfig().getMaxFramesExternalGroupBy()));
                             }
                         }
                     } else if (((AbstractLogicalOperator) 
(r0.getValue())).getOperatorTag()
@@ -196,7 +197,8 @@
                                 columnList.add(varRef.getVariableReference());
                             }
                         }
-                        op.setPhysicalOperator(new 
PreclusteredGroupByPOperator(columnList, gby.isGroupAll()));
+                        op.setPhysicalOperator(new 
PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
+                                
context.getPhysicalOptimizationConfig().getMaxFramesExternalGroupBy()));
                     } else {
                         throw new AlgebricksException("Unsupported nested 
operator within a group-by: "
                                 + ((AbstractLogicalOperator) 
(r0.getValue())).getOperatorTag().name());
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp
new file mode 100644
index 0000000..92698ab
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type tpch.LineItemType as
+ closed {
+  l_orderkey : bigint,
+  l_partkey : bigint,
+  l_suppkey : bigint,
+  l_linenumber : bigint,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+}
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp
new file mode 100644
index 0000000..5fe734c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load  dataset LineItem using localfs 
((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`))
 pre-sorted;
+
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp
new file mode 100644
index 0000000..73d7ab2
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SELECT  l_returnflag AS l_returnflag,
+        l_linestatus AS l_linestatus,
+        g
+FROM  LineItem AS l
+WHERE l.l_shipdate <= '1998-09-02'
+/* +hash */
+GROUP BY l.l_returnflag AS l_returnflag, l.l_linestatus AS l_linestatus
+GROUP AS g
+ORDER BY l_returnflag, l_linestatus
+;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/query-issue810-3/query-issue810-3.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/query-issue810-3/query-issue810-3.3.query.sqlpp
index 5186119..c5a8933 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/query-issue810-3/query-issue810-3.3.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/query-issue810-3/query-issue810-3.3.query.sqlpp
@@ -25,6 +25,8 @@
 
 USE tpch;
 
+SET `compiler.groupmemory` "32MB"
+
 
 SELECT  l_returnflag,
         l_linestatus,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql/query-issue810/query-issue810.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql/query-issue810/query-issue810.3.query.sqlpp
index 17e37b3..7f21560 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql/query-issue810/query-issue810.3.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql/query-issue810/query-issue810.3.query.sqlpp
@@ -25,6 +25,7 @@
 
 USE tpch;
 
+SET `compiler.groupmemory` "32MB"
 
 SELECT l_returnflag AS l_returnflag,
        l_linestatus AS l_linestatus,
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch/query-issue810-3/query-issue810-3.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch/query-issue810-3/query-issue810-3.3.query.sqlpp
index f25f99d..a46a0e7 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch/query-issue810-3/query-issue810-3.3.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch/query-issue810-3/query-issue810-3.3.query.sqlpp
@@ -25,6 +25,7 @@
 
 use tpch;
 
+set `compiler.groupmemory` "32MB"
 
 select element 
{'l_returnflag':l_returnflag,'l_linestatus':l_linestatus,'count_cheaps':coll_count(cheaps),'avg_expensive_discounts':tpch.coll_avg(expensives),'sum_disc_prices':tpch.coll_sum(disc_prices),'total_charges':tpch.coll_sum(charges)}
 from  LineItem as l
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/listify/listify.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/listify/listify.1.adm
new file mode 100644
index 0000000..583f20b
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/listify/listify.1.adm
@@ -0,0 +1 @@
+{ "avgpay": 2500.0, "workers": [ { "name": "Bill", "salary": 2000 }, { "name": 
"Fred", "salary": 3000 } ], "deptno": "K55" }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 70b3486..cc21741 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -2887,6 +2887,12 @@
         <expected-error>Invalid item type: function agg-sum cannot process 
item type object in an input array (or multiset)</expected-error>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="group-by">
+      <compilation-unit name="listify">
+        <output-dir compare="Text">none</output-dir>
+        <expected-error>Group By operator memory usage</expected-error>
+      </compilation-unit>
+    </test-case>
   </test-group>
   <test-group name="index-join">
     <test-case FilePath="index-join">
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index a636d10..445d4f2 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -43,10 +43,12 @@
 public class PreclusteredGroupByPOperator extends 
AbstractPreclusteredGroupByPOperator {
 
     private final boolean groupAll;
+    private final int framesLimit;
 
-    public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, 
boolean groupAll) {
+    public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, 
boolean groupAll, int framesLimit) {
         super(columnList);
         this.groupAll = groupAll;
+        this.framesLimit = framesLimit;
     }
 
     @Override
@@ -84,7 +86,7 @@
                 context);
 
         PreclusteredGroupOperatorDescriptor opDesc = new 
PreclusteredGroupOperatorDescriptor(spec, keys,
-                comparatorFactories, aggregatorFactory, recordDescriptor, 
groupAll);
+                comparatorFactories, aggregatorFactory, recordDescriptor, 
groupAll, framesLimit);
 
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 7dc59f6..4b31dc2 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -187,7 +187,8 @@
                         }
                     }
                     if (topLevelOp) {
-                        op.setPhysicalOperator(new 
PreclusteredGroupByPOperator(columnList, gby.isGroupAll()));
+                        op.setPhysicalOperator(new 
PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
+                                
context.getPhysicalOptimizationConfig().getMaxFramesExternalGroupBy()));
                     } else {
                         op.setPhysicalOperator(new 
MicroPreclusteredGroupByPOperator(columnList));
                     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 893aa61..e901675 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -53,7 +54,8 @@
 
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, 
RecordDescriptor inRecordDesc,
-            RecordDescriptor outRecordDescriptor, int[] keys, int[] 
partialKeys) throws HyracksDataException {
+            RecordDescriptor outRecordDescriptor, int[] keys, int[] 
partialKeys, long memoryBudget)
+            throws HyracksDataException {
         final AggregatorOutput outputWriter = new AggregatorOutput(subplans, 
keyFieldIdx.length, decorFieldIdx.length);
         final NestedTupleSourceRuntime[] pipelines = new 
NestedTupleSourceRuntime[subplans.length];
         for (int i = 0; i < subplans.length; i++) {
@@ -86,6 +88,9 @@
             @Override
             public void aggregate(IFrameTupleAccessor accessor, int tIndex, 
IFrameTupleAccessor stateAccessor,
                     int stateTupleIndex, AggregateState state) throws 
HyracksDataException {
+                // Checks the memory usage.
+                memoryUsageCheck();
+
                 for (int i = 0; i < pipelines.length; i++) {
                     pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
                 }
@@ -98,7 +103,10 @@
                     outputWriter.setInputIdx(i);
                     pipelines[i].close();
                 }
-                // outputWriter.writeTuple(appender);
+
+                // Checks the memory usage.
+                memoryUsageCheck();
+
                 tupleBuilder.reset();
                 ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
                 byte[] data = tb.getByteArray();
@@ -136,6 +144,16 @@
 
             }
 
+            // Checks the memory usage.
+            private void memoryUsageCheck() throws HyracksDataException {
+                ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
+                byte[] data = tb.getByteArray();
+                if (memoryBudget > 0 && data.length > memoryBudget) {
+                    throw 
HyracksDataException.create(ErrorCode.GROUP_BY_MEMORY_BUDGET_EXCEEDS, 
data.length,
+                            memoryBudget);
+                }
+            }
+
         };
     }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 8b8e320..c261df8 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -60,7 +60,7 @@
     @Override
     public IAggregatorDescriptor createAggregator(final IHyracksTaskContext 
ctx, RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, int[] 
keyFieldsInPartialResults,
-            final IFrameWriter writer) throws HyracksDataException {
+            final IFrameWriter writer, long memoryBudget) throws 
HyracksDataException {
         final RunningAggregatorOutput outputWriter =
                 new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, 
decorFieldIdx.length, writer);
         // should enforce protocol
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index e4ac701..1e06c76 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -42,8 +42,8 @@
 
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, 
RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] 
keyFieldsInPartialResults)
-                    throws HyracksDataException {
+            RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] 
keyFieldsInPartialResults,
+            long memoryBudget) throws HyracksDataException {
         final int[] keys = keyFields;
 
         /**
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 87f3cb4..50452c1 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -43,7 +43,8 @@
 
     @Override
     public IAggregatorDescriptor createAggregator(final IHyracksTaskContext 
ctx, RecordDescriptor inRecordDesc,
-            RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] 
partialKeys) throws HyracksDataException {
+            RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] 
partialKeys, long memoryBudget)
+            throws HyracksDataException {
 
         return new IAggregatorDescriptor() {
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index e6fbc6f..d8aeed0 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -122,6 +122,7 @@
     public static final int UNRECOGNIZED_INDEX_COMPONENT_FILE = 86;
     public static final int UNEQUAL_NUM_FILTERS_TREES = 87;
     public static final int CANNOT_MODIFY_INDEX_DISK_IS_FULL = 88;
+    public static final int GROUP_BY_MEMORY_BUDGET_EXCEEDS = 89;
 
     // Compilation error codes.
     public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index d2e05e3..05e2b1a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -105,5 +105,6 @@
 86 = Found an unrecognized index file %1$s
 87 = Unequal number of trees and filters found in %1$s
 88 = Cannot modify index (Disk is full)
+89 = Group By operator memory usage (%1$s bytes) exceeds the budget (%2$s 
bytes)
 
 10000 = The given rule collection %1$s is not an instance of the List class.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
index 5d13332..d546e5e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
@@ -32,14 +32,14 @@
      */
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, 
RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] 
keyFieldsInPartialResults, IFrameWriter writer)
-            throws HyracksDataException {
-        return this
-                .createAggregator(ctx, inRecordDescriptor, 
outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+            RecordDescriptor outRecordDescriptor, int[] keyFields, int[] 
keyFieldsInPartialResults, IFrameWriter writer,
+            long memoryBudget) throws HyracksDataException {
+        return createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, 
keyFields, keyFieldsInPartialResults,
+                memoryBudget);
     }
 
     abstract protected IAggregatorDescriptor 
createAggregator(IHyracksTaskContext ctx,
             RecordDescriptor inRecordDescriptor, RecordDescriptor 
outRecordDescriptor, int[] keyFields,
-            final int[] keyFieldsInPartialResults) throws HyracksDataException;
+            final int[] keyFieldsInPartialResults, long memoryBudget) throws 
HyracksDataException;
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index cd79a30..43b9685 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -97,7 +97,7 @@
                 hashFunctionFamilies).createPartitioner(seed);
 
         final IAggregatorDescriptor aggregator = 
aggregateFactory.createAggregator(ctx, inRecordDescriptor,
-                outRecordDescriptor, keyFields, intermediateResultKeys, null);
+                outRecordDescriptor, keyFields, intermediateResultKeys, null, 
-1);
 
         final AggregateState aggregateState = 
aggregator.createAggregateStates();
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
index 99c787c..d5ed713 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
@@ -29,6 +29,6 @@
 
     IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, 
RecordDescriptor inRecordDescriptor,
             RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] 
keyFieldsInPartialResults,
-            IFrameWriter writer) throws HyracksDataException;
+            IFrameWriter writer, long memoryBudget) throws 
HyracksDataException;
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index e326f39..595e2c4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -28,7 +28,6 @@
 import 
org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.AggregateState;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
 import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
 import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
 
@@ -58,8 +57,8 @@
      */
     @Override
     public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, 
RecordDescriptor inRecordDescriptor,
-            RecordDescriptor outRecordDescriptor, final int[] keyFields, final 
int[] keyFieldsInPartialResults)
-            throws HyracksDataException {
+            RecordDescriptor outRecordDescriptor, final int[] keyFields, final 
int[] keyFieldsInPartialResults,
+            long memoryBudget) throws HyracksDataException {
 
         final IFieldAggregateDescriptor[] aggregators = new 
IFieldAggregateDescriptor[aggregatorFactories.length];
         for (int i = 0; i < aggregators.length; i++) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
index 0fe0f54..739de74 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
@@ -35,22 +35,30 @@
     private final IBinaryComparatorFactory[] comparatorFactories;
     private final IAggregatorDescriptorFactory aggregatorFactory;
     private final boolean groupAll;
+    private final int framesLimit;
 
     public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry 
spec, int[] groupFields,
             IBinaryComparatorFactory[] comparatorFactories, 
IAggregatorDescriptorFactory aggregatorFactory,
             RecordDescriptor recordDescriptor) {
-        this(spec, groupFields, comparatorFactories, aggregatorFactory, 
recordDescriptor, false);
+        this(spec, groupFields, comparatorFactories, aggregatorFactory, 
recordDescriptor, false, -1);
     }
 
     public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry 
spec, int[] groupFields,
             IBinaryComparatorFactory[] comparatorFactories, 
IAggregatorDescriptorFactory aggregatorFactory,
-            RecordDescriptor recordDescriptor, boolean groupAll) {
+            RecordDescriptor recordDescriptor, int framesLimit) {
+        this(spec, groupFields, comparatorFactories, aggregatorFactory, 
recordDescriptor, false, framesLimit);
+    }
+
+    public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry 
spec, int[] groupFields,
+            IBinaryComparatorFactory[] comparatorFactories, 
IAggregatorDescriptorFactory aggregatorFactory,
+            RecordDescriptor recordDescriptor, boolean groupAll, int 
framesLimit) {
         super(spec, 1, 1);
         this.groupFields = groupFields;
         this.comparatorFactories = comparatorFactories;
         this.aggregatorFactory = aggregatorFactory;
         outRecDescs[0] = recordDescriptor;
         this.groupAll = groupAll;
+        this.framesLimit = framesLimit;
     }
 
     @Override
@@ -58,6 +66,6 @@
             final IRecordDescriptorProvider recordDescProvider, int partition, 
int nPartitions)
             throws HyracksDataException {
         return new PreclusteredGroupOperatorNodePushable(ctx, groupFields, 
comparatorFactories, aggregatorFactory,
-                recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), outRecDescs[0], groupAll);
+                recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), outRecDescs[0], groupAll, framesLimit);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 2acc4db..0a7444d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -36,12 +36,14 @@
     private final RecordDescriptor inRecordDescriptor;
     private final RecordDescriptor outRecordDescriptor;
     private final boolean groupAll;
+    private final int frameLimit;
 
     private PreclusteredGroupWriter pgw;
 
     PreclusteredGroupOperatorNodePushable(IHyracksTaskContext ctx, int[] 
groupFields,
             IBinaryComparatorFactory[] comparatorFactories, 
IAggregatorDescriptorFactory aggregatorFactory,
-            RecordDescriptor inRecordDescriptor, RecordDescriptor 
outRecordDescriptor, boolean groupAll) {
+            RecordDescriptor inRecordDescriptor, RecordDescriptor 
outRecordDescriptor, boolean groupAll,
+            int frameLimit) {
         this.ctx = ctx;
         this.groupFields = groupFields;
         this.comparatorFactories = comparatorFactories;
@@ -49,6 +51,7 @@
         this.inRecordDescriptor = inRecordDescriptor;
         this.outRecordDescriptor = outRecordDescriptor;
         this.groupAll = groupAll;
+        this.frameLimit = frameLimit;
     }
 
     @Override
@@ -58,7 +61,7 @@
             comparators[i] = comparatorFactories[i].createBinaryComparator();
         }
         pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, 
aggregatorFactory, inRecordDescriptor,
-                outRecordDescriptor, writer, false, groupAll);
+                outRecordDescriptor, writer, false, groupAll, frameLimit);
         pgw.open();
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 3105d42..18e29ba 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -51,28 +51,32 @@
     private final boolean outputPartial;
     private boolean first;
     private boolean isFailed = false;
+    private final long memoryLimit;
 
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, 
IBinaryComparator[] comparators,
             IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor 
inRecordDesc,
             RecordDescriptor outRecordDesc, IFrameWriter writer) throws 
HyracksDataException {
-        this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, 
outRecordDesc, writer, false, false);
+        this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, 
outRecordDesc, writer, false, false, -1);
     }
 
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, 
IBinaryComparator[] comparators,
             IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor 
inRecordDesc,
             RecordDescriptor outRecordDesc, IFrameWriter writer, boolean 
outputPartial) throws HyracksDataException {
         this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, 
outRecordDesc, writer, outputPartial,
-                false);
+                false, -1);
     }
 
     public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, 
IBinaryComparator[] comparators,
             IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor 
inRecordDesc,
-            RecordDescriptor outRecordDesc, IFrameWriter writer, boolean 
outputPartial, boolean groupAll)
-            throws HyracksDataException {
+            RecordDescriptor outRecordDesc, IFrameWriter writer, boolean 
outputPartial, boolean groupAll,
+            int framesLimit) throws HyracksDataException {
         this.groupFields = groupFields;
         this.comparators = comparators;
+
+        this.memoryLimit = ((long) framesLimit) * ctx.getInitialFrameSize();
         this.aggregator =
-                aggregatorFactory.createAggregator(ctx, inRecordDesc, 
outRecordDesc, groupFields, groupFields, writer);
+                aggregatorFactory.createAggregator(ctx, inRecordDesc, 
outRecordDesc, groupFields, groupFields, writer,
+                        this.memoryLimit);
         this.aggregateState = aggregator.createAggregateStates();
         copyFrame = new VSizeFrame(ctx);
         inFrameAccessor = new FrameTupleAccessor(inRecordDesc);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1940
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I670269b0b8f446d06d8dd73202194574aa524e85
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>

Reply via email to