Yingyi Bu has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1940
Change subject: [RT]Buget the memory usage for pre-clustered group-by.
......................................................................
[RT]Buget the memory usage for pre-clustered group-by.
Change-Id: I670269b0b8f446d06d8dd73202194574aa524e85
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/query-issue810-3/query-issue810-3.3.query.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql/query-issue810/query-issue810.3.query.sqlpp
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch/query-issue810-3/query-issue810-3.3.query.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/listify/listify.1.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
24 files changed, 184 insertions(+), 31 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/40/1940/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index 5aaf87b..fa066f9 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -182,7 +182,8 @@
columnList.add(varRef.getVariableReference());
}
}
- op.setPhysicalOperator(new
PreclusteredGroupByPOperator(columnList, gby.isGroupAll()));
+ op.setPhysicalOperator(new
PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
+
context.getPhysicalOptimizationConfig().getMaxFramesExternalGroupBy()));
}
}
} else if (((AbstractLogicalOperator)
(r0.getValue())).getOperatorTag()
@@ -196,7 +197,8 @@
columnList.add(varRef.getVariableReference());
}
}
- op.setPhysicalOperator(new
PreclusteredGroupByPOperator(columnList, gby.isGroupAll()));
+ op.setPhysicalOperator(new
PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
+
context.getPhysicalOptimizationConfig().getMaxFramesExternalGroupBy()));
} else {
throw new AlgebricksException("Unsupported nested
operator within a group-by: "
+ ((AbstractLogicalOperator)
(r0.getValue())).getOperatorTag().name());
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp
new file mode 100644
index 0000000..92698ab
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.1.ddl.sqlpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use tpch;
+
+
+create type tpch.LineItemType as
+ closed {
+ l_orderkey : bigint,
+ l_partkey : bigint,
+ l_suppkey : bigint,
+ l_linenumber : bigint,
+ l_quantity : double,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+}
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp
new file mode 100644
index 0000000..5fe734c
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.2.update.sqlpp
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+
+load dataset LineItem using localfs
((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`))
pre-sorted;
+
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp
new file mode 100644
index 0000000..73d7ab2
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/listify/listify.3.query.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE tpch;
+
+SELECT l_returnflag AS l_returnflag,
+ l_linestatus AS l_linestatus,
+ g
+FROM LineItem AS l
+WHERE l.l_shipdate <= '1998-09-02'
+/* +hash */
+GROUP BY l.l_returnflag AS l_returnflag, l.l_linestatus AS l_linestatus
+GROUP AS g
+ORDER BY l_returnflag, l_linestatus
+;
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/query-issue810-3/query-issue810-3.3.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/query-issue810-3/query-issue810-3.3.query.sqlpp
index 5186119..c5a8933 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/query-issue810-3/query-issue810-3.3.query.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql-sugar/query-issue810-3/query-issue810-3.3.query.sqlpp
@@ -25,6 +25,8 @@
USE tpch;
+SET `compiler.groupmemory` "32MB"
+
SELECT l_returnflag,
l_linestatus,
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql/query-issue810/query-issue810.3.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql/query-issue810/query-issue810.3.query.sqlpp
index 17e37b3..7f21560 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql/query-issue810/query-issue810.3.query.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch-sql/query-issue810/query-issue810.3.query.sqlpp
@@ -25,6 +25,7 @@
USE tpch;
+SET `compiler.groupmemory` "32MB"
SELECT l_returnflag AS l_returnflag,
l_linestatus AS l_linestatus,
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch/query-issue810-3/query-issue810-3.3.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch/query-issue810-3/query-issue810-3.3.query.sqlpp
index f25f99d..a46a0e7 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch/query-issue810-3/query-issue810-3.3.query.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/tpch/query-issue810-3/query-issue810-3.3.query.sqlpp
@@ -25,6 +25,7 @@
use tpch;
+set `compiler.groupmemory` "32MB"
select element
{'l_returnflag':l_returnflag,'l_linestatus':l_linestatus,'count_cheaps':coll_count(cheaps),'avg_expensive_discounts':tpch.coll_avg(expensives),'sum_disc_prices':tpch.coll_sum(disc_prices),'total_charges':tpch.coll_sum(charges)}
from LineItem as l
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/listify/listify.1.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/listify/listify.1.adm
new file mode 100644
index 0000000..583f20b
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/listify/listify.1.adm
@@ -0,0 +1 @@
+{ "avgpay": 2500.0, "workers": [ { "name": "Bill", "salary": 2000 }, { "name":
"Fred", "salary": 3000 } ], "deptno": "K55" }
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 70b3486..cc21741 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -2887,6 +2887,12 @@
<expected-error>Invalid item type: function agg-sum cannot process
item type object in an input array (or multiset)</expected-error>
</compilation-unit>
</test-case>
+ <test-case FilePath="group-by">
+ <compilation-unit name="listify">
+ <output-dir compare="Text">none</output-dir>
+ <expected-error>Group By operator memory usage</expected-error>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="index-join">
<test-case FilePath="index-join">
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
index a636d10..445d4f2 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/PreclusteredGroupByPOperator.java
@@ -43,10 +43,12 @@
public class PreclusteredGroupByPOperator extends
AbstractPreclusteredGroupByPOperator {
private final boolean groupAll;
+ private final int framesLimit;
- public PreclusteredGroupByPOperator(List<LogicalVariable> columnList,
boolean groupAll) {
+ public PreclusteredGroupByPOperator(List<LogicalVariable> columnList,
boolean groupAll, int framesLimit) {
super(columnList);
this.groupAll = groupAll;
+ this.framesLimit = framesLimit;
}
@Override
@@ -84,7 +86,7 @@
context);
PreclusteredGroupOperatorDescriptor opDesc = new
PreclusteredGroupOperatorDescriptor(spec, keys,
- comparatorFactories, aggregatorFactory, recordDescriptor,
groupAll);
+ comparatorFactories, aggregatorFactory, recordDescriptor,
groupAll, framesLimit);
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 7dc59f6..4b31dc2 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -187,7 +187,8 @@
}
}
if (topLevelOp) {
- op.setPhysicalOperator(new
PreclusteredGroupByPOperator(columnList, gby.isGroupAll()));
+ op.setPhysicalOperator(new
PreclusteredGroupByPOperator(columnList, gby.isGroupAll(),
+
context.getPhysicalOptimizationConfig().getMaxFramesExternalGroupBy()));
} else {
op.setPhysicalOperator(new
MicroPreclusteredGroupByPOperator(columnList));
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 893aa61..e901675 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -53,7 +54,8 @@
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDescriptor, int[] keys, int[]
partialKeys) throws HyracksDataException {
+ RecordDescriptor outRecordDescriptor, int[] keys, int[]
partialKeys, long memoryBudget)
+ throws HyracksDataException {
final AggregatorOutput outputWriter = new AggregatorOutput(subplans,
keyFieldIdx.length, decorFieldIdx.length);
final NestedTupleSourceRuntime[] pipelines = new
NestedTupleSourceRuntime[subplans.length];
for (int i = 0; i < subplans.length; i++) {
@@ -86,6 +88,9 @@
@Override
public void aggregate(IFrameTupleAccessor accessor, int tIndex,
IFrameTupleAccessor stateAccessor,
int stateTupleIndex, AggregateState state) throws
HyracksDataException {
+ // Checks the memory usage.
+ memoryUsageCheck();
+
for (int i = 0; i < pipelines.length; i++) {
pipelines[i].writeTuple(accessor.getBuffer(), tIndex);
}
@@ -98,7 +103,10 @@
outputWriter.setInputIdx(i);
pipelines[i].close();
}
- // outputWriter.writeTuple(appender);
+
+ // Checks the memory usage.
+ memoryUsageCheck();
+
tupleBuilder.reset();
ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
byte[] data = tb.getByteArray();
@@ -136,6 +144,16 @@
}
+ // Checks the memory usage.
+ private void memoryUsageCheck() throws HyracksDataException {
+ ArrayTupleBuilder tb = outputWriter.getTupleBuilder();
+ byte[] data = tb.getByteArray();
+ if (memoryBudget > 0 && data.length > memoryBudget) {
+ throw
HyracksDataException.create(ErrorCode.GROUP_BY_MEMORY_BUDGET_EXCEEDS,
data.length,
+ memoryBudget);
+ }
+ }
+
};
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index 8b8e320..c261df8 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -60,7 +60,7 @@
@Override
public IAggregatorDescriptor createAggregator(final IHyracksTaskContext
ctx, RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, int[]
keyFieldsInPartialResults,
- final IFrameWriter writer) throws HyracksDataException {
+ final IFrameWriter writer, long memoryBudget) throws
HyracksDataException {
final RunningAggregatorOutput outputWriter =
new RunningAggregatorOutput(ctx, subplans, keyFieldIdx.length,
decorFieldIdx.length, writer);
// should enforce protocol
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
index e4ac701..1e06c76 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SerializableAggregatorDescriptorFactory.java
@@ -42,8 +42,8 @@
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, final int[]
keyFieldsInPartialResults)
- throws HyracksDataException {
+ RecordDescriptor outRecordDescriptor, int[] keyFields, final int[]
keyFieldsInPartialResults,
+ long memoryBudget) throws HyracksDataException {
final int[] keys = keyFields;
/**
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
index 87f3cb4..50452c1 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/SimpleAlgebricksAccumulatingAggregatorFactory.java
@@ -43,7 +43,8 @@
@Override
public IAggregatorDescriptor createAggregator(final IHyracksTaskContext
ctx, RecordDescriptor inRecordDesc,
- RecordDescriptor outRecordDescriptor, int[] aggKeys, int[]
partialKeys) throws HyracksDataException {
+ RecordDescriptor outRecordDescriptor, int[] aggKeys, int[]
partialKeys, long memoryBudget)
+ throws HyracksDataException {
return new IAggregatorDescriptor() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index e6fbc6f..d8aeed0 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -122,6 +122,7 @@
public static final int UNRECOGNIZED_INDEX_COMPONENT_FILE = 86;
public static final int UNEQUAL_NUM_FILTERS_TREES = 87;
public static final int CANNOT_MODIFY_INDEX_DISK_IS_FULL = 88;
+ public static final int GROUP_BY_MEMORY_BUDGET_EXCEEDS = 89;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index d2e05e3..05e2b1a 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -105,5 +105,6 @@
86 = Found an unrecognized index file %1$s
87 = Unequal number of trees and filters found in %1$s
88 = Cannot modify index (Disk is full)
+89 = Group By operator memory usage (%1$s bytes) exceeds the budget (%2$s
bytes)
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
index 5d13332..d546e5e 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/AbstractAccumulatingAggregatorDescriptorFactory.java
@@ -32,14 +32,14 @@
*/
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int[] keyFields, int[]
keyFieldsInPartialResults, IFrameWriter writer)
- throws HyracksDataException {
- return this
- .createAggregator(ctx, inRecordDescriptor,
outRecordDescriptor, keyFields, keyFieldsInPartialResults);
+ RecordDescriptor outRecordDescriptor, int[] keyFields, int[]
keyFieldsInPartialResults, IFrameWriter writer,
+ long memoryBudget) throws HyracksDataException {
+ return createAggregator(ctx, inRecordDescriptor, outRecordDescriptor,
keyFields, keyFieldsInPartialResults,
+ memoryBudget);
}
abstract protected IAggregatorDescriptor
createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDescriptor, RecordDescriptor
outRecordDescriptor, int[] keyFields,
- final int[] keyFieldsInPartialResults) throws HyracksDataException;
+ final int[] keyFieldsInPartialResults, long memoryBudget) throws
HyracksDataException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index cd79a30..43b9685 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -97,7 +97,7 @@
hashFunctionFamilies).createPartitioner(seed);
final IAggregatorDescriptor aggregator =
aggregateFactory.createAggregator(ctx, inRecordDescriptor,
- outRecordDescriptor, keyFields, intermediateResultKeys, null);
+ outRecordDescriptor, keyFields, intermediateResultKeys, null,
-1);
final AggregateState aggregateState =
aggregator.createAggregateStates();
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
index 99c787c..d5ed713 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/IAggregatorDescriptorFactory.java
@@ -29,6 +29,6 @@
IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDescriptor,
RecordDescriptor outRecordDescriptor, int[] keyFields, final int[]
keyFieldsInPartialResults,
- IFrameWriter writer) throws HyracksDataException;
+ IFrameWriter writer, long memoryBudget) throws
HyracksDataException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
index e326f39..595e2c4 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/aggregators/MultiFieldsAggregatorFactory.java
@@ -28,7 +28,6 @@
import
org.apache.hyracks.dataflow.std.group.AbstractAccumulatingAggregatorDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.AggregateState;
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
-import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptor;
import org.apache.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
@@ -58,8 +57,8 @@
*/
@Override
public IAggregatorDescriptor createAggregator(IHyracksTaskContext ctx,
RecordDescriptor inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, final int[] keyFields, final
int[] keyFieldsInPartialResults)
- throws HyracksDataException {
+ RecordDescriptor outRecordDescriptor, final int[] keyFields, final
int[] keyFieldsInPartialResults,
+ long memoryBudget) throws HyracksDataException {
final IFieldAggregateDescriptor[] aggregators = new
IFieldAggregateDescriptor[aggregatorFactories.length];
for (int i = 0; i < aggregators.length; i++) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
index 0fe0f54..739de74 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
@@ -35,22 +35,30 @@
private final IBinaryComparatorFactory[] comparatorFactories;
private final IAggregatorDescriptorFactory aggregatorFactory;
private final boolean groupAll;
+ private final int framesLimit;
public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry
spec, int[] groupFields,
IBinaryComparatorFactory[] comparatorFactories,
IAggregatorDescriptorFactory aggregatorFactory,
RecordDescriptor recordDescriptor) {
- this(spec, groupFields, comparatorFactories, aggregatorFactory,
recordDescriptor, false);
+ this(spec, groupFields, comparatorFactories, aggregatorFactory,
recordDescriptor, false, -1);
}
public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry
spec, int[] groupFields,
IBinaryComparatorFactory[] comparatorFactories,
IAggregatorDescriptorFactory aggregatorFactory,
- RecordDescriptor recordDescriptor, boolean groupAll) {
+ RecordDescriptor recordDescriptor, int framesLimit) {
+ this(spec, groupFields, comparatorFactories, aggregatorFactory,
recordDescriptor, false, framesLimit);
+ }
+
+ public PreclusteredGroupOperatorDescriptor(IOperatorDescriptorRegistry
spec, int[] groupFields,
+ IBinaryComparatorFactory[] comparatorFactories,
IAggregatorDescriptorFactory aggregatorFactory,
+ RecordDescriptor recordDescriptor, boolean groupAll, int
framesLimit) {
super(spec, 1, 1);
this.groupFields = groupFields;
this.comparatorFactories = comparatorFactories;
this.aggregatorFactory = aggregatorFactory;
outRecDescs[0] = recordDescriptor;
this.groupAll = groupAll;
+ this.framesLimit = framesLimit;
}
@Override
@@ -58,6 +66,6 @@
final IRecordDescriptorProvider recordDescProvider, int partition,
int nPartitions)
throws HyracksDataException {
return new PreclusteredGroupOperatorNodePushable(ctx, groupFields,
comparatorFactories, aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(getActivityId(),
0), outRecDescs[0], groupAll);
+ recordDescProvider.getInputRecordDescriptor(getActivityId(),
0), outRecDescs[0], groupAll, framesLimit);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
index 2acc4db..0a7444d 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorNodePushable.java
@@ -36,12 +36,14 @@
private final RecordDescriptor inRecordDescriptor;
private final RecordDescriptor outRecordDescriptor;
private final boolean groupAll;
+ private final int frameLimit;
private PreclusteredGroupWriter pgw;
PreclusteredGroupOperatorNodePushable(IHyracksTaskContext ctx, int[]
groupFields,
IBinaryComparatorFactory[] comparatorFactories,
IAggregatorDescriptorFactory aggregatorFactory,
- RecordDescriptor inRecordDescriptor, RecordDescriptor
outRecordDescriptor, boolean groupAll) {
+ RecordDescriptor inRecordDescriptor, RecordDescriptor
outRecordDescriptor, boolean groupAll,
+ int frameLimit) {
this.ctx = ctx;
this.groupFields = groupFields;
this.comparatorFactories = comparatorFactories;
@@ -49,6 +51,7 @@
this.inRecordDescriptor = inRecordDescriptor;
this.outRecordDescriptor = outRecordDescriptor;
this.groupAll = groupAll;
+ this.frameLimit = frameLimit;
}
@Override
@@ -58,7 +61,7 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
pgw = new PreclusteredGroupWriter(ctx, groupFields, comparators,
aggregatorFactory, inRecordDescriptor,
- outRecordDescriptor, writer, false, groupAll);
+ outRecordDescriptor, writer, false, groupAll, frameLimit);
pgw.open();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 3105d42..18e29ba 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -51,28 +51,32 @@
private final boolean outputPartial;
private boolean first;
private boolean isFailed = false;
+ private final long memoryLimit;
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields,
IBinaryComparator[] comparators,
IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor
inRecordDesc,
RecordDescriptor outRecordDesc, IFrameWriter writer) throws
HyracksDataException {
- this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc,
outRecordDesc, writer, false, false);
+ this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc,
outRecordDesc, writer, false, false, -1);
}
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields,
IBinaryComparator[] comparators,
IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor
inRecordDesc,
RecordDescriptor outRecordDesc, IFrameWriter writer, boolean
outputPartial) throws HyracksDataException {
this(ctx, groupFields, comparators, aggregatorFactory, inRecordDesc,
outRecordDesc, writer, outputPartial,
- false);
+ false, -1);
}
public PreclusteredGroupWriter(IHyracksTaskContext ctx, int[] groupFields,
IBinaryComparator[] comparators,
IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor
inRecordDesc,
- RecordDescriptor outRecordDesc, IFrameWriter writer, boolean
outputPartial, boolean groupAll)
- throws HyracksDataException {
+ RecordDescriptor outRecordDesc, IFrameWriter writer, boolean
outputPartial, boolean groupAll,
+ int framesLimit) throws HyracksDataException {
this.groupFields = groupFields;
this.comparators = comparators;
+
+ this.memoryLimit = ((long) framesLimit) * ctx.getInitialFrameSize();
this.aggregator =
- aggregatorFactory.createAggregator(ctx, inRecordDesc,
outRecordDesc, groupFields, groupFields, writer);
+ aggregatorFactory.createAggregator(ctx, inRecordDesc,
outRecordDesc, groupFields, groupFields, writer,
+ this.memoryLimit);
this.aggregateState = aggregator.createAggregateStates();
copyFrame = new VSizeFrame(ctx);
inFrameAccessor = new FrameTupleAccessor(inRecordDesc);
--
To view, visit https://asterix-gerrit.ics.uci.edu/1940
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I670269b0b8f446d06d8dd73202194574aa524e85
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>