Yingyi Bu has submitted this change and it was merged. Change subject: [ASTERIXDB-1812][RT] Budget the memory usage for pre-clustered group-by. ......................................................................
[ASTERIXDB-1812][RT] Budget the memory usage for pre-clustered group-by. - user model changes: no - storage format changes: no - interface changes: no Details: - let pre-clustered group-by consider memory budget. Change-Id: I670269b0b8f446d06d8dd73202194574aa524e85 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1940 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Till Westmann <ti...@apache.org> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java M asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.3.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java M hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java 28 files changed, 310 insertions(+), 37 deletions(-) Approvals: Anon. E. Moose #1000171: Till Westmann: Looks good to me, approved Jenkins: Verified; No violations found; ; Verified diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java index 5aaf87b..d22ec54 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java @@ -162,8 +162,8 @@ } ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator( gby.getGroupByList(), - physicalOptimizationConfig.getMaxFramesExternalGroupBy(), - (long) physicalOptimizationConfig.getMaxFramesExternalGroupBy() + physicalOptimizationConfig.getMaxFramesForGroupBy(), + (long) physicalOptimizationConfig.getMaxFramesForGroupBy() * physicalOptimizationConfig.getFrameSize()); generateMergeAggregationExpressions(gby, context); op.setPhysicalOperator(externalGby); @@ -182,7 +182,8 @@ columnList.add(varRef.getVariableReference()); } } - op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll())); + op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(), + context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy())); } } } else if (((AbstractLogicalOperator) (r0.getValue())).getOperatorTag() @@ -196,7 +197,8 @@ columnList.add(varRef.getVariableReference()); } } - op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll())); + op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(), + context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy())); } else { throw new AlgebricksException("Unsupported nested operator within a group-by: " + ((AbstractLogicalOperator) (r0.getValue())).getOperatorTag().name()); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql b/asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql index 493cbb0..4fa4f3e 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries/big-object/big_object_groupby/big_object_groupby.3.query.aql @@ -25,6 +25,8 @@ use dataverse test; +set "compiler.groupmemory" "32MB" + for $i in dataset('Line') group by $partkey := $i.l_partkey with $i order by $partkey diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp index b5388a9..5a5009b 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/big-object/big_object_groupby/big_object_groupby.3.query.sqlpp @@ -25,6 +25,7 @@ use test; +set `compiler.groupmemory` `32MB` select element {'partkey':partkey,'lines': (from g select value i) } from Line as i diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.1.ddl.sqlpp new file mode 100644 index 0000000..92698ab --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.1.ddl.sqlpp @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse tpch if exists; +create dataverse tpch; + +use tpch; + + +create type tpch.LineItemType as + closed { + l_orderkey : bigint, + l_partkey : bigint, + l_suppkey : bigint, + l_linenumber : bigint, + l_quantity : double, + l_extendedprice : double, + l_discount : double, + l_tax : double, + l_returnflag : string, + l_linestatus : string, + l_shipdate : string, + l_commitdate : string, + l_receiptdate : string, + l_shipinstruct : string, + l_shipmode : string, + l_comment : string +} + +create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber; + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.2.update.sqlpp new file mode 100644 index 0000000..5fe734c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.2.update.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use tpch; + + +load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted; + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.3.query.sqlpp new file mode 100644 index 0000000..7379646 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify-2/listify-2.3.query.sqlpp @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE tpch; + +SET `compiler.groupmemory` "576KB" + +SELECT * +FROM LineItem AS l +WHERE l.l_shipdate <= '1998-09-02' +/* +hash */ +GROUP BY l.l_returnflag AS l_returnflag, l.l_linestatus AS l_linestatus +ORDER BY l_returnflag, l_linestatus +; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp new file mode 100644 index 0000000..92698ab --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse tpch if exists; +create dataverse tpch; + +use tpch; + + +create type tpch.LineItemType as + closed { + l_orderkey : bigint, + l_partkey : bigint, + l_suppkey : bigint, + l_linenumber : bigint, + l_quantity : double, + l_extendedprice : double, + l_discount : double, + l_tax : double, + l_returnflag : string, + l_linestatus : string, + l_shipdate : string, + l_commitdate : string, + l_receiptdate : string, + l_shipinstruct : string, + l_shipmode : string, + l_comment : string +} + +create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber; + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp new file mode 100644 index 0000000..5fe734c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use tpch; + + +load dataset LineItem using localfs ((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`)) pre-sorted; + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp new file mode 100644 index 0000000..ba1e7d4 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE tpch; + +SET `compiler.groupmemory` "576KB" + +SELECT l_returnflag AS l_returnflag, + l_linestatus AS l_linestatus, + g +FROM LineItem AS l +WHERE l.l_shipdate <= '1998-09-02' +/* +hash */ +GROUP BY l.l_returnflag AS l_returnflag, l.l_linestatus AS l_linestatus +GROUP AS g +ORDER BY l_returnflag, l_linestatus +; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast index 1b4831f..9747f1e 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results_parser_sqlpp/big-object/big_object_groupby/big_object_groupby.3.ast @@ -1,4 +1,5 @@ DataverseUse test +Set compiler.groupmemory=32MB Query: SELECT ELEMENT [ RecordConstructor [ diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index 70b3486..16d4cb3 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -2887,6 +2887,18 @@ <expected-error>Invalid item type: function agg-sum cannot process item type object in an input array (or multiset)</expected-error> </compilation-unit> </test-case> + <test-case FilePath="group-by"> + <compilation-unit name="listify"> + <output-dir compare="Text">none</output-dir> + <expected-error>The byte size of a single group</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="group-by"> + <compilation-unit name="listify-2"> + <output-dir compare="Text">none</output-dir> + <expected-error>The byte size of a single group</expected-error> + </compilation-unit> + </test-case> </test-group> <test-group name="index-join"> <test-case FilePath="index-join"> diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java index 5a465f7..78e4795 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java @@ -44,10 +44,12 @@ public class PreclusteredGroupByPOperator extends AbstractPreclusteredGroupByPOperator { private final boolean groupAll; + private final int framesLimit; - public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, boolean groupAll) { + public PreclusteredGroupByPOperator(List<LogicalVariable> columnList, boolean groupAll, int framesLimit) { super(columnList); this.groupAll = groupAll; + this.framesLimit = framesLimit; } @Override @@ -86,7 +88,7 @@ context); PreclusteredGroupOperatorDescriptor opDesc = new PreclusteredGroupOperatorDescriptor(spec, keys, - comparatorFactories, aggregatorFactory, recordDescriptor, groupAll); + comparatorFactories, aggregatorFactory, recordDescriptor, groupAll, framesLimit); contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java index 7340882..a242927 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java @@ -91,7 +91,7 @@ setInt(MAX_FRAMES_FOR_JOIN, frameLimit); } - public int getMaxFramesExternalGroupBy() { + public int getMaxFramesForGroupBy() { int frameSize = getFrameSize(); return getInt(MAX_FRAMES_EXTERNAL_GROUP_BY, (int) (((long) 256 * MB) / frameSize)); } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java index 576bd62..7ea1327 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushGroupByIntoSortRule.java @@ -102,7 +102,7 @@ //replace preclustered gby with sort gby if (!groupByOperator.isGroupAll()) { op.setPhysicalOperator(new SortGroupByPOperator(groupByOperator.getGroupByList(), - context.getPhysicalOptimizationConfig().getMaxFramesExternalGroupBy(), + context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy(), sortPhysicalOperator.getSortColumns())); } // remove the stable sort operator diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index 7dc59f6..16ed9cb 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -167,8 +167,8 @@ if (hasIntermediateAgg) { ExternalGroupByPOperator externalGby = new ExternalGroupByPOperator( gby.getGroupByList(), - physicalOptimizationConfig.getMaxFramesExternalGroupBy(), - (long) physicalOptimizationConfig.getMaxFramesExternalGroupBy() + physicalOptimizationConfig.getMaxFramesForGroupBy(), + (long) physicalOptimizationConfig.getMaxFramesForGroupBy() * physicalOptimizationConfig.getFrameSize()); op.setPhysicalOperator(externalGby); break; @@ -187,7 +187,8 @@ } } if (topLevelOp) { - op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll())); + op.setPhysicalOperator(new PreclusteredGroupByPOperator(columnList, gby.isGroupAll(), + context.getPhysicalOptimizationConfig().getMaxFramesForGroupBy())); } else { op.setPhysicalOperator(new MicroPreclusteredGroupByPOperator(columnList)); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java index 893aa61..94af04f 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java @@ -29,6 +29,7 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; @@ -53,7 +54,8 @@ @Override public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc, - RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys) throws HyracksDataException { + RecordDescriptor outRecordDescriptor, int[] keys, int[] partialKeys, long memoryBudget) + throws HyracksDataException { final AggregatorOutput outputWriter = new AggregatorOutput(subplans, keyFieldIdx.length, decorFieldIdx.length); final NestedTupleSourceRuntime[] pipelines = new NestedTupleSourceRuntime[subplans.length]; for (int i = 0; i < subplans.length; i++) { @@ -86,6 +88,9 @@ @Override public void aggregate(IFrameTupleAccessor accessor, int tIndex, IFrameTupleAccessor stateAccessor, int stateTupleIndex, AggregateState state) throws HyracksDataException { + // Checks the memory usage. + memoryUsageCheck(); + for (int i = 0; i < pipelines.length; i++) { pipelines[i].writeTuple(accessor.getBuffer(), tIndex); } @@ -98,7 +103,10 @@ outputWriter.setInputIdx(i); pipelines[i].close(); } - // outputWriter.writeTuple(appender); + + // Checks the memory usage. + memoryUsageCheck(); + tupleBuilder.reset(); ArrayTupleBuilder tb = outputWriter.getTupleBuilder(); byte[] data = tb.getByteArray(); @@ -136,6 +144,18 @@ } + // Checks the memory usage. + private void memoryUsageCheck() throws HyracksDataException { + if (memoryBudget > 0) { + ArrayTupleBuilder tb = outputWriter.getTupleBuilder(); + byte[] data = tb.getByteArray(); + if (data.length > memoryBudget) { + throw HyracksDataException.create(ErrorCode.GROUP_BY_MEMORY_BUDGET_EXCEEDS, data.length, + memoryBudget); + } + } + } + }; } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java index 8b8e320..c261df8 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java @@ -60,7 +60,7 @@ @Override public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, - final IFrameWriter writer) throws HyracksDataException { + final IFrameWriter writer, long memoryBudget) throws HyracksDataException { final RunningAggregatorOutput outputWriter = new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length, decorFieldIdx.length, writer); // should enforce protocol diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java index e4ac701..1e06c76 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java @@ -42,8 +42,8 @@ @Override public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor, - RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults) - throws HyracksDataException { + RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults, + long memoryBudget) throws HyracksDataException { final int[] keys = keyFields; /** diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java index 87f3cb4..50452c1 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java @@ -43,7 +43,8 @@ @Override public IAggregatorDescriptor createAggregator(final IHyracksTaskContext ctx, RecordDescriptor inRecordDesc, - RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys) throws HyracksDataException { + RecordDescriptor outRecordDescriptor, int[] aggKeys, int[] partialKeys, long memoryBudget) + throws HyracksDataException { return new IAggregatorDescriptor() { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index e6fbc6f..b054faf 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -122,6 +122,8 @@ public static final int UNRECOGNIZED_INDEX_COMPONENT_FILE = 86; public static final int UNEQUAL_NUM_FILTERS_TREES = 87; public static final int CANNOT_MODIFY_INDEX_DISK_IS_FULL = 88; + public static final int GROUP_BY_MEMORY_BUDGET_EXCEEDS = 89; + public static final int ILLEGAL_MEMORY_BUDGET = 90; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index d2e05e3..1d2143b 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -105,5 +105,7 @@ 86 = Found an unrecognized index file %1$s 87 = Unequal number of trees and filters found in %1$s 88 = Cannot modify index (Disk is full) +89 = The byte size of a single group (%1$s bytes) exceeds the budget for a group by operator (%2$s bytes) +90 = Memory budget for the %1$s operator (%2$s bytes) is lower than the minimum (%3$s bytes) 10000 = The given rule collection %1$s is not an instance of the List class. diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java index 5d13332..d546e5e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java @@ -32,14 +32,14 @@ */ @Override public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor, - RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, IFrameWriter writer) - throws HyracksDataException { - return this - .createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, keyFields, keyFieldsInPartialResults); + RecordDescriptor outRecordDescriptor, int[] keyFields, int[] keyFieldsInPartialResults, IFrameWriter writer, + long memoryBudget) throws HyracksDataException { + return createAggregator(ctx, inRecordDescriptor, outRecordDescriptor, keyFields, keyFieldsInPartialResults, + memoryBudget); } abstract protected IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields, - final int[] keyFieldsInPartialResults) throws HyracksDataException; + final int[] keyFieldsInPartialResults, long memoryBudget) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java index cd79a30..43b9685 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java @@ -97,7 +97,7 @@ hashFunctionFamilies).createPartitioner(seed); final IAggregatorDescriptor aggregator = aggregateFactory.createAggregator(ctx, inRecordDescriptor, - outRecordDescriptor, keyFields, intermediateResultKeys, null); + outRecordDescriptor, keyFields, intermediateResultKeys, null, -1); final AggregateState aggregateState = aggregator.createAggregateStates(); diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java index 99c787c..d5ed713 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java @@ -29,6 +29,6 @@ IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int[] keyFields, final int[] keyFieldsInPartialResults, - IFrameWriter writer) throws HyracksDataException; + IFrameWriter writer, long memoryBudget) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java index e326f39..595e2c4 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java @@ -28,7 +28,6 @@ import org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory; import org.apache.hyracks.dataflow.std.group.AggregateState; import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor; -import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory; import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptor; import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory; @@ -58,8 +57,8 @@ */ @Override public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx, RecordDescriptor inRecordDescriptor, - RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults) - throws HyracksDataException { + RecordDescriptor outRecordDescriptor, final int[] keyFields, final int[] keyFieldsInPartialResults, + long memoryBudget) throws HyracksDataException { final IFieldAggregateDescriptor[] aggregators = new IFieldAggregateDescriptor[aggregatorFactories.length]; for (int i = 0; i < aggregators.length; i++) { diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java index 0fe0f54..739de74 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java @@ -35,22 +35,30 @@ private final IBinaryComparatorFactory[] comparatorFactories; private final IAggregatorDescriptorFactory aggregatorFactory; private final boolean groupAll; + private final int framesLimit; public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields, IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor recordDescriptor) { - this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor, false); + this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor, false, -1); } public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields, IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory, - RecordDescriptor recordDescriptor, boolean groupAll) { + RecordDescriptor recordDescriptor, int framesLimit) { + this(spec, groupFields, comparatorFactories, aggregatorFactory, recordDescriptor, false, framesLimit); + } + + public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] groupFields, + IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory, + RecordDescriptor recordDescriptor, boolean groupAll, int framesLimit) { super(spec, 1, 1); this.groupFields = groupFields; this.comparatorFactories = comparatorFactories; this.aggregatorFactory = aggregatorFactory; outRecDescs[0] = recordDescriptor; this.groupAll = groupAll; + this.framesLimit = framesLimit; } @Override @@ -58,6 +66,6 @@ final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { return new PreclusteredGroupOperatorNodePushable(ctx, groupFields, comparatorFactories, aggregatorFactory, - recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), outRecDescs[0], groupAll); + recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), outRecDescs[0], groupAll, framesLimit); } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java index 2acc4db..0a7444d 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java @@ -36,12 +36,14 @@ private final RecordDescriptor inRecordDescriptor; private final RecordDescriptor outRecordDescriptor; private final boolean groupAll; + private final int frameLimit; private PreclusteredGroupWriter pgw; PreclusteredGroupOperatorNodePushable(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory, - RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, boolean groupAll) { + RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, boolean groupAll, + int frameLimit) { this.ctx = ctx; this.groupFields = groupFields; this.comparatorFactories = comparatorFactories; @@ -49,6 +51,7 @@ this.inRecordDescriptor = inRecordDescriptor; this.outRecordDescriptor = outRecordDescriptor; this.groupAll = groupAll; + this.frameLimit = frameLimit; } @Override @@ -58,7 +61,7 @@ comparators[i] = comparatorFactories[i].createBinaryComparator(); } pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators, aggregatorFactory, inRecordDescriptor, - outRecordDescriptor, writer, false, groupAll); + outRecordDescriptor, writer, false, groupAll, frameLimit); pgw.open(); } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java index 3105d42..db6102e 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java @@ -26,6 +26,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; @@ -51,28 +52,39 @@ private final boolean outputPartial; private boolean first; private boolean isFailed = false; + private final long memoryLimit; public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators, IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException { - this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer, false, false); + this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer, false, false, -1); } public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators, IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc, RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial) throws HyracksDataException { this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc, outRecordDesc, writer, outputPartial, - false); + false, -1); } public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields, IBinaryComparator[] comparators, IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor inRecordDesc, - RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial, boolean groupAll) - throws HyracksDataException { + RecordDescriptor outRecordDesc, IFrameWriter writer, boolean outputPartial, boolean groupAll, + int framesLimit) throws HyracksDataException { this.groupFields = groupFields; this.comparators = comparators; + + if (framesLimit >= 0 && framesLimit <= 2) { + throw HyracksDataException.create(ErrorCode.ILLEGAL_MEMORY_BUDGET, "GROUP BY", + Long.toString(((long) (framesLimit)) * ctx.getInitialFrameSize()), + Long.toString(2L * ctx.getInitialFrameSize())); + } + + // Deducts input/output frames. + this.memoryLimit = framesLimit <= 0 ? -1 : ((long) (framesLimit - 2)) * ctx.getInitialFrameSize(); this.aggregator = - aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields, writer); + aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields, writer, + this.memoryLimit); this.aggregateState = aggregator.createAggregateStates(); copyFrame = new VSizeFrame(ctx); inFrameAccessor = new FrameTupleAccessor(inRecordDesc); -- To view, visit https://asterix-gerrit.ics.uci.edu/1940 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I670269b0b8f446d06d8dd73202194574aa524e85 Gerrit-PatchSet: 14 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Yingyi Bu <buyin...@gmail.com> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Dmitry Lychagin <dmitry.lycha...@couchbase.com> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: Yingyi Bu <buyin...@gmail.com>